Socket
Socket
Sign inDemoInstall

@effect/stream

Package Overview
Dependencies
Maintainers
3
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effect/stream - npm Package Compare versions

Comparing version 0.29.0 to 0.30.0

69

Channel.js

@@ -6,4 +6,4 @@ "use strict";

});
exports.unwrap = exports.unit = exports.toStream = exports.toSink = exports.toQueue = exports.toPull = exports.toHub = exports.sync = exports.succeed = exports.scoped = exports.runDrain = exports.runCollect = exports.run = exports.repeated = exports.readWithCause = exports.readWith = exports.readOrFail = exports.read = exports.provideSomeLayer = exports.provideService = exports.provideLayer = exports.provideContext = exports.pipeToOrFail = exports.pipeTo = exports.orElse = exports.orDieWith = exports.orDie = exports.never = exports.mergeWith = exports.mergeOutWith = exports.mergeOut = exports.mergeMapStrategy = exports.mergeMapBufferStrategy = exports.mergeMapBuffer = exports.mergeMap = exports.mergeAllWith = exports.mergeAllUnboundedWith = exports.mergeAllUnbounded = exports.mergeAll = exports.mapOutEffectPar = exports.mapOutEffect = exports.mapOut = exports.mapErrorCause = exports.mapError = exports.mapEffect = exports.map = exports.isChannelException = exports.interruptWhenDeferred = exports.interruptWhen = exports.identity = exports.fromQueue = exports.fromOption = exports.fromInput = exports.fromHubScoped = exports.fromHub = exports.fromEither = exports.fromEffect = exports.foldChannel = exports.foldCauseChannel = exports.flatten = exports.flatMap = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.ensuringWith = exports.ensuring = exports.emitCollect = exports.embedInput = exports.drain = exports.doneCollect = exports.contramapInEffect = exports.contramapIn = exports.contramapErrorEffect = exports.contramapError = exports.contramapEffect = exports.contramapContext = exports.contramap = exports.contextWithEffect = exports.contextWithChannel = exports.contextWith = exports.context = exports.concatOut = exports.concatMapWithCustom = exports.concatMapWith = exports.concatMap = exports.concatAllWith = exports.concatAll = exports.collect = exports.catchAllCause = exports.catchAll = exports.bufferChunk = exports.buffer = exports.asUnit = exports.as = exports.acquireUseRelease = exports.acquireReleaseOut = exports.ChannelTypeId = exports.ChannelExceptionTypeId = exports.ChannelException = void 0;
exports.zipRight = exports.zipParRight = exports.zipParLeft = exports.zipPar = exports.zipLeft = exports.zip = exports.writeChunk = exports.writeAll = exports.write = exports.updateService = exports.unwrapScoped = void 0;
exports.write = exports.updateService = exports.unwrapScoped = exports.unwrap = exports.unit = exports.toStream = exports.toSink = exports.toQueue = exports.toPull = exports.toHub = exports.sync = exports.succeed = exports.scoped = exports.runDrain = exports.runCollect = exports.run = exports.repeated = exports.readWithCause = exports.readWith = exports.readOrFail = exports.read = exports.provideSomeLayer = exports.provideService = exports.provideLayer = exports.provideContext = exports.pipeToOrFail = exports.pipeTo = exports.orElse = exports.orDieWith = exports.orDie = exports.never = exports.mergeWith = exports.mergeOutWith = exports.mergeOut = exports.mergeMap = exports.mergeAllWith = exports.mergeAllUnboundedWith = exports.mergeAllUnbounded = exports.mergeAll = exports.mapOutEffectPar = exports.mapOutEffect = exports.mapOut = exports.mapErrorCause = exports.mapError = exports.mapEffect = exports.map = exports.isChannelException = exports.interruptWhenDeferred = exports.interruptWhen = exports.identity = exports.fromQueue = exports.fromOption = exports.fromInput = exports.fromHubScoped = exports.fromHub = exports.fromEither = exports.fromEffect = exports.foldChannel = exports.foldCauseChannel = exports.flatten = exports.flatMap = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.ensuringWith = exports.ensuring = exports.emitCollect = exports.embedInput = exports.drain = exports.doneCollect = exports.contramapInEffect = exports.contramapIn = exports.contramapErrorEffect = exports.contramapError = exports.contramapEffect = exports.contramapContext = exports.contramap = exports.contextWithEffect = exports.contextWithChannel = exports.contextWith = exports.context = exports.concatOut = exports.concatMapWithCustom = exports.concatMapWith = exports.concatMap = exports.concatAllWith = exports.concatAll = exports.collect = exports.catchAllCause = exports.catchAll = exports.bufferChunk = exports.buffer = exports.asUnit = exports.as = exports.acquireUseRelease = exports.acquireReleaseOut = exports.ChannelTypeId = exports.ChannelExceptionTypeId = exports.ChannelException = void 0;
exports.zipRight = exports.zipLeft = exports.zip = exports.writeChunk = exports.writeAll = void 0;
var channel = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/stream/internal/channel"));

@@ -576,36 +576,2 @@ var core = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/stream/internal/core"));

/**
* Like `mergeMap`, but with a configurable `mergeStrategy` parameter.
*
* @param n The maximum number of channels to merge.
* @param mergeStrategy The `MergeStrategy` to use (either `BackPressure` or `Sliding`.
* @param f The function that creates a new channel from each emitted element.
* @since 1.0.0
* @category mapping
*/
exports.mergeMap = mergeMap;
const mergeMapStrategy = channel.mergeMapStrategy;
/**
* Like `mergeMap`, but with a configurable `bufferSize` parameter.
*
* @param n The maximum number of channels to merge.
* @param bufferSize The number of elements that can be buffered from upstream for the merging.
* @param f The function that creates a new channel from each emitted element.
* @since 1.0.0
* @category mapping
*/
exports.mergeMapStrategy = mergeMapStrategy;
const mergeMapBuffer = channel.mergeMapBuffer;
/**
* Like `mergeMap`, but with a configurable `bufferSize` and `mergeStrategy` parameter.
*
* @param n The maximum number of channels to merge.
* @param bufferSize The number of elements that can be buffered from upstream for the merging.
* @param mergeStrategy The `MergeStrategy` to use (either `BackPressure` or `Sliding`.
* @param f The function that creates a new channel from each emitted element.
* @since 1.0.0
* @category mapping
*/
exports.mergeMapBuffer = mergeMapBuffer;
const mergeMapBufferStrategy = channel.mergeMapBufferStrategy;
/**
* Returns a new channel which merges a number of channels emitted by this

@@ -617,3 +583,3 @@ * channel using the back pressuring merge strategy. See `Channel.mergeAll`.

*/
exports.mergeMapBufferStrategy = mergeMapBufferStrategy;
exports.mergeMap = mergeMap;
const mergeOut = channel.mergeOut;

@@ -953,29 +919,2 @@ /**

/**
* Creates a new channel which runs in parallel this and the other channel and
* when both succeeds finishes with a tuple of both channel's done value.
*
* @since 1.0.0
* @category zipping
*/
exports.zipRight = zipRight;
const zipPar = channel.zipPar;
/**
* Creates a new channel which runs in parallel this and the other channel and
* when both succeeds finishes with the first one's done value.
*
* @since 1.0.0
* @category zipping
*/
exports.zipPar = zipPar;
const zipParLeft = channel.zipParLeft;
/**
* Creates a new channel which runs in parallel this and the other channel and
* when both succeeds finishes with the second one's done value.
*
* @since 1.0.0
* @category zipping
*/
exports.zipParLeft = zipParLeft;
const zipParRight = channel.zipParRight;
/**
* Represents a generic checked exception which occurs when a `Channel` is

@@ -987,3 +926,3 @@ * executed.

*/
exports.zipParRight = zipParRight;
exports.zipRight = zipRight;
const ChannelException = channel.ChannelException;

@@ -990,0 +929,0 @@ /**

12

Channel/ChildExecutorDecision.d.ts

@@ -112,5 +112,13 @@ /**

export declare const match: {
<A>(onContinue: () => A, onClose: (value: unknown) => A, onYield: () => A): (self: ChildExecutorDecision) => A;
<A>(self: ChildExecutorDecision, onContinue: () => A, onClose: (value: unknown) => A, onYield: () => A): A;
<A>(options: {
readonly onContinue: () => A;
readonly onClose: (value: unknown) => A;
readonly onYield: () => A;
}): (self: ChildExecutorDecision) => A;
<A>(self: ChildExecutorDecision, options: {
readonly onContinue: () => A;
readonly onClose: (value: unknown) => A;
readonly onYield: () => A;
}): A;
};
//# sourceMappingURL=ChildExecutorDecision.d.ts.map

@@ -68,5 +68,11 @@ /**

export declare const match: {
<R, E0, Z0, E, Z, Z2>(onDone: (effect: Effect.Effect<R, E, Z>) => Z2, onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2): (self: MergeDecision<R, E0, Z0, E, Z>) => Z2;
<R, E0, Z0, E, Z, Z2>(self: MergeDecision<R, E0, Z0, E, Z>, onDone: (effect: Effect.Effect<R, E, Z>) => Z2, onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2): Z2;
<R, E0, Z0, E, Z, Z2>(options: {
readonly onDone: (effect: Effect.Effect<R, E, Z>) => Z2;
readonly onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2;
}): (self: MergeDecision<R, E0, Z0, E, Z>) => Z2;
<R, E0, Z0, E, Z, Z2>(self: MergeDecision<R, E0, Z0, E, Z>, options: {
readonly onDone: (effect: Effect.Effect<R, E, Z>) => Z2;
readonly onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2;
}): Z2;
};
//# sourceMappingURL=MergeDecision.d.ts.map

@@ -111,5 +111,13 @@ /**

export declare const match: {
<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(onBothRunning: (left: Fiber.Fiber<Err, Either.Either<Done, Elem>>, right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>) => Z, onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z, onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z): (self: MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>) => Z;
<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(self: MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>, onBothRunning: (left: Fiber.Fiber<Err, Either.Either<Done, Elem>>, right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>) => Z, onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z, onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z): Z;
<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(options: {
readonly onBothRunning: (left: Fiber.Fiber<Err, Either.Either<Done, Elem>>, right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>) => Z;
readonly onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z;
readonly onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z;
}): (self: MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>) => Z;
<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(self: MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>, options: {
readonly onBothRunning: (left: Fiber.Fiber<Err, Either.Either<Done, Elem>>, right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>) => Z;
readonly onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z;
readonly onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z;
}): Z;
};
//# sourceMappingURL=MergeState.d.ts.map

@@ -83,5 +83,11 @@ /**

export declare const match: {
<A>(onBackPressure: () => A, onBufferSliding: () => A): (self: MergeStrategy) => A;
<A>(self: MergeStrategy, onBackPressure: () => A, onBufferSliding: () => A): A;
<A>(options: {
readonly onBackPressure: () => A;
readonly onBufferSliding: () => A;
}): (self: MergeStrategy) => A;
<A>(self: MergeStrategy, options: {
readonly onBackPressure: () => A;
readonly onBufferSliding: () => A;
}): A;
};
//# sourceMappingURL=MergeStrategy.d.ts.map

@@ -87,5 +87,11 @@ /**

export declare const match: {
<A, Z>(onPulled: (value: A) => Z, onNoUpstream: (activeDownstreamCount: number) => Z): (self: UpstreamPullRequest<A>) => Z;
<A, Z>(self: UpstreamPullRequest<A>, onPulled: (value: A) => Z, onNoUpstream: (activeDownstreamCount: number) => Z): Z;
<A, Z>(options: {
readonly onPulled: (value: A) => Z;
readonly onNoUpstream: (activeDownstreamCount: number) => Z;
}): (self: UpstreamPullRequest<A>) => Z;
<A, Z>(self: UpstreamPullRequest<A>, options: {
readonly onPulled: (value: A) => Z;
readonly onNoUpstream: (activeDownstreamCount: number) => Z;
}): Z;
};
//# sourceMappingURL=UpstreamPullRequest.d.ts.map

@@ -91,5 +91,11 @@ /**

export declare const match: {
<A, Z>(onPullAfterNext: (emitSeparator: Option.Option<A>) => Z, onPullAfterAllEnqueued: (emitSeparator: Option.Option<A>) => Z): (self: UpstreamPullStrategy<A>) => Z;
<A, Z>(self: UpstreamPullStrategy<A>, onPullAfterNext: (emitSeparator: Option.Option<A>) => Z, onPullAfterAllEnqueued: (emitSeparator: Option.Option<A>) => Z): Z;
<A, Z>(options: {
readonly onNext: (emitSeparator: Option.Option<A>) => Z;
readonly onAllEnqueued: (emitSeparator: Option.Option<A>) => Z;
}): (self: UpstreamPullStrategy<A>) => Z;
<A, Z>(self: UpstreamPullStrategy<A>, options: {
readonly onNext: (emitSeparator: Option.Option<A>) => Z;
readonly onAllEnqueued: (emitSeparator: Option.Option<A>) => Z;
}): Z;
};
//# sourceMappingURL=UpstreamPullStrategy.d.ts.map

@@ -6,3 +6,3 @@ "use strict";

});
exports.zipRight = exports.zipParRight = exports.zipParLeft = exports.zipPar = exports.zipLeft = exports.zip = exports.writeChunk = exports.writeAll = exports.updateService = exports.unwrapScoped = exports.unwrap = exports.toQueue = exports.toPull = exports.toHub = exports.serviceWithEffect = exports.serviceWithChannel = exports.serviceWith = exports.service = exports.scoped = exports.runDrain = exports.runCollect = exports.run = exports.repeated = exports.read = exports.provideSomeLayer = exports.provideService = exports.provideLayer = exports.pipeToOrFail = exports.orElse = exports.orDieWith = exports.orDie = exports.never = exports.mergeWith = exports.mergeOutWith = exports.mergeOut = exports.mergeMapStrategy = exports.mergeMapBufferStrategy = exports.mergeMapBuffer = exports.mergeMap = exports.mergeAllWith = exports.mergeAllUnboundedWith = exports.mergeAllUnbounded = exports.mergeAll = exports.mapOutEffectPar = exports.mapOutEffect = exports.mapOut = exports.mapErrorCause = exports.mapError = exports.mapEffect = exports.map = exports.isChannelException = exports.interruptWhenDeferred = exports.interruptWhen = exports.identityChannel = exports.fromQueue = exports.fromOption = exports.fromInput = exports.fromHubScoped = exports.fromHub = exports.fromEither = exports.foldChannel = exports.flatten = exports.ensuring = exports.emitCollect = exports.drain = exports.doneCollect = exports.contramapInEffect = exports.contramapIn = exports.contramapErrorEffect = exports.contramapError = exports.contramapEffect = exports.contramapContext = exports.contramap = exports.contextWithEffect = exports.contextWithChannel = exports.contextWith = exports.context = exports.concatOut = exports.concatMap = exports.collect = exports.catchAll = exports.bufferChunk = exports.buffer = exports.asUnit = exports.as = exports.acquireUseRelease = exports.ChannelExceptionTypeId = exports.ChannelException = void 0;
exports.zipRight = exports.zipLeft = exports.zip = exports.writeChunk = exports.writeAll = exports.updateService = exports.unwrapScoped = exports.unwrap = exports.toQueue = exports.toPull = exports.toHub = exports.serviceWithEffect = exports.serviceWithChannel = exports.serviceWith = exports.service = exports.scoped = exports.runDrain = exports.runCollect = exports.run = exports.repeated = exports.read = exports.provideSomeLayer = exports.provideService = exports.provideLayer = exports.pipeToOrFail = exports.orElse = exports.orDieWith = exports.orDie = exports.never = exports.mergeWith = exports.mergeOutWith = exports.mergeOut = exports.mergeMap = exports.mergeAllWith = exports.mergeAllUnboundedWith = exports.mergeAllUnbounded = exports.mergeAll = exports.mapOutEffectPar = exports.mapOutEffect = exports.mapOut = exports.mapErrorCause = exports.mapError = exports.mapEffect = exports.map = exports.isChannelException = exports.interruptWhenDeferred = exports.interruptWhen = exports.identityChannel = exports.fromQueue = exports.fromOption = exports.fromInput = exports.fromHubScoped = exports.fromHub = exports.fromEither = exports.foldChannel = exports.flatten = exports.ensuring = exports.emitCollect = exports.drain = exports.doneCollect = exports.contramapInEffect = exports.contramapIn = exports.contramapErrorEffect = exports.contramapError = exports.contramapEffect = exports.contramapContext = exports.contramap = exports.contextWithEffect = exports.contextWithChannel = exports.contextWith = exports.context = exports.concatOut = exports.concatMap = exports.collect = exports.catchAll = exports.bufferChunk = exports.buffer = exports.asUnit = exports.as = exports.acquireUseRelease = exports.ChannelExceptionTypeId = exports.ChannelException = void 0;
var Chunk = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Chunk"));

@@ -45,9 +45,17 @@ var Context = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Context"));

exports.asUnit = asUnit;
const buffer = (empty, isEmpty, ref) => core.suspend(() => {
const doBuffer = (empty, isEmpty, ref) => unwrap(Ref.modify(ref, inElem => isEmpty(inElem) ? [core.readWith(input => core.flatMap(core.write(input), () => doBuffer(empty, isEmpty, ref)), error => core.fail(error), done => core.succeedNow(done)), inElem] : [core.flatMap(core.write(inElem), () => doBuffer(empty, isEmpty, ref)), empty]));
return doBuffer(empty, isEmpty, ref);
const buffer = options => core.suspend(() => {
const doBuffer = (empty, isEmpty, ref) => unwrap(Ref.modify(ref, inElem => isEmpty(inElem) ? [core.readWith({
onInput: input => core.flatMap(core.write(input), () => doBuffer(empty, isEmpty, ref)),
onFailure: error => core.fail(error),
onDone: done => core.succeedNow(done)
}), inElem] : [core.flatMap(core.write(inElem), () => doBuffer(empty, isEmpty, ref)), empty]));
return doBuffer(options.empty, options.isEmpty, options.ref);
});
/** @internal */
exports.buffer = buffer;
const bufferChunk = ref => buffer(Chunk.empty(), Chunk.isEmpty, ref);
const bufferChunk = ref => buffer({
empty: Chunk.empty(),
isEmpty: Chunk.isEmpty,
ref
});
/** @internal */

@@ -65,6 +73,10 @@ exports.bufferChunk = bufferChunk;

const collect = /*#__PURE__*/(0, _Function.dual)(2, (self, pf) => {
const collector = core.readWith(out => Option.match(pf(out), {
onNone: () => collector,
onSome: out2 => core.flatMap(core.write(out2), () => collector)
}), core.fail, core.succeedNow);
const collector = core.readWith({
onInput: out => Option.match(pf(out), {
onNone: () => collector,
onSome: out2 => core.flatMap(core.write(out2), () => collector)
}),
onFailure: core.fail,
onDone: core.succeedNow
});
return core.pipeTo(self, collector);

@@ -78,3 +90,7 @@ });

const contramap = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWith(inElem => core.flatMap(core.write(inElem), () => reader), core.fail, done => core.succeedNow(f(done)));
const reader = core.readWith({
onInput: inElem => core.flatMap(core.write(inElem), () => reader),
onFailure: core.fail,
onDone: done => core.succeedNow(f(done))
});
return core.pipeTo(reader, self);

@@ -85,3 +101,7 @@ });

const contramapEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWith(inElem => core.flatMap(core.write(inElem), () => reader), core.fail, done => core.fromEffect(f(done)));
const reader = core.readWith({
onInput: inElem => core.flatMap(core.write(inElem), () => reader),
onFailure: core.fail,
onDone: done => core.fromEffect(f(done))
});
return core.pipeTo(reader, self);

@@ -92,3 +112,7 @@ });

const contramapError = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWith(inElem => core.flatMap(core.write(inElem), () => reader), error => core.fail(f(error)), core.succeedNow);
const reader = core.readWith({
onInput: inElem => core.flatMap(core.write(inElem), () => reader),
onFailure: error => core.fail(f(error)),
onDone: core.succeedNow
});
return core.pipeTo(reader, self);

@@ -99,3 +123,7 @@ });

const contramapErrorEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWith(inElem => core.flatMap(core.write(inElem), () => reader), error => core.fromEffect(f(error)), core.succeedNow);
const reader = core.readWith({
onInput: inElem => core.flatMap(core.write(inElem), () => reader),
onFailure: error => core.fromEffect(f(error)),
onDone: core.succeedNow
});
return core.pipeTo(reader, self);

@@ -106,3 +134,7 @@ });

const contramapIn = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWith(inElem => core.flatMap(core.write(f(inElem)), () => reader), core.fail, core.succeedNow);
const reader = core.readWith({
onInput: inElem => core.flatMap(core.write(f(inElem)), () => reader),
onFailure: core.fail,
onDone: core.succeedNow
});
return core.pipeTo(reader, self);

@@ -112,3 +144,7 @@ });

const contramapInEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWith(inElem => core.flatMap(core.flatMap(core.fromEffect(f(inElem)), core.write), () => reader), core.fail, core.succeedNow);
const reader = core.readWith({
onInput: inElem => core.flatMap(core.flatMap(core.fromEffect(f(inElem)), core.write), () => reader),
onFailure: core.fail,
onDone: core.succeedNow
});
return core.pipeTo(reader, self);

@@ -125,9 +161,17 @@ });

const doneCollectReader = builder => {
return core.readWith(outElem => core.flatMap(core.sync(() => {
builder.push(outElem);
}), () => doneCollectReader(builder)), core.fail, core.succeed);
return core.readWith({
onInput: outElem => core.flatMap(core.sync(() => {
builder.push(outElem);
}), () => doneCollectReader(builder)),
onFailure: core.fail,
onDone: core.succeed
});
};
/** @internal */
const drain = self => {
const drainer = core.readWithCause(() => drainer, core.failCause, core.succeed);
const drainer = core.readWithCause({
onInput: () => drainer,
onFailure: core.failCause,
onDone: core.succeed
});
return core.pipeTo(self, drainer);

@@ -158,15 +202,18 @@ };

exports.flatten = flatten;
const foldChannel = /*#__PURE__*/(0, _Function.dual)(3, (self, onError, onSuccess) => core.foldCauseChannel(self, cause => {
const either = Cause.failureOrCause(cause);
switch (either._tag) {
case "Left":
{
return onError(either.left);
}
case "Right":
{
return core.failCause(either.right);
}
}
}, onSuccess));
const foldChannel = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => core.foldCauseChannel(self, {
onFailure: cause => {
const either = Cause.failureOrCause(cause);
switch (either._tag) {
case "Left":
{
return options.onFailure(either.left);
}
case "Right":
{
return core.failCause(either.right);
}
}
},
onSuccess: options.onSuccess
}));
/** @internal */

@@ -206,6 +253,14 @@ exports.foldChannel = foldChannel;

/** @internal */
const identityChannel = () => core.readWith(input => core.flatMap(core.write(input), () => identityChannel()), core.fail, core.succeedNow);
const identityChannel = () => core.readWith({
onInput: input => core.flatMap(core.write(input), () => identityChannel()),
onFailure: core.fail,
onDone: core.succeedNow
});
/** @internal */
exports.identityChannel = identityChannel;
const interruptWhen = /*#__PURE__*/(0, _Function.dual)(2, (self, effect) => mergeWith(self, core.fromEffect(effect), selfDone => mergeDecision.Done(Effect.suspend(() => selfDone)), effectDone => mergeDecision.Done(Effect.suspend(() => effectDone))));
const interruptWhen = /*#__PURE__*/(0, _Function.dual)(2, (self, effect) => mergeWith(self, {
other: core.fromEffect(effect),
onSelfDone: selfDone => mergeDecision.Done(Effect.suspend(() => selfDone)),
onOtherDone: effectDone => mergeDecision.Done(Effect.suspend(() => effectDone))
}));
/** @internal */

@@ -229,3 +284,7 @@ exports.interruptWhen = interruptWhen;

const mapOut = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWith(outElem => core.flatMap(core.write(f(outElem)), () => reader), core.fail, core.succeedNow);
const reader = core.readWith({
onInput: outElem => core.flatMap(core.write(f(outElem)), () => reader),
onFailure: core.fail,
onDone: core.succeedNow
});
return core.pipeTo(self, reader);

@@ -236,3 +295,7 @@ });

const mapOutEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const reader = core.readWithCause(outElem => core.flatMap(() => reader)(core.flatMap(core.write)(core.fromEffect(f(outElem)))), core.failCause, core.succeedNow);
const reader = core.readWithCause({
onInput: outElem => core.flatMap(() => reader)(core.flatMap(core.write)(core.fromEffect(f(outElem)))),
onFailure: core.failCause,
onDone: core.succeedNow
});
return core.pipeTo(self, reader);

@@ -278,14 +341,22 @@ });

exports.mapOutEffectPar = mapOutEffectPar;
const mergeAll = (n, bufferSize = 16, mergeStrategy = _mergeStrategy.BackPressure()) => {
return channels => mergeAllWith(n, bufferSize, mergeStrategy)(channels, _Function.constVoid);
const mergeAll = options => {
return channels => mergeAllWith(options)(channels, _Function.constVoid);
};
/** @internal */
exports.mergeAll = mergeAll;
const mergeAllUnbounded = channels => mergeAllWith(Number.POSITIVE_INFINITY)(channels, _Function.constVoid);
const mergeAllUnbounded = channels => mergeAllWith({
concurrency: "unbounded"
})(channels, _Function.constVoid);
/** @internal */
exports.mergeAllUnbounded = mergeAllUnbounded;
const mergeAllUnboundedWith = (channels, f) => mergeAllWith(Number.POSITIVE_INFINITY)(channels, f);
const mergeAllUnboundedWith = (channels, f) => mergeAllWith({
concurrency: "unbounded"
})(channels, f);
/** @internal */
exports.mergeAllUnboundedWith = mergeAllUnboundedWith;
const mergeAllWith = (n, bufferSize = 16, mergeStrategy = _mergeStrategy.BackPressure()) => (channels, f) => unwrapScoped(Effect.map(([queue, input]) => {
const mergeAllWith = ({
bufferSize = 16,
concurrency,
mergeStrategy = _mergeStrategy.BackPressure()
}) => (channels, f) => unwrapScoped(Effect.map(([queue, input]) => {
const consumer = unwrap(Effect.matchCause({

@@ -306,3 +377,3 @@ onFailure: core.failCause,

const errorSignal = yield* $(Deferred.make());
const withPermits = n === Number.POSITIVE_INFINITY ? _ => _Function.identity : (yield* $(Effect.makeSemaphore(n))).withPermits;
const withPermits = concurrency === "unbounded" ? _ => _Function.identity : (yield* $(Effect.makeSemaphore(concurrency))).withPermits;
const pull = yield* $(toPull(channels));

@@ -323,3 +394,3 @@ const evaluatePull = pull => Effect.catchAllCause(cause => Cause.isInterrupted(cause) ? Effect.failCause(cause) : Effect.asUnit(Effect.zipRight(Deferred.succeed(errorSignal, void 0))(Queue.offer(queue, Effect.failCause(cause)))))(Effect.flatMap(Option.match({

onLeft: outDone => Effect.raceWith(Deferred.await(errorSignal), {
other: withPermits(n)(Effect.unit),
other: withPermits(concurrency)(Effect.unit),
onSelfDone: (_, permitAcquisition) => Effect.as(false)(Fiber.interrupt(permitAcquisition)),

@@ -331,21 +402,24 @@ onOtherDone: (_, failureAwait) => Effect.zipRight(Effect.as(false)(Effect.flatMap(Option.match({

}),
onRight: channel => _mergeStrategy.match(() => Effect.gen(function* ($) {
const latch = yield* $(Deferred.make());
const raceEffects = Effect.scoped(Effect.flatMap(pull => Effect.raceAwait(Deferred.await(errorSignal))(evaluatePull(pull)))(toPull(core.pipeTo(channel)(queueReader))));
yield* $(Effect.forkScoped(withPermits(1)(Effect.zipRight(raceEffects)(Deferred.succeed(latch, void 0)))));
yield* $(Deferred.await(latch));
const errored = yield* $(Deferred.isDone(errorSignal));
return !errored;
}), () => Effect.gen(function* ($) {
const canceler = yield* $(Deferred.make());
const latch = yield* $(Deferred.make());
const size = yield* $(Queue.size(cancelers));
yield* $(Effect.when(() => size >= n)(Effect.flatMap(_ => Deferred.succeed(_, void 0))(Queue.take(cancelers))));
yield* $(Queue.offer(cancelers, canceler));
const raceEffects = Effect.scoped(Effect.flatMap(pull => Effect.raceAwait(Deferred.await(canceler))(Effect.raceAwait(Deferred.await(errorSignal))(evaluatePull(pull))))(toPull(core.pipeTo(channel)(queueReader))));
yield* $(Effect.forkScoped(withPermits(1)(Effect.zipRight(raceEffects)(Deferred.succeed(latch, void 0)))));
yield* $(Deferred.await(latch));
const errored = yield* $(Deferred.isDone(errorSignal));
return !errored;
}))(mergeStrategy)
onRight: channel => _mergeStrategy.match({
onBackPressure: () => Effect.gen(function* ($) {
const latch = yield* $(Deferred.make());
const raceEffects = Effect.scoped(Effect.flatMap(pull => Effect.raceAwait(Deferred.await(errorSignal))(evaluatePull(pull)))(toPull(core.pipeTo(channel)(queueReader))));
yield* $(Effect.forkScoped(withPermits(1)(Effect.zipRight(raceEffects)(Deferred.succeed(latch, void 0)))));
yield* $(Deferred.await(latch));
const errored = yield* $(Deferred.isDone(errorSignal));
return !errored;
}),
onBufferSliding: () => Effect.gen(function* ($) {
const canceler = yield* $(Deferred.make());
const latch = yield* $(Deferred.make());
const size = yield* $(Queue.size(cancelers));
yield* $(Effect.when(() => concurrency === "unbounded" ? false : size >= concurrency)(Effect.flatMap(_ => Deferred.succeed(_, void 0))(Queue.take(cancelers))));
yield* $(Queue.offer(cancelers, canceler));
const raceEffects = Effect.scoped(Effect.flatMap(pull => Effect.raceAwait(Deferred.await(canceler))(Effect.raceAwait(Deferred.await(errorSignal))(evaluatePull(pull))))(toPull(core.pipeTo(channel)(queueReader))));
yield* $(Effect.forkScoped(withPermits(1)(Effect.zipRight(raceEffects)(Deferred.succeed(latch, void 0)))));
yield* $(Deferred.await(latch));
const errored = yield* $(Deferred.isDone(errorSignal));
return !errored;
})
})(mergeStrategy)
})

@@ -357,23 +431,18 @@ }))));

exports.mergeAllWith = mergeAllWith;
const mergeMap = /*#__PURE__*/(0, _Function.dual)(3, (self, f, n) => mergeMapBufferStrategy(self, f, n, 16, _mergeStrategy.BackPressure()));
const mergeMap = /*#__PURE__*/(0, _Function.dual)(3, (self, f, options) => mergeAll(options)(mapOut(self, f)));
/** @internal */
exports.mergeMap = mergeMap;
const mergeMapBuffer = /*#__PURE__*/(0, _Function.dual)(4, (self, f, n, bufferSize) => mergeMapBufferStrategy(self, f, n, bufferSize, _mergeStrategy.BackPressure()));
const mergeOut = /*#__PURE__*/(0, _Function.dual)(2, (self, n) => mergeAll({
concurrency: n
})(mapOut(self, _Function.identity)));
/** @internal */
exports.mergeMapBuffer = mergeMapBuffer;
const mergeMapStrategy = /*#__PURE__*/(0, _Function.dual)(4, (self, f, n, mergeStrategy) => mergeMapBufferStrategy(self, f, n, 16, mergeStrategy));
/** @internal */
exports.mergeMapStrategy = mergeMapStrategy;
const mergeMapBufferStrategy = /*#__PURE__*/(0, _Function.dual)(5, (self, f, n, bufferSize, mergeStrategy) => mergeAll(n, bufferSize, mergeStrategy)(mapOut(self, f)));
/** @internal */
exports.mergeMapBufferStrategy = mergeMapBufferStrategy;
const mergeOut = /*#__PURE__*/(0, _Function.dual)(2, (self, n) => mergeAll(n)(mapOut(self, _Function.identity)));
/** @internal */
exports.mergeOut = mergeOut;
const mergeOutWith = /*#__PURE__*/(0, _Function.dual)(3, (self, n, f) => mergeAllWith(n)(mapOut(self, _Function.identity), f));
const mergeOutWith = /*#__PURE__*/(0, _Function.dual)(3, (self, n, f) => mergeAllWith({
concurrency: n
})(mapOut(self, _Function.identity), f));
/** @internal */
exports.mergeOutWith = mergeOutWith;
const mergeWith = /*#__PURE__*/(0, _Function.dual)(4, (self, that, leftDone, rightDone) => unwrapScoped(Effect.flatMap(input => {
const mergeWith = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => unwrapScoped(Effect.flatMap(singleProducerAsyncInput.make(), input => {
const queueReader = fromInput(input);
return Effect.map(Effect.zip(toPull(core.pipeTo(queueReader, self)), toPull(core.pipeTo(queueReader, that))), ([pullL, pullR]) => {
return Effect.map(Effect.zip(toPull(core.pipeTo(queueReader, self)), toPull(core.pipeTo(queueReader, options.other))), ([pullL, pullR]) => {
const handleSide = (exit, fiber, pull) => (done, both, single) => {

@@ -409,4 +478,4 @@ const onDecision = decision => {

other: rightJoin,
onSelfDone: (leftExit, rf) => Effect.zipRight(Fiber.interrupt(rf), handleSide(leftExit, state.right, pullL)(leftDone, mergeState.BothRunning, f => mergeState.LeftDone(f))),
onOtherDone: (rightExit, lf) => Effect.zipRight(Fiber.interrupt(lf), handleSide(rightExit, state.left, pullR)(rightDone, (left, right) => mergeState.BothRunning(right, left), f => mergeState.RightDone(f)))
onSelfDone: (leftExit, rf) => Effect.zipRight(Fiber.interrupt(rf), handleSide(leftExit, state.right, pullL)(options.onSelfDone, mergeState.BothRunning, f => mergeState.LeftDone(f))),
onOtherDone: (rightExit, lf) => Effect.zipRight(Fiber.interrupt(lf), handleSide(rightExit, state.left, pullR)(options.onOtherDone, (left, right) => mergeState.BothRunning(right, left), f => mergeState.RightDone(f)))
}));

@@ -438,3 +507,3 @@ }

});
})(singleProducerAsyncInput.make())));
})));
/** @internal */

@@ -458,10 +527,18 @@ exports.mergeWith = mergeWith;

let channelException = undefined;
const reader = core.readWith(outElem => core.flatMap(core.write(outElem), () => reader), outErr => {
channelException = ChannelException(outErr);
return core.failCause(Cause.die(channelException));
}, core.succeedNow);
const writer = core.readWithCause(outElem => core.flatMap(() => writer)(core.write(outElem)), annotatedCause => {
const unannotated = Cause.unannotate(annotatedCause);
return Cause.isDieType(unannotated) && isChannelException(unannotated.defect) && Equal.equals(unannotated.defect, channelException) ? core.fail(unannotated.defect.error) : core.failCause(annotatedCause);
}, core.succeedNow);
const reader = core.readWith({
onInput: outElem => core.flatMap(core.write(outElem), () => reader),
onFailure: outErr => {
channelException = ChannelException(outErr);
return core.failCause(Cause.die(channelException));
},
onDone: core.succeedNow
});
const writer = core.readWithCause({
onInput: outElem => core.flatMap(() => writer)(core.write(outElem)),
onFailure: annotatedCause => {
const unannotated = Cause.unannotate(annotatedCause);
return Cause.isDieType(unannotated) && isChannelException(unannotated.defect) && Equal.equals(unannotated.defect, channelException) ? core.fail(unannotated.defect.error) : core.failCause(annotatedCause);
},
onDone: core.succeedNow
});
return core.pipeTo(core.pipeTo(core.pipeTo(self, reader), that), writer);

@@ -555,3 +632,7 @@ }));

const toQueueInternal = queue => {
return core.readWithCause(elem => core.flatMap(core.fromEffect(Queue.offer(queue, Either.right(elem))), () => toQueueInternal(queue)), cause => core.fromEffect(Queue.offer(queue, Either.left(Exit.failCause(cause)))), done => core.fromEffect(Queue.offer(queue, Either.left(Exit.succeed(done)))));
return core.readWithCause({
onInput: elem => core.flatMap(core.fromEffect(Queue.offer(queue, Either.right(elem))), () => toQueueInternal(queue)),
onFailure: cause => core.fromEffect(Queue.offer(queue, Either.left(Exit.failCause(cause)))),
onDone: done => core.fromEffect(Queue.offer(queue, Either.left(Exit.succeed(done))))
});
};

@@ -578,20 +659,19 @@ /** @internal */

/** @internal */
const zip = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => core.flatMap(self, a => map(that, b => [a, b])));
const zip = /*#__PURE__*/(0, _Function.dual)(args => core.isChannel(args[1]), (self, that, options) => options?.concurrent ? mergeWith(self, {
other: that,
onSelfDone: exit1 => mergeDecision.Await(exit2 => Effect.suspend(() => Exit.zip(exit1, exit2))),
onOtherDone: exit2 => mergeDecision.Await(exit1 => Effect.suspend(() => Exit.zip(exit1, exit2)))
}) : core.flatMap(self, a => map(that, b => [a, b])));
/** @internal */
exports.zip = zip;
const zipLeft = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => core.flatMap(self, z => as(that, z)));
const zipLeft = /*#__PURE__*/(0, _Function.dual)(args => core.isChannel(args[1]), (self, that, options) => options?.concurrent ? map(zip(self, that, {
concurrent: true
}), tuple => tuple[0]) : core.flatMap(self, z => as(that, z)));
/** @internal */
exports.zipLeft = zipLeft;
const zipRight = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => core.flatMap(self, () => that));
const zipRight = /*#__PURE__*/(0, _Function.dual)(args => core.isChannel(args[1]), (self, that, options) => options?.concurrent ? map(zip(self, that, {
concurrent: true
}), tuple => tuple[1]) : core.flatMap(self, () => that));
/** @internal */
exports.zipRight = zipRight;
const zipPar = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => mergeWith(self, that, exit1 => mergeDecision.Await(exit2 => Effect.suspend(() => Exit.zip(exit1, exit2))), exit2 => mergeDecision.Await(exit1 => Effect.suspend(() => Exit.zip(exit1, exit2)))));
/** @internal */
exports.zipPar = zipPar;
const zipParLeft = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => map(zipPar(self, that), tuple => tuple[0]));
/** @internal */
exports.zipParLeft = zipParLeft;
const zipParRight = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => map(zipPar(self, that), tuple => tuple[1]));
/** @internal */
exports.zipParRight = zipParRight;
const ChannelExceptionTypeId = /*#__PURE__*/Symbol.for("@effect/stream/Channel/errors/ChannelException");

@@ -598,0 +678,0 @@ /** @internal */

@@ -55,3 +55,7 @@ "use strict";

exports.isYield = isYield;
const match = /*#__PURE__*/(0, _Function.dual)(4, (self, onContinue, onClose, onYield) => {
const match = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onClose,
onContinue,
onYield
}) => {
switch (self._tag) {

@@ -58,0 +62,0 @@ case OpCodes.OP_CONTINUE:

@@ -49,3 +49,6 @@ "use strict";

exports.isMergeDecision = isMergeDecision;
const match = /*#__PURE__*/(0, _Function.dual)(3, (self, onDone, onAwait) => {
const match = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onAwait,
onDone
}) => {
const op = self;

@@ -52,0 +55,0 @@ switch (op._tag) {

@@ -66,3 +66,7 @@ "use strict";

exports.isRightDone = isRightDone;
const match = /*#__PURE__*/(0, _Function.dual)(4, (self, onBothRunning, onLeftDone, onRightDone) => {
const match = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onBothRunning,
onLeftDone,
onRightDone
}) => {
switch (self._tag) {

@@ -69,0 +73,0 @@ case OpCodes.OP_BOTH_RUNNING:

@@ -44,3 +44,6 @@ "use strict";

exports.isBufferSliding = isBufferSliding;
const match = /*#__PURE__*/(0, _Function.dual)(3, (self, onBackPressure, onBufferSliding) => {
const match = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onBackPressure,
onBufferSliding
}) => {
switch (self._tag) {

@@ -47,0 +50,0 @@ case OpCodes.OP_BACK_PRESSURE:

@@ -50,3 +50,6 @@ "use strict";

exports.isNoUpstream = isNoUpstream;
const match = /*#__PURE__*/(0, _Function.dual)(3, (self, onPulled, onNoUpstream) => {
const match = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onNoUpstream,
onPulled
}) => {
switch (self._tag) {

@@ -53,0 +56,0 @@ case OpCodes.OP_PULLED:

@@ -50,11 +50,14 @@ "use strict";

exports.isPullAfterAllEnqueued = isPullAfterAllEnqueued;
const match = /*#__PURE__*/(0, _Function.dual)(3, (self, onPullAfterNext, onPullAfterAllEnqueued) => {
const match = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onAllEnqueued,
onNext
}) => {
switch (self._tag) {
case OpCodes.OP_PULL_AFTER_NEXT:
{
return onPullAfterNext(self.emitSeparator);
return onNext(self.emitSeparator);
}
case OpCodes.OP_PULL_AFTER_ALL_ENQUEUED:
{
return onPullAfterAllEnqueued(self.emitSeparator);
return onAllEnqueued(self.emitSeparator);
}

@@ -61,0 +64,0 @@ }

@@ -6,3 +6,3 @@ "use strict";

});
exports.write = exports.unit = exports.sync = exports.suspend = exports.succeedNow = exports.succeed = exports.readWithCause = exports.readWith = exports.readOrFail = exports.provideContext = exports.pipeTo = exports.fromEffect = exports.foldCauseChannel = exports.flatMap = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.ensuringWith = exports.embedInput = exports.concatMapWithCustom = exports.concatMapWith = exports.concatAllWith = exports.concatAll = exports.collectElements = exports.catchAllCause = exports.acquireReleaseOut = exports.ChannelTypeId = void 0;
exports.write = exports.unit = exports.sync = exports.suspend = exports.succeedNow = exports.succeed = exports.readWithCause = exports.readWith = exports.readOrFail = exports.provideContext = exports.pipeTo = exports.isChannel = exports.fromEffect = exports.foldCauseChannel = exports.flatMap = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.ensuringWith = exports.embedInput = exports.concatMapWithCustom = exports.concatMapWith = exports.concatAllWith = exports.concatAll = exports.collectElements = exports.catchAllCause = exports.acquireReleaseOut = exports.ChannelTypeId = void 0;
var Chunk = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Chunk"));

@@ -14,2 +14,3 @@ var Either = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Either"));

var Cause = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Cause"));
var Effect = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Effect"));
var childExecutorDecision = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/stream/internal/channel/childExecutorDecision"));

@@ -44,2 +45,5 @@ var _continuation = /*#__PURE__*/require("@effect/stream/internal/channel/continuation");

/** @internal */
const isChannel = u => typeof u === "object" && u != null && ChannelTypeId in u || Effect.isEffect(u);
/** @internal */
exports.isChannel = isChannel;
const acquireReleaseOut = /*#__PURE__*/(0, _Function.dual)(2, (self, release) => {

@@ -71,7 +75,9 @@ const op = Object.create(proto);

exports.collectElements = collectElements;
const collectElementsReader = builder => {
return readWith(outElem => flatMap(sync(() => {
const collectElementsReader = builder => readWith({
onInput: outElem => flatMap(sync(() => {
builder.push(outElem);
}), () => collectElementsReader(builder)), fail, succeedNow);
};
}), () => collectElementsReader(builder)),
onFailure: fail,
onDone: succeedNow
});
/** @internal */

@@ -164,7 +170,7 @@ const concatAll = channels => concatAllWith(channels, _Function.constVoid, _Function.constVoid);

exports.flatMap = flatMap;
const foldCauseChannel = /*#__PURE__*/(0, _Function.dual)(3, (self, onError, onSuccess) => {
const foldCauseChannel = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => {
const op = Object.create(proto);
op._tag = OpCodes.OP_FOLD;
op.channel = self;
op.k = new _continuation.ContinuationKImpl(onSuccess, onError);
op.k = new _continuation.ContinuationKImpl(options.onSuccess, options.onFailure);
return op;

@@ -209,13 +215,17 @@ });

exports.readOrFail = readOrFail;
const readWith = (input, error, done) => readWithCause(input, cause => Either.match(Cause.failureOrCause(cause), {
onLeft: error,
onRight: failCause
}), done);
const readWith = options => readWithCause({
onInput: options.onInput,
onFailure: cause => Either.match(Cause.failureOrCause(cause), {
onLeft: options.onFailure,
onRight: failCause
}),
onDone: options.onDone
});
/** @internal */
exports.readWith = readWith;
const readWithCause = (input, halt, done) => {
const readWithCause = options => {
const op = Object.create(proto);
op._tag = OpCodes.OP_READ;
op.more = input;
op.done = new _continuation.ContinuationKImpl(done, halt);
op.more = options.onInput;
op.done = new _continuation.ContinuationKImpl(options.onDone, options.onFailure);
return op;

@@ -222,0 +232,0 @@ };

@@ -6,3 +6,3 @@ "use strict";

});
exports.mapEffectParByKeyBuffer = exports.mapEffectParByKey = exports.make = exports.groupByKeyBuffer = exports.groupByKey = exports.groupByIterable = exports.groupByBuffer = exports.groupBy = exports.first = exports.filter = exports.evaluateBuffer = exports.evaluate = exports.GroupByTypeId = void 0;
exports.mapEffectOptions = exports.make = exports.groupByKey = exports.groupByIterable = exports.groupBy = exports.first = exports.filter = exports.evaluateBuffer = exports.evaluate = exports.bindEffect = exports.GroupByTypeId = void 0;
var Chunk = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Chunk"));

@@ -42,3 +42,8 @@ var _Function = /*#__PURE__*/require("@effect/data/Function");

exports.evaluate = evaluate;
const evaluateBuffer = /*#__PURE__*/(0, _Function.dual)(3, (self, f, bufferSize) => stream.flatMapParBuffer(self.grouped, Number.POSITIVE_INFINITY, bufferSize, ([key, queue]) => f(key, stream.flattenTake(stream.fromQueueWithShutdown(queue)))));
const evaluateBuffer = /*#__PURE__*/(0, _Function.dual)(3, (self, f, bufferSize) => stream.flatMap(self.grouped, ([key, queue]) => f(key, stream.flattenTake(stream.fromQueue(queue, {
shutdown: true
}))), {
concurrency: "unbounded",
bufferSize
}));
/** @internal */

@@ -74,10 +79,7 @@ exports.evaluateBuffer = evaluateBuffer;

exports.make = make;
const groupBy = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => groupByBuffer(self, f, 16));
/** @internal */
exports.groupBy = groupBy;
const groupByBuffer = /*#__PURE__*/(0, _Function.dual)(3, (self, f, bufferSize) => make(stream.unwrapScoped(Effect.gen(function* ($) {
const groupBy = /*#__PURE__*/(0, _Function.dual)(args => typeof args[0] !== "function", (self, f, options) => make(stream.unwrapScoped(Effect.gen(function* ($) {
const decider = yield* $(Deferred.make());
const output = yield* $(Effect.acquireRelease(Queue.bounded(bufferSize), queue => Queue.shutdown(queue)));
const output = yield* $(Effect.acquireRelease(Queue.bounded(options?.bufferSize ?? 16), queue => Queue.shutdown(queue)));
const ref = yield* $(Ref.make(new Map()));
const add = yield* $(stream.distributedWithDynamicCallback(bufferSize, ([key, value]) => Effect.flatMap(Deferred.await(decider), f => f(key, value)), exit => Queue.offer(output, exit))(stream.mapEffect(self, f)));
const add = yield* $(stream.distributedWithDynamicCallback(options?.bufferSize ?? 16, ([key, value]) => Effect.flatMap(Deferred.await(decider), f => f(key, value)), exit => Queue.offer(output, exit))(stream.mapEffectSequential(self, f)));
yield* $(Deferred.succeed(decider, (key, _) => Effect.flatMap(Option.match({

@@ -87,5 +89,25 @@ onNone: () => Effect.flatMap(add, ([index, queue]) => Effect.zipRight(Ref.update(ref, map => map.set(key, index)), Effect.as(n => n === index)(Queue.offer(output, Exit.succeed([key, mapDequeue(queue, exit => new take.TakeImpl(Exit.map(tuple => Chunk.of(tuple[1]))(exit)))]))))),

}))(Effect.map(map => Option.fromNullable(map.get(key)))(Ref.get(ref)))));
return stream.flattenExitOption(stream.fromQueueWithShutdown(output));
return stream.flattenExitOption(stream.fromQueue(output, {
shutdown: true
}));
}))));
exports.groupByBuffer = groupByBuffer;
/** @internal */
exports.groupBy = groupBy;
const mapEffectOptions = /*#__PURE__*/(0, _Function.dual)(args => typeof args[0] !== "function", (self, f, options) => {
if (options?.key) {
return evaluate(groupByKey(self, options.key, {
bufferSize: options.bufferSize
}), (_, s) => stream.mapEffectSequential(f)(s));
}
return stream.matchConcurrency(options?.concurrency, () => stream.mapEffectSequential(self, f), n => options?.unordered ? stream.flatMap(self, a => stream.fromEffect(f(a)), {
concurrency: n
}) : stream.mapEffectPar(self, n, f));
});
/** @internal */
exports.mapEffectOptions = mapEffectOptions;
const bindEffect = /*#__PURE__*/(0, _Function.dual)(args => typeof args[0] !== "string", (self, tag, f, options) => mapEffectOptions(self, k => Effect.map(f(k), a => ({
...k,
[tag]: a
})), options));
exports.bindEffect = bindEffect;
const mapDequeue = (dequeue, f) => new MapDequeue(dequeue, f);

@@ -148,27 +170,24 @@ class MapDequeue {

/** @internal */
const groupByKey = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => groupByKeyBuffer(self, f, 16));
/** @internal */
exports.groupByKey = groupByKey;
const groupByKeyBuffer = /*#__PURE__*/(0, _Function.dual)(3, (self, f, bufferSize) => {
const loop = (map, outerQueue) => core.readWithCause(input => core.flatMap(() => loop(map, outerQueue))(core.fromEffect(Effect.forEach(([key, values]) => {
const innerQueue = map.get(key);
if (innerQueue === undefined) {
return Effect.flatMap(innerQueue => Effect.zipRight(Effect.catchSomeCause(cause => Cause.isInterruptedOnly(cause) ? Option.some(Effect.unit) : Option.none())(Queue.offer(innerQueue, take.chunk(values))))(Effect.zipRight(Queue.offer(outerQueue, take.of([key, innerQueue])))(Effect.sync(() => {
map.set(key, innerQueue);
}))))(Queue.bounded(bufferSize));
}
return Effect.catchSomeCause(cause => Cause.isInterruptedOnly(cause) ? Option.some(Effect.unit) : Option.none())(Queue.offer(innerQueue, take.chunk(values)));
}, {
discard: true
})(groupByIterable(f)(input)))), cause => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))), () => core.fromEffect(Effect.zipRight(Queue.offer(outerQueue, take.end))(Effect.forEach(([_, innerQueue]) => Effect.catchSomeCause(cause => Cause.isInterruptedOnly(cause) ? Option.some(Effect.unit) : Option.none())(Queue.offer(innerQueue, take.end)), {
discard: true
})(map.entries()))));
return make(stream.unwrapScoped(Effect.flatMap(map => Effect.flatMap(queue => Effect.as(stream.flattenTake(stream.fromQueueWithShutdown(queue)))(Effect.forkScoped(channelExecutor.runScoped(channel.drain(core.pipeTo(loop(map, queue))(stream.toChannel(self)))))))(Effect.acquireRelease(Queue.unbounded(), queue => Queue.shutdown(queue))))(Effect.sync(() => new Map()))));
const groupByKey = /*#__PURE__*/(0, _Function.dual)(args => typeof args[0] !== "function", (self, f, options) => {
const loop = (map, outerQueue) => core.readWithCause({
onInput: input => core.flatMap(core.fromEffect(Effect.forEach(groupByIterable(input, f), ([key, values]) => {
const innerQueue = map.get(key);
if (innerQueue === undefined) {
return Effect.flatMap(innerQueue => Effect.zipRight(Effect.catchSomeCause(cause => Cause.isInterruptedOnly(cause) ? Option.some(Effect.unit) : Option.none())(Queue.offer(innerQueue, take.chunk(values))))(Effect.zipRight(Queue.offer(outerQueue, take.of([key, innerQueue])))(Effect.sync(() => {
map.set(key, innerQueue);
}))))(Queue.bounded(options?.bufferSize ?? 16));
}
return Effect.catchSomeCause(Queue.offer(innerQueue, take.chunk(values)), cause => Cause.isInterruptedOnly(cause) ? Option.some(Effect.unit) : Option.none());
}, {
discard: true
})), () => loop(map, outerQueue)),
onFailure: cause => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))),
onDone: () => core.fromEffect(Effect.zipRight(Queue.offer(outerQueue, take.end))(Effect.forEach(map.entries(), ([_, innerQueue]) => Effect.catchSomeCause(cause => Cause.isInterruptedOnly(cause) ? Option.some(Effect.unit) : Option.none())(Queue.offer(innerQueue, take.end)), {
discard: true
})))
});
return make(stream.unwrapScoped(Effect.flatMap(map => Effect.flatMap(queue => Effect.as(stream.flattenTake(stream.fromQueue(queue, {
shutdown: true
})))(Effect.forkScoped(channelExecutor.runScoped(channel.drain(core.pipeTo(loop(map, queue))(stream.toChannel(self)))))))(Effect.acquireRelease(Queue.unbounded(), queue => Queue.shutdown(queue))))(Effect.sync(() => new Map()))));
});
/** @internal */
exports.groupByKeyBuffer = groupByKeyBuffer;
const mapEffectParByKey = /*#__PURE__*/(0, _Function.dual)(3, (self, keyBy, f) => mapEffectParByKeyBuffer(self, keyBy, 16, f));
/** @internal */
exports.mapEffectParByKey = mapEffectParByKey;
const mapEffectParByKeyBuffer = /*#__PURE__*/(0, _Function.dual)(4, (self, keyBy, bufferSize, f) => evaluate((_, s) => stream.mapEffect(f)(s))(groupByKeyBuffer(self, keyBy, bufferSize)));
/**

@@ -179,3 +198,3 @@ * A variant of `groupBy` that retains the insertion order of keys.

*/
exports.mapEffectParByKeyBuffer = mapEffectParByKeyBuffer;
exports.groupByKey = groupByKey;
const groupByIterable = /*#__PURE__*/(0, _Function.dual)(2, (iterable, f) => {

@@ -182,0 +201,0 @@ const builder = [];

@@ -6,4 +6,4 @@ "use strict";

});
exports.some = exports.serviceWithSink = exports.serviceWithEffect = exports.serviceWith = exports.service = exports.refineOrDieWith = exports.refineOrDie = exports.raceWithCapacity = exports.raceWith = exports.raceBothCapacity = exports.raceBoth = exports.race = exports.provideContext = exports.orElse = exports.never = exports.mkString = exports.mapLeftover = exports.mapError = exports.mapEffect = exports.map = exports.leftover = exports.last = exports.ignoreLeftover = exports.head = exports.fromQueueWithShutdown = exports.fromQueue = exports.fromPush = exports.fromHubWithShutdown = exports.fromHub = exports.fromEffect = exports.fromChannel = exports.forEachWhile = exports.forEachChunkWhile = exports.forEachChunk = exports.forEach = exports.foldWeightedEffect = exports.foldWeightedDecomposeEffect = exports.foldWeightedDecompose = exports.foldWeighted = exports.foldUntilEffect = exports.foldUntil = exports.foldSink = exports.foldLeftEffect = exports.foldLeftChunksEffect = exports.foldLeftChunks = exports.foldLeft = exports.foldEffect = exports.foldChunksEffect = exports.foldChunks = exports.fold = exports.flatMap = exports.findEffect = exports.filterInputEffect = exports.filterInput = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.every = exports.ensuringWith = exports.ensuring = exports.dropWhileEffect = exports.dropWhile = exports.dropUntilEffect = exports.dropUntil = exports.drop = exports.drain = exports.dimapEffect = exports.dimapChunksEffect = exports.dimapChunks = exports.dimap = exports.dieSync = exports.dieMessage = exports.die = exports.count = exports.contramapEffect = exports.contramapChunksEffect = exports.contramapChunks = exports.contramap = exports.contextWithSink = exports.contextWithEffect = exports.contextWith = exports.context = exports.collectLeftover = exports.collectAllWhileWith = exports.collectAllWhileEffect = exports.collectAllWhile = exports.collectAllUntilEffect = exports.collectAllUntil = exports.collectAllToSetN = exports.collectAllToSet = exports.collectAllToMapN = exports.collectAllToMap = exports.collectAllN = exports.collectAllFrom = exports.collectAll = exports.channelToSink = exports.as = exports.SinkTypeId = exports.SinkImpl = void 0;
exports.zipWithPar = exports.zipWith = exports.zipRight = exports.zipParRight = exports.zipParLeft = exports.zipPar = exports.zipLeft = exports.zip = exports.withDuration = exports.unwrapScoped = exports.unwrap = exports.toChannel = exports.timed = exports.take = exports.sync = exports.suspend = exports.summarized = exports.sum = exports.succeed = exports.splitWhere = void 0;
exports.sum = exports.succeed = exports.splitWhere = exports.some = exports.serviceWithSink = exports.serviceWithEffect = exports.serviceWith = exports.service = exports.refineOrDieWith = exports.refineOrDie = exports.raceWith = exports.raceBoth = exports.race = exports.provideContext = exports.orElse = exports.never = exports.mkString = exports.mapLeftover = exports.mapError = exports.mapEffect = exports.map = exports.leftover = exports.last = exports.isSink = exports.ignoreLeftover = exports.head = exports.fromQueue = exports.fromPush = exports.fromHub = exports.fromEffect = exports.fromChannel = exports.forEachWhile = exports.forEachChunkWhile = exports.forEachChunk = exports.forEach = exports.foldWeightedEffect = exports.foldWeightedDecomposeEffect = exports.foldWeightedDecompose = exports.foldWeighted = exports.foldUntilEffect = exports.foldUntil = exports.foldSink = exports.foldLeftEffect = exports.foldLeftChunksEffect = exports.foldLeftChunks = exports.foldLeft = exports.foldEffect = exports.foldChunksEffect = exports.foldChunks = exports.fold = exports.flatMap = exports.findEffect = exports.filterInputEffect = exports.filterInput = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.every = exports.ensuringWith = exports.ensuring = exports.dropWhileEffect = exports.dropWhile = exports.dropUntilEffect = exports.dropUntil = exports.drop = exports.drain = exports.dimapEffect = exports.dimapChunksEffect = exports.dimapChunks = exports.dimap = exports.dieSync = exports.dieMessage = exports.die = exports.count = exports.contramapEffect = exports.contramapChunksEffect = exports.contramapChunks = exports.contramap = exports.contextWithSink = exports.contextWithEffect = exports.contextWith = exports.context = exports.collectLeftover = exports.collectAllWhileWith = exports.collectAllWhileEffect = exports.collectAllWhile = exports.collectAllUntilEffect = exports.collectAllUntil = exports.collectAllToSetN = exports.collectAllToSet = exports.collectAllToMapN = exports.collectAllToMap = exports.collectAllN = exports.collectAllFrom = exports.collectAll = exports.channelToSink = exports.as = exports.SinkTypeId = exports.SinkImpl = void 0;
exports.zipWith = exports.zipRight = exports.zipLeft = exports.zip = exports.withDuration = exports.unwrapScoped = exports.unwrap = exports.toChannel = exports.timed = exports.take = exports.sync = exports.suspend = exports.summarized = void 0;
var Chunk = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Chunk"));

@@ -57,2 +57,5 @@ var Duration = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Duration"));

/** @internal */
const isSink = u => typeof u === "object" && u != null && SinkTypeId in u;
/** @internal */
exports.isSink = isSink;
const as = /*#__PURE__*/(0, _Function.dual)(2, (self, z) => map(() => z)(self));

@@ -64,3 +67,7 @@ /** @internal */

exports.collectAll = collectAll;
const collectAllLoop = acc => core.readWithCause(chunk => collectAllLoop(Chunk.appendAll(chunk)(acc)), core.failCause, () => core.succeed(acc));
const collectAllLoop = acc => core.readWithCause({
onInput: chunk => collectAllLoop(Chunk.appendAll(chunk)(acc)),
onFailure: core.failCause,
onDone: () => core.succeed(acc)
});
/** @internal */

@@ -70,14 +77,22 @@ const collectAllN = n => suspend(() => fromChannel(collectAllNLoop(n, Chunk.empty())));

exports.collectAllN = collectAllN;
const collectAllNLoop = (n, acc) => core.readWithCause(chunk => {
const [collected, leftovers] = Chunk.splitAt(chunk, n);
if (collected.length < n) {
return collectAllNLoop(n - collected.length, Chunk.appendAll(acc, collected));
}
if (Chunk.isEmpty(leftovers)) {
return core.succeed(Chunk.appendAll(acc, collected));
}
return core.flatMap(core.write(leftovers), () => core.succeed(Chunk.appendAll(acc, collected)));
}, core.failCause, () => core.succeed(acc));
const collectAllNLoop = (n, acc) => core.readWithCause({
onInput: chunk => {
const [collected, leftovers] = Chunk.splitAt(chunk, n);
if (collected.length < n) {
return collectAllNLoop(n - collected.length, Chunk.appendAll(acc, collected));
}
if (Chunk.isEmpty(leftovers)) {
return core.succeed(Chunk.appendAll(acc, collected));
}
return core.flatMap(core.write(leftovers), () => core.succeed(Chunk.appendAll(acc, collected)));
},
onFailure: core.failCause,
onDone: () => core.succeed(acc)
});
/** @internal */
const collectAllFrom = self => collectAllWhileWith(Chunk.empty(), _Function.constTrue, (chunk, z) => Chunk.append(z)(chunk))(self);
const collectAllFrom = self => collectAllWhileWith(self, {
initial: Chunk.empty(),
while: _Function.constTrue,
body: (chunk, z) => Chunk.append(z)(chunk)
});
/** @internal */

@@ -95,6 +110,11 @@ exports.collectAllFrom = collectAllFrom;

const collectAllToMapN = (n, key, merge) => {
return foldWeighted(HashMap.empty(), n, (acc, input) => HashMap.has(key(input))(acc) ? 0 : 1, (acc, input) => {
const k = key(input);
const v = HashMap.has(k)(acc) ? merge(HashMap.unsafeGet(k)(acc), input) : input;
return HashMap.set(k, v)(acc);
return foldWeighted({
initial: HashMap.empty(),
maxCost: n,
cost: (acc, input) => HashMap.has(key(input))(acc) ? 0 : 1,
body: (acc, input) => {
const k = key(input);
const v = HashMap.has(k)(acc) ? merge(HashMap.unsafeGet(k)(acc), input) : input;
return HashMap.set(k, v)(acc);
}
});

@@ -107,3 +127,8 @@ };

exports.collectAllToSet = collectAllToSet;
const collectAllToSetN = n => foldWeighted(HashSet.empty(), n, (acc, input) => HashSet.has(input)(acc) ? 0 : 1, (acc, input) => HashSet.add(input)(acc));
const collectAllToSetN = n => foldWeighted({
initial: HashSet.empty(),
maxCost: n,
cost: (acc, input) => HashSet.has(acc, input) ? 0 : 1,
body: (acc, input) => HashSet.add(acc, input)
});
/** @internal */

@@ -124,9 +149,13 @@ exports.collectAllToSetN = collectAllToSetN;

exports.collectAllWhile = collectAllWhile;
const collectAllWhileReader = (predicate, done) => core.readWith(input => {
const [collected, leftovers] = ReadonlyArray.span(predicate)(Chunk.toReadonlyArray(input));
if (leftovers.length === 0) {
return collectAllWhileReader(predicate, Chunk.appendAll(Chunk.unsafeFromArray(collected))(done));
}
return channel.zipRight(core.succeed(Chunk.appendAll(Chunk.unsafeFromArray(collected))(done)))(core.write(Chunk.unsafeFromArray(leftovers)));
}, core.fail, () => core.succeed(done));
const collectAllWhileReader = (predicate, done) => core.readWith({
onInput: input => {
const [collected, leftovers] = ReadonlyArray.span(predicate)(Chunk.toReadonlyArray(input));
if (leftovers.length === 0) {
return collectAllWhileReader(predicate, Chunk.appendAll(Chunk.unsafeFromArray(collected))(done));
}
return channel.zipRight(core.succeed(Chunk.appendAll(Chunk.unsafeFromArray(collected))(done)))(core.write(Chunk.unsafeFromArray(leftovers)));
},
onFailure: core.fail,
onDone: () => core.succeed(done)
});
/** @internal */

@@ -136,15 +165,23 @@ const collectAllWhileEffect = predicate => fromChannel(collectAllWhileEffectReader(predicate, Chunk.empty()));

exports.collectAllWhileEffect = collectAllWhileEffect;
const collectAllWhileEffectReader = (predicate, done) => core.readWith(input => core.flatMap(collected => {
const leftovers = Chunk.drop(collected.length)(input);
if (Chunk.isEmpty(leftovers)) {
return collectAllWhileEffectReader(predicate, Chunk.appendAll(collected)(done));
}
return channel.zipRight(core.succeed(Chunk.appendAll(collected)(done)))(core.write(leftovers));
})(core.fromEffect(Effect.map(Chunk.unsafeFromArray)(Effect.takeWhile(predicate)(input)))), core.fail, () => core.succeed(done));
const collectAllWhileEffectReader = (predicate, done) => core.readWith({
onInput: input => core.flatMap(collected => {
const leftovers = Chunk.drop(collected.length)(input);
if (Chunk.isEmpty(leftovers)) {
return collectAllWhileEffectReader(predicate, Chunk.appendAll(collected)(done));
}
return channel.zipRight(core.succeed(Chunk.appendAll(collected)(done)))(core.write(leftovers));
})(core.fromEffect(Effect.map(Chunk.unsafeFromArray)(Effect.takeWhile(predicate)(input)))),
onFailure: core.fail,
onDone: () => core.succeed(done)
});
/** @internal */
const collectAllWhileWith = /*#__PURE__*/(0, _Function.dual)(4, (self, z, p, f) => {
const collectAllWhileWith = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => {
const refs = Effect.zip(Ref.make(false))(Ref.make(Chunk.empty()));
const newChannel = core.flatMap(([leftoversRef, upstreamDoneRef]) => {
const upstreamMarker = core.readWith(input => core.flatMap(() => upstreamMarker)(core.write(input)), core.fail, done => channel.as(done)(core.fromEffect(Ref.set(upstreamDoneRef, true))));
return core.pipeTo(collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, z, p, f))(core.pipeTo(channel.bufferChunk(leftoversRef))(upstreamMarker));
const upstreamMarker = core.readWith({
onInput: input => core.flatMap(() => upstreamMarker)(core.write(input)),
onFailure: core.fail,
onDone: done => channel.as(done)(core.fromEffect(Ref.set(upstreamDoneRef, true)))
});
return core.pipeTo(collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, options.initial, options.while, options.body))(core.pipeTo(channel.bufferChunk(leftoversRef))(upstreamMarker));
})(core.fromEffect(refs));

@@ -156,6 +193,9 @@ return new SinkImpl(newChannel);

const collectAllWhileWithLoop = (self, leftoversRef, upstreamDoneRef, currentResult, p, f) => {
return channel.foldChannel(core.fail, ([leftovers, doneValue]) => p(doneValue) ? core.flatMap(() => core.flatMap(upstreamDone => {
const accumulatedResult = f(currentResult, doneValue);
return upstreamDone ? channel.as(accumulatedResult)(core.write(Chunk.flatten(leftovers))) : collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, accumulatedResult, p, f);
})(core.fromEffect(Ref.get(upstreamDoneRef))))(core.fromEffect(Ref.set(leftoversRef, Chunk.flatten(leftovers)))) : channel.as(currentResult)(core.write(Chunk.flatten(leftovers))))(channel.doneCollect(toChannel(self)));
return channel.foldChannel({
onFailure: core.fail,
onSuccess: ([leftovers, doneValue]) => p(doneValue) ? core.flatMap(() => core.flatMap(upstreamDone => {
const accumulatedResult = f(currentResult, doneValue);
return upstreamDone ? channel.as(accumulatedResult)(core.write(Chunk.flatten(leftovers))) : collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, accumulatedResult, p, f);
})(core.fromEffect(Ref.get(upstreamDoneRef))))(core.fromEffect(Ref.set(leftoversRef, Chunk.flatten(leftovers)))) : channel.as(currentResult)(core.write(Chunk.flatten(leftovers)))
})(channel.doneCollect(toChannel(self)));
};

@@ -173,3 +213,7 @@ /** @internal */

const contramapChunks = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const loop = core.readWith(chunk => core.flatMap(() => loop)(core.write(f(chunk))), core.fail, core.succeed);
const loop = core.readWith({
onInput: chunk => core.flatMap(() => loop)(core.write(f(chunk))),
onFailure: core.fail,
onDone: core.succeed
});
return new SinkImpl(core.pipeTo(toChannel(self))(loop));

@@ -180,3 +224,7 @@ });

const contramapChunksEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => {
const loop = core.readWith(chunk => core.flatMap(() => loop)(core.flatMap(core.write)(core.fromEffect(f(chunk)))), core.fail, core.succeed);
const loop = core.readWith({
onInput: chunk => core.flatMap(() => loop)(core.flatMap(core.write)(core.fromEffect(f(chunk)))),
onFailure: core.fail,
onDone: core.succeed
});
return new SinkImpl(channel.pipeToOrFail(toChannel(self))(loop));

@@ -198,12 +246,12 @@ });

exports.dieSync = dieSync;
const dimap = /*#__PURE__*/(0, _Function.dual)(3, (self, f, g) => map(g)(contramap(f)(self)));
const dimap = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => map(contramap(self, options.onInput), options.onDone));
/** @internal */
exports.dimap = dimap;
const dimapEffect = /*#__PURE__*/(0, _Function.dual)(3, (self, f, g) => mapEffect(g)(contramapEffect(f)(self)));
const dimapEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => mapEffect(contramapEffect(self, options.onInput), options.onDone));
/** @internal */
exports.dimapEffect = dimapEffect;
const dimapChunks = /*#__PURE__*/(0, _Function.dual)(3, (self, f, g) => map(g)(contramapChunks(f)(self)));
const dimapChunks = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => map(contramapChunks(self, options.onInput), options.onDone));
/** @internal */
exports.dimapChunks = dimapChunks;
const dimapChunksEffect = /*#__PURE__*/(0, _Function.dual)(3, (self, f, g) => mapEffect(g)(contramapChunksEffect(f)(self)));
const dimapChunksEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => mapEffect(contramapChunksEffect(self, options.onInput), options.onDone));
/** @internal */

@@ -217,11 +265,15 @@ exports.dimapChunksEffect = dimapChunksEffect;

exports.drop = drop;
const dropLoop = n => core.readWith(input => {
const dropped = Chunk.drop(n)(input);
const leftover = Math.max(n - input.length, 0);
const more = Chunk.isEmpty(input) || leftover > 0;
if (more) {
return dropLoop(leftover);
}
return channel.zipRight(channel.identityChannel())(core.write(dropped));
}, core.fail, core.unit);
const dropLoop = n => core.readWith({
onInput: input => {
const dropped = Chunk.drop(n)(input);
const leftover = Math.max(n - input.length, 0);
const more = Chunk.isEmpty(input) || leftover > 0;
if (more) {
return dropLoop(leftover);
}
return channel.zipRight(channel.identityChannel())(core.write(dropped));
},
onFailure: core.fail,
onDone: core.unit
});
/** @internal */

@@ -234,6 +286,10 @@ const dropUntil = predicate => new SinkImpl(channel.pipeToOrFail(toChannel(drop(1)))(toChannel(dropWhile(input => !predicate(input)))));

exports.dropUntilEffect = dropUntilEffect;
const dropUntilEffectReader = predicate => core.readWith(input => channel.unwrap(Effect.map(leftover => {
const more = leftover.length === 0;
return more ? dropUntilEffectReader(predicate) : channel.zipRight(channel.identityChannel())(core.write(Chunk.unsafeFromArray(leftover)));
})(Effect.dropUntil(predicate)(input))), core.fail, core.unit);
const dropUntilEffectReader = predicate => core.readWith({
onInput: input => channel.unwrap(Effect.map(leftover => {
const more = leftover.length === 0;
return more ? dropUntilEffectReader(predicate) : channel.zipRight(channel.identityChannel())(core.write(Chunk.unsafeFromArray(leftover)));
})(Effect.dropUntil(predicate)(input))),
onFailure: core.fail,
onDone: core.unit
});
/** @internal */

@@ -243,9 +299,13 @@ const dropWhile = predicate => new SinkImpl(dropWhileReader(predicate));

exports.dropWhile = dropWhile;
const dropWhileReader = predicate => core.readWith(input => {
const out = Chunk.dropWhile(predicate)(input);
if (Chunk.isEmpty(out)) {
return dropWhileReader(predicate);
}
return channel.zipRight(channel.identityChannel())(core.write(out));
}, core.fail, core.succeedNow);
const dropWhileReader = predicate => core.readWith({
onInput: input => {
const out = Chunk.dropWhile(predicate)(input);
if (Chunk.isEmpty(out)) {
return dropWhileReader(predicate);
}
return channel.zipRight(channel.identityChannel())(core.write(out));
},
onFailure: core.fail,
onDone: core.succeedNow
});
/** @internal */

@@ -255,6 +315,10 @@ const dropWhileEffect = predicate => suspend(() => new SinkImpl(dropWhileEffectReader(predicate)));

exports.dropWhileEffect = dropWhileEffect;
const dropWhileEffectReader = predicate => core.readWith(input => channel.unwrap(Effect.map(leftover => {
const more = leftover.length === 0;
return more ? dropWhileEffectReader(predicate) : channel.zipRight(channel.identityChannel())(core.write(Chunk.unsafeFromArray(leftover)));
})(Effect.dropWhile(predicate)(input))), core.fail, core.unit);
const dropWhileEffectReader = predicate => core.readWith({
onInput: input => channel.unwrap(Effect.map(leftover => {
const more = leftover.length === 0;
return more ? dropWhileEffectReader(predicate) : channel.zipRight(channel.identityChannel())(core.write(Chunk.unsafeFromArray(leftover)));
})(Effect.dropWhile(predicate)(input))),
onFailure: core.fail,
onDone: core.unit
});
/** @internal */

@@ -304,12 +368,19 @@ const ensuring = /*#__PURE__*/(0, _Function.dual)(2, (self, finalizer) => new SinkImpl(channel.ensuring(finalizer)(toChannel(self))));

const newChannel = core.flatMap(([leftoversRef, upstreamDoneRef]) => {
const upstreamMarker = core.readWith(input => core.flatMap(() => upstreamMarker)(core.write(input)), core.fail, done => channel.as(done)(core.fromEffect(Ref.set(upstreamDoneRef, true))));
const loop = channel.foldChannel(core.fail, ([leftovers, doneValue]) => core.flatMap(satisfied => channel.zipRight(core.flatMap(upstreamDone => {
if (satisfied) {
return channel.as(Option.some(doneValue))(core.write(Chunk.flatten(leftovers)));
}
if (upstreamDone) {
return channel.as(Option.none())(core.write(Chunk.flatten(leftovers)));
}
return loop;
})(core.fromEffect(Ref.get(upstreamDoneRef))))(core.fromEffect(Ref.set(leftoversRef, Chunk.flatten(leftovers)))))(core.fromEffect(f(doneValue))))(core.collectElements(toChannel(self)));
const upstreamMarker = core.readWith({
onInput: input => core.flatMap(() => upstreamMarker)(core.write(input)),
onFailure: core.fail,
onDone: done => channel.as(done)(core.fromEffect(Ref.set(upstreamDoneRef, true)))
});
const loop = channel.foldChannel(core.collectElements(toChannel(self)), {
onFailure: core.fail,
onSuccess: ([leftovers, doneValue]) => core.flatMap(satisfied => channel.zipRight(core.flatMap(upstreamDone => {
if (satisfied) {
return channel.as(Option.some(doneValue))(core.write(Chunk.flatten(leftovers)));
}
if (upstreamDone) {
return channel.as(Option.none())(core.write(Chunk.flatten(leftovers)));
}
return loop;
})(core.fromEffect(Ref.get(upstreamDoneRef))))(core.fromEffect(Ref.set(leftoversRef, Chunk.flatten(leftovers)))))(core.fromEffect(f(doneValue)))
});
return core.pipeTo(loop)(core.pipeTo(channel.bufferChunk(leftoversRef))(upstreamMarker));

@@ -328,9 +399,13 @@ })(core.fromEffect(Effect.zip(Ref.make(false))(Ref.make(Chunk.empty()))));

}
return core.readWith(input => {
const [nextS, leftovers] = foldChunkSplit(s, input, contFn, f, 0, input.length);
if (Chunk.isNonEmpty(leftovers)) {
return channel.as(nextS)(core.write(leftovers));
}
return foldReader(nextS, contFn, f);
}, core.fail, () => core.succeedNow(s));
return core.readWith({
onInput: input => {
const [nextS, leftovers] = foldChunkSplit(s, input, contFn, f, 0, input.length);
if (Chunk.isNonEmpty(leftovers)) {
return channel.as(nextS)(core.write(leftovers));
}
return foldReader(nextS, contFn, f);
},
onFailure: core.fail,
onDone: () => core.succeedNow(s)
});
};

@@ -349,19 +424,22 @@ /** @internal */

/** @internal */
const foldSink = /*#__PURE__*/(0, _Function.dual)(3, (self, onFailure, onSuccess) => {
const newChannel = channel.foldChannel(error => toChannel(onFailure(error)), ([leftovers, z]) => core.suspend(() => {
const leftoversRef = {
ref: Chunk.filter(Chunk.isNonEmpty)(leftovers)
};
const refReader =
// This cast is safe because of the L1 >: L <: In1 bound. It follows that
// L <: In1 and therefore Chunk[L] can be safely cast to Chunk[In1].
core.flatMap(chunk => channel.writeChunk(chunk))(core.sync(() => {
const ref = leftoversRef.ref;
leftoversRef.ref = Chunk.empty();
return ref;
}));
const passthrough = channel.identityChannel();
const continuationSink = core.pipeTo(toChannel(onSuccess(z)))(channel.zipRight(passthrough)(refReader));
return core.flatMap(([newLeftovers, z1]) => channel.as(z1)(channel.zipRight(channel.writeChunk(newLeftovers))(core.flatMap(channel.writeChunk)(core.succeed(leftoversRef.ref)))))(core.collectElements(continuationSink));
}))(core.collectElements(toChannel(self)));
const foldSink = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => {
const newChannel = channel.foldChannel({
onFailure: error => toChannel(options.onFailure(error)),
onSuccess: ([leftovers, z]) => core.suspend(() => {
const leftoversRef = {
ref: Chunk.filter(Chunk.isNonEmpty)(leftovers)
};
const refReader =
// This cast is safe because of the L1 >: L <: In1 bound. It follows that
// L <: In1 and therefore Chunk[L] can be safely cast to Chunk[In1].
core.flatMap(chunk => channel.writeChunk(chunk))(core.sync(() => {
const ref = leftoversRef.ref;
leftoversRef.ref = Chunk.empty();
return ref;
}));
const passthrough = channel.identityChannel();
const continuationSink = core.pipeTo(toChannel(options.onSuccess(z)))(channel.zipRight(passthrough)(refReader));
return core.flatMap(core.collectElements(continuationSink), ([newLeftovers, z1]) => channel.as(z1)(channel.zipRight(channel.writeChunk(newLeftovers))(core.flatMap(channel.writeChunk)(core.succeed(leftoversRef.ref)))));
})
})(core.collectElements(toChannel(self)));
return new SinkImpl(newChannel);

@@ -378,3 +456,7 @@ });

}
return core.readWith(input => foldChunksReader(f(s, input), contFn, f), core.fail, () => core.succeedNow(s));
return core.readWith({
onInput: input => foldChunksReader(f(s, input), contFn, f),
onFailure: core.fail,
onDone: () => core.succeedNow(s)
});
};

@@ -389,3 +471,7 @@ /** @internal */

}
return core.readWith(input => core.flatMap(s => foldChunksEffectReader(s, contFn, f))(core.fromEffect(f(s, input))), core.fail, () => core.succeedNow(s));
return core.readWith({
onInput: input => core.flatMap(s => foldChunksEffectReader(s, contFn, f))(core.fromEffect(f(s, input))),
onFailure: core.fail,
onDone: () => core.succeedNow(s)
});
};

@@ -400,6 +486,10 @@ /** @internal */

}
return core.readWith(input => core.flatMap(([nextS, leftovers]) => Option.match({
onNone: () => foldEffectReader(nextS, contFn, f),
onSome: leftover => channel.as(nextS)(core.write(leftover))
})(leftovers))(core.fromEffect(foldChunkSplitEffect(s, input, contFn, f))), core.fail, () => core.succeedNow(s));
return core.readWith({
onInput: input => core.flatMap(([nextS, leftovers]) => Option.match({
onNone: () => foldEffectReader(nextS, contFn, f),
onSome: leftover => channel.as(nextS)(core.write(leftover))
})(leftovers))(core.fromEffect(foldChunkSplitEffect(s, input, contFn, f))),
onFailure: core.fail,
onDone: () => core.succeedNow(s)
});
};

@@ -434,18 +524,25 @@ /** @internal */

exports.foldUntilEffect = foldUntilEffect;
const foldWeighted = (s, max, costFn, f) => foldWeightedDecompose(s, max, costFn, Chunk.of, f);
const foldWeighted = options => foldWeightedDecompose({
...options,
decompose: Chunk.of
});
/** @internal */
exports.foldWeighted = foldWeighted;
const foldWeightedDecompose = (s, max, costFn, decompose, f) => suspend(() => new SinkImpl(foldWeightedDecomposeLoop(s, 0, false, max, costFn, decompose, f)));
const foldWeightedDecompose = options => suspend(() => new SinkImpl(foldWeightedDecomposeLoop(options.initial, 0, false, options.maxCost, options.cost, options.decompose, options.body)));
/** @internal */
exports.foldWeightedDecompose = foldWeightedDecompose;
const foldWeightedDecomposeLoop = (s, cost, dirty, max, costFn, decompose, f) => core.readWith(input => {
const [nextS, nextCost, nextDirty, leftovers] = foldWeightedDecomposeFold(input, 0, s, cost, dirty, max, costFn, decompose, f);
if (Chunk.isNonEmpty(leftovers)) {
return channel.zipRight(core.succeedNow(nextS))(core.write(leftovers));
}
if (cost > max) {
return core.succeedNow(nextS);
}
return foldWeightedDecomposeLoop(nextS, nextCost, nextDirty, max, costFn, decompose, f);
}, core.fail, () => core.succeedNow(s));
const foldWeightedDecomposeLoop = (s, cost, dirty, max, costFn, decompose, f) => core.readWith({
onInput: input => {
const [nextS, nextCost, nextDirty, leftovers] = foldWeightedDecomposeFold(input, 0, s, cost, dirty, max, costFn, decompose, f);
if (Chunk.isNonEmpty(leftovers)) {
return channel.zipRight(core.succeedNow(nextS))(core.write(leftovers));
}
if (cost > max) {
return core.succeedNow(nextS);
}
return foldWeightedDecomposeLoop(nextS, nextCost, nextDirty, max, costFn, decompose, f);
},
onFailure: core.fail,
onDone: () => core.succeedNow(s)
});
/** @internal */

@@ -479,17 +576,24 @@ const foldWeightedDecomposeFold = (input, index, s, cost, dirty, max, costFn, decompose, f) => {

/** @internal */
const foldWeightedDecomposeEffect = (s, max, costFn, decompose, f) => suspend(() => new SinkImpl(foldWeightedDecomposeEffectLoop(s, max, costFn, decompose, f, 0, false)));
const foldWeightedDecomposeEffect = options => suspend(() => new SinkImpl(foldWeightedDecomposeEffectLoop(options.initial, options.maxCost, options.cost, options.decompose, options.body, 0, false)));
/** @internal */
exports.foldWeightedDecomposeEffect = foldWeightedDecomposeEffect;
const foldWeightedEffect = (s, max, costFn, f) => foldWeightedDecomposeEffect(s, max, costFn, input => Effect.succeed(Chunk.of(input)), f);
const foldWeightedEffect = options => foldWeightedDecomposeEffect({
...options,
decompose: input => Effect.succeed(Chunk.of(input))
});
/** @internal */
exports.foldWeightedEffect = foldWeightedEffect;
const foldWeightedDecomposeEffectLoop = (s, max, costFn, decompose, f, cost, dirty) => core.readWith(input => core.flatMap(([nextS, nextCost, nextDirty, leftovers]) => {
if (Chunk.isNonEmpty(leftovers)) {
return channel.zipRight(core.succeedNow(nextS))(core.write(leftovers));
}
if (cost > max) {
return core.succeedNow(nextS);
}
return foldWeightedDecomposeEffectLoop(nextS, max, costFn, decompose, f, nextCost, nextDirty);
})(core.fromEffect(foldWeightedDecomposeEffectFold(s, max, costFn, decompose, f, input, dirty, cost, 0))), core.fail, () => core.succeedNow(s));
const foldWeightedDecomposeEffectLoop = (s, max, costFn, decompose, f, cost, dirty) => core.readWith({
onInput: input => core.flatMap(([nextS, nextCost, nextDirty, leftovers]) => {
if (Chunk.isNonEmpty(leftovers)) {
return channel.zipRight(core.succeedNow(nextS))(core.write(leftovers));
}
if (cost > max) {
return core.succeedNow(nextS);
}
return foldWeightedDecomposeEffectLoop(nextS, max, costFn, decompose, f, nextCost, nextDirty);
})(core.fromEffect(foldWeightedDecomposeEffectFold(s, max, costFn, decompose, f, input, dirty, cost, 0))),
onFailure: core.fail,
onDone: () => core.succeedNow(s)
});
/** @internal */

@@ -525,9 +629,16 @@ const foldWeightedDecomposeEffectFold = (s, max, costFn, decompose, f, input, dirty, cost, index) => {

/** @internal */
const flatMap = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => foldSink(fail, f)(self));
const flatMap = /*#__PURE__*/(0, _Function.dual)(2, (self, f) => foldSink(self, {
onFailure: fail,
onSuccess: f
}));
/** @internal */
exports.flatMap = flatMap;
const forEach = f => {
const process = core.readWithCause(input => core.flatMap(() => process)(core.fromEffect(Effect.forEach(input, f, {
discard: true
}))), core.failCause, core.unit);
const process = core.readWithCause({
onInput: input => core.flatMap(() => process)(core.fromEffect(Effect.forEach(input, f, {
discard: true
}))),
onFailure: core.failCause,
onDone: core.unit
});
return new SinkImpl(process);

@@ -538,3 +649,7 @@ };

const forEachChunk = f => {
const process = core.readWithCause(input => core.flatMap(() => process)(core.fromEffect(f(input))), core.failCause, core.unit);
const process = core.readWithCause({
onInput: input => core.flatMap(() => process)(core.fromEffect(f(input))),
onFailure: core.failCause,
onDone: core.unit
});
return new SinkImpl(process);

@@ -545,3 +660,7 @@ };

const forEachWhile = f => {
const process = core.readWithCause(input => forEachWhileReader(f, input, 0, input.length, process), core.failCause, core.unit);
const process = core.readWithCause({
onInput: input => forEachWhileReader(f, input, 0, input.length, process),
onFailure: core.failCause,
onDone: core.unit
});
return new SinkImpl(process);

@@ -559,3 +678,7 @@ };

const forEachChunkWhile = f => {
const reader = core.readWith(input => core.flatMap(cont => cont ? reader : core.unit())(core.fromEffect(f(input))), core.fail, core.unit);
const reader = core.readWith({
onInput: input => core.flatMap(cont => cont ? reader : core.unit())(core.fromEffect(f(input))),
onFailure: core.fail,
onDone: core.unit
});
return new SinkImpl(reader);

@@ -571,24 +694,28 @@ };

exports.fromEffect = fromEffect;
const fromHub = hub => fromQueue(hub);
const fromHub = (hub, options) => fromQueue(hub, options);
/** @internal */
exports.fromHub = fromHub;
const fromHubWithShutdown = hub => fromQueueWithShutdown(hub);
/** @internal */
exports.fromHubWithShutdown = fromHubWithShutdown;
const fromPush = push => new SinkImpl(channel.unwrapScoped(Effect.map(fromPushPull)(push)));
exports.fromPush = fromPush;
const fromPushPull = push => core.readWith(input => channel.foldChannel(([either, leftovers]) => Either.match({
onLeft: error => channel.zipRight(core.fail(error))(core.write(leftovers)),
onRight: z => channel.zipRight(core.succeedNow(z))(core.write(leftovers))
})(either), () => fromPushPull(push))(core.fromEffect(push(Option.some(input)))), core.fail, () => channel.foldChannel(([either, leftovers]) => Either.match({
onLeft: error => channel.zipRight(core.fail(error))(core.write(leftovers)),
onRight: z => channel.zipRight(core.succeedNow(z))(core.write(leftovers))
})(either), () => core.fromEffect(Effect.dieMessage("BUG: Sink.fromPush - please report an issue at https://github.com/Effect-TS/stream/issues")))(core.fromEffect(push(Option.none()))));
const fromPushPull = push => core.readWith({
onInput: input => channel.foldChannel(core.fromEffect(push(Option.some(input))), {
onFailure: ([either, leftovers]) => Either.match(either, {
onLeft: error => channel.zipRight(core.fail(error))(core.write(leftovers)),
onRight: z => channel.zipRight(core.succeedNow(z))(core.write(leftovers))
}),
onSuccess: () => fromPushPull(push)
}),
onFailure: core.fail,
onDone: () => channel.foldChannel(core.fromEffect(push(Option.none())), {
onFailure: ([either, leftovers]) => Either.match(either, {
onLeft: error => channel.zipRight(core.fail(error))(core.write(leftovers)),
onRight: z => channel.zipRight(core.succeedNow(z))(core.write(leftovers))
}),
onSuccess: () => core.fromEffect(Effect.dieMessage("BUG: Sink.fromPush - please report an issue at https://github.com/Effect-TS/stream/issues"))
})
});
/** @internal */
const fromQueue = queue => forEachChunk(input => Queue.offerAll(queue, input));
const fromQueue = (queue, options) => options?.shutdown ? unwrapScoped(Effect.map(Effect.acquireRelease(Effect.succeed(queue), Queue.shutdown), fromQueue)) : forEachChunk(input => Queue.offerAll(queue, input));
/** @internal */
exports.fromQueue = fromQueue;
const fromQueueWithShutdown = queue => unwrapScoped(Effect.map(fromQueue)(Effect.acquireRelease(Effect.succeed(queue), Queue.shutdown)));
/** @internal */
exports.fromQueueWithShutdown = fromQueueWithShutdown;
const head = () => fold(Option.none(), Option.isNone, (option, input) => Option.match(option, {

@@ -643,21 +770,26 @@ onNone: () => Option.some(input),

exports.race = race;
const raceBoth = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => raceBothCapacity(self, that, 16));
const raceBoth = /*#__PURE__*/(0, _Function.dual)(args => isSink(args[1]), (self, that, options) => raceWith(self, {
other: that,
onSelfDone: selfDone => mergeDecision.Done(Effect.map(selfDone, Either.left)),
onOtherDone: thatDone => mergeDecision.Done(Effect.map(thatDone, Either.right)),
capacity: options?.capacity ?? 16
}));
/** @internal */
exports.raceBoth = raceBoth;
const raceBothCapacity = /*#__PURE__*/(0, _Function.dual)(3, (self, that, capacity) => {
return raceWithCapacity(self, that, selfDone => mergeDecision.Done(Effect.map(Either.left)(Effect.suspend(() => selfDone))), thatDone => mergeDecision.Done(Effect.map(Either.right)(Effect.suspend(() => thatDone))), capacity);
});
/** @internal */
exports.raceBothCapacity = raceBothCapacity;
const raceWith = /*#__PURE__*/(0, _Function.dual)(4, (self, that, leftDone, rightDone) => raceWithCapacity(self, that, leftDone, rightDone, 16));
/** @internal */
exports.raceWith = raceWith;
const raceWithCapacity = /*#__PURE__*/(0, _Function.dual)(5, (self, that, leftDone, rightDone, capacity) => {
const raceWith = /*#__PURE__*/(0, _Function.dual)(2, (self, options) => {
const scoped = Effect.gen(function* ($) {
const hub = yield* $(Hub.bounded(capacity));
const hub = yield* $(Hub.bounded(options?.capacity ?? 16));
const channel1 = yield* $(channel.fromHubScoped(hub));
const channel2 = yield* $(channel.fromHubScoped(hub));
const reader = channel.toHub(hub);
const writer = channel.mergeWith(core.pipeTo(toChannel(that))(channel2), leftDone, rightDone)(core.pipeTo(toChannel(self))(channel1));
const racedChannel = channel.mergeWith(writer, () => mergeDecision.Await(exit => Effect.suspend(() => exit)), done => mergeDecision.Done(Effect.suspend(() => done)))(reader);
const writer = channel.mergeWith({
other: core.pipeTo(toChannel(options.other))(channel2),
onSelfDone: options.onSelfDone,
onOtherDone: options.onOtherDone
})(core.pipeTo(toChannel(self))(channel1));
const racedChannel = channel.mergeWith(reader, {
other: writer,
onSelfDone: _ => mergeDecision.Await(exit => Effect.suspend(() => exit)),
onOtherDone: done => mergeDecision.Done(Effect.suspend(() => done))
});
return new SinkImpl(racedChannel);

@@ -668,3 +800,3 @@ });

/** @internal */
exports.raceWithCapacity = raceWithCapacity;
exports.raceWith = raceWith;
const refineOrDie = /*#__PURE__*/(0, _Function.dual)(2, (self, pf) => refineOrDieWith(pf, _Function.identity)(self));

@@ -703,21 +835,25 @@ /** @internal */

exports.splitWhere = splitWhere;
const splitWhereSplitter = (written, leftovers, f) => core.readWithCause(input => {
if (Chunk.isEmpty(input)) {
return splitWhereSplitter(written, leftovers, f);
}
if (written) {
const index = indexWhere(input, f);
const splitWhereSplitter = (written, leftovers, f) => core.readWithCause({
onInput: input => {
if (Chunk.isEmpty(input)) {
return splitWhereSplitter(written, leftovers, f);
}
if (written) {
const index = indexWhere(input, f);
if (index === -1) {
return channel.zipRight(core.write(input), splitWhereSplitter(true, leftovers, f));
}
const [left, right] = Chunk.splitAt(input, index);
return channel.zipRight(core.write(left), core.fromEffect(Ref.set(leftovers, right)));
}
const index = indexWhere(input, f, 1);
if (index === -1) {
return channel.zipRight(splitWhereSplitter(true, leftovers, f))(core.write(input));
return channel.zipRight(core.write(input), splitWhereSplitter(true, leftovers, f));
}
const [left, right] = Chunk.splitAt(index)(input);
return channel.zipRight(core.fromEffect(Ref.set(leftovers, right)))(core.write(left));
}
const index = indexWhere(input, f, 1);
if (index === -1) {
return channel.zipRight(splitWhereSplitter(true, leftovers, f))(core.write(input));
}
const [left, right] = Chunk.splitAt(Math.max(index, 1))(input);
return channel.zipRight(core.fromEffect(Ref.set(leftovers, right)))(core.write(left));
}, core.failCause, core.succeed);
const [left, right] = Chunk.splitAt(Math.max(index, 1))(input);
return channel.zipRight(core.write(left), core.fromEffect(Ref.set(leftovers, right)));
},
onFailure: core.failCause,
onDone: core.succeed
});
/** @internal */

@@ -780,25 +916,14 @@ const indexWhere = (self, predicate, from = 0) => {

exports.withDuration = withDuration;
const zip = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => zipWith(that, (z, z2) => [z, z2])(self));
const zip = /*#__PURE__*/(0, _Function.dual)(args => isSink(args[1]), (self, that, options) => zipWith(self, that, (z, z2) => [z, z2], options));
/** @internal */
exports.zip = zip;
const zipLeft = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => zipWith(that, (z, _) => z)(self));
const zipLeft = /*#__PURE__*/(0, _Function.dual)(args => isSink(args[1]), (self, that, options) => zipWith(self, that, (z, _) => z, options));
/** @internal */
exports.zipLeft = zipLeft;
const zipRight = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => zipWith(that, (_, z2) => z2)(self));
const zipRight = /*#__PURE__*/(0, _Function.dual)(args => isSink(args[1]), (self, that, options) => zipWith(self, that, (_, z2) => z2, options));
/** @internal */
exports.zipRight = zipRight;
const zipWith = /*#__PURE__*/(0, _Function.dual)(3, (self, that, f) => flatMap(z => map(z2 => f(z, z2))(that))(self));
/** @internal */
exports.zipWith = zipWith;
const zipPar = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => zipWithPar(that, (z, z2) => [z, z2])(self));
/** @internal */
exports.zipPar = zipPar;
const zipParLeft = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => zipWithPar(that, (z, _) => z)(self));
/** @internal */
exports.zipParLeft = zipParLeft;
const zipParRight = /*#__PURE__*/(0, _Function.dual)(2, (self, that) => zipWithPar(that, (_, z2) => z2)(self));
/** @internal */
exports.zipParRight = zipParRight;
const zipWithPar = /*#__PURE__*/(0, _Function.dual)(3, (self, that, f) => {
return raceWith(that, Exit.match({
const zipWith = /*#__PURE__*/(0, _Function.dual)(args => isSink(args[1]), (self, that, f, options) => options?.concurrent ? raceWith(self, {
other: that,
onSelfDone: Exit.match({
onFailure: cause => mergeDecision.Done(Effect.failCause(cause)),

@@ -809,3 +934,4 @@ onSuccess: leftZ => mergeDecision.Await(Exit.match({

}))
}), Exit.match({
}),
onOtherDone: Exit.match({
onFailure: cause => mergeDecision.Done(Effect.failCause(cause)),

@@ -816,9 +942,9 @@ onSuccess: rightZ => mergeDecision.Await(Exit.match({

}))
}))(self);
});
})
}) : flatMap(self, z => map(that, z2 => f(z, z2))));
// Circular with Channel
/** @internal */
exports.zipWithPar = zipWithPar;
exports.zipWith = zipWith;
const channelToSink = self => new SinkImpl(self);
exports.channelToSink = channelToSink;
//# sourceMappingURL=sink.js.map

@@ -10,7 +10,2 @@ import * as Option from "@effect/data/Option";

export declare const distributedWithDynamicCallback: (<E, A, _>(maximumLag: number, decide: (a: A) => Effect.Effect<never, never, Predicate<number>>, done: (exit: Exit.Exit<Option.Option<E>, never>) => Effect.Effect<never, never, _>) => <R>(self: Stream.Stream<R, E, A>) => Effect.Effect<Scope.Scope | R, never, Effect.Effect<never, never, readonly [number, Queue.Dequeue<Exit.Exit<Option.Option<E>, A>>]>>) & (<R_1, E_1, A_1, __1>(self: Stream.Stream<R_1, E_1, A_1>, maximumLag: number, decide: (a: A_1) => Effect.Effect<never, never, Predicate<number>>, done: (exit: Exit.Exit<Option.Option<E_1>, never>) => Effect.Effect<never, never, __1>) => Effect.Effect<Scope.Scope | R_1, never, Effect.Effect<never, never, readonly [number, Queue.Dequeue<Exit.Exit<Option.Option<E_1>, A_1>>]>>);
export declare class ReadableStreamError {
readonly reason: unknown;
readonly _tag = "ReadableStreamError";
constructor(reason: unknown);
}
/**

@@ -17,0 +12,0 @@ * Repeats the value using the provided schedule.

@@ -6,3 +6,3 @@ "use strict";

});
exports.match = exports.isRight = exports.isLeft = exports.isEither = exports.isBoth = exports.Right = exports.Left = exports.Either = exports.Both = void 0;
exports.match = exports.isRight = exports.isLeft = exports.isEither = exports.isBoth = exports.fromInput = exports.Right = exports.Left = exports.Either = exports.Both = void 0;
var _Function = /*#__PURE__*/require("@effect/data/Function");

@@ -33,2 +33,18 @@ var OpCodes = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/stream/internal/opCodes/haltStrategy"));

exports.Either = Either;
const fromInput = input => {
switch (input) {
case "left":
return Left;
case "right":
return Right;
case "both":
return Both;
case "either":
return Either;
default:
return input;
}
};
/** @internal */
exports.fromInput = fromInput;
const isLeft = self => self._tag === OpCodes.OP_LEFT;

@@ -35,0 +51,0 @@ /** @internal */

@@ -44,3 +44,5 @@ "use strict";

get changes() {
return stream.unwrapScoped(this.semaphore.withPermits(1)(Effect.flatMap(a => Effect.map(s => stream.concat(s)(stream.make(a)))(stream.fromHubScoped(this.hub)))(Ref.get(this.ref))));
return stream.unwrapScoped(this.semaphore.withPermits(1)(Effect.flatMap(a => Effect.map(stream.fromHub(this.hub, {
scoped: true
}), s => stream.concat(stream.make(a), s)))(Ref.get(this.ref))));
}

@@ -47,0 +49,0 @@ modify(f) {

@@ -103,6 +103,10 @@ "use strict";

exports.make = make;
const match = /*#__PURE__*/(0, _Function.dual)(4, (self, onEnd, onError, onSuccess) => Exit.match(self.exit, {
const match = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onEnd,
onFailure,
onSuccess
}) => Exit.match(self.exit, {
onFailure: cause => Option.match(Cause.flipCauseOption(cause), {
onNone: onEnd,
onSome: onError
onSome: onFailure
}),

@@ -113,6 +117,10 @@ onSuccess

exports.match = match;
const matchEffect = /*#__PURE__*/(0, _Function.dual)(4, (self, onEnd, onError, onSuccess) => Exit.matchEffect(self.exit, {
const matchEffect = /*#__PURE__*/(0, _Function.dual)(2, (self, {
onEnd,
onFailure,
onSuccess
}) => Exit.matchEffect(self.exit, {
onFailure: cause => Option.match(Cause.flipCauseOption(cause), {
onNone: onEnd,
onSome: onError
onSome: onFailure
}),

@@ -119,0 +127,0 @@ onSuccess

{
"name": "@effect/stream",
"version": "0.29.0",
"version": "0.30.0",
"license": "MIT",

@@ -11,3 +11,3 @@ "repository": {

"@effect/data": "^0.16.0",
"@effect/io": "^0.35.0"
"@effect/io": "^0.35.1"
},

@@ -14,0 +14,0 @@ "publishConfig": {

@@ -193,4 +193,12 @@ /**

export declare const collectAllWhileWith: {
<Z, S>(z: S, p: Predicate<Z>, f: (s: S, z: Z) => S): <R, E, In, L extends In>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In, L, S>;
<R, E, In, L extends In, Z, S>(self: Sink<R, E, In, L, Z>, z: S, p: Predicate<Z>, f: (s: S, z: Z) => S): Sink<R, E, In, L, S>;
<Z, S>(options: {
readonly initial: S;
readonly while: Predicate<Z>;
readonly body: (s: S, z: Z) => S;
}): <R, E, In, L extends In>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In, L, S>;
<R, E, In, L extends In, Z, S>(self: Sink<R, E, In, L, Z>, options: {
readonly initial: S;
readonly while: Predicate<Z>;
readonly body: (s: S, z: Z) => S;
}): Sink<R, E, In, L, S>;
};

@@ -277,4 +285,10 @@ /**

export declare const dimap: {
<In0, In, Z, Z2>(f: (input: In0) => In, g: (z: Z) => Z2): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In0, L, Z2>;
<R, E, L, In0, In, Z, Z2>(self: Sink<R, E, In, L, Z>, f: (input: In0) => In, g: (z: Z) => Z2): Sink<R, E, In0, L, Z2>;
<In0, In, Z, Z2>(options: {
readonly onInput: (input: In0) => In;
readonly onDone: (z: Z) => Z2;
}): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In0, L, Z2>;
<R, E, L, In0, In, Z, Z2>(self: Sink<R, E, In, L, Z>, options: {
readonly onInput: (input: In0) => In;
readonly onDone: (z: Z) => Z2;
}): Sink<R, E, In0, L, Z2>;
};

@@ -289,4 +303,10 @@ /**

export declare const dimapEffect: {
<In0, R2, E2, In, Z, R3, E3, Z2>(f: (input: In0) => Effect.Effect<R2, E2, In>, g: (z: Z) => Effect.Effect<R3, E3, Z2>): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R3 | R, E2 | E3 | E, In0, L, Z2>;
<R, E, L, In0, R2, E2, In, Z, R3, E3, Z2>(self: Sink<R, E, In, L, Z>, f: (input: In0) => Effect.Effect<R2, E2, In>, g: (z: Z) => Effect.Effect<R3, E3, Z2>): Sink<R | R2 | R3, E | E2 | E3, In0, L, Z2>;
<In0, R2, E2, In, Z, R3, E3, Z2>(options: {
readonly onInput: (input: In0) => Effect.Effect<R2, E2, In>;
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>;
}): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R3 | R, E2 | E3 | E, In0, L, Z2>;
<R, E, L, In0, R2, E2, In, Z, R3, E3, Z2>(self: Sink<R, E, In, L, Z>, options: {
readonly onInput: (input: In0) => Effect.Effect<R2, E2, In>;
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>;
}): Sink<R | R2 | R3, E | E2 | E3, In0, L, Z2>;
};

@@ -301,4 +321,10 @@ /**

export declare const dimapChunks: {
<In0, In, Z, Z2>(f: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>, g: (z: Z) => Z2): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In0, L, Z2>;
<R, E, L, In0, In, Z, Z2>(self: Sink<R, E, In, L, Z>, f: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>, g: (z: Z) => Z2): Sink<R, E, In0, L, Z2>;
<In0, In, Z, Z2>(options: {
readonly onInput: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>;
readonly onDone: (z: Z) => Z2;
}): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In0, L, Z2>;
<R, E, L, In0, In, Z, Z2>(self: Sink<R, E, In, L, Z>, options: {
readonly onInput: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>;
readonly onDone: (z: Z) => Z2;
}): Sink<R, E, In0, L, Z2>;
};

@@ -313,4 +339,10 @@ /**

export declare const dimapChunksEffect: {
<In0, R2, E2, In, Z, R3, E3, Z2>(f: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>, g: (z: Z) => Effect.Effect<R3, E3, Z2>): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R3 | R, E2 | E3 | E, In0, L, Z2>;
<R, E, L, In0, R2, E2, In, Z, R3, E3, Z2>(self: Sink<R, E, In, L, Z>, f: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>, g: (z: Z) => Effect.Effect<R3, E3, Z2>): Sink<R | R2 | R3, E | E2 | E3, In0, L, Z2>;
<In0, R2, E2, In, Z, R3, E3, Z2>(options: {
readonly onInput: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>;
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>;
}): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R3 | R, E2 | E3 | E, In0, L, Z2>;
<R, E, L, In0, R2, E2, In, Z, R3, E3, Z2>(self: Sink<R, E, In, L, Z>, options: {
readonly onInput: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>;
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>;
}): Sink<R | R2 | R3, E | E2 | E3, In0, L, Z2>;
};

@@ -491,4 +523,10 @@ /**

export declare const foldSink: {
<R1, R2, E, E1, E2, In, In1 extends In, In2 extends In, L, L1, L2, Z, Z1, Z2>(onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>, onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>): <R>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R2 | R, E1 | E2, In1 & In2, L1 | L2, Z1 | Z2>;
<R, R1, R2, E, E1, E2, In, In1 extends In, In2 extends In, L, L1, L2, Z, Z1, Z2>(self: Sink<R, E, In, L, Z>, onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>, onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>): Sink<R | R1 | R2, E1 | E2, In1 & In2, L1 | L2, Z1 | Z2>;
<R1, R2, E, E1, E2, In, In1 extends In, In2 extends In, L, L1, L2, Z, Z1, Z2>(options: {
readonly onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>;
readonly onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>;
}): <R>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R2 | R, E1 | E2, In1 & In2, L1 | L2, Z1 | Z2>;
<R, R1, R2, E, E1, E2, In, In1 extends In, In2 extends In, L, L1, L2, Z, Z1, Z2>(self: Sink<R, E, In, L, Z>, options: {
readonly onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>;
readonly onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>;
}): Sink<R | R1 | R2, E1 | E2, In1 & In2, L1 | L2, Z1 | Z2>;
};

@@ -587,3 +625,8 @@ /**

*/
export declare const foldWeighted: <S, In>(s: S, max: number, costFn: (s: S, input: In) => number, f: (s: S, input: In) => S) => Sink<never, never, In, In, S>;
export declare const foldWeighted: <S, In>(options: {
readonly initial: S;
readonly maxCost: number;
readonly cost: (s: S, input: In) => number;
readonly body: (s: S, input: In) => S;
}) => Sink<never, never, In, In, S>;
/**

@@ -626,3 +669,9 @@ * Creates a sink that folds elements of type `In` into a structure of type

*/
export declare const foldWeightedDecompose: <S, In>(s: S, max: number, costFn: (s: S, input: In) => number, decompose: (input: In) => Chunk.Chunk<In>, f: (s: S, input: In) => S) => Sink<never, never, In, In, S>;
export declare const foldWeightedDecompose: <S, In>(options: {
readonly initial: S;
readonly maxCost: number;
readonly cost: (s: S, input: In) => number;
readonly decompose: (input: In) => Chunk.Chunk<In>;
readonly body: (s: S, input: In) => S;
}) => Sink<never, never, In, In, S>;
/**

@@ -645,3 +694,9 @@ * Creates a sink that effectfully folds elements of type `In` into a

*/
export declare const foldWeightedDecomposeEffect: <S, In, R, E, R2, E2, R3, E3>(s: S, max: number, costFn: (s: S, input: In) => Effect.Effect<R, E, number>, decompose: (input: In) => Effect.Effect<R2, E2, Chunk.Chunk<In>>, f: (s: S, input: In) => Effect.Effect<R3, E3, S>) => Sink<R | R2 | R3, E | E2 | E3, In, In, S>;
export declare const foldWeightedDecomposeEffect: <S, In, R, E, R2, E2, R3, E3>(options: {
readonly initial: S;
readonly maxCost: number;
readonly cost: (s: S, input: In) => Effect.Effect<R, E, number>;
readonly decompose: (input: In) => Effect.Effect<R2, E2, Chunk.Chunk<In>>;
readonly body: (s: S, input: In) => Effect.Effect<R3, E3, S>;
}) => Sink<R | R2 | R3, E | E2 | E3, In, In, S>;
/**

@@ -660,3 +715,8 @@ * Creates a sink that effectfully folds elements of type `In` into a

*/
export declare const foldWeightedEffect: <S, In, R, E, R2, E2>(s: S, max: number, costFn: (s: S, input: In) => Effect.Effect<R, E, number>, f: (s: S, input: In) => Effect.Effect<R2, E2, S>) => Sink<R | R2, E | E2, In, In, S>;
export declare const foldWeightedEffect: <S, In, R, E, R2, E2>(options: {
readonly initial: S;
readonly maxCost: number;
readonly cost: (s: S, input: In) => Effect.Effect<R, E, number>;
readonly body: (s: S, input: In) => Effect.Effect<R2, E2, S>;
}) => Sink<R | R2, E | E2, In, In, S>;
/**

@@ -735,12 +795,6 @@ * A sink that executes the provided effectful function for every element fed

*/
export declare const fromHub: <In>(hub: Hub.Hub<In>) => Sink<never, never, In, never, void>;
export declare const fromHub: <In>(hub: Hub.Hub<In>, options?: {
readonly shutdown?: boolean;
}) => Sink<never, never, In, never, void>;
/**
* Create a sink which publishes each element to the specified hub. The hub
* will be shutdown once the stream is closed.
*
* @since 1.0.0
* @category constructors
*/
export declare const fromHubWithShutdown: <In>(hub: Hub.Hub<In>) => Sink<never, never, In, never, void>;
/**
* Creates a sink from a chunk processing function.

@@ -758,12 +812,6 @@ *

*/
export declare const fromQueue: <In>(queue: Queue.Enqueue<In>) => Sink<never, never, In, never, void>;
export declare const fromQueue: <In>(queue: Queue.Enqueue<In>, options?: {
readonly shutdown?: boolean;
}) => Sink<never, never, In, never, void>;
/**
* Create a sink which enqueues each element into the specified queue. The
* queue will be shutdown once the stream is closed.
*
* @since 1.0.0
* @category constructors
*/
export declare const fromQueueWithShutdown: <In>(queue: Queue.Enqueue<In>) => Sink<never, never, In, never, void>;
/**
* Creates a sink containing the first value.

@@ -891,16 +939,10 @@ *

export declare const raceBoth: {
<R1, E1, In1, L1, Z1>(that: Sink<R1, E1, In1, L1, Z1>): <R, E, In, L, Z>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R, E1 | E, In & In1, L1 | L, Either.Either<Z, Z1>>;
<R, E, In, L, Z, R1, E1, In1, L1, Z1>(self: Sink<R, E, In, L, Z>, that: Sink<R1, E1, In1, L1, Z1>): Sink<R | R1, E | E1, In & In1, L | L1, Either.Either<Z, Z1>>;
<R1, E1, In1, L1, Z1>(that: Sink<R1, E1, In1, L1, Z1>, options?: {
readonly capacity?: number;
}): <R, E, In, L, Z>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R, E1 | E, In & In1, L1 | L, Either.Either<Z, Z1>>;
<R, E, In, L, Z, R1, E1, In1, L1, Z1>(self: Sink<R, E, In, L, Z>, that: Sink<R1, E1, In1, L1, Z1>, options?: {
readonly capacity?: number;
}): Sink<R | R1, E | E1, In & In1, L | L1, Either.Either<Z, Z1>>;
};
/**
* Like `raceBoth`, but with a configurable `capacity` parameter.
*
* @since 1.0.0
* @category utils
*/
export declare const raceBothCapacity: {
<R1, E1, In1, L1, Z1>(that: Sink<R1, E1, In1, L1, Z1>, capacity: number): <R, E, In, L, Z>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R, E1 | E, In & In1, L1 | L, Either.Either<Z, Z1>>;
<R, E, In, L, Z, R1, E1, In1, L1, Z1>(self: Sink<R, E, In, L, Z>, that: Sink<R1, E1, In1, L1, Z1>, capacity: number): Sink<R | R1, E | E1, In & In1, L | L1, Either.Either<Z, Z1>>;
};
/**
* Runs both sinks in parallel on the input, using the specified merge

@@ -913,17 +955,17 @@ * function as soon as one result or the other has been computed.

export declare const raceWith: {
<R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(that: Sink<R2, E2, In2, L2, Z2>, leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>, rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>): <R, In, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L2 | L, Z3 | Z4>;
<R, In, L, R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>, rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>): Sink<R | R2, E2 | E, In & In2, L | L2, Z3 | Z4>;
<R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(options: {
readonly other: Sink<R2, E2, In2, L2, Z2>;
readonly onSelfDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>;
readonly onOtherDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>;
readonly capacity?: number;
}): <R, In, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L2 | L, Z3 | Z4>;
<R, In, L, R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(self: Sink<R, E, In, L, Z>, options: {
readonly other: Sink<R2, E2, In2, L2, Z2>;
readonly onSelfDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>;
readonly onOtherDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>;
readonly capacity?: number;
}): Sink<R | R2, E2 | E, In & In2, L | L2, Z3 | Z4>;
};
/**
* Like `raceWith`, but with a configurable `capacity` parameter.
*
* @since 1.0.0
* @category utils
*/
export declare const raceWithCapacity: {
<R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(that: Sink<R2, E2, In2, L2, Z2>, leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>, rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>, capacity: number): <R, In, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L2 | L, Z3 | Z4>;
<R, In, L, R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>, rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>, capacity: number): Sink<R | R2, E2 | E, In & In2, L | L2, Z3 | Z4>;
};
/**
* @since 1.0.0
* @category error handling

@@ -1041,4 +1083,8 @@ */

export declare const zip: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, readonly [Z, Z2]>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>): Sink<R | R2, E | E2, In & In2, L | L2, readonly [Z, Z2]>;
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>, options?: {
readonly concurrent?: boolean;
}): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, readonly [Z, Z2]>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, options?: {
readonly concurrent?: boolean;
}): Sink<R | R2, E | E2, In & In2, L | L2, readonly [Z, Z2]>;
};

@@ -1052,4 +1098,8 @@ /**

export declare const zipLeft: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>): Sink<R | R2, E | E2, In & In2, L | L2, Z>;
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>, options?: {
readonly concurrent?: boolean;
}): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, options?: {
readonly concurrent?: boolean;
}): Sink<R | R2, E | E2, In & In2, L | L2, Z>;
};

@@ -1063,4 +1113,8 @@ /**

export declare const zipRight: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z2>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>): Sink<R | R2, E | E2, In & In2, L | L2, Z2>;
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>, options?: {
readonly concurrent?: boolean;
}): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z2>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, options?: {
readonly concurrent?: boolean;
}): Sink<R | R2, E | E2, In & In2, L | L2, Z2>;
};

@@ -1076,47 +1130,9 @@ /**

export declare const zipWith: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(that: Sink<R2, E2, In2, L2, Z2>, f: (z: Z, z1: Z2) => Z3): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z3>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, f: (z: Z, z1: Z2) => Z3): Sink<R | R2, E | E2, In & In2, L | L2, Z3>;
<R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(that: Sink<R2, E2, In2, L2, Z2>, f: (z: Z, z1: Z2) => Z3, options?: {
readonly concurrent?: boolean;
}): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z3>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, f: (z: Z, z1: Z2) => Z3, options?: {
readonly concurrent?: boolean;
}): Sink<R | R2, E | E2, In & In2, L | L2, Z3>;
};
/**
* Runs both sinks in parallel on the input and combines the results in a
* tuple.
*
* @since 1.0.0
* @category zipping
*/
export declare const zipPar: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, readonly [Z, Z2]>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>): Sink<R | R2, E | E2, In & In2, L | L2, readonly [Z, Z2]>;
};
/**
* Like `Sink.zipPar` but keeps only the result from this sink.
*
* @since 1.0.0
* @category zipping
*/
export declare const zipParLeft: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>): Sink<R | R2, E | E2, In & In2, L | L2, Z>;
};
/**
* Like `Sink.zipPar` but keeps only the result from `that` sink.
*
* @since 1.0.0
* @category zipping
*/
export declare const zipParRight: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(that: Sink<R2, E2, In2, L2, Z2>): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z2>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>): Sink<R | R2, E | E2, In & In2, L | L2, Z2>;
};
/**
* Runs both sinks in parallel on the input and combines the results using the
* provided function.
*
* @since 1.0.0
* @category zipping
*/
export declare const zipWithPar: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(that: Sink<R2, E2, In2, L2, Z2>, f: (z: Z, z1: Z2) => Z3): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z3>;
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(self: Sink<R, E, In, L, Z>, that: Sink<R2, E2, In2, L2, Z2>, f: (z: Z, z1: Z2) => Z3): Sink<R | R2, E | E2, In & In2, L | L2, Z3>;
};
//# sourceMappingURL=Sink.d.ts.map

@@ -6,4 +6,4 @@ "use strict";

});
exports.sync = exports.suspend = exports.summarized = exports.sum = exports.succeed = exports.splitWhere = exports.some = exports.refineOrDieWith = exports.refineOrDie = exports.raceWithCapacity = exports.raceWith = exports.raceBothCapacity = exports.raceBoth = exports.race = exports.provideContext = exports.orElse = exports.never = exports.mkString = exports.mapLeftover = exports.mapError = exports.mapEffect = exports.map = exports.leftover = exports.last = exports.ignoreLeftover = exports.head = exports.fromQueueWithShutdown = exports.fromQueue = exports.fromPush = exports.fromHubWithShutdown = exports.fromHub = exports.fromEffect = exports.fromChannel = exports.forEachWhile = exports.forEachChunkWhile = exports.forEachChunk = exports.forEach = exports.foldWeightedEffect = exports.foldWeightedDecomposeEffect = exports.foldWeightedDecompose = exports.foldWeighted = exports.foldUntilEffect = exports.foldUntil = exports.foldSink = exports.foldLeftEffect = exports.foldLeftChunksEffect = exports.foldLeftChunks = exports.foldLeft = exports.foldEffect = exports.foldChunksEffect = exports.foldChunks = exports.fold = exports.flatMap = exports.findEffect = exports.filterInputEffect = exports.filterInput = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.every = exports.ensuringWith = exports.ensuring = exports.dropWhileEffect = exports.dropWhile = exports.dropUntilEffect = exports.dropUntil = exports.drop = exports.drain = exports.dimapEffect = exports.dimapChunksEffect = exports.dimapChunks = exports.dimap = exports.dieSync = exports.dieMessage = exports.die = exports.count = exports.contramapEffect = exports.contramapChunksEffect = exports.contramapChunks = exports.contramap = exports.contextWithSink = exports.contextWithEffect = exports.contextWith = exports.context = exports.collectLeftover = exports.collectAllWhileWith = exports.collectAllWhileEffect = exports.collectAllWhile = exports.collectAllUntilEffect = exports.collectAllUntil = exports.collectAllToSetN = exports.collectAllToSet = exports.collectAllToMapN = exports.collectAllToMap = exports.collectAllN = exports.collectAllFrom = exports.collectAll = exports.as = exports.SinkTypeId = void 0;
exports.zipWithPar = exports.zipWith = exports.zipRight = exports.zipParRight = exports.zipParLeft = exports.zipPar = exports.zipLeft = exports.zip = exports.withDuration = exports.unwrapScoped = exports.unwrap = exports.toChannel = exports.timed = exports.take = void 0;
exports.unwrap = exports.toChannel = exports.timed = exports.take = exports.sync = exports.suspend = exports.summarized = exports.sum = exports.succeed = exports.splitWhere = exports.some = exports.refineOrDieWith = exports.refineOrDie = exports.raceWith = exports.raceBoth = exports.race = exports.provideContext = exports.orElse = exports.never = exports.mkString = exports.mapLeftover = exports.mapError = exports.mapEffect = exports.map = exports.leftover = exports.last = exports.ignoreLeftover = exports.head = exports.fromQueue = exports.fromPush = exports.fromHub = exports.fromEffect = exports.fromChannel = exports.forEachWhile = exports.forEachChunkWhile = exports.forEachChunk = exports.forEach = exports.foldWeightedEffect = exports.foldWeightedDecomposeEffect = exports.foldWeightedDecompose = exports.foldWeighted = exports.foldUntilEffect = exports.foldUntil = exports.foldSink = exports.foldLeftEffect = exports.foldLeftChunksEffect = exports.foldLeftChunks = exports.foldLeft = exports.foldEffect = exports.foldChunksEffect = exports.foldChunks = exports.fold = exports.flatMap = exports.findEffect = exports.filterInputEffect = exports.filterInput = exports.failSync = exports.failCauseSync = exports.failCause = exports.fail = exports.every = exports.ensuringWith = exports.ensuring = exports.dropWhileEffect = exports.dropWhile = exports.dropUntilEffect = exports.dropUntil = exports.drop = exports.drain = exports.dimapEffect = exports.dimapChunksEffect = exports.dimapChunks = exports.dimap = exports.dieSync = exports.dieMessage = exports.die = exports.count = exports.contramapEffect = exports.contramapChunksEffect = exports.contramapChunks = exports.contramap = exports.contextWithSink = exports.contextWithEffect = exports.contextWith = exports.context = exports.collectLeftover = exports.collectAllWhileWith = exports.collectAllWhileEffect = exports.collectAllWhile = exports.collectAllUntilEffect = exports.collectAllUntil = exports.collectAllToSetN = exports.collectAllToSet = exports.collectAllToMapN = exports.collectAllToMap = exports.collectAllN = exports.collectAllFrom = exports.collectAll = exports.as = exports.SinkTypeId = void 0;
exports.zipWith = exports.zipRight = exports.zipLeft = exports.zip = exports.withDuration = exports.unwrapScoped = void 0;
var internal = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/stream/internal/sink"));

@@ -679,4 +679,3 @@ function _getRequireWildcardCache(nodeInterop) { if (typeof WeakMap !== "function") return null; var cacheBabelInterop = new WeakMap(); var cacheNodeInterop = new WeakMap(); return (_getRequireWildcardCache = function (nodeInterop) { return nodeInterop ? cacheNodeInterop : cacheBabelInterop; })(nodeInterop); }

/**
* Create a sink which publishes each element to the specified hub. The hub
* will be shutdown once the stream is closed.
* Creates a sink from a chunk processing function.
*

@@ -687,10 +686,2 @@ * @since 1.0.0

exports.fromHub = fromHub;
const fromHubWithShutdown = internal.fromHubWithShutdown;
/**
* Creates a sink from a chunk processing function.
*
* @since 1.0.0
* @category constructors
*/
exports.fromHubWithShutdown = fromHubWithShutdown;
const fromPush = internal.fromPush;

@@ -706,4 +697,3 @@ /**

/**
* Create a sink which enqueues each element into the specified queue. The
* queue will be shutdown once the stream is closed.
* Creates a sink containing the first value.
*

@@ -714,10 +704,2 @@ * @since 1.0.0

exports.fromQueue = fromQueue;
const fromQueueWithShutdown = internal.fromQueueWithShutdown;
/**
* Creates a sink containing the first value.
*
* @since 1.0.0
* @category constructors
*/
exports.fromQueueWithShutdown = fromQueueWithShutdown;
const head = internal.head;

@@ -833,10 +815,2 @@ /**

/**
* Like `raceBoth`, but with a configurable `capacity` parameter.
*
* @since 1.0.0
* @category utils
*/
exports.raceBoth = raceBoth;
const raceBothCapacity = internal.raceBothCapacity;
/**
* Runs both sinks in parallel on the input, using the specified merge

@@ -848,17 +822,9 @@ * function as soon as one result or the other has been computed.

*/
exports.raceBothCapacity = raceBothCapacity;
exports.raceBoth = raceBoth;
const raceWith = internal.raceWith;
/**
* Like `raceWith`, but with a configurable `capacity` parameter.
*
* @since 1.0.0
* @category utils
* @category error handling
*/
exports.raceWith = raceWith;
const raceWithCapacity = internal.raceWithCapacity;
/**
* @since 1.0.0
* @category error handling
*/
exports.raceWithCapacity = raceWithCapacity;
const refineOrDie = internal.refineOrDie;

@@ -1005,37 +971,3 @@ /**

const zipWith = internal.zipWith;
/**
* Runs both sinks in parallel on the input and combines the results in a
* tuple.
*
* @since 1.0.0
* @category zipping
*/
exports.zipWith = zipWith;
const zipPar = internal.zipPar;
/**
* Like `Sink.zipPar` but keeps only the result from this sink.
*
* @since 1.0.0
* @category zipping
*/
exports.zipPar = zipPar;
const zipParLeft = internal.zipParLeft;
/**
* Like `Sink.zipPar` but keeps only the result from `that` sink.
*
* @since 1.0.0
* @category zipping
*/
exports.zipParLeft = zipParLeft;
const zipParRight = internal.zipParRight;
/**
* Runs both sinks in parallel on the input and combines the results using the
* provided function.
*
* @since 1.0.0
* @category zipping
*/
exports.zipParRight = zipParRight;
const zipWithPar = internal.zipWithPar;
exports.zipWithPar = zipWithPar;
//# sourceMappingURL=Sink.js.map

@@ -131,4 +131,9 @@ /**

export const match: {
<A>(onContinue: () => A, onClose: (value: unknown) => A, onYield: () => A): (self: ChildExecutorDecision) => A
<A>(self: ChildExecutorDecision, onContinue: () => A, onClose: (value: unknown) => A, onYield: () => A): A
<A>(
options: { readonly onContinue: () => A; readonly onClose: (value: unknown) => A; readonly onYield: () => A }
): (self: ChildExecutorDecision) => A
<A>(
self: ChildExecutorDecision,
options: { readonly onContinue: () => A; readonly onClose: (value: unknown) => A; readonly onYield: () => A }
): A
} = internal.match

@@ -82,10 +82,14 @@ /**

<R, E0, Z0, E, Z, Z2>(
onDone: (effect: Effect.Effect<R, E, Z>) => Z2,
onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
options: {
readonly onDone: (effect: Effect.Effect<R, E, Z>) => Z2
readonly onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
}
): (self: MergeDecision<R, E0, Z0, E, Z>) => Z2
<R, E0, Z0, E, Z, Z2>(
self: MergeDecision<R, E0, Z0, E, Z>,
onDone: (effect: Effect.Effect<R, E, Z>) => Z2,
onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
options: {
readonly onDone: (effect: Effect.Effect<R, E, Z>) => Z2
readonly onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
}
): Z2
} = internal.match

@@ -146,18 +146,22 @@ /**

<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(
onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z,
onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z,
onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
options: {
readonly onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z
readonly onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z
readonly onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
}
): (self: MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>) => Z
<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(
self: MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>,
onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z,
onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z,
onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
options: {
readonly onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z
readonly onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z
readonly onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
}
): Z
} = internal.match

@@ -99,4 +99,10 @@ /**

export const match: {
<A>(onBackPressure: () => A, onBufferSliding: () => A): (self: MergeStrategy) => A
<A>(self: MergeStrategy, onBackPressure: () => A, onBufferSliding: () => A): A
<A>(options: {
readonly onBackPressure: () => A
readonly onBufferSliding: () => A
}): (self: MergeStrategy) => A
<A>(self: MergeStrategy, options: {
readonly onBackPressure: () => A
readonly onBufferSliding: () => A
}): A
} = internal.match

@@ -104,6 +104,8 @@ /**

<A, Z>(
onPulled: (value: A) => Z,
onNoUpstream: (activeDownstreamCount: number) => Z
options: { readonly onPulled: (value: A) => Z; readonly onNoUpstream: (activeDownstreamCount: number) => Z }
): (self: UpstreamPullRequest<A>) => Z
<A, Z>(self: UpstreamPullRequest<A>, onPulled: (value: A) => Z, onNoUpstream: (activeDownstreamCount: number) => Z): Z
<A, Z>(
self: UpstreamPullRequest<A>,
options: { readonly onPulled: (value: A) => Z; readonly onNoUpstream: (activeDownstreamCount: number) => Z }
): Z
} = internal.match

@@ -108,10 +108,14 @@ /**

<A, Z>(
onPullAfterNext: (emitSeparator: Option.Option<A>) => Z,
onPullAfterAllEnqueued: (emitSeparator: Option.Option<A>) => Z
options: {
readonly onNext: (emitSeparator: Option.Option<A>) => Z
readonly onAllEnqueued: (emitSeparator: Option.Option<A>) => Z
}
): (self: UpstreamPullStrategy<A>) => Z
<A, Z>(
self: UpstreamPullStrategy<A>,
onPullAfterNext: (emitSeparator: Option.Option<A>) => Z,
onPullAfterAllEnqueued: (emitSeparator: Option.Option<A>) => Z
options: {
readonly onNext: (emitSeparator: Option.Option<A>) => Z
readonly onAllEnqueued: (emitSeparator: Option.Option<A>) => Z
}
): Z
} = internal.match

@@ -62,17 +62,23 @@ import { dual } from "@effect/data/Function"

<A>(
onContinue: () => A,
onClose: (value: unknown) => A,
onYield: () => A
options: {
readonly onContinue: () => A
readonly onClose: (value: unknown) => A
readonly onYield: () => A
}
) => (self: ChildExecutorDecision.ChildExecutorDecision) => A,
<A>(
self: ChildExecutorDecision.ChildExecutorDecision,
onContinue: () => A,
onClose: (value: unknown) => A,
onYield: () => A
options: {
readonly onContinue: () => A
readonly onClose: (value: unknown) => A
readonly onYield: () => A
}
) => A
>(4, <A>(
>(2, <A>(
self: ChildExecutorDecision.ChildExecutorDecision,
onContinue: () => A,
onClose: (value: unknown) => A,
onYield: () => A
{ onClose, onContinue, onYield }: {
readonly onContinue: () => A
readonly onClose: (value: unknown) => A
readonly onYield: () => A
}
): A => {

@@ -79,0 +85,0 @@ switch (self._tag) {

@@ -87,14 +87,20 @@ import { dual } from "@effect/data/Function"

<R, E0, Z0, E, Z, Z2>(
onDone: (effect: Effect.Effect<R, E, Z>) => Z2,
onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
options: {
readonly onDone: (effect: Effect.Effect<R, E, Z>) => Z2
readonly onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
}
) => (self: MergeDecision.MergeDecision<R, E0, Z0, E, Z>) => Z2,
<R, E0, Z0, E, Z, Z2>(
self: MergeDecision.MergeDecision<R, E0, Z0, E, Z>,
onDone: (effect: Effect.Effect<R, E, Z>) => Z2,
onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
options: {
readonly onDone: (effect: Effect.Effect<R, E, Z>) => Z2
readonly onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
}
) => Z2
>(3, <R, E0, Z0, E, Z, Z2>(
>(2, <R, E0, Z0, E, Z, Z2>(
self: MergeDecision.MergeDecision<R, E0, Z0, E, Z>,
onDone: (effect: Effect.Effect<R, E, Z>) => Z2,
onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
{ onAwait, onDone }: {
readonly onDone: (effect: Effect.Effect<R, E, Z>) => Z2
readonly onAwait: (f: (exit: Exit.Exit<E0, Z0>) => Effect.Effect<R, E, Z>) => Z2
}
): Z2 => {

@@ -101,0 +107,0 @@ const op = self as Primitive

@@ -85,23 +85,25 @@ import type * as Either from "@effect/data/Either"

<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(
onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z,
onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z,
onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
options: {
readonly onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z
readonly onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z
readonly onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
}
) => (self: MergeState.MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>) => Z,
<Env, Err, Err1, Err2, Elem, Done, Done1, Done2, Z>(
self: MergeState.MergeState<Env, Err, Err1, Err2, Elem, Done, Done1, Done2>,
onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z,
onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z,
onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
options: {
readonly onBothRunning: (
left: Fiber.Fiber<Err, Either.Either<Done, Elem>>,
right: Fiber.Fiber<Err1, Either.Either<Done1, Elem>>
) => Z
readonly onLeftDone: (f: (exit: Exit.Exit<Err1, Done1>) => Effect.Effect<Env, Err2, Done2>) => Z
readonly onRightDone: (f: (exit: Exit.Exit<Err, Done>) => Effect.Effect<Env, Err2, Done2>) => Z
}
) => Z
>(4, (
>(2, (
self,
onBothRunning,
onLeftDone,
onRightDone
{ onBothRunning, onLeftDone, onRightDone }
) => {

@@ -108,0 +110,0 @@ switch (self._tag) {

@@ -46,12 +46,19 @@ import { dual } from "@effect/data/Function"

export const match = dual<
<A>(onBackPressure: () => A, onBufferSliding: () => A) => (self: MergeStrategy.MergeStrategy) => A,
<A>(options: {
readonly onBackPressure: () => A
readonly onBufferSliding: () => A
}) => (self: MergeStrategy.MergeStrategy) => A,
<A>(
self: MergeStrategy.MergeStrategy,
onBackPressure: () => A,
onBufferSliding: () => A
options: {
readonly onBackPressure: () => A
readonly onBufferSliding: () => A
}
) => A
>(3, <A>(
>(2, <A>(
self: MergeStrategy.MergeStrategy,
onBackPressure: () => A,
onBufferSliding: () => A
{ onBackPressure, onBufferSliding }: {
readonly onBackPressure: () => A
readonly onBufferSliding: () => A
}
): A => {

@@ -58,0 +65,0 @@ switch (self._tag) {

@@ -56,14 +56,20 @@ import { dual } from "@effect/data/Function"

<A, Z>(
onPulled: (value: A) => Z,
onNoUpstream: (activeDownstreamCount: number) => Z
options: {
readonly onPulled: (value: A) => Z
readonly onNoUpstream: (activeDownstreamCount: number) => Z
}
) => (self: UpstreamPullRequest.UpstreamPullRequest<A>) => Z,
<A, Z>(
self: UpstreamPullRequest.UpstreamPullRequest<A>,
onPulled: (value: A) => Z,
onNoUpstream: (activeDownstreamCount: number) => Z
options: {
readonly onPulled: (value: A) => Z
readonly onNoUpstream: (activeDownstreamCount: number) => Z
}
) => Z
>(3, <A, Z>(
>(2, <A, Z>(
self: UpstreamPullRequest.UpstreamPullRequest<A>,
onPulled: (value: A) => Z,
onNoUpstream: (activeDownstreamCount: number) => Z
{ onNoUpstream, onPulled }: {
readonly onPulled: (value: A) => Z
readonly onNoUpstream: (activeDownstreamCount: number) => Z
}
): Z => {

@@ -70,0 +76,0 @@ switch (self._tag) {

@@ -59,23 +59,29 @@ import { dual } from "@effect/data/Function"

<A, Z>(
onPullAfterNext: (emitSeparator: Option.Option<A>) => Z,
onPullAfterAllEnqueued: (emitSeparator: Option.Option<A>) => Z
options: {
readonly onNext: (emitSeparator: Option.Option<A>) => Z
readonly onAllEnqueued: (emitSeparator: Option.Option<A>) => Z
}
) => (self: UpstreamPullStrategy.UpstreamPullStrategy<A>) => Z,
<A, Z>(
self: UpstreamPullStrategy.UpstreamPullStrategy<A>,
onPullAfterNext: (emitSeparator: Option.Option<A>) => Z,
onPullAfterAllEnqueued: (emitSeparator: Option.Option<A>) => Z
options: {
readonly onNext: (emitSeparator: Option.Option<A>) => Z
readonly onAllEnqueued: (emitSeparator: Option.Option<A>) => Z
}
) => Z
>(3, <A, Z>(
>(2, <A, Z>(
self: UpstreamPullStrategy.UpstreamPullStrategy<A>,
onPullAfterNext: (emitSeparator: Option.Option<A>) => Z,
onPullAfterAllEnqueued: (emitSeparator: Option.Option<A>) => Z
{ onAllEnqueued, onNext }: {
readonly onNext: (emitSeparator: Option.Option<A>) => Z
readonly onAllEnqueued: (emitSeparator: Option.Option<A>) => Z
}
): Z => {
switch (self._tag) {
case OpCodes.OP_PULL_AFTER_NEXT: {
return onPullAfterNext(self.emitSeparator)
return onNext(self.emitSeparator)
}
case OpCodes.OP_PULL_AFTER_ALL_ENQUEUED: {
return onPullAfterAllEnqueued(self.emitSeparator)
return onAllEnqueued(self.emitSeparator)
}
}
})

@@ -9,3 +9,3 @@ import * as Chunk from "@effect/data/Chunk"

import * as Cause from "@effect/io/Cause"
import type * as Effect from "@effect/io/Effect"
import * as Effect from "@effect/io/Effect"
import type * as Exit from "@effect/io/Exit"

@@ -191,2 +191,13 @@ import type * as Channel from "@effect/stream/Channel"

/** @internal */
export const isChannel = (u: unknown): u is Channel.Channel<
unknown,
unknown,
unknown,
unknown,
unknown,
unknown,
unknown
> => (typeof u === "object" && u != null && ChannelTypeId in u) || Effect.isEffect(u)
/** @internal */
export const acquireReleaseOut = dual<

@@ -281,5 +292,5 @@ <R2, Z>(

builder: Array<OutElem>
): Channel.Channel<never, OutErr, OutElem, OutDone, OutErr, never, OutDone> => {
return readWith(
(outElem) =>
): Channel.Channel<never, OutErr, OutElem, OutDone, OutErr, never, OutDone> =>
readWith({
onInput: (outElem) =>
flatMap(

@@ -291,6 +302,5 @@ sync(() => {

),
fail,
succeedNow
)
}
onFailure: fail,
onDone: succeedNow
})

@@ -693,4 +703,8 @@ /** @internal */

>(
onError: (c: Cause.Cause<OutErr>) => Channel.Channel<Env1, InErr1, InElem1, InDone1, OutErr2, OutElem1, OutDone2>,
onSuccess: (o: OutDone) => Channel.Channel<Env2, InErr2, InElem2, InDone2, OutErr3, OutElem2, OutDone3>
options: {
readonly onFailure: (
c: Cause.Cause<OutErr>
) => Channel.Channel<Env1, InErr1, InElem1, InDone1, OutErr2, OutElem1, OutDone2>
readonly onSuccess: (o: OutDone) => Channel.Channel<Env2, InErr2, InElem2, InDone2, OutErr3, OutElem2, OutDone3>
}
) => <Env, InErr, InElem, InDone, OutElem>(

@@ -731,4 +745,8 @@ self: Channel.Channel<Env, InErr, InElem, InDone, OutErr, OutElem, OutDone>

self: Channel.Channel<Env, InErr, InElem, InDone, OutErr, OutElem, OutDone>,
onError: (c: Cause.Cause<OutErr>) => Channel.Channel<Env1, InErr1, InElem1, InDone1, OutErr2, OutElem1, OutDone2>,
onSuccess: (o: OutDone) => Channel.Channel<Env2, InErr2, InElem2, InDone2, OutErr3, OutElem2, OutDone3>
options: {
readonly onFailure: (
c: Cause.Cause<OutErr>
) => Channel.Channel<Env1, InErr1, InElem1, InDone1, OutErr2, OutElem1, OutDone2>
readonly onSuccess: (o: OutDone) => Channel.Channel<Env2, InErr2, InElem2, InDone2, OutErr3, OutElem2, OutDone3>
}
) => Channel.Channel<

@@ -744,3 +762,3 @@ Env1 | Env2 | Env,

>(
3,
2,
<

@@ -770,8 +788,8 @@ Env,

self: Channel.Channel<Env, InErr, InElem, InDone, OutErr, OutElem, OutDone>,
onError: (
c: Cause.Cause<OutErr>
) => Channel.Channel<Env1, InErr1, InElem1, InDone1, OutErr2, OutElem1, OutDone2>,
onSuccess: (
o: OutDone
) => Channel.Channel<Env2, InErr2, InElem2, InDone2, OutErr3, OutElem2, OutDone3>
options: {
readonly onFailure: (
c: Cause.Cause<OutErr>
) => Channel.Channel<Env1, InErr1, InElem1, InDone1, OutErr2, OutElem1, OutDone2>
readonly onSuccess: (o: OutDone) => Channel.Channel<Env2, InErr2, InElem2, InDone2, OutErr3, OutElem2, OutDone3>
}
): Channel.Channel<

@@ -789,3 +807,3 @@ Env | Env1 | Env2,

op.channel = self
op.k = new ContinuationKImpl(onSuccess, onError as any)
op.k = new ContinuationKImpl(options.onSuccess, options.onFailure as any)
return op

@@ -884,5 +902,7 @@ }

>(
input: (input: InElem) => Channel.Channel<Env, InErr, InElem, InDone, OutErr, OutElem, OutDone>,
error: (error: InErr) => Channel.Channel<Env2, InErr, InElem, InDone, OutErr2, OutElem2, OutDone2>,
done: (done: InDone) => Channel.Channel<Env3, InErr, InElem, InDone, OutErr3, OutElem3, OutDone3>
options: {
readonly onInput: (input: InElem) => Channel.Channel<Env, InErr, InElem, InDone, OutErr, OutElem, OutDone>
readonly onFailure: (error: InErr) => Channel.Channel<Env2, InErr, InElem, InDone, OutErr2, OutElem2, OutDone2>
readonly onDone: (done: InDone) => Channel.Channel<Env3, InErr, InElem, InDone, OutErr3, OutElem3, OutDone3>
}
): Channel.Channel<

@@ -897,7 +917,7 @@ Env | Env2 | Env3,

> =>
readWithCause(
input,
(cause) => Either.match(Cause.failureOrCause(cause), { onLeft: error, onRight: failCause }),
done
)
readWithCause({
onInput: options.onInput,
onFailure: (cause) => Either.match(Cause.failureOrCause(cause), { onLeft: options.onFailure, onRight: failCause }),
onDone: options.onDone
})

@@ -922,5 +942,9 @@ /** @internal */

>(
input: (input: InElem) => Channel.Channel<Env, InErr, InElem, InDone, OutErr, OutElem, OutDone>,
halt: (cause: Cause.Cause<InErr>) => Channel.Channel<Env2, InErr, InElem, InDone, OutErr2, OutElem2, OutDone2>,
done: (done: InDone) => Channel.Channel<Env3, InErr, InElem, InDone, OutErr3, OutElem3, OutDone3>
options: {
readonly onInput: (input: InElem) => Channel.Channel<Env, InErr, InElem, InDone, OutErr, OutElem, OutDone>
readonly onFailure: (
cause: Cause.Cause<InErr>
) => Channel.Channel<Env2, InErr, InElem, InDone, OutErr2, OutElem2, OutDone2>
readonly onDone: (done: InDone) => Channel.Channel<Env3, InErr, InElem, InDone, OutErr3, OutElem3, OutDone3>
}
): Channel.Channel<

@@ -937,4 +961,4 @@ Env | Env2 | Env3,

op._tag = OpCodes.OP_READ
op.more = input
op.done = new ContinuationKImpl(done, halt as any)
op.more = options.onInput
op.done = new ContinuationKImpl(options.onDone, options.onFailure as any)
return op

@@ -941,0 +965,0 @@ }

@@ -73,7 +73,6 @@ import * as Chunk from "@effect/data/Chunk"

): Stream.Stream<R | R2, E | E2, A> =>
stream.flatMapParBuffer(
stream.flatMap(
self.grouped,
Number.POSITIVE_INFINITY,
bufferSize,
([key, queue]) => f(key, stream.flattenTake(stream.fromQueueWithShutdown(queue)))
([key, queue]) => f(key, stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))),
{ concurrency: "unbounded", bufferSize }
)

@@ -135,21 +134,4 @@ )

<A, R2, E2, K, V>(
f: (a: A) => Effect.Effect<R2, E2, readonly [K, V]>
) => <R, E>(self: Stream.Stream<R, E, A>) => GroupBy.GroupBy<R2 | R, E2 | E, K, V>,
<R, E, A, R2, E2, K, V>(
self: Stream.Stream<R, E, A>,
f: (a: A) => Effect.Effect<R2, E2, readonly [K, V]>
) => GroupBy.GroupBy<R2 | R, E2 | E, K, V>
>(
2,
<R, E, A, R2, E2, K, V>(
self: Stream.Stream<R, E, A>,
f: (a: A) => Effect.Effect<R2, E2, readonly [K, V]>
): GroupBy.GroupBy<R | R2, E | E2, K, V> => groupByBuffer(self, f, 16)
)
/** @internal */
export const groupByBuffer = dual<
<A, R2, E2, K, V>(
f: (a: A) => Effect.Effect<R2, E2, readonly [K, V]>,
bufferSize: number
options?: { readonly bufferSize?: number }
) => <R, E>(self: Stream.Stream<R, E, A>) => GroupBy.GroupBy<R2 | R, E2 | E, K, V>,

@@ -159,10 +141,10 @@ <R, E, A, R2, E2, K, V>(

f: (a: A) => Effect.Effect<R2, E2, readonly [K, V]>,
bufferSize: number
options?: { readonly bufferSize?: number }
) => GroupBy.GroupBy<R2 | R, E2 | E, K, V>
>(
3,
(args) => typeof args[0] !== "function",
<R, E, A, R2, E2, K, V>(
self: Stream.Stream<R, E, A>,
f: (a: A) => Effect.Effect<R2, E2, readonly [K, V]>,
bufferSize: number
options?: { readonly bufferSize?: number }
): GroupBy.GroupBy<R | R2, E | E2, K, V> =>

@@ -180,3 +162,3 @@ make(

Queue.bounded<Exit.Exit<Option.Option<E | E2>, readonly [K, Queue.Dequeue<Take.Take<E | E2, V>>]>>(
bufferSize
options?.bufferSize ?? 16
),

@@ -188,5 +170,5 @@ (queue) => Queue.shutdown(queue)

pipe(
stream.mapEffect(self, f),
stream.mapEffectSequential(self, f),
stream.distributedWithDynamicCallback(
bufferSize,
options?.bufferSize ?? 16,
([key, value]) => Effect.flatMap(Deferred.await(decider), (f) => f(key, value)),

@@ -228,3 +210,3 @@ (exit) => Queue.offer(output, exit)

)
return stream.flattenExitOption(stream.fromQueueWithShutdown(output))
return stream.flattenExitOption(stream.fromQueue(output, { shutdown: true }))
})

@@ -235,2 +217,110 @@ )

/** @internal */
export const mapEffectOptions = dual<
{
<A, R2, E2, A2>(
f: (a: A) => Effect.Effect<R2, E2, A2>,
options?: {
readonly concurrency?: number | "unbounded"
readonly unordered?: boolean
}
): <R, E>(self: Stream.Stream<R, E, A>) => Stream.Stream<R2 | R, E2 | E, A2>
<A, R2, E2, A2, K>(
f: (a: A) => Effect.Effect<R2, E2, A2>,
options: {
readonly key: (a: A) => K
readonly bufferSize?: number
}
): <R, E>(self: Stream.Stream<R, E, A>) => Stream.Stream<R2 | R, E2 | E, A2>
},
{
<R, E, A, R2, E2, A2>(
self: Stream.Stream<R, E, A>,
f: (a: A) => Effect.Effect<R2, E2, A2>,
options?: {
readonly concurrency?: number | "unbounded"
readonly unordered?: boolean
}
): Stream.Stream<R2 | R, E2 | E, A2>
<R, E, A, R2, E2, A2, K>(
self: Stream.Stream<R, E, A>,
f: (a: A) => Effect.Effect<R2, E2, A2>,
options: {
readonly key: (a: A) => K
readonly bufferSize?: number
}
): Stream.Stream<R2 | R, E2 | E, A2>
}
>(
(args) => typeof args[0] !== "function",
(<R, E, A, R2, E2, A2, K>(
self: Stream.Stream<R, E, A>,
f: (a: A) => Effect.Effect<R2, E2, A2>,
options?: {
readonly key?: (a: A) => K
readonly concurrency?: number | "unbounded"
readonly unordered?: boolean
readonly bufferSize?: number
}
): Stream.Stream<R | R2, E | E2, A2> => {
if (options?.key) {
return evaluate(
groupByKey(self, options.key, { bufferSize: options.bufferSize }),
(_, s) => pipe(s, stream.mapEffectSequential(f))
)
}
return stream.matchConcurrency(
options?.concurrency,
() => stream.mapEffectSequential(self, f),
(n) =>
options?.unordered ?
stream.flatMap(self, (a) => stream.fromEffect(f(a)), { concurrency: n }) :
stream.mapEffectPar(self, n, f)
)
}) as any
)
/** @internal */
export const bindEffect = dual<
<N extends string, K, R2, E2, A>(
tag: Exclude<N, keyof K>,
f: (_: K) => Effect.Effect<R2, E2, A>,
options?: {
readonly concurrency?: number | "unbounded"
readonly bufferSize?: number
}
) => <R, E>(self: Stream.Stream<R, E, K>) => Stream.Stream<
R | R2,
E | E2,
Effect.MergeRecord<K, { [k in N]: A }>
>,
<R, E, N extends string, K, R2, E2, A>(
self: Stream.Stream<R, E, K>,
tag: Exclude<N, keyof K>,
f: (_: K) => Effect.Effect<R2, E2, A>,
options?: {
readonly concurrency?: number | "unbounded"
readonly unordered?: boolean
}
) => Stream.Stream<
R | R2,
E | E2,
Effect.MergeRecord<K, { [k in N]: A }>
>
>((args) => typeof args[0] !== "string", <R, E, N extends string, K, R2, E2, A>(
self: Stream.Stream<R, E, K>,
tag: Exclude<N, keyof K>,
f: (_: K) => Effect.Effect<R2, E2, A>,
options?: {
readonly concurrency?: number | "unbounded"
readonly unordered?: boolean
}
) =>
mapEffectOptions(self, (k) =>
Effect.map(
f(k),
(a): Effect.MergeRecord<K, { [k in N]: A }> => ({ ...k, [tag]: a } as any)
), options))
const mapDequeue = <A, B>(dequeue: Queue.Dequeue<A>, f: (a: A) => B): Queue.Dequeue<B> => new MapDequeue(dequeue, f)

@@ -312,31 +402,31 @@

export const groupByKey = dual<
<A, K>(f: (a: A) => K) => <R, E>(self: Stream.Stream<R, E, A>) => GroupBy.GroupBy<R, E, K, A>,
<R, E, A, K>(self: Stream.Stream<R, E, A>, f: (a: A) => K) => GroupBy.GroupBy<R, E, K, A>
<A, K>(
f: (a: A) => K,
options?: { readonly bufferSize?: number }
) => <R, E>(self: Stream.Stream<R, E, A>) => GroupBy.GroupBy<R, E, K, A>,
<R, E, A, K>(
self: Stream.Stream<R, E, A>,
f: (a: A) => K,
options?: { readonly bufferSize?: number }
) => GroupBy.GroupBy<R, E, K, A>
>(
2,
<R, E, A, K>(self: Stream.Stream<R, E, A>, f: (a: A) => K): GroupBy.GroupBy<R, E, K, A> =>
groupByKeyBuffer(self, f, 16)
)
/** @internal */
export const groupByKeyBuffer = dual<
<A, K>(f: (a: A) => K, bufferSize: number) => <R, E>(self: Stream.Stream<R, E, A>) => GroupBy.GroupBy<R, E, K, A>,
<R, E, A, K>(self: Stream.Stream<R, E, A>, f: (a: A) => K, bufferSize: number) => GroupBy.GroupBy<R, E, K, A>
>(3, <R, E, A, K>(self: Stream.Stream<R, E, A>, f: (a: A) => K, bufferSize: number): GroupBy.GroupBy<R, E, K, A> => {
const loop = (
map: Map<K, Queue.Queue<Take.Take<E, A>>>,
outerQueue: Queue.Queue<Take.Take<E, readonly [K, Queue.Queue<Take.Take<E, A>>]>>
): Channel.Channel<R, E, Chunk.Chunk<A>, unknown, E, never, unknown> =>
core.readWithCause(
(input: Chunk.Chunk<A>) =>
pipe(
core.fromEffect(
pipe(
input,
groupByIterable(f),
Effect.forEach(([key, values]) => {
(args) => typeof args[0] !== "function",
<R, E, A, K>(
self: Stream.Stream<R, E, A>,
f: (a: A) => K,
options?: { readonly bufferSize?: number }
): GroupBy.GroupBy<R, E, K, A> => {
const loop = (
map: Map<K, Queue.Queue<Take.Take<E, A>>>,
outerQueue: Queue.Queue<Take.Take<E, readonly [K, Queue.Queue<Take.Take<E, A>>]>>
): Channel.Channel<R, E, Chunk.Chunk<A>, unknown, E, never, unknown> =>
core.readWithCause({
onInput: (input: Chunk.Chunk<A>) =>
core.flatMap(
core.fromEffect(
Effect.forEach(groupByIterable(input, f), ([key, values]) => {
const innerQueue = map.get(key)
if (innerQueue === undefined) {
return pipe(
Queue.bounded<Take.Take<E, A>>(bufferSize),
Queue.bounded<Take.Take<E, A>>(options?.bufferSize ?? 16),
Effect.flatMap((innerQueue) =>

@@ -364,53 +454,51 @@ pipe(

}
return pipe(
return Effect.catchSomeCause(
Queue.offer(innerQueue, take.chunk(values)),
Effect.catchSomeCause((cause) =>
(cause) =>
Cause.isInterruptedOnly(cause) ?
Option.some(Effect.unit) :
Option.none()
)
)
}, { discard: true })
)
),
() => loop(map, outerQueue)
),
core.flatMap(() => loop(map, outerQueue))
),
(cause) => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))),
() =>
pipe(
core.fromEffect(
pipe(
map.entries(),
Effect.forEach(([_, innerQueue]) =>
pipe(
Queue.offer(innerQueue, take.end),
Effect.catchSomeCause((cause) =>
Cause.isInterruptedOnly(cause) ?
Option.some(Effect.unit) :
Option.none()
)
), { discard: true }),
Effect.zipRight(Queue.offer(outerQueue, take.end))
onFailure: (cause) => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))),
onDone: () =>
pipe(
core.fromEffect(
pipe(
Effect.forEach(map.entries(), ([_, innerQueue]) =>
pipe(
Queue.offer(innerQueue, take.end),
Effect.catchSomeCause((cause) =>
Cause.isInterruptedOnly(cause) ?
Option.some(Effect.unit) :
Option.none()
)
), { discard: true }),
Effect.zipRight(Queue.offer(outerQueue, take.end))
)
)
)
)
)
return make(stream.unwrapScoped(
pipe(
Effect.sync(() => new Map<K, Queue.Queue<Take.Take<E, A>>>()),
Effect.flatMap((map) =>
pipe(
Effect.acquireRelease(
Queue.unbounded<Take.Take<E, readonly [K, Queue.Queue<Take.Take<E, A>>]>>(),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((queue) =>
pipe(
self,
stream.toChannel,
core.pipeTo(loop(map, queue)),
channel.drain,
channelExecutor.runScoped,
Effect.forkScoped,
Effect.as(stream.flattenTake(stream.fromQueueWithShutdown(queue)))
})
return make(stream.unwrapScoped(
pipe(
Effect.sync(() => new Map<K, Queue.Queue<Take.Take<E, A>>>()),
Effect.flatMap((map) =>
pipe(
Effect.acquireRelease(
Queue.unbounded<Take.Take<E, readonly [K, Queue.Queue<Take.Take<E, A>>]>>(),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((queue) =>
pipe(
self,
stream.toChannel,
core.pipeTo(loop(map, queue)),
channel.drain,
channelExecutor.runScoped,
Effect.forkScoped,
Effect.as(stream.flattenTake(stream.fromQueue(queue, { shutdown: true })))
)
)

@@ -420,53 +508,6 @@ )

)
)
))
})
/** @internal */
export const mapEffectParByKey = dual<
<R2, E2, A2, A, K>(
keyBy: (a: A) => K,
f: (a: A) => Effect.Effect<R2, E2, A2>
) => <R, E>(self: Stream.Stream<R, E, A>) => Stream.Stream<R2 | R, E2 | E, A2>,
<R, E, R2, E2, A2, A, K>(
self: Stream.Stream<R, E, A>,
keyBy: (a: A) => K,
f: (a: A) => Effect.Effect<R2, E2, A2>
) => Stream.Stream<R2 | R, E2 | E, A2>
>(
3,
<R, E, R2, E2, A2, A, K>(
self: Stream.Stream<R, E, A>,
keyBy: (a: A) => K,
f: (a: A) => Effect.Effect<R2, E2, A2>
): Stream.Stream<R | R2, E | E2, A2> => mapEffectParByKeyBuffer(self, keyBy, 16, f)
))
}
)
/** @internal */
export const mapEffectParByKeyBuffer = dual<
<R2, E2, A2, A, K>(
keyBy: (a: A) => K,
bufferSize: number,
f: (a: A) => Effect.Effect<R2, E2, A2>
) => <R, E>(self: Stream.Stream<R, E, A>) => Stream.Stream<R2 | R, E2 | E, A2>,
<R, E, R2, E2, A2, A, K>(
self: Stream.Stream<R, E, A>,
keyBy: (a: A) => K,
bufferSize: number,
f: (a: A) => Effect.Effect<R2, E2, A2>
) => Stream.Stream<R2 | R, E2 | E, A2>
>(
4,
<R, E, R2, E2, A2, A, K>(
self: Stream.Stream<R, E, A>,
keyBy: (a: A) => K,
bufferSize: number,
f: (a: A) => Effect.Effect<R2, E2, A2>
): Stream.Stream<R | R2, E | E2, A2> =>
pipe(
groupByKeyBuffer(self, keyBy, bufferSize),
evaluate((_, s) => pipe(s, stream.mapEffect(f)))
)
)
/**

@@ -473,0 +514,0 @@ * A variant of `groupBy` that retains the insertion order of keys.

@@ -26,2 +26,18 @@ import { dual } from "@effect/data/Function"

/** @internal */
export const fromInput = (input: HaltStrategy.HaltStrategyInput): HaltStrategy.HaltStrategy => {
switch (input) {
case "left":
return Left
case "right":
return Right
case "both":
return Both
case "either":
return Either
default:
return input
}
}
/** @internal */
export const isLeft = (self: HaltStrategy.HaltStrategy): self is HaltStrategy.Left => self._tag === OpCodes.OP_LEFT

@@ -28,0 +44,0 @@

@@ -46,10 +46,9 @@ import { dual, pipe } from "@effect/data/Function"

Effect.flatMap((a) =>
pipe(
stream.fromHubScoped(this.hub),
Effect.map((s) =>
pipe(
Effect.map(
stream.fromHub(this.hub, { scoped: true }),
(s) =>
stream.concat(
stream.make(a),
stream.concat(s)
s
)
)
)

@@ -56,0 +55,0 @@ ),

@@ -108,17 +108,23 @@ import * as Chunk from "@effect/data/Chunk"

<Z, E, Z2, A, Z3>(
onEnd: () => Z,
onError: (cause: Cause.Cause<E>) => Z2,
onSuccess: (chunk: Chunk.Chunk<A>) => Z3
options: {
readonly onEnd: () => Z
readonly onFailure: (cause: Cause.Cause<E>) => Z2
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Z3
}
) => (self: Take.Take<E, A>) => Z | Z2 | Z3,
<Z, E, Z2, A, Z3>(
self: Take.Take<E, A>,
onEnd: () => Z,
onError: (cause: Cause.Cause<E>) => Z2,
onSuccess: (chunk: Chunk.Chunk<A>) => Z3
options: {
readonly onEnd: () => Z
readonly onFailure: (cause: Cause.Cause<E>) => Z2
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Z3
}
) => Z | Z2 | Z3
>(4, <Z, E, Z2, A, Z3>(
>(2, <Z, E, Z2, A, Z3>(
self: Take.Take<E, A>,
onEnd: () => Z,
onError: (cause: Cause.Cause<E>) => Z2,
onSuccess: (chunk: Chunk.Chunk<A>) => Z3
{ onEnd, onFailure, onSuccess }: {
readonly onEnd: () => Z
readonly onFailure: (cause: Cause.Cause<E>) => Z2
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Z3
}
): Z | Z2 | Z3 =>

@@ -129,3 +135,3 @@ Exit.match<Option.Option<E>, Chunk.Chunk<A>, Z | Z2 | Z3>(self.exit, {

onNone: onEnd,
onSome: onError
onSome: onFailure
}),

@@ -138,17 +144,23 @@ onSuccess

<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(
onEnd: () => Effect.Effect<R, E2, Z>,
onError: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>,
onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
options: {
readonly onEnd: () => Effect.Effect<R, E2, Z>
readonly onFailure: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
}
) => (self: Take.Take<E, A>) => Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>,
<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(
self: Take.Take<E, A>,
onEnd: () => Effect.Effect<R, E2, Z>,
onError: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>,
onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
options: {
readonly onEnd: () => Effect.Effect<R, E2, Z>
readonly onFailure: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
}
) => Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>
>(4, <R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(
>(2, <R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(
self: Take.Take<E, A>,
onEnd: () => Effect.Effect<R, E2, Z>,
onError: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>,
onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
{ onEnd, onFailure, onSuccess }: {
readonly onEnd: () => Effect.Effect<R, E2, Z>
readonly onFailure: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
}
): Effect.Effect<R | R2 | R3, E | E2 | E3, Z | Z2 | Z3> =>

@@ -159,3 +171,3 @@ Exit.matchEffect<Option.Option<E>, Chunk.Chunk<A>, R | R2, E | E2, Z | Z2, R3, E3, Z3>(self.exit, {

onNone: onEnd,
onSome: onError
onSome: onFailure
}),

@@ -162,0 +174,0 @@ onSuccess

@@ -236,11 +236,15 @@ /**

<Z, S>(
z: S,
p: Predicate<Z>,
f: (s: S, z: Z) => S
options: {
readonly initial: S
readonly while: Predicate<Z>
readonly body: (s: S, z: Z) => S
}
): <R, E, In, L extends In>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In, L, S>
<R, E, In, L extends In, Z, S>(
self: Sink<R, E, In, L, Z>,
z: S,
p: Predicate<Z>,
f: (s: S, z: Z) => S
options: {
readonly initial: S
readonly while: Predicate<Z>
readonly body: (s: S, z: Z) => S
}
): Sink<R, E, In, L, S>

@@ -351,6 +355,8 @@ } = internal.collectAllWhileWith as any

<In0, In, Z, Z2>(
f: (input: In0) => In,
g: (z: Z) => Z2
options: { readonly onInput: (input: In0) => In; readonly onDone: (z: Z) => Z2 }
): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In0, L, Z2>
<R, E, L, In0, In, Z, Z2>(self: Sink<R, E, In, L, Z>, f: (input: In0) => In, g: (z: Z) => Z2): Sink<R, E, In0, L, Z2>
<R, E, L, In0, In, Z, Z2>(
self: Sink<R, E, In, L, Z>,
options: { readonly onInput: (input: In0) => In; readonly onDone: (z: Z) => Z2 }
): Sink<R, E, In0, L, Z2>
} = internal.dimap

@@ -367,9 +373,13 @@

<In0, R2, E2, In, Z, R3, E3, Z2>(
f: (input: In0) => Effect.Effect<R2, E2, In>,
g: (z: Z) => Effect.Effect<R3, E3, Z2>
options: {
readonly onInput: (input: In0) => Effect.Effect<R2, E2, In>
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>
}
): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R3 | R, E2 | E3 | E, In0, L, Z2>
<R, E, L, In0, R2, E2, In, Z, R3, E3, Z2>(
self: Sink<R, E, In, L, Z>,
f: (input: In0) => Effect.Effect<R2, E2, In>,
g: (z: Z) => Effect.Effect<R3, E3, Z2>
options: {
readonly onInput: (input: In0) => Effect.Effect<R2, E2, In>
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>
}
): Sink<R | R2 | R3, E | E2 | E3, In0, L, Z2>

@@ -387,9 +397,7 @@ } = internal.dimapEffect

<In0, In, Z, Z2>(
f: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>,
g: (z: Z) => Z2
options: { readonly onInput: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>; readonly onDone: (z: Z) => Z2 }
): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R, E, In0, L, Z2>
<R, E, L, In0, In, Z, Z2>(
self: Sink<R, E, In, L, Z>,
f: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>,
g: (z: Z) => Z2
options: { readonly onInput: (chunk: Chunk.Chunk<In0>) => Chunk.Chunk<In>; readonly onDone: (z: Z) => Z2 }
): Sink<R, E, In0, L, Z2>

@@ -407,9 +415,13 @@ } = internal.dimapChunks

<In0, R2, E2, In, Z, R3, E3, Z2>(
f: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>,
g: (z: Z) => Effect.Effect<R3, E3, Z2>
options: {
readonly onInput: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>
}
): <R, E, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R3 | R, E2 | E3 | E, In0, L, Z2>
<R, E, L, In0, R2, E2, In, Z, R3, E3, Z2>(
self: Sink<R, E, In, L, Z>,
f: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>,
g: (z: Z) => Effect.Effect<R3, E3, Z2>
options: {
readonly onInput: (chunk: Chunk.Chunk<In0>) => Effect.Effect<R2, E2, Chunk.Chunk<In>>
readonly onDone: (z: Z) => Effect.Effect<R3, E3, Z2>
}
): Sink<R | R2 | R3, E | E2 | E3, In0, L, Z2>

@@ -644,9 +656,13 @@ } = internal.dimapChunksEffect

<R1, R2, E, E1, E2, In, In1 extends In, In2 extends In, L, L1, L2, Z, Z1, Z2>(
onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>,
onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>
options: {
readonly onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>
readonly onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>
}
): <R>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R2 | R, E1 | E2, In1 & In2, L1 | L2, Z1 | Z2>
<R, R1, R2, E, E1, E2, In, In1 extends In, In2 extends In, L, L1, L2, Z, Z1, Z2>(
self: Sink<R, E, In, L, Z>,
onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>,
onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>
options: {
readonly onFailure: (err: E) => Sink<R1, E1, In1, L1, Z1>
readonly onSuccess: (z: Z) => Sink<R2, E2, In2, L2, Z2>
}
): Sink<R | R1 | R2, E1 | E2, In1 & In2, L1 | L2, Z1 | Z2>

@@ -781,6 +797,8 @@ } = internal.foldSink

export const foldWeighted: <S, In>(
s: S,
max: number,
costFn: (s: S, input: In) => number,
f: (s: S, input: In) => S
options: {
readonly initial: S
readonly maxCost: number
readonly cost: (s: S, input: In) => number
readonly body: (s: S, input: In) => S
}
) => Sink<never, never, In, In, S> = internal.foldWeighted

@@ -826,7 +844,9 @@

export const foldWeightedDecompose: <S, In>(
s: S,
max: number,
costFn: (s: S, input: In) => number,
decompose: (input: In) => Chunk.Chunk<In>,
f: (s: S, input: In) => S
options: {
readonly initial: S
readonly maxCost: number
readonly cost: (s: S, input: In) => number
readonly decompose: (input: In) => Chunk.Chunk<In>
readonly body: (s: S, input: In) => S
}
) => Sink<never, never, In, In, S> = internal.foldWeightedDecompose

@@ -852,7 +872,9 @@

export const foldWeightedDecomposeEffect: <S, In, R, E, R2, E2, R3, E3>(
s: S,
max: number,
costFn: (s: S, input: In) => Effect.Effect<R, E, number>,
decompose: (input: In) => Effect.Effect<R2, E2, Chunk.Chunk<In>>,
f: (s: S, input: In) => Effect.Effect<R3, E3, S>
options: {
readonly initial: S
readonly maxCost: number
readonly cost: (s: S, input: In) => Effect.Effect<R, E, number>
readonly decompose: (input: In) => Effect.Effect<R2, E2, Chunk.Chunk<In>>
readonly body: (s: S, input: In) => Effect.Effect<R3, E3, S>
}
) => Sink<R | R2 | R3, E | E2 | E3, In, In, S> = internal.foldWeightedDecomposeEffect

@@ -874,6 +896,8 @@

export const foldWeightedEffect: <S, In, R, E, R2, E2>(
s: S,
max: number,
costFn: (s: S, input: In) => Effect.Effect<R, E, number>,
f: (s: S, input: In) => Effect.Effect<R2, E2, S>
options: {
readonly initial: S
readonly maxCost: number
readonly cost: (s: S, input: In) => Effect.Effect<R, E, number>
readonly body: (s: S, input: In) => Effect.Effect<R2, E2, S>
}
) => Sink<R | R2, E | E2, In, In, S> = internal.foldWeightedEffect

@@ -978,15 +1002,8 @@

*/
export const fromHub: <In>(hub: Hub.Hub<In>) => Sink<never, never, In, never, void> = internal.fromHub
export const fromHub: <In>(
hub: Hub.Hub<In>,
options?: { readonly shutdown?: boolean }
) => Sink<never, never, In, never, void> = internal.fromHub
/**
* Create a sink which publishes each element to the specified hub. The hub
* will be shutdown once the stream is closed.
*
* @since 1.0.0
* @category constructors
*/
export const fromHubWithShutdown: <In>(hub: Hub.Hub<In>) => Sink<never, never, In, never, void> =
internal.fromHubWithShutdown
/**
* Creates a sink from a chunk processing function.

@@ -1011,15 +1028,8 @@ *

*/
export const fromQueue: <In>(queue: Queue.Enqueue<In>) => Sink<never, never, In, never, void> = internal.fromQueue
export const fromQueue: <In>(
queue: Queue.Enqueue<In>,
options?: { readonly shutdown?: boolean }
) => Sink<never, never, In, never, void> = internal.fromQueue
/**
* Create a sink which enqueues each element into the specified queue. The
* queue will be shutdown once the stream is closed.
*
* @since 1.0.0
* @category constructors
*/
export const fromQueueWithShutdown: <In>(queue: Queue.Enqueue<In>) => Sink<never, never, In, never, void> =
internal.fromQueueWithShutdown
/**
* Creates a sink containing the first value.

@@ -1177,20 +1187,4 @@ *

<R1, E1, In1, L1, Z1>(
that: Sink<R1, E1, In1, L1, Z1>
): <R, E, In, L, Z>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R, E1 | E, In & In1, L1 | L, Either.Either<Z, Z1>>
<R, E, In, L, Z, R1, E1, In1, L1, Z1>(
self: Sink<R, E, In, L, Z>,
that: Sink<R1, E1, In1, L1, Z1>
): Sink<R | R1, E | E1, In & In1, L | L1, Either.Either<Z, Z1>>
} = internal.raceBoth
/**
* Like `raceBoth`, but with a configurable `capacity` parameter.
*
* @since 1.0.0
* @category utils
*/
export const raceBothCapacity: {
<R1, E1, In1, L1, Z1>(
that: Sink<R1, E1, In1, L1, Z1>,
capacity: number
options?: { readonly capacity?: number }
): <R, E, In, L, Z>(self: Sink<R, E, In, L, Z>) => Sink<R1 | R, E1 | E, In & In1, L1 | L, Either.Either<Z, Z1>>

@@ -1200,5 +1194,5 @@ <R, E, In, L, Z, R1, E1, In1, L1, Z1>(

that: Sink<R1, E1, In1, L1, Z1>,
capacity: number
options?: { readonly capacity?: number }
): Sink<R | R1, E | E1, In & In1, L | L1, Either.Either<Z, Z1>>
} = internal.raceBothCapacity
} = internal.raceBoth

@@ -1214,11 +1208,17 @@ /**

<R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(
that: Sink<R2, E2, In2, L2, Z2>,
leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>,
rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>
options: {
readonly other: Sink<R2, E2, In2, L2, Z2>
readonly onSelfDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>
readonly onOtherDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>
readonly capacity?: number
}
): <R, In, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L2 | L, Z3 | Z4>
<R, In, L, R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>,
leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>,
rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>
options: {
readonly other: Sink<R2, E2, In2, L2, Z2>
readonly onSelfDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>
readonly onOtherDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>
readonly capacity?: number
}
): Sink<R | R2, E2 | E, In & In2, L | L2, Z3 | Z4>

@@ -1228,25 +1228,3 @@ } = internal.raceWith

/**
* Like `raceWith`, but with a configurable `capacity` parameter.
*
* @since 1.0.0
* @category utils
*/
export const raceWithCapacity: {
<R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(
that: Sink<R2, E2, In2, L2, Z2>,
leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>,
rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>,
capacity: number
): <R, In, L>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L2 | L, Z3 | Z4>
<R, In, L, R2, E2, In2, L2, Z2, E, Z, Z3, Z4>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>,
leftDone: (exit: Exit.Exit<E, Z>) => MergeDecision.MergeDecision<R2, E2, Z2, E2 | E, Z3>,
rightDone: (exit: Exit.Exit<E2, Z2>) => MergeDecision.MergeDecision<R2, E, Z, E2 | E, Z4>,
capacity: number
): Sink<R | R2, E2 | E, In & In2, L | L2, Z3 | Z4>
} = internal.raceWithCapacity
/**
* @since 1.0.0
* @category error handling

@@ -1396,7 +1374,9 @@ */

<R2, E2, In, In2 extends In, L, L2, Z, Z2>(
that: Sink<R2, E2, In2, L2, Z2>
that: Sink<R2, E2, In2, L2, Z2>,
options?: { readonly concurrent?: boolean }
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, readonly [Z, Z2]>
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>
that: Sink<R2, E2, In2, L2, Z2>,
options?: { readonly concurrent?: boolean }
): Sink<R | R2, E | E2, In & In2, L | L2, readonly [Z, Z2]>

@@ -1413,7 +1393,9 @@ } = internal.zip

<R2, E2, In, In2 extends In, L, L2, Z, Z2>(
that: Sink<R2, E2, In2, L2, Z2>
that: Sink<R2, E2, In2, L2, Z2>,
options?: { readonly concurrent?: boolean }
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z>
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>
that: Sink<R2, E2, In2, L2, Z2>,
options?: { readonly concurrent?: boolean }
): Sink<R | R2, E | E2, In & In2, L | L2, Z>

@@ -1430,7 +1412,9 @@ } = internal.zipLeft

<R2, E2, In, In2 extends In, L, L2, Z, Z2>(
that: Sink<R2, E2, In2, L2, Z2>
that: Sink<R2, E2, In2, L2, Z2>,
options?: { readonly concurrent?: boolean }
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z2>
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>
that: Sink<R2, E2, In2, L2, Z2>,
options?: { readonly concurrent?: boolean }
): Sink<R | R2, E | E2, In & In2, L | L2, Z2>

@@ -1450,3 +1434,4 @@ } = internal.zipRight

that: Sink<R2, E2, In2, L2, Z2>,
f: (z: Z, z1: Z2) => Z3
f: (z: Z, z1: Z2) => Z3,
options?: { readonly concurrent?: boolean }
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z3>

@@ -1456,72 +1441,5 @@ <R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(

that: Sink<R2, E2, In2, L2, Z2>,
f: (z: Z, z1: Z2) => Z3
f: (z: Z, z1: Z2) => Z3,
options?: { readonly concurrent?: boolean }
): Sink<R | R2, E | E2, In & In2, L | L2, Z3>
} = internal.zipWith
/**
* Runs both sinks in parallel on the input and combines the results in a
* tuple.
*
* @since 1.0.0
* @category zipping
*/
export const zipPar: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(
that: Sink<R2, E2, In2, L2, Z2>
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, readonly [Z, Z2]>
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>
): Sink<R | R2, E | E2, In & In2, L | L2, readonly [Z, Z2]>
} = internal.zipPar
/**
* Like `Sink.zipPar` but keeps only the result from this sink.
*
* @since 1.0.0
* @category zipping
*/
export const zipParLeft: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(
that: Sink<R2, E2, In2, L2, Z2>
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z>
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>
): Sink<R | R2, E | E2, In & In2, L | L2, Z>
} = internal.zipParLeft
/**
* Like `Sink.zipPar` but keeps only the result from `that` sink.
*
* @since 1.0.0
* @category zipping
*/
export const zipParRight: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2>(
that: Sink<R2, E2, In2, L2, Z2>
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z2>
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>
): Sink<R | R2, E | E2, In & In2, L | L2, Z2>
} = internal.zipParRight
/**
* Runs both sinks in parallel on the input and combines the results using the
* provided function.
*
* @since 1.0.0
* @category zipping
*/
export const zipWithPar: {
<R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(
that: Sink<R2, E2, In2, L2, Z2>,
f: (z: Z, z1: Z2) => Z3
): <R, E>(self: Sink<R, E, In, L, Z>) => Sink<R2 | R, E2 | E, In & In2, L | L2, Z3>
<R, E, R2, E2, In, In2 extends In, L, L2, Z, Z2, Z3>(
self: Sink<R, E, In, L, Z>,
that: Sink<R2, E2, In2, L2, Z2>,
f: (z: Z, z1: Z2) => Z3
): Sink<R | R2, E | E2, In & In2, L | L2, Z3>
} = internal.zipWithPar

@@ -16,2 +16,8 @@ /**

*/
export type HaltStrategyInput = HaltStrategy | "left" | "right" | "both" | "either"
/**
* @since 1.0.0
* @category models
*/
export interface Left {

@@ -71,2 +77,8 @@ readonly _tag: "Left"

* @since 1.0.0
* @category constructors
*/
export const fromInput: (input: HaltStrategyInput) => HaltStrategy = internal.fromInput
/**
* @since 1.0.0
* @category refinements

@@ -73,0 +85,0 @@ */

@@ -192,11 +192,15 @@ /**

<Z, E, Z2, A, Z3>(
onEnd: () => Z,
onError: (cause: Cause.Cause<E>) => Z2,
onSuccess: (chunk: Chunk.Chunk<A>) => Z3
options: {
readonly onEnd: () => Z
readonly onFailure: (cause: Cause.Cause<E>) => Z2
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Z3
}
): (self: Take<E, A>) => Z | Z2 | Z3
<Z, E, Z2, A, Z3>(
self: Take<E, A>,
onEnd: () => Z,
onError: (cause: Cause.Cause<E>) => Z2,
onSuccess: (chunk: Chunk.Chunk<A>) => Z3
options: {
readonly onEnd: () => Z
readonly onFailure: (cause: Cause.Cause<E>) => Z2
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Z3
}
): Z | Z2 | Z3

@@ -216,11 +220,15 @@ } = internal.match

<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(
onEnd: () => Effect.Effect<R, E2, Z>,
onError: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>,
onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
options: {
readonly onEnd: () => Effect.Effect<R, E2, Z>
readonly onFailure: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
}
): (self: Take<E, A>) => Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>
<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(
self: Take<E, A>,
onEnd: () => Effect.Effect<R, E2, Z>,
onError: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>,
onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
options: {
readonly onEnd: () => Effect.Effect<R, E2, Z>
readonly onFailure: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>
}
): Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>

@@ -227,0 +235,0 @@ } = internal.matchEffect

@@ -10,2 +10,7 @@ /**

*/
export type HaltStrategyInput = HaltStrategy | "left" | "right" | "both" | "either";
/**
* @since 1.0.0
* @category models
*/
export interface Left {

@@ -57,2 +62,7 @@ readonly _tag: "Left";

* @since 1.0.0
* @category constructors
*/
export declare const fromInput: (input: HaltStrategyInput) => HaltStrategy;
/**
* @since 1.0.0
* @category refinements

@@ -59,0 +69,0 @@ */

@@ -6,3 +6,3 @@ "use strict";

});
exports.match = exports.isRight = exports.isLeft = exports.isEither = exports.isBoth = exports.Right = exports.Left = exports.Either = exports.Both = void 0;
exports.match = exports.isRight = exports.isLeft = exports.isEither = exports.isBoth = exports.fromInput = exports.Right = exports.Left = exports.Either = exports.Both = void 0;
var internal = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/stream/internal/stream/haltStrategy"));

@@ -40,5 +40,11 @@ function _getRequireWildcardCache(nodeInterop) { if (typeof WeakMap !== "function") return null; var cacheBabelInterop = new WeakMap(); var cacheNodeInterop = new WeakMap(); return (_getRequireWildcardCache = function (nodeInterop) { return nodeInterop ? cacheNodeInterop : cacheBabelInterop; })(nodeInterop); }

* @since 1.0.0
* @category constructors
*/
exports.Either = Either;
const fromInput = internal.fromInput;
/**
* @since 1.0.0
* @category refinements
*/
exports.Either = Either;
exports.fromInput = fromInput;
const isLeft = internal.isLeft;

@@ -45,0 +51,0 @@ /**

@@ -165,4 +165,12 @@ /**

export declare const match: {
<Z, E, Z2, A, Z3>(onEnd: () => Z, onError: (cause: Cause.Cause<E>) => Z2, onSuccess: (chunk: Chunk.Chunk<A>) => Z3): (self: Take<E, A>) => Z | Z2 | Z3;
<Z, E, Z2, A, Z3>(self: Take<E, A>, onEnd: () => Z, onError: (cause: Cause.Cause<E>) => Z2, onSuccess: (chunk: Chunk.Chunk<A>) => Z3): Z | Z2 | Z3;
<Z, E, Z2, A, Z3>(options: {
readonly onEnd: () => Z;
readonly onFailure: (cause: Cause.Cause<E>) => Z2;
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Z3;
}): (self: Take<E, A>) => Z | Z2 | Z3;
<Z, E, Z2, A, Z3>(self: Take<E, A>, options: {
readonly onEnd: () => Z;
readonly onFailure: (cause: Cause.Cause<E>) => Z2;
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Z3;
}): Z | Z2 | Z3;
};

@@ -179,4 +187,12 @@ /**

export declare const matchEffect: {
<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(onEnd: () => Effect.Effect<R, E2, Z>, onError: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>, onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>): (self: Take<E, A>) => Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>;
<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(self: Take<E, A>, onEnd: () => Effect.Effect<R, E2, Z>, onError: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>, onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>): Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>;
<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(options: {
readonly onEnd: () => Effect.Effect<R, E2, Z>;
readonly onFailure: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>;
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>;
}): (self: Take<E, A>) => Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>;
<R, E2, Z, R2, E, Z2, A, R3, E3, Z3>(self: Take<E, A>, options: {
readonly onEnd: () => Effect.Effect<R, E2, Z>;
readonly onFailure: (cause: Cause.Cause<E>) => Effect.Effect<R2, E2, Z2>;
readonly onSuccess: (chunk: Chunk.Chunk<A>) => Effect.Effect<R3, E3, Z3>;
}): Effect.Effect<R | R2 | R3, E2 | E | E3, Z | Z2 | Z3>;
};

@@ -183,0 +199,0 @@ /**

@@ -9,3 +9,2 @@ {

"vitest/globals",
"node"
],

@@ -12,0 +11,0 @@ "paths": {

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc