You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 4-6.RSVP
Socket
Book a DemoInstallSign in
Socket

@effect/platform-node-shared

Package Overview
Dependencies
Maintainers
0
Versions
388
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effect/platform-node-shared - npm Package Compare versions

Comparing version

to
0.45.0

95

dist/cjs/internal/stream.js

@@ -15,3 +15,3 @@ "use strict";

var _Function = require("effect/Function");
var Mailbox = _interopRequireWildcard(require("effect/Mailbox"));
var MutableRef = _interopRequireWildcard(require("effect/MutableRef"));
var Runtime = _interopRequireWildcard(require("effect/Runtime"));

@@ -22,5 +22,3 @@ var Stream = _interopRequireWildcard(require("effect/Stream"));

/** @internal */
const fromReadable = (evaluate, onError, {
chunkSize
} = {}) => Stream.fromChannel(fromReadableChannel(evaluate, onError, chunkSize ? Number(chunkSize) : undefined));
const fromReadable = (evaluate, onError, options) => Stream.fromChannel(fromReadableChannel(evaluate, onError, options));
/** @internal */

@@ -84,7 +82,10 @@ exports.fromReadable = fromReadable;

exports.toUint8Array = toUint8Array;
const fromDuplex = (evaluate, onError, options = {}) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([duplex, mailbox]) => readableOffer(duplex, mailbox, onError)), ([duplex, mailbox]) => Channel.embedInput(readableTake(duplex, mailbox, options.chunkSize ? Number(options.chunkSize) : undefined), writeInput(duplex, cause => mailbox.failCause(cause), options)), ([duplex, mailbox]) => Effect.zipRight(Effect.sync(() => {
if (!duplex.closed) {
duplex.destroy();
const fromDuplex = (evaluate, onError, options) => Channel.suspend(() => {
const duplex = evaluate();
if (!duplex.readable) {
return Channel.void;
}
}), mailbox.shutdown));
const exit = MutableRef.make(undefined);
return Channel.embedInput(unsafeReadableRead(duplex, onError, exit, options), writeInput(duplex, cause => Effect.sync(() => MutableRef.set(exit, Exit.failCause(cause))), options));
});
/** @internal */

@@ -101,7 +102,3 @@ exports.fromDuplex = fromDuplex;

/** @internal */
const fromReadableChannel = (evaluate, onError, chunkSize) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([readable, mailbox]) => readableOffer(readable, mailbox, onError)), ([readable, mailbox]) => readableTake(readable, mailbox, chunkSize), ([readable, mailbox]) => Effect.zipRight(Effect.sync(() => {
if ("closed" in readable && !readable.closed) {
readable.destroy();
}
}), mailbox.shutdown));
const fromReadableChannel = (evaluate, onError, options) => Channel.suspend(() => unsafeReadableRead(evaluate(), onError, MutableRef.make(undefined), options));
/** @internal */

@@ -149,33 +146,49 @@ exports.fromReadableChannel = fromReadableChannel;

exports.writeEffect = writeEffect;
const readableOffer = (readable, mailbox, onError) => Effect.sync(() => {
readable.on("readable", () => {
mailbox.unsafeOffer(void 0);
});
readable.on("error", err => {
mailbox.unsafeDone(Exit.fail(onError(err)));
});
readable.on("end", () => {
mailbox.unsafeDone(Exit.void);
});
if (readable.readable) {
mailbox.unsafeOffer(void 0);
}
});
const readableTake = (readable, mailbox, chunkSize) => {
const read = readChunkChannel(readable, chunkSize);
const loop = Channel.flatMap(mailbox.takeAll, ([, done]) => done ? read : Channel.zipRight(read, loop));
return loop;
};
const readChunkChannel = (readable, chunkSize) => Channel.suspend(() => {
const arr = [];
let chunk = readable.read(chunkSize);
if (chunk === null) {
const unsafeReadableRead = (readable, onError, exit, options) => {
if (!readable.readable) {
return Channel.void;
}
while (chunk !== null) {
arr.push(chunk);
chunk = readable.read(chunkSize);
const latch = Effect.unsafeMakeLatch(false);
function onReadable() {
latch.unsafeOpen();
}
return Channel.write(Chunk.unsafeFromArray(arr));
});
function onErr(err) {
exit.current = Exit.fail(onError(err));
latch.unsafeOpen();
}
function onEnd() {
exit.current = Exit.void;
latch.unsafeOpen();
}
readable.on("readable", onReadable);
readable.on("error", onErr);
readable.on("end", onEnd);
const chunkSize = options?.chunkSize ? Number(options.chunkSize) : undefined;
const read = Channel.suspend(function loop() {
let item = readable.read(chunkSize);
if (item === null) {
if (exit.current) {
return Channel.fromEffect(exit.current);
}
latch.unsafeClose();
return Channel.flatMap(latch.await, loop);
}
const arr = [item];
while (true) {
item = readable.read(chunkSize);
if (item === null) {
return Channel.flatMap(Channel.write(Chunk.unsafeFromArray(arr)), loop);
}
arr.push(item);
}
});
return Channel.ensuring(read, Effect.sync(() => {
readable.off("readable", onReadable);
readable.off("error", onErr);
readable.off("end", onEnd);
if (options?.closeOnDone !== false && "closed" in readable && !readable.closed) {
readable.destroy();
}
}));
};
class StreamAdapter extends _nodeStream.Readable {

@@ -182,0 +195,0 @@ readLatch;

@@ -59,3 +59,5 @@ "use strict";

*/
const stdin = exports.stdin = /*#__PURE__*/internal.fromReadable(() => process.stdin, err => err).pipe(Stream.orDie);
const stdin = exports.stdin = /*#__PURE__*/internal.fromReadable(() => process.stdin, err => err, {
closeOnDone: false
}).pipe(Stream.orDie);
//# sourceMappingURL=NodeStream.js.map

@@ -19,2 +19,4 @@ /**

readonly chunkSize?: SizeInput;
/** Default to true, which means the stream will be closed when done */
readonly closeOnDone?: boolean | undefined;
}

@@ -33,3 +35,3 @@ /**

*/
export declare const fromReadable: <E, A = Uint8Array>(evaluate: LazyArg<Readable | NodeJS.ReadableStream>, onError: (error: unknown) => E, { chunkSize }?: FromReadableOptions) => Stream.Stream<A, E>;
export declare const fromReadable: <E, A = Uint8Array<ArrayBufferLike>>(evaluate: LazyArg<Readable | NodeJS.ReadableStream>, onError: (error: unknown) => E, options?: FromReadableOptions) => Stream.Stream<A, E>;
/**

@@ -39,3 +41,3 @@ * @category constructors

*/
export declare const fromReadableChannel: <E, A = Uint8Array>(evaluate: LazyArg<Readable | NodeJS.ReadableStream>, onError: (error: unknown) => E, chunkSize: number | undefined) => Channel<Chunk<A>, unknown, E, unknown, void, unknown>;
export declare const fromReadableChannel: <E, A = Uint8Array<ArrayBufferLike>>(evaluate: LazyArg<Readable | NodeJS.ReadableStream>, onError: (error: unknown) => E, options?: FromReadableOptions | undefined) => Channel<Chunk<A>, unknown, E>;
/**

@@ -42,0 +44,0 @@ * @category constructors

@@ -9,3 +9,3 @@ import { SystemError } from "@effect/platform/Error";

import { dual } from "effect/Function";
import * as Mailbox from "effect/Mailbox";
import * as MutableRef from "effect/MutableRef";
import * as Runtime from "effect/Runtime";

@@ -15,5 +15,3 @@ import * as Stream from "effect/Stream";

/** @internal */
export const fromReadable = (evaluate, onError, {
chunkSize
} = {}) => Stream.fromChannel(fromReadableChannel(evaluate, onError, chunkSize ? Number(chunkSize) : undefined));
export const fromReadable = (evaluate, onError, options) => Stream.fromChannel(fromReadableChannel(evaluate, onError, options));
/** @internal */

@@ -74,7 +72,10 @@ export const toString = (readable, options) => {

/** @internal */
export const fromDuplex = (evaluate, onError, options = {}) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([duplex, mailbox]) => readableOffer(duplex, mailbox, onError)), ([duplex, mailbox]) => Channel.embedInput(readableTake(duplex, mailbox, options.chunkSize ? Number(options.chunkSize) : undefined), writeInput(duplex, cause => mailbox.failCause(cause), options)), ([duplex, mailbox]) => Effect.zipRight(Effect.sync(() => {
if (!duplex.closed) {
duplex.destroy();
export const fromDuplex = (evaluate, onError, options) => Channel.suspend(() => {
const duplex = evaluate();
if (!duplex.readable) {
return Channel.void;
}
}), mailbox.shutdown));
const exit = MutableRef.make(undefined);
return Channel.embedInput(unsafeReadableRead(duplex, onError, exit, options), writeInput(duplex, cause => Effect.sync(() => MutableRef.set(exit, Exit.failCause(cause))), options));
});
/** @internal */

@@ -90,7 +91,3 @@ export const pipeThroughDuplex = /*#__PURE__*/dual(args => Stream.StreamTypeId in args[0], (self, duplex, onError, options) => Stream.pipeThroughChannelOrFail(self, fromDuplex(duplex, onError, options)));

/** @internal */
export const fromReadableChannel = (evaluate, onError, chunkSize) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([readable, mailbox]) => readableOffer(readable, mailbox, onError)), ([readable, mailbox]) => readableTake(readable, mailbox, chunkSize), ([readable, mailbox]) => Effect.zipRight(Effect.sync(() => {
if ("closed" in readable && !readable.closed) {
readable.destroy();
}
}), mailbox.shutdown));
export const fromReadableChannel = (evaluate, onError, options) => Channel.suspend(() => unsafeReadableRead(evaluate(), onError, MutableRef.make(undefined), options));
/** @internal */

@@ -135,33 +132,49 @@ export const writeInput = (writable, onFailure, {

});
const readableOffer = (readable, mailbox, onError) => Effect.sync(() => {
readable.on("readable", () => {
mailbox.unsafeOffer(void 0);
});
readable.on("error", err => {
mailbox.unsafeDone(Exit.fail(onError(err)));
});
readable.on("end", () => {
mailbox.unsafeDone(Exit.void);
});
if (readable.readable) {
mailbox.unsafeOffer(void 0);
}
});
const readableTake = (readable, mailbox, chunkSize) => {
const read = readChunkChannel(readable, chunkSize);
const loop = Channel.flatMap(mailbox.takeAll, ([, done]) => done ? read : Channel.zipRight(read, loop));
return loop;
};
const readChunkChannel = (readable, chunkSize) => Channel.suspend(() => {
const arr = [];
let chunk = readable.read(chunkSize);
if (chunk === null) {
const unsafeReadableRead = (readable, onError, exit, options) => {
if (!readable.readable) {
return Channel.void;
}
while (chunk !== null) {
arr.push(chunk);
chunk = readable.read(chunkSize);
const latch = Effect.unsafeMakeLatch(false);
function onReadable() {
latch.unsafeOpen();
}
return Channel.write(Chunk.unsafeFromArray(arr));
});
function onErr(err) {
exit.current = Exit.fail(onError(err));
latch.unsafeOpen();
}
function onEnd() {
exit.current = Exit.void;
latch.unsafeOpen();
}
readable.on("readable", onReadable);
readable.on("error", onErr);
readable.on("end", onEnd);
const chunkSize = options?.chunkSize ? Number(options.chunkSize) : undefined;
const read = Channel.suspend(function loop() {
let item = readable.read(chunkSize);
if (item === null) {
if (exit.current) {
return Channel.fromEffect(exit.current);
}
latch.unsafeClose();
return Channel.flatMap(latch.await, loop);
}
const arr = [item];
while (true) {
item = readable.read(chunkSize);
if (item === null) {
return Channel.flatMap(Channel.write(Chunk.unsafeFromArray(arr)), loop);
}
arr.push(item);
}
});
return Channel.ensuring(read, Effect.sync(() => {
readable.off("readable", onReadable);
readable.off("error", onErr);
readable.off("end", onEnd);
if (options?.closeOnDone !== false && "closed" in readable && !readable.closed) {
readable.destroy();
}
}));
};
class StreamAdapter extends Readable {

@@ -168,0 +181,0 @@ readLatch;

@@ -52,3 +52,5 @@ import * as Stream from "effect/Stream";

*/
export const stdin = /*#__PURE__*/internal.fromReadable(() => process.stdin, err => err).pipe(Stream.orDie);
export const stdin = /*#__PURE__*/internal.fromReadable(() => process.stdin, err => err, {
closeOnDone: false
}).pipe(Stream.orDie);
//# sourceMappingURL=NodeStream.js.map
{
"name": "@effect/platform-node-shared",
"version": "0.44.0",
"version": "0.45.0",
"description": "Unified interfaces for common platform-specific services",

@@ -15,11 +15,11 @@ "license": "MIT",

"@parcel/watcher": "^2.5.1",
"multipasta": "^0.2.5",
"multipasta": "^0.2.7",
"ws": "^8.18.2"
},
"peerDependencies": {
"@effect/platform": "^0.88.1",
"@effect/cluster": "^0.43.0",
"@effect/rpc": "^0.65.1",
"@effect/sql": "^0.42.0",
"effect": "^3.16.14"
"@effect/cluster": "^0.44.0",
"@effect/platform": "^0.89.0",
"@effect/rpc": "^0.66.0",
"@effect/sql": "^0.43.0",
"effect": "^3.17.0"
},

@@ -26,0 +26,0 @@ "publishConfig": {

@@ -11,3 +11,3 @@ import { type PlatformError, SystemError } from "@effect/platform/Error"

import { dual } from "effect/Function"
import * as Mailbox from "effect/Mailbox"
import * as MutableRef from "effect/MutableRef"
import * as Runtime from "effect/Runtime"

@@ -24,6 +24,6 @@ import type * as AsyncInput from "effect/SingleProducerAsyncInput"

onError: (error: unknown) => E,
{ chunkSize }: FromReadableOptions = {}
options?: FromReadableOptions
): Stream.Stream<A, E> =>
Stream.fromChannel(
fromReadableChannel<E, A>(evaluate, onError, chunkSize ? Number(chunkSize) : undefined)
fromReadableChannel<E, A>(evaluate, onError, options)
)

@@ -113,34 +113,27 @@

/** @internal */
export const fromDuplex = <IE, E, I = Uint8Array | string, O = Uint8Array>(
export const fromDuplex = <IE, E, I = string | Uint8Array<ArrayBufferLike>, O = Uint8Array<ArrayBufferLike>>(
evaluate: LazyArg<Duplex>,
onError: (error: unknown) => E,
options: FromReadableOptions & FromWritableOptions = {}
): Channel.Channel<Chunk.Chunk<O>, Chunk.Chunk<I>, IE | E, IE, void, unknown> =>
Channel.acquireUseRelease(
Effect.tap(
Effect.zip(
Effect.sync(evaluate),
Mailbox.make<void, IE | E>()
),
([duplex, mailbox]) => readableOffer(duplex, mailbox, onError)
),
([duplex, mailbox]) =>
Channel.embedInput(
readableTake(duplex, mailbox, options.chunkSize ? Number(options.chunkSize) : undefined),
writeInput(
duplex,
(cause) => mailbox.failCause(cause),
options
)
),
([duplex, mailbox]) =>
Effect.zipRight(
Effect.sync(() => {
if (!duplex.closed) {
duplex.destroy()
}
}),
mailbox.shutdown
options?: FromReadableOptions & FromWritableOptions
): Channel.Channel<
Chunk.Chunk<O>,
Chunk.Chunk<I>,
IE | E,
IE
> =>
Channel.suspend(() => {
const duplex = evaluate()
if (!duplex.readable) {
return Channel.void
}
const exit = MutableRef.make<Exit.Exit<void, IE | E> | undefined>(undefined)
return Channel.embedInput(
unsafeReadableRead<O, IE | E>(duplex, onError, exit, options),
writeInput<IE, I>(
duplex,
(cause) => Effect.sync(() => MutableRef.set(exit, Exit.failCause(cause))),
options
)
)
)
})

@@ -197,22 +190,15 @@ /** @internal */

onError: (error: unknown) => E,
chunkSize: number | undefined
): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> =>
Channel.acquireUseRelease(
Effect.tap(
Effect.zip(
Effect.sync(evaluate),
Mailbox.make<void, E>()
),
([readable, mailbox]) => readableOffer(readable, mailbox, onError)
),
([readable, mailbox]) => readableTake(readable, mailbox, chunkSize),
([readable, mailbox]) =>
Effect.zipRight(
Effect.sync(() => {
if ("closed" in readable && !readable.closed) {
readable.destroy()
}
}),
mailbox.shutdown
)
options?: FromReadableOptions | undefined
): Channel.Channel<
Chunk.Chunk<A>,
unknown,
E
> =>
Channel.suspend(() =>
unsafeReadableRead(
evaluate(),
onError,
MutableRef.make(undefined),
options
)
)

@@ -272,52 +258,61 @@

const readableOffer = <E>(
const unsafeReadableRead = <A, E>(
readable: Readable | NodeJS.ReadableStream,
mailbox: Mailbox.Mailbox<void, E>,
onError: (error: unknown) => E
) =>
Effect.sync(() => {
readable.on("readable", () => {
mailbox.unsafeOffer(void 0)
})
readable.on("error", (err) => {
mailbox.unsafeDone(Exit.fail(onError(err)))
})
readable.on("end", () => {
mailbox.unsafeDone(Exit.void)
})
if (readable.readable) {
mailbox.unsafeOffer(void 0)
onError: (error: unknown) => E,
exit: MutableRef.MutableRef<Exit.Exit<void, E> | undefined>,
options: FromReadableOptions | undefined
) => {
if (!readable.readable) {
return Channel.void
}
const latch = Effect.unsafeMakeLatch(false)
function onReadable() {
latch.unsafeOpen()
}
function onErr(err: unknown) {
exit.current = Exit.fail(onError(err))
latch.unsafeOpen()
}
function onEnd() {
exit.current = Exit.void
latch.unsafeOpen()
}
readable.on("readable", onReadable)
readable.on("error", onErr)
readable.on("end", onEnd)
const chunkSize = options?.chunkSize ? Number(options.chunkSize) : undefined
const read = Channel.suspend(function loop(): Channel.Channel<Chunk.Chunk<A>, unknown, E> {
let item = readable.read(chunkSize) as A | null
if (item === null) {
if (exit.current) {
return Channel.fromEffect(exit.current)
}
latch.unsafeClose()
return Channel.flatMap(latch.await, loop)
}
const arr = [item as A]
while (true) {
item = readable.read(chunkSize)
if (item === null) {
return Channel.flatMap(Channel.write(Chunk.unsafeFromArray(arr)), loop)
}
arr.push(item as A)
}
})
const readableTake = <E, A>(
readable: Readable | NodeJS.ReadableStream,
mailbox: Mailbox.Mailbox<void, E>,
chunkSize: number | undefined
) => {
const read = readChunkChannel<A>(readable, chunkSize)
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = Channel.flatMap(
mailbox.takeAll,
([, done]) => done ? read : Channel.zipRight(read, loop)
return Channel.ensuring(
read,
Effect.sync(() => {
readable.off("readable", onReadable)
readable.off("error", onErr)
readable.off("end", onEnd)
if (options?.closeOnDone !== false && "closed" in readable && !readable.closed) {
readable.destroy()
}
})
)
return loop
}
const readChunkChannel = <A>(
readable: Readable | NodeJS.ReadableStream,
chunkSize: number | undefined
) =>
Channel.suspend(() => {
const arr: Array<A> = []
let chunk = readable.read(chunkSize)
if (chunk === null) {
return Channel.void
}
while (chunk !== null) {
arr.push(chunk)
chunk = readable.read(chunkSize)
}
return Channel.write(Chunk.unsafeFromArray(arr))
})
class StreamAdapter<E, R> extends Readable {

@@ -324,0 +319,0 @@ readonly readLatch: Effect.Latch

@@ -21,2 +21,4 @@ /**

readonly chunkSize?: SizeInput
/** Default to true, which means the stream will be closed when done */
readonly closeOnDone?: boolean | undefined
}

@@ -37,6 +39,6 @@

*/
export const fromReadable: <E, A = Uint8Array>(
export const fromReadable: <E, A = Uint8Array<ArrayBufferLike>>(
evaluate: LazyArg<Readable | NodeJS.ReadableStream>,
onError: (error: unknown) => E,
{ chunkSize }?: FromReadableOptions
options?: FromReadableOptions
) => Stream.Stream<A, E> = internal.fromReadable

@@ -48,7 +50,7 @@

*/
export const fromReadableChannel: <E, A = Uint8Array>(
export const fromReadableChannel: <E, A = Uint8Array<ArrayBufferLike>>(
evaluate: LazyArg<Readable | NodeJS.ReadableStream>,
onError: (error: unknown) => E,
chunkSize: number | undefined
) => Channel<Chunk<A>, unknown, E, unknown, void, unknown> = internal.fromReadableChannel
options?: FromReadableOptions | undefined
) => Channel<Chunk<A>, unknown, E> = internal.fromReadableChannel

@@ -148,4 +150,4 @@ /**

*/
export const stdin: Stream.Stream<Uint8Array> = internal.fromReadable(() => process.stdin, (err) => err).pipe(
Stream.orDie
)
export const stdin: Stream.Stream<Uint8Array> = internal.fromReadable(() => process.stdin, (err) => err, {
closeOnDone: false
}).pipe(Stream.orDie)

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet