Socket
Socket
Sign inDemoInstall

@effect/stream

Package Overview
Dependencies
Maintainers
3
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effect/stream - npm Package Compare versions

Comparing version 0.25.0 to 0.25.1

3

GroupBy.d.ts
/**
* @since 1.0.0
*/
import type { Pipeable } from "@effect/data/Pipeable";
import type { Predicate } from "@effect/data/Predicate";

@@ -26,3 +27,3 @@ import type * as Queue from "@effect/io/Queue";

*/
export interface GroupBy<R, E, K, V> extends GroupBy.Variance<R, E, K, V> {
export interface GroupBy<R, E, K, V> extends GroupBy.Variance<R, E, K, V>, Pipeable<GroupBy<R, E, K, V>> {
readonly grouped: Stream.Stream<R, E, readonly [K, Queue.Dequeue<Take.Take<E, V>>]>;

@@ -29,0 +30,0 @@ }

@@ -238,6 +238,3 @@ "use strict";

})(Effect.gen(function* ($) {
const queue = yield* $(Effect.acquireRelease({
acquire: Queue.bounded(n),
release: queue => Queue.shutdown(queue)
}));
const queue = yield* $(Effect.acquireRelease(Queue.bounded(n), queue => Queue.shutdown(queue)));
const errorSignal = yield* $(Deferred.make());

@@ -291,10 +288,4 @@ const withPermits = n === Number.POSITIVE_INFINITY ? _ => _Function.identity : (yield* $(Effect.makeSemaphore(n))).withPermits;

const queueReader = fromInput(input);
const queue = yield* $(Effect.acquireRelease({
acquire: Queue.bounded(bufferSize),
release: queue => Queue.shutdown(queue)
}));
const cancelers = yield* $(Effect.acquireRelease({
acquire: Queue.unbounded(),
release: queue => Queue.shutdown(queue)
}));
const queue = yield* $(Effect.acquireRelease(Queue.bounded(bufferSize), queue => Queue.shutdown(queue)));
const cancelers = yield* $(Effect.acquireRelease(Queue.unbounded(), queue => Queue.shutdown(queue)));
const lastDone = yield* $(Ref.make(Option.none()));

@@ -509,8 +500,5 @@ const errorSignal = yield* $(Deferred.make());

exports.toHub = toHub;
const toPull = self => Effect.map(Effect.acquireRelease({
acquire: Effect.sync(() => new executor.ChannelExecutor(self, void 0, _Function.identity)),
release: (exec, exit) => {
const finalize = exec.close(exit);
return finalize === undefined ? Effect.unit : finalize;
}
const toPull = self => Effect.map(Effect.acquireRelease(Effect.sync(() => new executor.ChannelExecutor(self, void 0, _Function.identity)), (exec, exit) => {
const finalize = exec.close(exit);
return finalize === undefined ? Effect.unit : finalize;
}), exec => Effect.suspend(() => interpretToPull(exec.run(), exec)));

@@ -517,0 +505,0 @@ /** @internal */

@@ -663,12 +663,8 @@ "use strict";

const runScoped = self => {
const run = (channelDeferred, scopeDeferred, scope) => Effect.acquireUseRelease({
acquire: Effect.sync(() => new ChannelExecutor(self, void 0, _Function.identity)),
use: exec => Effect.suspend(() => Effect.zipLeft(Deferred.await(scopeDeferred))(Effect.zipRight(Deferred.await(channelDeferred))(Effect.intoDeferred(channelDeferred)(runScopedInterpret(exec.run(), exec))))),
release: (exec, exit) => {
const finalize = exec.close(exit);
if (finalize === undefined) {
return Effect.unit;
}
return Effect.tapErrorCause(finalize, cause => Scope.addFinalizer(scope, Effect.failCause(cause)));
const run = (channelDeferred, scopeDeferred, scope) => Effect.acquireUseRelease(Effect.sync(() => new ChannelExecutor(self, void 0, _Function.identity)), exec => Effect.suspend(() => Effect.zipLeft(Deferred.await(scopeDeferred))(Effect.zipRight(Deferred.await(channelDeferred))(Effect.intoDeferred(channelDeferred)(runScopedInterpret(exec.run(), exec))))), (exec, exit) => {
const finalize = exec.close(exit);
if (finalize === undefined) {
return Effect.unit;
}
return Effect.tapErrorCause(finalize, cause => Scope.addFinalizer(scope, Effect.failCause(cause)));
});

@@ -675,0 +671,0 @@ return Effect.uninterruptibleMask(restore => Effect.flatMap(Effect.scope, parent => Effect.flatMap(([child, channelDeferred, scopeDeferred]) => Effect.flatMap(fiber => Effect.zipLeft(Fiber.inheritAll(fiber))(Effect.zipRight(restore(Deferred.await(channelDeferred)))(Effect.addFinalizer(() => Deferred.succeed(scopeDeferred, void 0)))))(Effect.forkScoped(restore(run(channelDeferred, scopeDeferred, child)))))(Effect.all(Scope.fork(parent, ExecutionStrategy.sequential), Deferred.make(), Deferred.make()))));

@@ -11,2 +11,3 @@ "use strict";

var Option = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Option"));
var _Pipeable = /*#__PURE__*/require("@effect/data/Pipeable");
var Cause = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Cause"));

@@ -36,3 +37,6 @@ var childExecutorDecision = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/stream/internal/channel/childExecutorDecision"));

const proto = {
[ChannelTypeId]: channelVariance
[ChannelTypeId]: channelVariance,
pipe() {
return (0, _Pipeable.pipeArguments)(this, arguments);
}
};

@@ -39,0 +43,0 @@ /** @internal */

@@ -10,2 +10,3 @@ "use strict";

var Option = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Option"));
var _Pipeable = /*#__PURE__*/require("@effect/data/Pipeable");
var Cause = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Cause"));

@@ -64,2 +65,5 @@ var Deferred = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Deferred"));

[GroupByTypeId]: groupByVariance,
pipe() {
return (0, _Pipeable.pipeArguments)(this, arguments);
},
grouped

@@ -75,6 +79,3 @@ });

const decider = yield* $(Deferred.make());
const output = yield* $(Effect.acquireRelease({
acquire: Queue.bounded(bufferSize),
release: queue => Queue.shutdown(queue)
}));
const output = yield* $(Effect.acquireRelease(Queue.bounded(bufferSize), queue => Queue.shutdown(queue)));
const ref = yield* $(Ref.make(new Map()));

@@ -140,2 +141,5 @@ const add = yield* $(stream.distributedWithDynamicCallback(bufferSize, ([key, value]) => Effect.flatMap(Deferred.await(decider), f => f(key, value)), exit => Queue.offer(output, exit))(stream.mapEffect(self, f)));

}
pipe() {
return (0, _Pipeable.pipeArguments)(this, arguments);
}
}

@@ -161,6 +165,3 @@ _a = Queue.DequeueTypeId;

})(map.entries()))));
return make(stream.unwrapScoped(Effect.flatMap(map => Effect.flatMap(queue => Effect.as(stream.flattenTake(stream.fromQueueWithShutdown(queue)))(Effect.forkScoped(channelExecutor.runScoped(channel.drain(core.pipeTo(loop(map, queue))(stream.toChannel(self)))))))(Effect.acquireRelease({
acquire: Queue.unbounded(),
release: queue => Queue.shutdown(queue)
})))(Effect.sync(() => new Map()))));
return make(stream.unwrapScoped(Effect.flatMap(map => Effect.flatMap(queue => Effect.as(stream.flattenTake(stream.fromQueueWithShutdown(queue)))(Effect.forkScoped(channelExecutor.runScoped(channel.drain(core.pipeTo(loop(map, queue))(stream.toChannel(self)))))))(Effect.acquireRelease(Queue.unbounded(), queue => Queue.shutdown(queue))))(Effect.sync(() => new Map()))));
});

@@ -167,0 +168,0 @@ /** @internal */

@@ -15,2 +15,3 @@ "use strict";

var Option = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Option"));
var _Pipeable = /*#__PURE__*/require("@effect/data/Pipeable");
var ReadonlyArray = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/ReadonlyArray"));

@@ -49,2 +50,5 @@ var Cause = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Cause"));

}
pipe() {
return (0, _Pipeable.pipeArguments)(this, arguments);
}
}

@@ -560,6 +564,3 @@ exports.SinkImpl = SinkImpl;

exports.fromQueue = fromQueue;
const fromQueueWithShutdown = queue => unwrapScoped(Effect.map(fromQueue)(Effect.acquireRelease({
acquire: Effect.succeed(queue),
release: Queue.shutdown
})));
const fromQueueWithShutdown = queue => unwrapScoped(Effect.map(fromQueue)(Effect.acquireRelease(Effect.succeed(queue), Queue.shutdown)));
/** @internal */

@@ -566,0 +567,0 @@ exports.fromQueueWithShutdown = fromQueueWithShutdown;

@@ -8,2 +8,3 @@ "use strict";

var _Function = /*#__PURE__*/require("@effect/data/Function");
var _Pipeable = /*#__PURE__*/require("@effect/data/Pipeable");
var Effect = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Effect"));

@@ -40,2 +41,5 @@ var Hub = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Hub"));

}
pipe() {
return (0, _Pipeable.pipeArguments)(this, arguments);
}
get changes() {

@@ -42,0 +46,0 @@ return stream.unwrapScoped(this.semaphore.withPermits(1)(Effect.flatMap(a => Effect.map(s => stream.concat(s)(stream.make(a)))(stream.fromHubScoped(this.hub)))(Ref.get(this.ref))));

@@ -10,2 +10,3 @@ "use strict";

var Option = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/data/Option"));
var _Pipeable = /*#__PURE__*/require("@effect/data/Pipeable");
var Cause = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Cause"));

@@ -33,2 +34,5 @@ var Effect = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/io/Effect"));

}
pipe() {
return (0, _Pipeable.pipeArguments)(this, arguments);
}
}

@@ -35,0 +39,0 @@ exports.TakeImpl = TakeImpl;

{
"name": "@effect/stream",
"version": "0.25.0",
"version": "0.25.1",
"license": "MIT",

@@ -10,4 +10,4 @@ "repository": {

"dependencies": {
"@effect/data": "~0.13.0",
"@effect/io": "~0.31.1"
"@effect/data": "~0.13.5",
"@effect/io": "~0.31.3"
},

@@ -14,0 +14,0 @@ "publishConfig": {

@@ -12,2 +12,3 @@ /**

import type * as Option from "@effect/data/Option";
import type { Pipeable } from "@effect/data/Pipeable";
import type { Predicate, Refinement } from "@effect/data/Predicate";

@@ -43,3 +44,3 @@ import type * as Unify from "@effect/data/Unify";

*/
export interface Sink<R, E, In, L, Z> extends Sink.Variance<R, E, In, L, Z> {
export interface Sink<R, E, In, L, Z> extends Sink.Variance<R, E, In, L, Z>, Pipeable<Sink<R, E, In, L, Z>> {
}

@@ -67,3 +68,3 @@ /**

declare module "@effect/io/Effect" {
interface Effect<R, E, A> extends Sink<R, E, unknown, never, A> {
interface Effect<R, E, A> extends Omit<Sink<R, E, unknown, never, A>, "pipe"> {
}

@@ -70,0 +71,0 @@ interface EffectUnifyBlacklist {

/**
* @since 1.0.0
*/
import type { Pipeable } from "@effect/data/Pipeable"
import type { Predicate } from "@effect/data/Predicate"

@@ -30,3 +31,3 @@ import type * as Queue from "@effect/io/Queue"

*/
export interface GroupBy<R, E, K, V> extends GroupBy.Variance<R, E, K, V> {
export interface GroupBy<R, E, K, V> extends GroupBy.Variance<R, E, K, V>, Pipeable<GroupBy<R, E, K, V>> {
readonly grouped: Stream.Stream<R, E, readonly [K, Queue.Dequeue<Take.Take<E, V>>]>

@@ -33,0 +34,0 @@ }

@@ -1116,5 +1116,5 @@ import type * as Context from "@effect/data/Context"

) =>
Effect.acquireUseRelease({
acquire: Effect.sync(() => new ChannelExecutor(self, void 0, identity)),
use: (exec) =>
Effect.acquireUseRelease(
Effect.sync(() => new ChannelExecutor(self, void 0, identity)),
(exec) =>
Effect.suspend(() =>

@@ -1128,3 +1128,3 @@ pipe(

),
release: (exec, exit) => {
(exec, exit) => {
const finalize = exec.close(exit)

@@ -1139,3 +1139,3 @@ if (finalize === undefined) {

}
})
)
return Effect.uninterruptibleMask((restore) =>

@@ -1142,0 +1142,0 @@ Effect.flatMap(Effect.scope, (parent) =>

@@ -7,2 +7,3 @@ import * as Chunk from "@effect/data/Chunk"

import * as Option from "@effect/data/Option"
import { pipeArguments } from "@effect/data/Pipeable"
import * as Cause from "@effect/io/Cause"

@@ -43,3 +44,6 @@ import type * as Effect from "@effect/io/Effect"

const proto = {
[ChannelTypeId]: channelVariance
[ChannelTypeId]: channelVariance,
pipe() {
return pipeArguments(this, arguments)
}
}

@@ -46,0 +50,0 @@

import * as Chunk from "@effect/data/Chunk"
import { dual, pipe } from "@effect/data/Function"
import * as Option from "@effect/data/Option"
import { pipeArguments } from "@effect/data/Pipeable"
import type { Predicate } from "@effect/data/Predicate"

@@ -122,2 +123,5 @@ import * as Cause from "@effect/io/Cause"

[GroupByTypeId]: groupByVariance,
pipe() {
return pipeArguments(this, arguments)
},
grouped

@@ -172,8 +176,8 @@ })

)
const output = yield* $(Effect.acquireRelease({
acquire: Queue.bounded<Exit.Exit<Option.Option<E | E2>, readonly [K, Queue.Dequeue<Take.Take<E | E2, V>>]>>(
const output = yield* $(Effect.acquireRelease(
Queue.bounded<Exit.Exit<Option.Option<E | E2>, readonly [K, Queue.Dequeue<Take.Take<E | E2, V>>]>>(
bufferSize
),
release: (queue) => Queue.shutdown(queue)
}))
(queue) => Queue.shutdown(queue)
))
const ref = yield* $(Ref.make<Map<K, number>>(new Map()))

@@ -295,2 +299,6 @@ const add = yield* $(

}
pipe() {
return pipeArguments(this, arguments)
}
}

@@ -389,6 +397,6 @@

pipe(
Effect.acquireRelease({
acquire: Queue.unbounded<Take.Take<E, readonly [K, Queue.Queue<Take.Take<E, A>>]>>(),
release: (queue) => Queue.shutdown(queue)
}),
Effect.acquireRelease(
Queue.unbounded<Take.Take<E, readonly [K, Queue.Queue<Take.Take<E, A>>]>>(),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((queue) =>

@@ -395,0 +403,0 @@ pipe(

import { dual, pipe } from "@effect/data/Function"
import { pipeArguments } from "@effect/data/Pipeable"
import * as Effect from "@effect/io/Effect"

@@ -38,2 +39,5 @@ import * as Hub from "@effect/io/Hub"

}
pipe() {
return pipeArguments(this, arguments)
}
get changes(): Stream<never, never, A> {

@@ -40,0 +44,0 @@ return pipe(

import * as Chunk from "@effect/data/Chunk"
import { constFalse, constTrue, dual, pipe } from "@effect/data/Function"
import * as Option from "@effect/data/Option"
import { pipeArguments } from "@effect/data/Pipeable"
import * as Cause from "@effect/io/Cause"

@@ -28,2 +29,5 @@ import * as Effect from "@effect/io/Effect"

}
pipe() {
return pipeArguments(this, arguments)
}
}

@@ -30,0 +34,0 @@

@@ -12,2 +12,3 @@ /**

import type * as Option from "@effect/data/Option"
import type { Pipeable } from "@effect/data/Pipeable"
import type { Predicate, Refinement } from "@effect/data/Predicate"

@@ -47,3 +48,3 @@ import type * as Unify from "@effect/data/Unify"

*/
export interface Sink<R, E, In, L, Z> extends Sink.Variance<R, E, In, L, Z> {}
export interface Sink<R, E, In, L, Z> extends Sink.Variance<R, E, In, L, Z>, Pipeable<Sink<R, E, In, L, Z>> {}

@@ -80,3 +81,3 @@ /**

declare module "@effect/io/Effect" {
interface Effect<R, E, A> extends Sink<R, E, unknown, never, A> {}
interface Effect<R, E, A> extends Omit<Sink<R, E, unknown, never, A>, "pipe"> {}
interface EffectUnifyBlacklist {

@@ -83,0 +84,0 @@ Sink?: true

@@ -5,2 +5,3 @@ /**

import type * as Option from "@effect/data/Option"
import type { PipeableOverride } from "@effect/data/Pipeable"
import type * as Effect from "@effect/io/Effect"

@@ -32,3 +33,5 @@ import type * as Hub from "@effect/io/Hub"

*/
export interface SubscriptionRef<A> extends SubscriptionRef.Variance<A>, Synchronized.Synchronized<A> {
export interface SubscriptionRef<A>
extends SubscriptionRef.Variance<A>, PipeableOverride<Synchronized.Synchronized<A>, SubscriptionRef<A>>
{
/** @internal */

@@ -35,0 +38,0 @@ readonly ref: Ref.Ref<A>

@@ -6,2 +6,3 @@ /**

import type * as Option from "@effect/data/Option"
import type { Pipeable } from "@effect/data/Pipeable"
import type * as Cause from "@effect/io/Cause"

@@ -32,3 +33,3 @@ import type * as Effect from "@effect/io/Effect"

*/
export interface Take<E, A> extends Take.Variance<E, A> {
export interface Take<E, A> extends Take.Variance<E, A>, Pipeable<Take<E, A>> {
/** @internal */

@@ -35,0 +36,0 @@ readonly exit: Exit.Exit<Option.Option<E>, Chunk.Chunk<A>>

@@ -5,2 +5,3 @@ /**

import type * as Option from "@effect/data/Option";
import type { PipeableOverride } from "@effect/data/Pipeable";
import type * as Effect from "@effect/io/Effect";

@@ -26,3 +27,3 @@ import * as Synchronized from "@effect/io/Ref/Synchronized";

*/
export interface SubscriptionRef<A> extends SubscriptionRef.Variance<A>, Synchronized.Synchronized<A> {
export interface SubscriptionRef<A> extends SubscriptionRef.Variance<A>, PipeableOverride<Synchronized.Synchronized<A>, SubscriptionRef<A>> {
/**

@@ -29,0 +30,0 @@ * A stream containing the current value of the `Ref` as well as all changes

@@ -6,2 +6,3 @@ /**

import type * as Option from "@effect/data/Option";
import type { Pipeable } from "@effect/data/Pipeable";
import type * as Cause from "@effect/io/Cause";

@@ -28,3 +29,3 @@ import type * as Effect from "@effect/io/Effect";

*/
export interface Take<E, A> extends Take.Variance<E, A> {
export interface Take<E, A> extends Take.Variance<E, A>, Pipeable<Take<E, A>> {
}

@@ -31,0 +32,0 @@ /**

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc