diff --git a/.changeset/brown-peas-compare.md b/.changeset/brown-peas-compare.md
new file mode 100644
index 00000000000..7c5bf4cc302
--- /dev/null
+++ b/.changeset/brown-peas-compare.md
@@ -0,0 +1,5 @@
+---
+"effect": major
+---
+
+make Logger.prettyLogger the default logger
diff --git a/.changeset/chilled-ducks-sniff.md b/.changeset/chilled-ducks-sniff.md
new file mode 100644
index 00000000000..2f65736f7ee
--- /dev/null
+++ b/.changeset/chilled-ducks-sniff.md
@@ -0,0 +1,24 @@
+---
+"effect": major
+---
+
+Use object options for Stream.async apis
+
+Instead of:
+
+```ts
+Stream.async((emit) => {
+ //...
+}, 16);
+```
+
+You can now write:
+
+```ts
+Stream.async(
+ (emit) => {
+ //...
+ },
+ { bufferSize: 16 },
+);
+```
diff --git a/.changeset/cold-cougars-pretend.md b/.changeset/cold-cougars-pretend.md
new file mode 100644
index 00000000000..df705b7505a
--- /dev/null
+++ b/.changeset/cold-cougars-pretend.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+add TSubscriptionRef
diff --git a/.changeset/happy-jeans-invent.md b/.changeset/happy-jeans-invent.md
new file mode 100644
index 00000000000..f5e93e0f140
--- /dev/null
+++ b/.changeset/happy-jeans-invent.md
@@ -0,0 +1,6 @@
+---
+"effect": major
+---
+
+For `Data.TaggedEnum` added capitalized constructor naming convention.
+Helper functions `$is` and `$match` have been renamed to `is` and `match`.
diff --git a/.changeset/poor-tools-switch.md b/.changeset/poor-tools-switch.md
new file mode 100644
index 00000000000..8ef566daa9c
--- /dev/null
+++ b/.changeset/poor-tools-switch.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+add Logger.string Layer, for setting the logger to Logger.stringLogger
diff --git a/.changeset/shiny-squids-sell.md b/.changeset/shiny-squids-sell.md
new file mode 100644
index 00000000000..0e68f8a95fd
--- /dev/null
+++ b/.changeset/shiny-squids-sell.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+add Stream.fromTQueue & Stream.fromTPubSub
diff --git a/.changeset/tough-lobsters-guess.md b/.changeset/tough-lobsters-guess.md
new file mode 100644
index 00000000000..50a130bd188
--- /dev/null
+++ b/.changeset/tough-lobsters-guess.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+add option to .releaseLock a ReadableStream on finalization
diff --git a/.changeset/young-knives-poke.md b/.changeset/young-knives-poke.md
new file mode 100644
index 00000000000..a14be98752d
--- /dev/null
+++ b/.changeset/young-knives-poke.md
@@ -0,0 +1,7 @@
+---
+"effect": major
+"@effect/platform-node-shared": minor
+"@effect/cli": minor
+---
+
+depreciate Secret module in favour of the Redacted module
diff --git a/packages/cli/src/Args.ts b/packages/cli/src/Args.ts
index dd2e7aca7a9..8c09ec3b909 100644
--- a/packages/cli/src/Args.ts
+++ b/packages/cli/src/Args.ts
@@ -11,7 +11,6 @@ import type { Effect } from "effect/Effect"
import type { Option } from "effect/Option"
import type { Pipeable } from "effect/Pipeable"
import type { Redacted } from "effect/Redacted"
-import type { Secret } from "effect/Secret"
import type { CliConfig } from "./CliConfig.js"
import type { HelpDoc } from "./HelpDoc.js"
import * as InternalArgs from "./internal/args.js"
@@ -387,16 +386,6 @@ export const repeated: (self: Args) => Args> = InternalArgs.repea
*/
export const redacted: (config?: Args.BaseArgsConfig) => Args = InternalArgs.redacted
-/**
- * Creates a text argument.
- *
- * Can optionally provide a custom argument name (defaults to `"secret"`).
- *
- * @since 1.0.0
- * @category constructors
- */
-export const secret: (config?: Args.BaseArgsConfig) => Args = InternalArgs.secret
-
/**
* Creates a text argument.
*
diff --git a/packages/cli/src/Options.ts b/packages/cli/src/Options.ts
index 4d68c3bdc5f..78aa69f4734 100644
--- a/packages/cli/src/Options.ts
+++ b/packages/cli/src/Options.ts
@@ -13,7 +13,6 @@ import type { HashMap } from "effect/HashMap"
import type { Option } from "effect/Option"
import type { Pipeable } from "effect/Pipeable"
import type { Redacted } from "effect/Redacted"
-import type { Secret } from "effect/Secret"
import type { CliConfig } from "./CliConfig.js"
import type { HelpDoc } from "./HelpDoc.js"
import * as InternalOptions from "./internal/options.js"
@@ -315,13 +314,6 @@ export const none: Options = InternalOptions.none
*/
export const redacted: (name: string) => Options = InternalOptions.redacted
-/**
- * @since 1.0.0
- * @category constructors
- * @deprecated
- */
-export const secret: (name: string) => Options = InternalOptions.secret
-
/**
* @since 1.0.0
* @category constructors
diff --git a/packages/cli/src/internal/args.ts b/packages/cli/src/internal/args.ts
index c7f404ee4f4..8a8cf4f627a 100644
--- a/packages/cli/src/internal/args.ts
+++ b/packages/cli/src/internal/args.ts
@@ -13,7 +13,6 @@ import * as Option from "effect/Option"
import { pipeArguments } from "effect/Pipeable"
import type * as Redacted from "effect/Redacted"
import * as Ref from "effect/Ref"
-import type * as Secret from "effect/Secret"
import type * as Args from "../Args.js"
import type * as CliConfig from "../CliConfig.js"
import type * as HelpDoc from "../HelpDoc.js"
@@ -272,11 +271,6 @@ export const redacted = (
config?: Args.Args.BaseArgsConfig
): Args.Args => makeSingle(Option.fromNullable(config?.name), InternalPrimitive.redacted)
-/** @internal */
-export const secret = (
- config?: Args.Args.BaseArgsConfig
-): Args.Args => makeSingle(Option.fromNullable(config?.name), InternalPrimitive.secret)
-
/** @internal */
export const text = (config?: Args.Args.BaseArgsConfig): Args.Args =>
makeSingle(Option.fromNullable(config?.name), InternalPrimitive.text)
diff --git a/packages/cli/src/internal/options.ts b/packages/cli/src/internal/options.ts
index 3151ff4371b..9eb0fb6008a 100644
--- a/packages/cli/src/internal/options.ts
+++ b/packages/cli/src/internal/options.ts
@@ -16,7 +16,6 @@ import { pipeArguments } from "effect/Pipeable"
import * as Predicate from "effect/Predicate"
import type * as Redacted from "effect/Redacted"
import * as Ref from "effect/Ref"
-import type * as Secret from "effect/Secret"
import type * as CliConfig from "../CliConfig.js"
import type * as HelpDoc from "../HelpDoc.js"
import type * as Options from "../Options.js"
@@ -386,10 +385,6 @@ export const none: Options.Options = (() => {
export const redacted = (name: string): Options.Options =>
makeSingle(name, Arr.empty(), InternalPrimitive.redacted)
-/** @internal */
-export const secret = (name: string): Options.Options =>
- makeSingle(name, Arr.empty(), InternalPrimitive.secret)
-
/** @internal */
export const text = (name: string): Options.Options => makeSingle(name, Arr.empty(), InternalPrimitive.text)
diff --git a/packages/cli/src/internal/primitive.ts b/packages/cli/src/internal/primitive.ts
index ccd033eb2ae..d684ab7a8f9 100644
--- a/packages/cli/src/internal/primitive.ts
+++ b/packages/cli/src/internal/primitive.ts
@@ -6,7 +6,6 @@ import { dual, pipe } from "effect/Function"
import * as Option from "effect/Option"
import { pipeArguments } from "effect/Pipeable"
import * as EffectRedacted from "effect/Redacted"
-import * as EffectSecret from "effect/Secret"
import type * as CliConfig from "../CliConfig.js"
import type * as HelpDoc from "../HelpDoc.js"
import type * as Span from "../HelpDoc/Span.js"
@@ -52,7 +51,6 @@ export type Instruction =
| Float
| Integer
| Path
- | Secret
| Redacted
| Text
@@ -90,9 +88,6 @@ export interface Path extends
/** @internal */
export interface Redacted extends Op<"Redacted", {}> {}
-/** @internal */
-export interface Secret extends Op<"Secret", {}> {}
-
/** @internal */
export interface Text extends Op<"Text", {}> {}
@@ -125,9 +120,6 @@ export const isIntegerType = (self: Instruction): self is Integer => self._tag =
/** @internal */
export const isPathType = (self: Instruction): self is Path => self._tag === "Path"
-/** @internal */
-export const isSecretType = (self: Instruction): self is Path => self._tag === "Path"
-
/** @internal */
export const isTextType = (self: Instruction): self is Text => self._tag === "Text"
@@ -205,13 +197,6 @@ export const redacted: Primitive.Primitive = (() => {
return op
})()
-/** @internal */
-export const secret: Primitive.Primitive = (() => {
- const op = Object.create(proto)
- op._tag = "Secret"
- return op
-})()
-
/** @internal */
export const text: Primitive.Primitive = (() => {
const op = Object.create(proto)
@@ -283,7 +268,6 @@ const getChoicesInternal = (self: Instruction): Option.Option => {
case "Integer":
case "Path":
case "Redacted":
- case "Secret":
case "Text": {
return Option.none()
}
@@ -346,7 +330,6 @@ const getHelpInternal = (self: Instruction): Span.Span => {
`('${self.pathType}') and path existence ('${self.pathExists}')`
)
}
- case "Secret":
case "Redacted": {
return InternalSpan.text("A user-defined piece of text that is confidential.")
}
@@ -382,9 +365,6 @@ const getTypeNameInternal = (self: Instruction): string => {
case "Redacted": {
return "redacted"
}
- case "Secret": {
- return "secret"
- }
case "Text": {
return "text"
}
@@ -467,11 +447,6 @@ const validateInternal = (
Effect.map((value) => EffectRedacted.make(value))
)
}
- case "Secret": {
- return attempt(value, getTypeNameInternal(self), Schema.decodeUnknown(Schema.String)).pipe(
- Effect.map((value) => EffectSecret.fromString(value))
- )
- }
case "Text": {
return attempt(value, getTypeNameInternal(self), Schema.decodeUnknown(Schema.String))
}
@@ -601,13 +576,6 @@ const wizardInternal = (self: Instruction, help: HelpDoc.HelpDoc): Prompt.Prompt
message: InternalHelpDoc.toAnsiText(message).trimEnd()
})
}
- case "Secret": {
- const primitiveHelp = InternalHelpDoc.p("Enter some text (value will be redacted)")
- const message = InternalHelpDoc.sequence(help, primitiveHelp)
- return InternalTextPrompt.hidden({
- message: InternalHelpDoc.toAnsiText(message).trimEnd()
- })
- }
case "Text": {
const primitiveHelp = InternalHelpDoc.p("Enter some text")
const message = InternalHelpDoc.sequence(help, primitiveHelp)
@@ -631,7 +599,6 @@ export const getBashCompletions = (self: Instruction): string => {
case "DateTime":
case "Float":
case "Integer":
- case "Secret":
case "Redacted":
case "Text": {
return "$(compgen -f \"${cur}\")"
@@ -675,7 +642,6 @@ export const getFishCompletions = (self: Instruction): Array => {
case "Float":
case "Integer":
case "Redacted":
- case "Secret":
case "Text": {
return Arr.make("-r", "-f")
}
@@ -755,7 +721,6 @@ export const getZshCompletions = (self: Instruction): string => {
}
}
case "Redacted":
- case "Secret":
case "Text": {
return ""
}
diff --git a/packages/cli/src/internal/prompt/confirm.ts b/packages/cli/src/internal/prompt/confirm.ts
index 1bab440afc4..75515305d6f 100644
--- a/packages/cli/src/internal/prompt/confirm.ts
+++ b/packages/cli/src/internal/prompt/confirm.ts
@@ -100,7 +100,7 @@ function renderSubmission(value: boolean, options: Options) {
function handleRender(options: Options) {
return (_: State, action: Prompt.Prompt.Action) => {
- return Action.$match(action, {
+ return Action.match(action, {
Beep: () => Effect.succeed(renderBeep),
NextFrame: ({ state }) => renderNextFrame(state, options),
Submit: ({ value }) => renderSubmission(value, options)
diff --git a/packages/cli/src/internal/prompt/date.ts b/packages/cli/src/internal/prompt/date.ts
index 8aca15a4094..114d80c89fd 100644
--- a/packages/cli/src/internal/prompt/date.ts
+++ b/packages/cli/src/internal/prompt/date.ts
@@ -233,7 +233,7 @@ const defaultLocales: Prompt.Prompt.DateOptions["locales"] = {
function handleRender(options: DateOptions) {
return (state: State, action: Prompt.Prompt.Action) => {
- return Action.$match(action, {
+ return Action.match(action, {
Beep: () => Effect.succeed(renderBeep),
NextFrame: ({ state }) => renderNextFrame(state, options),
Submit: () => renderSubmission(state, options)
diff --git a/packages/cli/src/internal/prompt/file.ts b/packages/cli/src/internal/prompt/file.ts
index a47f23d3b46..c415373324c 100644
--- a/packages/cli/src/internal/prompt/file.ts
+++ b/packages/cli/src/internal/prompt/file.ts
@@ -33,7 +33,7 @@ type Confirm = Data.TaggedEnum<{
}>
const Confirm = Data.taggedEnum()
-const showConfirmation = Confirm.$is("Show")
+const showConfirmation = Confirm.is("Show")
const renderBeep = Doc.render(Doc.beep, { style: "pretty" })
@@ -235,7 +235,7 @@ function renderSubmission(value: string, options: FileOptions) {
function handleRender(options: FileOptions) {
return (_: State, action: Prompt.Prompt.Action) => {
- return Action.$match(action, {
+ return Action.match(action, {
Beep: () => Effect.succeed(renderBeep),
NextFrame: ({ state }) => renderNextFrame(state, options),
Submit: ({ value }) => renderSubmission(value, options)
diff --git a/packages/cli/src/internal/prompt/number.ts b/packages/cli/src/internal/prompt/number.ts
index f2d69334bfd..586bb81a8de 100644
--- a/packages/cli/src/internal/prompt/number.ts
+++ b/packages/cli/src/internal/prompt/number.ts
@@ -202,7 +202,7 @@ const initialState: State = {
function handleRenderInteger(options: IntegerOptions) {
return (state: State, action: Prompt.Prompt.Action) => {
- return Action.$match(action, {
+ return Action.match(action, {
Beep: () => Effect.succeed(renderBeep),
NextFrame: ({ state }) => renderNextFrame(state, options),
Submit: () => renderSubmission(state, options)
@@ -298,7 +298,7 @@ export const integer = (options: Prompt.Prompt.IntegerOptions): Prompt.Prompt) => {
- return Action.$match(action, {
+ return Action.match(action, {
Beep: () => Effect.succeed(renderBeep),
NextFrame: ({ state }) => renderNextFrame(state, options),
Submit: () => renderSubmission(state, options)
diff --git a/packages/cli/src/internal/prompt/select.ts b/packages/cli/src/internal/prompt/select.ts
index fde52276a6f..da951c6aa12 100644
--- a/packages/cli/src/internal/prompt/select.ts
+++ b/packages/cli/src/internal/prompt/select.ts
@@ -182,7 +182,7 @@ function processNext(state: State, choices: Prompt.Prompt.SelectOptions["c
function handleRender(options: SelectOptions) {
return (state: State, action: Prompt.Prompt.Action) => {
- return Action.$match(action, {
+ return Action.match(action, {
Beep: () => Effect.succeed(renderBeep),
NextFrame: ({ state }) => renderNextFrame(state, options),
Submit: () => renderSubmission(state, options)
diff --git a/packages/cli/src/internal/prompt/text.ts b/packages/cli/src/internal/prompt/text.ts
index a78867d85cb..3d0dae69bc6 100644
--- a/packages/cli/src/internal/prompt/text.ts
+++ b/packages/cli/src/internal/prompt/text.ts
@@ -238,7 +238,7 @@ const initialState: State = {
function handleRender(options: Options) {
return (state: State, action: Prompt.Prompt.Action) => {
- return Action.$match(action, {
+ return Action.match(action, {
Beep: () => Effect.succeed(renderBeep),
NextFrame: ({ state }) => renderNextFrame(state, options),
Submit: () => renderSubmission(state, options)
diff --git a/packages/effect/src/Config.ts b/packages/effect/src/Config.ts
index 04f82fcccd6..f08e445baaa 100644
--- a/packages/effect/src/Config.ts
+++ b/packages/effect/src/Config.ts
@@ -14,7 +14,6 @@ import type * as LogLevel from "./LogLevel.js"
import type * as Option from "./Option.js"
import type { Predicate, Refinement } from "./Predicate.js"
import type * as Redacted from "./Redacted.js"
-import type * as Secret from "./Secret.js"
import type * as Types from "./Types.js"
/**
@@ -334,15 +333,6 @@ export const primitive: (
*/
export const repeat: (self: Config) => Config> = internal.repeat
-/**
- * Constructs a config for a secret value.
- *
- * @since 2.0.0
- * @category constructors
- * @deprecated
- */
-export const secret: (name?: string) => Config = internal.secret
-
/**
* Constructs a config for a redacted value.
*
diff --git a/packages/effect/src/Data.ts b/packages/effect/src/Data.ts
index 6b02cc2df75..73a671a5521 100644
--- a/packages/effect/src/Data.ts
+++ b/packages/effect/src/Data.ts
@@ -260,12 +260,22 @@ export const Structural: new(
* @category models
*/
export type TaggedEnum<
- A extends Record> & UntaggedChildren
+ A extends
+ & Record>
+ & CapitalConstructorNames
+ & UntaggedChildren
> = keyof A extends infer Tag ?
Tag extends keyof A ? Types.Simplify<{ readonly _tag: Tag } & { readonly [K in keyof A[Tag]]: A[Tag][K] }>
: never
: never
+type CapitalConstructorNames = Record<
+ Uncapitalize,
+ keyof A extends infer X extends string
+ ? X extends Uncapitalize ? `Use capitalized constructor name. Did you mean "${Capitalize}"?` : never
+ : never
+>
+
type ChildrenAreTagged = keyof A extends infer K ? K extends keyof A ? "_tag" extends keyof A[K] ? true
: false
: never
@@ -336,8 +346,8 @@ export declare namespace TaggedEnum {
readonly [Tag in A["_tag"]]: Case.Constructor, "_tag">
}
& {
- readonly $is: (tag: Tag) => (u: unknown) => u is Extract
- readonly $match: {
+ readonly is: (tag: Tag) => (u: unknown) => u is Extract
+ readonly match: {
<
Cases extends {
readonly [Tag in A["_tag"]]: (args: Extract) => any
@@ -356,7 +366,7 @@ export declare namespace TaggedEnum {
* @since 3.2.0
*/
export interface GenericMatchers> {
- readonly $is: (
+ readonly is: (
tag: Tag
) => {
>(
@@ -364,7 +374,7 @@ export declare namespace TaggedEnum {
): u is T & { readonly _tag: Tag }
(u: unknown): u is Extract, { readonly _tag: Tag }>
}
- readonly $match: {
+ readonly match: {
<
A,
B,
@@ -482,9 +492,9 @@ export const taggedEnum: {
} = () =>
new Proxy({}, {
get(_target, tag, _receiver) {
- if (tag === "$is") {
+ if (tag === "is") {
return Predicate.isTagged
- } else if (tag === "$match") {
+ } else if (tag === "match") {
return taggedMatch
}
return tagged(tag as string)
diff --git a/packages/effect/src/Logger.ts b/packages/effect/src/Logger.ts
index 60a06251853..1a3fb41bc81 100644
--- a/packages/effect/src/Logger.ts
+++ b/packages/effect/src/Logger.ts
@@ -497,14 +497,6 @@ export const prettyLogger: (
}
) => Logger = internal.prettyLogger
-/**
- * A default version of the pretty logger.
- *
- * @since 3.8.0
- * @category constructors
- */
-export const prettyLoggerDefault: Logger = internal.prettyLoggerDefault
-
/**
* The structured logger provides detailed log outputs, structured in a way that
* retains comprehensive traceability of the events, suitable for deeper
@@ -592,31 +584,10 @@ export const json: Layer.Layer = replace(fiberRuntime.defaultLogger, fibe
export const logFmt: Layer.Layer = replace(fiberRuntime.defaultLogger, fiberRuntime.logFmtLogger)
/**
- * The pretty logger utilizes the capabilities of the console API to generate
- * visually engaging and color-enhanced log outputs. This feature is
- * particularly useful for improving the readability of log messages during
- * development and debugging processes.
- *
- * @example
- * import { Effect, Logger } from "effect"
- *
- * const program = Effect.log("message1", "message2").pipe(
- * Effect.annotateLogs({ key1: "value1", key2: "value2" }),
- * Effect.withLogSpan("myspan")
- * )
- *
- * // Effect.runFork(program.pipe(Effect.provide(Logger.pretty)))
- * // green --v v-- bold and cyan
- * // [07:51:54.434] INFO (#0) myspan=1ms: message1
- * // message2
- * // v-- bold
- * // key2: value2
- * // key1: value1
- *
- * @since 3.5.0
+ * @since 4.0.0
* @category constructors
*/
-export const pretty: Layer.Layer = replace(fiberRuntime.defaultLogger, fiberRuntime.prettyLogger)
+export const string: Layer.Layer = replace(fiberRuntime.defaultLogger, fiberRuntime.stringLogger)
/**
* The structured logger provides detailed log outputs, structured in a way that
diff --git a/packages/effect/src/Secret.ts b/packages/effect/src/Secret.ts
deleted file mode 100644
index c19270fbe3d..00000000000
--- a/packages/effect/src/Secret.ts
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * @since 2.0.0
- * @deprecated
- */
-import type * as Equal from "./Equal.js"
-import * as InternalSecret from "./internal/secret.js"
-import type * as Redacted from "./Redacted.js"
-
-/**
- * @since 2.0.0
- * @category symbols
- * @deprecated
- */
-export const SecretTypeId: unique symbol = InternalSecret.SecretTypeId
-
-/**
- * @since 2.0.0
- * @category symbols
- * @deprecated
- */
-export type SecretTypeId = typeof SecretTypeId
-
-/**
- * @since 2.0.0
- * @category models
- * @deprecated
- */
-export interface Secret extends Redacted.Redacted, Secret.Proto, Equal.Equal {
- /** @internal */
- readonly raw: Array
-}
-
-/**
- * @since 2.0.0
- * @deprecated
- */
-export declare namespace Secret {
- /**
- * @since 2.0.0
- * @category models
- * @deprecated
- */
- export interface Proto {
- readonly [SecretTypeId]: SecretTypeId
- }
-}
-
-/**
- * @since 2.0.0
- * @category refinements
- * @deprecated
- */
-export const isSecret: (u: unknown) => u is Secret = InternalSecret.isSecret
-
-/**
- * @since 2.0.0
- * @category constructors
- * @deprecated
- */
-export const make: (bytes: Array) => Secret = InternalSecret.make
-
-/**
- * @since 2.0.0
- * @category constructors
- * @deprecated
- */
-export const fromIterable: (iterable: Iterable) => Secret = InternalSecret.fromIterable
-
-/**
- * @since 2.0.0
- * @category constructors
- * @deprecated
- */
-export const fromString: (text: string) => Secret = InternalSecret.fromString
-
-/**
- * @since 2.0.0
- * @category getters
- * @deprecated
- */
-export const value: (self: Secret) => string = InternalSecret.value
-
-/**
- * @since 2.0.0
- * @category unsafe
- * @deprecated
- */
-export const unsafeWipe: (self: Secret) => void = InternalSecret.unsafeWipe
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index 9a4dc084397..d4763020ea1 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -29,6 +29,8 @@ import type * as Sink from "./Sink.js"
import type * as Emit from "./StreamEmit.js"
import type * as HaltStrategy from "./StreamHaltStrategy.js"
import type * as Take from "./Take.js"
+import type { TPubSub } from "./TPubSub.js"
+import type { TDequeue } from "./TQueue.js"
import type * as Tracer from "./Tracer.js"
import type { Covariant, NoInfer, TupleOf } from "./Types.js"
import type * as Unify from "./Unify.js"
@@ -312,7 +314,7 @@ export const as: {
const _async: (
register: (emit: Emit.Emit) => Effect.Effect | void,
- bufferSize?: number | "unbounded" | {
+ options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
@@ -367,7 +369,7 @@ export {
*/
export const asyncEffect: (
register: (emit: Emit.Emit) => Effect.Effect,
- bufferSize?: number | "unbounded" | {
+ options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
@@ -424,7 +426,7 @@ export const asyncPush: (
*/
export const asyncScoped: (
register: (emit: Emit.Emit) => Effect.Effect,
- bufferSize?: number | "unbounded" | {
+ options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
@@ -2013,6 +2015,14 @@ export const fromPubSub: {
): Stream
} = internal.fromPubSub
+/**
+ * Creates a stream from a subscription to a `TPubSub`.
+ *
+ * @since 3.10.0
+ * @category constructors
+ */
+export const fromTPubSub: (pubsub: TPubSub) => Stream = internal.fromTPubSub
+
/**
* Creates a new `Stream` from an iterable collection of values.
*
@@ -2094,6 +2104,14 @@ export const fromQueue: (
}
) => Stream = internal.fromQueue
+/**
+ * Creates a stream from a TQueue of values
+ *
+ * @since 3.10.0
+ * @category constructors
+ */
+export const fromTQueue: (queue: TDequeue) => Stream = internal.fromTQueue
+
/**
* Creates a stream from a `ReadableStream`.
*
@@ -2102,10 +2120,16 @@ export const fromQueue: (
* @since 2.0.0
* @category constructors
*/
-export const fromReadableStream: (
- evaluate: LazyArg>,
- onError: (error: unknown) => E
-) => Stream = internal.fromReadableStream
+export const fromReadableStream: {
+ (
+ options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly releaseLockOnEnd?: boolean | undefined
+ }
+ ): Stream
+ (evaluate: LazyArg>, onError: (error: unknown) => E): Stream
+} = internal.fromReadableStream
/**
* Creates a stream from a `ReadableStreamBYOBReader`.
@@ -2116,11 +2140,21 @@ export const fromReadableStream: (
* @since 2.0.0
* @category constructors
*/
-export const fromReadableStreamByob: (
- evaluate: LazyArg>,
- onError: (error: unknown) => E,
- allocSize?: number
-) => Stream = internal.fromReadableStreamByob
+export const fromReadableStreamByob: {
+ (
+ options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly bufferSize?: number | undefined
+ readonly releaseLockOnEnd?: boolean | undefined
+ }
+ ): Stream
+ (
+ evaluate: LazyArg>,
+ onError: (error: unknown) => E,
+ allocSize?: number
+ ): Stream
+} = internal.fromReadableStreamByob
/**
* Creates a stream from a `Schedule` that does not require any further
diff --git a/packages/effect/src/TPubSub.ts b/packages/effect/src/TPubSub.ts
index 1a15cfa1b6a..f6b7b045ce6 100644
--- a/packages/effect/src/TPubSub.ts
+++ b/packages/effect/src/TPubSub.ts
@@ -107,6 +107,15 @@ export const isEmpty: (self: TPubSub) => STM.STM = internal.isEmp
*/
export const isFull: (self: TPubSub) => STM.STM = internal.isFull
+/**
+ * Interrupts any fibers that are suspended on `offer` or `take`. Future calls
+ * to `offer*` and `take*` will be interrupted immediately.
+ *
+ * @since 2.0.0
+ * @category utils
+ */
+export const shutdown: (self: TPubSub) => STM.STM = internal.shutdown
+
/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
*
diff --git a/packages/effect/src/TQueue.ts b/packages/effect/src/TQueue.ts
index 800555b2c8a..e8b9b465faf 100644
--- a/packages/effect/src/TQueue.ts
+++ b/packages/effect/src/TQueue.ts
@@ -206,7 +206,7 @@ export const isTEnqueue: (u: unknown) => u is TEnqueue = internal.isTEn
* @since 2.0.0
* @category mutations
*/
-export const awaitShutdown: (self: TQueue) => STM.STM = internal.awaitShutdown
+export const awaitShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.awaitShutdown
/**
* Creates a bounded queue with the back pressure strategy. The queue will
@@ -226,7 +226,7 @@ export const bounded: (requestedCapacity: number) => STM.STM> = int
* @since 2.0.0
* @category getters
*/
-export const capacity: (self: TQueue) => number = internal.capacity
+export const capacity: (self: TDequeue | TEnqueue) => number = internal.capacity
/**
* Creates a bounded queue with the dropping strategy. The queue will drop new
@@ -245,7 +245,7 @@ export const dropping: (requestedCapacity: number) => STM.STM> = in
* @since 2.0.0
* @category getters
*/
-export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpty
+export const isEmpty: (self: TDequeue | TEnqueue) => STM.STM = internal.isEmpty
/**
* Returns `true` if the `TQueue` contains at least one element, `false`
@@ -254,7 +254,7 @@ export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpt
* @since 2.0.0
* @category getters
*/
-export const isFull: (self: TQueue) => STM.STM = internal.isFull
+export const isFull: (self: TDequeue | TEnqueue) => STM.STM = internal.isFull
/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
@@ -262,7 +262,7 @@ export const isFull: (self: TQueue) => STM.STM = internal.isFull
* @since 2.0.0
* @category getters
*/
-export const isShutdown: (self: TQueue) => STM.STM = internal.isShutdown
+export const isShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.isShutdown
/**
* Places one value in the queue.
@@ -345,7 +345,7 @@ export const seek: {
* @since 2.0.0
* @category mutations
*/
-export const shutdown: (self: TQueue) => STM.STM = internal.shutdown
+export const shutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.shutdown
/**
* Retrieves the size of the queue, which is equal to the number of elements
@@ -355,7 +355,7 @@ export const shutdown: (self: TQueue) => STM.STM = internal.shutdown
* @since 2.0.0
* @category getters
*/
-export const size: (self: TQueue) => STM.STM = internal.size
+export const size: (self: TDequeue | TEnqueue) => STM.STM = internal.size
/**
* Creates a bounded queue with the sliding strategy. The queue will add new
diff --git a/packages/effect/src/TRef.ts b/packages/effect/src/TRef.ts
index 5b98a7c6536..1dd83e9c4ed 100644
--- a/packages/effect/src/TRef.ts
+++ b/packages/effect/src/TRef.ts
@@ -7,6 +7,7 @@ import type * as TxnId from "./internal/stm/stm/txnId.js"
import type * as Versioned from "./internal/stm/stm/versioned.js"
import * as internal from "./internal/stm/tRef.js"
import type * as Option from "./Option.js"
+import type { Pipeable } from "./Pipeable.js"
import type * as STM from "./STM.js"
import type * as Types from "./Types.js"
@@ -34,7 +35,7 @@ export type TRefTypeId = typeof TRefTypeId
* @since 2.0.0
* @category models
*/
-export interface TRef extends TRef.Variance {
+export interface TRef extends TRef.Variance, Pipeable {
/**
* Note: the method is unbound, exposed only for potential extensions.
*/
diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts
new file mode 100644
index 00000000000..dfc6ccb5ffe
--- /dev/null
+++ b/packages/effect/src/TSubscriptionRef.ts
@@ -0,0 +1,192 @@
+/**
+ * @since 3.10.0
+ */
+import type * as Effect from "./Effect.js"
+import * as internal from "./internal/stm/tSubscriptionRef.js"
+import type * as Option from "./Option.js"
+import type * as Scope from "./Scope.js"
+import type * as STM from "./STM.js"
+import type * as Stream from "./Stream.js"
+import type * as TPubSub from "./TPubSub.js"
+import type * as TQueue from "./TQueue.js"
+import type * as TRef from "./TRef.js"
+import type * as Types from "./Types.js"
+
+/**
+ * @since 3.10.0
+ * @category symbols
+ */
+export const TSubscriptionRefTypeId: unique symbol = internal.TSubscriptionRefTypeId
+
+/**
+ * @since 3.10.0
+ * @category symbols
+ */
+export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId
+
+/**
+ * A `TSubscriptionRef` is a `TRef` that can be subscribed to in order to
+ * receive a `TDequeue` of the current value and all committed changes to the value.
+ *
+ * @since 3.10.0
+ * @category models
+ */
+export interface TSubscriptionRef extends TSubscriptionRef.Variance, TRef.TRef {
+ /** @internal */
+ readonly ref: TRef.TRef
+ /** @internal */
+ readonly pubsub: TPubSub.TPubSub
+ /** @internal */
+ modify(f: (a: A) => readonly [B, A]): STM.STM
+
+ /**
+ * A TDequeue containing the current value of the `Ref` as well as all changes
+ * to that value.
+ */
+ readonly changes: STM.STM>
+}
+
+/**
+ * @since 3.10.0
+ */
+export declare namespace TSubscriptionRef {
+ /**
+ * @since 3.10.0
+ * @category models
+ */
+ export interface Variance {
+ readonly [TSubscriptionRefTypeId]: {
+ readonly _A: Types.Invariant
+ }
+ }
+}
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const get: (self: TSubscriptionRef) => STM.STM = internal.get
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const getAndSet: {
+ (value: A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, value: A): STM.STM
+} = internal.getAndSet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const getAndUpdate: {
+ (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => A): STM.STM
+} = internal.getAndUpdate
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const getAndUpdateSome: {
+ (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM
+} = internal.getAndUpdateSome
+
+/**
+ * @since 3.10.0
+ * @category constructors
+ */
+export const make: (value: A) => STM.STM> = internal.make
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const modify: {
+ (f: (a: A) => readonly [B, A]): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => readonly [B, A]): STM.STM
+} = internal.modify
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const modifySome: {
+ (fallback: B, f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, fallback: B, f: (a: A) => Option.Option): STM.STM
+} = internal.modifySome
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const set: {
+ (value: A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, value: A): STM.STM
+} = internal.set
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const setAndGet: {
+ (value: A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, value: A): STM.STM
+} = internal.setAndGet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const update: {
+ (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => A): STM.STM
+} = internal.update
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const updateAndGet: {
+ (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => A): STM.STM
+} = internal.updateAndGet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const updateSome: {
+ (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM
+} = internal.updateSome
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const updateSomeAndGet: {
+ (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM
+} = internal.updateSomeAndGet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const changesScoped: (self: TSubscriptionRef) => Effect.Effect, never, Scope.Scope> =
+ internal.changesScoped
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const changes: (self: TSubscriptionRef) => STM.STM> = (self) => self.changes
diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts
index cb90a4ac728..9e581032c95 100644
--- a/packages/effect/src/index.ts
+++ b/packages/effect/src/index.ts
@@ -750,12 +750,6 @@ export * as ScopedCache from "./ScopedCache.js"
*/
export * as ScopedRef from "./ScopedRef.js"
-/**
- * @since 2.0.0
- * @deprecated
- */
-export * as Secret from "./Secret.js"
-
/**
* @since 2.0.0
*/
@@ -895,6 +889,11 @@ export * as TSemaphore from "./TSemaphore.js"
*/
export * as TSet from "./TSet.js"
+/**
+ * @since 3.10.0
+ */
+export * as TSubscriptionRef from "./TSubscriptionRef.js"
+
/**
* @since 2.0.0
*/
diff --git a/packages/effect/src/internal/config.ts b/packages/effect/src/internal/config.ts
index 5cbf28f1af9..861dae5aba0 100644
--- a/packages/effect/src/internal/config.ts
+++ b/packages/effect/src/internal/config.ts
@@ -11,14 +11,12 @@ import type * as LogLevel from "../LogLevel.js"
import * as Option from "../Option.js"
import { hasProperty, type Predicate, type Refinement } from "../Predicate.js"
import type * as Redacted from "../Redacted.js"
-import type * as Secret from "../Secret.js"
import * as configError from "./configError.js"
import * as core from "./core.js"
import * as defaultServices from "./defaultServices.js"
import * as effectable from "./effectable.js"
import * as OpCodes from "./opCodes/config.js"
import * as redacted_ from "./redacted.js"
-import * as InternalSecret from "./secret.js"
const ConfigSymbolKey = "effect/Config"
@@ -414,15 +412,6 @@ export const repeat = (self: Config.Config): Config.Config> => {
return repeat
}
-/** @internal */
-export const secret = (name?: string): Config.Config => {
- const config = primitive(
- "a secret property",
- (text) => Either.right(InternalSecret.fromString(text))
- )
- return name === undefined ? config : nested(config, name)
-}
-
/** @internal */
export const redacted = (name?: string): Config.Config => {
const config = primitive(
diff --git a/packages/effect/src/internal/fiberRuntime.ts b/packages/effect/src/internal/fiberRuntime.ts
index 428e9cdf082..b54534333c7 100644
--- a/packages/effect/src/internal/fiberRuntime.ts
+++ b/packages/effect/src/internal/fiberRuntime.ts
@@ -1432,7 +1432,7 @@ export const loggerWithConsoleError = (self: Logger): Logger = globalValue(
Symbol.for("effect/Logger/defaultLogger"),
- () => loggerWithConsoleLog(internalLogger.stringLogger)
+ () => internalLogger.prettyLogger()
)
/** @internal */
@@ -1448,9 +1448,9 @@ export const logFmtLogger: Logger = globalValue(
)
/** @internal */
-export const prettyLogger: Logger = globalValue(
- Symbol.for("effect/Logger/prettyLogger"),
- () => internalLogger.prettyLoggerDefault
+export const stringLogger: Logger = globalValue(
+ Symbol.for("effect/Logger/stringLogger"),
+ () => loggerWithConsoleLog(internalLogger.stringLogger)
)
/** @internal */
diff --git a/packages/effect/src/internal/logger.ts b/packages/effect/src/internal/logger.ts
index 5bb919236a9..5fc774c9612 100644
--- a/packages/effect/src/internal/logger.ts
+++ b/packages/effect/src/internal/logger.ts
@@ -3,7 +3,6 @@ import * as Context from "../Context.js"
import * as FiberRefs from "../FiberRefs.js"
import type { LazyArg } from "../Function.js"
import { constVoid, dual, pipe } from "../Function.js"
-import { globalValue } from "../GlobalValue.js"
import * as HashMap from "../HashMap.js"
import * as Inspectable from "../Inspectable.js"
import * as List from "../List.js"
@@ -571,6 +570,3 @@ const prettyLoggerBrowser = (options: {
}
)
}
-
-/** @internal */
-export const prettyLoggerDefault = globalValue("effect/Logger/prettyLoggerDefault", () => prettyLogger())
diff --git a/packages/effect/src/internal/secret.ts b/packages/effect/src/internal/secret.ts
deleted file mode 100644
index 3ef59924ceb..00000000000
--- a/packages/effect/src/internal/secret.ts
+++ /dev/null
@@ -1,87 +0,0 @@
-import * as Arr from "../Array.js"
-import { hasProperty } from "../Predicate.js"
-import type * as Secret from "../Secret.js"
-import * as redacted_ from "./redacted.js"
-
-/**
- * @internal
- * @deprecated
- */
-const SecretSymbolKey = "effect/Secret"
-
-/**
- * @internal
- * @deprecated
- */
-export const SecretTypeId: Secret.SecretTypeId = Symbol.for(
- SecretSymbolKey
-) as Secret.SecretTypeId
-
-/**
- * @internal
- * @deprecated
- */
-export const isSecret = (u: unknown): u is Secret.Secret => hasProperty(u, SecretTypeId)
-
-/**
- * @internal
- * @deprecated
- */
-export const make = (bytes: Array): Secret.Secret => {
- const secret = Object.create({
- ...redacted_.proto,
- [SecretTypeId]: SecretTypeId
- })
- Object.defineProperty(secret, "toString", {
- enumerable: false,
- value() {
- return "Secret()"
- }
- })
- Object.defineProperty(secret, "toJSON", {
- enumerable: false,
- value() {
- return ""
- }
- })
- Object.defineProperty(secret, "raw", {
- enumerable: false,
- value: bytes
- })
- redacted_.redactedRegistry.set(secret, bytes.map((byte) => String.fromCharCode(byte)).join(""))
- return secret
-}
-
-/**
- * @internal
- * @deprecated
- */
-export const fromIterable = (iterable: Iterable): Secret.Secret =>
- make(Arr.fromIterable(iterable).map((char) => char.charCodeAt(0)))
-
-/**
- * @internal
- * @deprecated
- */
-export const fromString = (text: string): Secret.Secret => {
- return make(text.split("").map((char) => char.charCodeAt(0)))
-}
-
-/**
- * @internal
- * @deprecated
- */
-export const value = (self: Secret.Secret): string => {
- return self.raw.map((byte) => String.fromCharCode(byte)).join("")
-}
-
-/**
- * @internal
- * @deprecated
- */
-export const unsafeWipe = (self: Secret.Secret): void => {
- for (let i = 0; i < self.raw.length; i++) {
- self.raw[i] = 0
- }
- redacted_.redactedRegistry.delete(self)
-}
diff --git a/packages/effect/src/internal/stm/core.ts b/packages/effect/src/internal/stm/core.ts
index 7c0f5587152..12030907524 100644
--- a/packages/effect/src/internal/stm/core.ts
+++ b/packages/effect/src/internal/stm/core.ts
@@ -15,11 +15,10 @@ import { pipeArguments } from "../../Pipeable.js"
import { hasProperty } from "../../Predicate.js"
import type * as Scheduler from "../../Scheduler.js"
import type * as STM from "../../STM.js"
-import { StreamTypeId } from "../../Stream.js"
import { YieldWrap } from "../../Utils.js"
import { ChannelTypeId } from "../core-stream.js"
import { withFiberRuntime } from "../core.js"
-import { effectVariance } from "../effectable.js"
+import { effectVariance, StreamTypeId } from "../effectable.js"
import { OP_COMMIT } from "../opCodes/effect.js"
import { SingleShotGen } from "../singleShotGen.js"
import { SinkTypeId } from "../sink.js"
diff --git a/packages/effect/src/internal/stm/tPubSub.ts b/packages/effect/src/internal/stm/tPubSub.ts
index 089be12ce55..b838ddb90a2 100644
--- a/packages/effect/src/internal/stm/tPubSub.ts
+++ b/packages/effect/src/internal/stm/tPubSub.ts
@@ -201,6 +201,7 @@ class TPubSubSubscriptionImpl implements TQueue.TDequeue {
capacity(): number {
return this.requestedCapacity
}
+
size: STM.STM = core.withSTMRuntime((runtime) => {
let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal)
if (currentSubscriberHead === undefined) {
diff --git a/packages/effect/src/internal/stm/tQueue.ts b/packages/effect/src/internal/stm/tQueue.ts
index 90039374f32..8ca27b9865c 100644
--- a/packages/effect/src/internal/stm/tQueue.ts
+++ b/packages/effect/src/internal/stm/tQueue.ts
@@ -3,7 +3,7 @@ import * as Chunk from "../../Chunk.js"
import { dual, pipe } from "../../Function.js"
import * as Option from "../../Option.js"
import { hasProperty, type Predicate } from "../../Predicate.js"
-import * as STM from "../../STM.js"
+import type * as STM from "../../STM.js"
import type * as TQueue from "../../TQueue.js"
import type * as TRef from "../../TRef.js"
import * as core from "./core.js"
@@ -99,7 +99,7 @@ class TQueueImpl implements TQueue.TQueue {
size: STM.STM = core.withSTMRuntime((runtime) => {
const queue = tRef.unsafeGet(this.ref, runtime.journal)
if (queue === undefined) {
- return STM.interruptAs(runtime.fiberId)
+ return core.interruptAs(runtime.fiberId)
}
return core.succeed(queue.length)
})
diff --git a/packages/effect/src/internal/stm/tRef.ts b/packages/effect/src/internal/stm/tRef.ts
index c7805093604..3162fc252b0 100644
--- a/packages/effect/src/internal/stm/tRef.ts
+++ b/packages/effect/src/internal/stm/tRef.ts
@@ -1,5 +1,7 @@
import { dual } from "../../Function.js"
import * as Option from "../../Option.js"
+import type { Pipeable } from "../../Pipeable.js"
+import { pipeArguments } from "../../Pipeable.js"
import type * as STM from "../../STM.js"
import type * as TRef from "../../TRef.js"
import * as core from "./core.js"
@@ -16,13 +18,13 @@ export const TRefTypeId: TRef.TRefTypeId = Symbol.for(
TRefSymbolKey
) as TRef.TRefTypeId
-const tRefVariance = {
+export const tRefVariance = {
/* c8 ignore next */
_A: (_: any) => _
}
/** @internal */
-export class TRefImpl implements TRef.TRef {
+export class TRefImpl implements TRef.TRef, Pipeable {
readonly [TRefTypeId] = tRefVariance
/** @internal */
todos: Map
@@ -40,6 +42,9 @@ export class TRefImpl implements TRef.TRef {
return retValue
})
}
+ pipe() {
+ return pipeArguments(this, arguments)
+ }
}
/** @internal */
diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts
new file mode 100644
index 00000000000..94a49240553
--- /dev/null
+++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts
@@ -0,0 +1,286 @@
+import * as Effect from "../../Effect.js"
+import { dual, pipe } from "../../Function.js"
+import * as Option from "../../Option.js"
+import { pipeArguments } from "../../Pipeable.js"
+import * as STM from "../../STM.js"
+import * as TPubSub from "../../TPubSub.js"
+import * as TQueue from "../../TQueue.js"
+import * as TRef from "../../TRef.js"
+import type * as TSubscriptionRef from "../../TSubscriptionRef.js"
+import * as stream from "../stream.js"
+import { tDequeueVariance } from "./tQueue.js"
+import { tRefVariance } from "./tRef.js"
+
+/** @internal */
+const TSubscriptionRefSymbolKey = "effect/TSubscriptionRef"
+
+/** @internal */
+export const TSubscriptionRefTypeId: TSubscriptionRef.TSubscriptionRefTypeId = Symbol.for(
+ TSubscriptionRefSymbolKey
+) as TSubscriptionRef.TSubscriptionRefTypeId
+
+const TSubscriptionRefVariance = {
+ /* c8 ignore next */
+ _A: (_: any) => _
+}
+
+class TDequeueMerge implements TQueue.TDequeue {
+ [TQueue.TDequeueTypeId] = tDequeueVariance
+
+ constructor(
+ readonly first: TQueue.TDequeue,
+ readonly second: TQueue.TDequeue
+ ) {}
+
+ peek: STM.STM = STM.gen(this, function*() {
+ const first = yield* this.peekOption
+ if (first._tag === "Some") {
+ return first.value
+ }
+ return yield* STM.retry
+ })
+
+ peekOption: STM.STM> = STM.gen(this, function*() {
+ const first = yield* this.first.peekOption
+ if (first._tag === "Some") {
+ return first
+ }
+ const second = yield* this.second.peekOption
+ if (second._tag === "Some") {
+ return second
+ }
+ return Option.none()
+ })
+
+ take: STM.STM = STM.gen(this, function*() {
+ if (!(yield* this.first.isEmpty)) {
+ return yield* this.first.take
+ }
+ if (!(yield* this.second.isEmpty)) {
+ return yield* this.second.take
+ }
+ return yield* STM.retry
+ })
+
+ takeAll: STM.STM> = STM.gen(this, function*() {
+ return [...yield* this.first.takeAll, ...yield* this.second.takeAll]
+ })
+
+ takeUpTo(max: number): STM.STM> {
+ return STM.gen(this, function*() {
+ const first = yield* this.first.takeUpTo(max)
+ if (first.length >= max) {
+ return first
+ }
+ return [...first, ...yield* this.second.takeUpTo(max - first.length)]
+ })
+ }
+
+ capacity(): number {
+ return this.first.capacity() + this.second.capacity()
+ }
+
+ size: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.size) + (yield* this.second.size)
+ })
+
+ isFull: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.isFull) && (yield* this.second.isFull)
+ })
+
+ isEmpty: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.isEmpty) && (yield* this.second.isEmpty)
+ })
+
+ shutdown: STM.STM = STM.gen(this, function*() {
+ yield* this.first.shutdown
+ yield* this.second.shutdown
+ })
+
+ isShutdown: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.isShutdown) && (yield* this.second.isShutdown)
+ })
+
+ awaitShutdown: STM.STM = STM.gen(this, function*() {
+ yield* this.first.awaitShutdown
+ yield* this.second.awaitShutdown
+ })
+}
+
+/** @internal */
+class TSubscriptionRefImpl implements TSubscriptionRef.TSubscriptionRef {
+ readonly [TSubscriptionRefTypeId] = TSubscriptionRefVariance
+ readonly [TRef.TRefTypeId] = tRefVariance
+
+ constructor(
+ readonly ref: TRef.TRef,
+ readonly pubsub: TPubSub.TPubSub
+ ) {}
+
+ get todos() {
+ return this.ref.todos
+ }
+
+ get versioned() {
+ return this.ref.versioned
+ }
+
+ pipe() {
+ return pipeArguments(this, arguments)
+ }
+
+ get changes(): STM.STM> {
+ return STM.gen(this, function*() {
+ const first = yield* TQueue.unbounded()
+ yield* TQueue.offer(first, yield* TRef.get(this.ref))
+ return new TDequeueMerge(first, yield* TPubSub.subscribe(this.pubsub))
+ })
+ }
+
+ modify(f: (a: A) => readonly [B, A]): STM.STM {
+ return pipe(
+ TRef.get(this.ref),
+ STM.map(f),
+ STM.flatMap(([b, a]) =>
+ pipe(
+ TRef.set(this.ref, a),
+ STM.as(b),
+ STM.zipLeft(TPubSub.publish(this.pubsub, a))
+ )
+ )
+ )
+ }
+}
+
+/** @internal */
+export const make = (value: A): STM.STM> =>
+ pipe(
+ STM.all([
+ TPubSub.unbounded(),
+ TRef.make(value)
+ ]),
+ STM.map(([pubsub, ref]) => new TSubscriptionRefImpl(ref, pubsub))
+ )
+
+/** @internal */
+export const get = (self: TSubscriptionRef.TSubscriptionRef) => TRef.get(self.ref)
+
+/** @internal */
+export const set = dual<
+ (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM
+>(
+ 2,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A): STM.STM =>
+ self.modify((): [void, A] => [void 0, value])
+)
+
+/** @internal */
+export const getAndSet = dual<
+ (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM
+>(2, (self, value) => self.modify((a) => [a, value]))
+
+/** @internal */
+export const getAndUpdate = dual<
+ (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM
+>(2, (self, f) => self.modify((a) => [a, f(a)]))
+
+/** @internal */
+export const getAndUpdateSome = dual<
+ (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM
+>(2, (self, f) =>
+ self.modify((a) =>
+ Option.match(f(a), {
+ onNone: () => [a, a],
+ onSome: (b) => [a, b]
+ })
+ ))
+
+/** @internal */
+export const setAndGet = dual<
+ (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM
+>(2, (self, value) => self.modify(() => [value, value]))
+
+/** @internal */
+export const modify = dual<
+ (f: (a: A) => readonly [B, A]) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => readonly [B, A]) => STM.STM
+>(2, (self, f) => self.modify(f))
+
+/** @internal */
+export const modifySome = dual<
+ (
+ fallback: B,
+ f: (a: A) => Option.Option
+ ) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (
+ self: TSubscriptionRef.TSubscriptionRef,
+ fallback: B,
+ f: (a: A) => Option.Option
+ ) => STM.STM
+>(3, (self, fallback, f) =>
+ self.modify((a) =>
+ Option.match(f(a), {
+ onNone: () => [fallback, a],
+ onSome: (b) => b
+ })
+ ))
+
+/** @internal */
+export const update = dual<
+ (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM
+>(2, (self, f) => self.modify((a) => [void 0, f(a)]))
+
+/** @internal */
+export const updateAndGet = dual<
+ (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM
+>(2, (self, f) =>
+ self.modify((a) => {
+ const b = f(a)
+ return [b, b]
+ }))
+
+/** @internal */
+export const updateSome = dual<
+ (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM
+>(
+ 2,
+ (self, f) =>
+ self.modify((a) => [
+ void 0,
+ Option.match(f(a), {
+ onNone: () => a,
+ onSome: (b) => b
+ })
+ ])
+)
+
+/** @internal */
+export const updateSomeAndGet = dual<
+ (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM
+>(
+ 2,
+ (self, f) =>
+ self.modify((a) =>
+ Option.match(f(a), {
+ onNone: () => [a, a],
+ onSome: (b) => [b, b]
+ })
+ )
+)
+
+/** @internal */
+export const changesScoped = (self: TSubscriptionRef.TSubscriptionRef) =>
+ Effect.acquireRelease(self.changes, TQueue.shutdown)
+
+/** @internal */
+export const changesStream = (self: TSubscriptionRef.TSubscriptionRef) =>
+ stream.unwrap(Effect.map(self.changes, stream.fromTQueue))
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index 3e677bfac92..82627f4988c 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -18,7 +18,7 @@ import * as MergeDecision from "../MergeDecision.js"
import * as Option from "../Option.js"
import type * as Order from "../Order.js"
import { pipeArguments } from "../Pipeable.js"
-import { hasProperty, isTagged, type Predicate, type Refinement } from "../Predicate.js"
+import { hasProperty, type Predicate, type Refinement } from "../Predicate.js"
import * as PubSub from "../PubSub.js"
import * as Queue from "../Queue.js"
import * as RcRef from "../RcRef.js"
@@ -31,6 +31,8 @@ import type * as Stream from "../Stream.js"
import type * as Emit from "../StreamEmit.js"
import * as HaltStrategy from "../StreamHaltStrategy.js"
import type * as Take from "../Take.js"
+import * as TPubSub from "../TPubSub.js"
+import * as TQueue from "../TQueue.js"
import type * as Tracer from "../Tracer.js"
import * as Tuple from "../Tuple.js"
import type { NoInfer, TupleOf } from "../Types.js"
@@ -464,23 +466,23 @@ export const as = dual<
>(2, (self: Stream.Stream, value: B): Stream.Stream => map(self, () => value))
const queueFromBufferOptions = (
- bufferSize?: number | "unbounded" | {
+ options?: {
+ readonly bufferSize: "unbounded"
+ } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Effect.Effect>> => {
- if (bufferSize === "unbounded") {
+ if (options?.bufferSize === "unbounded") {
return Queue.unbounded()
- } else if (typeof bufferSize === "number" || bufferSize === undefined) {
- return Queue.bounded(bufferSize ?? 16)
}
- switch (bufferSize.strategy) {
+ switch (options?.strategy) {
case "dropping":
- return Queue.dropping(bufferSize.bufferSize ?? 16)
+ return Queue.dropping(options.bufferSize ?? 16)
case "sliding":
- return Queue.sliding(bufferSize.bufferSize ?? 16)
+ return Queue.sliding(options.bufferSize ?? 16)
default:
- return Queue.bounded(bufferSize.bufferSize ?? 16)
+ return Queue.bounded(options?.bufferSize ?? 16)
}
}
@@ -489,13 +491,15 @@ export const _async = (
register: (
emit: Emit.Emit
) => Effect.Effect | void,
- bufferSize?: number | "unbounded" | {
+ options?: {
+ readonly bufferSize: "unbounded"
+ } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream =>
Effect.acquireRelease(
- queueFromBufferOptions(bufferSize),
+ queueFromBufferOptions(options),
(queue) => Queue.shutdown(queue)
).pipe(
Effect.flatMap((output) =>
@@ -544,14 +548,16 @@ export const _async = (
/** @internal */
export const asyncEffect = (
register: (emit: Emit.Emit) => Effect.Effect,
- bufferSize?: number | "unbounded" | {
+ options?: {
+ readonly bufferSize: "unbounded"
+ } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream =>
pipe(
Effect.acquireRelease(
- queueFromBufferOptions(bufferSize),
+ queueFromBufferOptions(options),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
@@ -647,14 +653,16 @@ export const asyncPush = (
/** @internal */
export const asyncScoped = (
register: (emit: Emit.Emit) => Effect.Effect,
- bufferSize?: number | "unbounded" | {
+ options?: {
+ readonly bufferSize: "unbounded"
+ } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream> =>
pipe(
Effect.acquireRelease(
- queueFromBufferOptions(bufferSize),
+ queueFromBufferOptions(options),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
@@ -3133,6 +3141,14 @@ export const fromPubSub: {
return options?.shutdown ? ensuring(stream, PubSub.shutdown(pubsub)) : stream
}
+/** @internal */
+export const fromTPubSub = (pubsub: TPubSub.TPubSub): Stream.Stream => {
+ return unwrapScoped(Effect.map(
+ TPubSub.subscribeScoped(pubsub),
+ (queue) => fromTQueue(queue)
+ ))
+}
+
/** @internal */
export const fromIterable = (iterable: Iterable): Stream.Stream =>
suspend(() =>
@@ -3224,6 +3240,24 @@ export const fromQueue = (
options?.shutdown ? ensuring(Queue.shutdown(queue)) : identity
)
+/** @internal */
+export const fromTQueue = (queue: TQueue.TDequeue): Stream.Stream =>
+ pipe(
+ TQueue.take(queue),
+ Effect.map(Chunk.of),
+ Effect.catchAllCause((cause) =>
+ pipe(
+ TQueue.isShutdown(queue),
+ Effect.flatMap((isShutdown) =>
+ isShutdown && Cause.isInterrupted(cause) ?
+ pull.end() :
+ pull.failCause(cause)
+ )
+ )
+ ),
+ repeatEffectChunkOption
+ )
+
/** @internal */
export const fromSchedule = (schedule: Schedule.Schedule): Stream.Stream =>
pipe(
@@ -3233,14 +3267,38 @@ export const fromSchedule = (schedule: Schedule.Schedule):
)
/** @internal */
-export const fromReadableStream = (
- evaluate: LazyArg>,
- onError: (error: unknown) => E
-): Stream.Stream =>
- unwrapScoped(Effect.map(
+export const fromReadableStream: {
+ (
+ options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly releaseLockOnEnd?: boolean | undefined
+ }
+ ): Stream.Stream
+ (
+ evaluate: LazyArg>,
+ onError: (error: unknown) => E
+ ): Stream.Stream
+} = (
+ ...args: [options: {
+ readonly evaluate: LazyArg