@effect/platform-node-shared
Advanced tools
Comparing version
@@ -12,6 +12,5 @@ "use strict"; | ||
var Effect = _interopRequireWildcard(require("effect/Effect")); | ||
var Either = _interopRequireWildcard(require("effect/Either")); | ||
var Exit = _interopRequireWildcard(require("effect/Exit")); | ||
var _Function = require("effect/Function"); | ||
var Queue = _interopRequireWildcard(require("effect/Queue")); | ||
var Mailbox = _interopRequireWildcard(require("effect/Mailbox")); | ||
var Runtime = _interopRequireWildcard(require("effect/Runtime")); | ||
@@ -85,7 +84,7 @@ var Scope = _interopRequireWildcard(require("effect/Scope")); | ||
exports.toUint8Array = toUint8Array; | ||
const fromDuplex = (evaluate, onError, options = {}) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Queue.unbounded()), ([duplex, queue]) => readableOffer(duplex, queue, onError)), ([duplex, queue]) => Channel.embedInput(readableTake(duplex, queue, options.chunkSize ? Number(options.chunkSize) : undefined), writeInput(duplex, cause => Queue.offer(queue, Either.left(Exit.failCause(cause))), options)), ([duplex, queue]) => Effect.zipRight(Effect.sync(() => { | ||
const fromDuplex = (evaluate, onError, options = {}) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([duplex, mailbox]) => readableOffer(duplex, mailbox, onError)), ([duplex, mailbox]) => Channel.embedInput(readableTake(duplex, mailbox, options.chunkSize ? Number(options.chunkSize) : undefined), writeInput(duplex, cause => mailbox.failCause(cause), options)), ([duplex, mailbox]) => Effect.zipRight(Effect.sync(() => { | ||
if (!duplex.closed) { | ||
duplex.destroy(); | ||
} | ||
}), Queue.shutdown(queue))); | ||
}), mailbox.shutdown)); | ||
/** @internal */ | ||
@@ -103,7 +102,7 @@ exports.fromDuplex = fromDuplex; | ||
/** @internal */ | ||
const fromReadableChannel = (evaluate, onError, chunkSize) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Queue.unbounded()), ([readable, queue]) => readableOffer(readable, queue, onError)), ([readable, queue]) => readableTake(readable, queue, chunkSize), ([readable, queue]) => Effect.zipRight(Effect.sync(() => { | ||
const fromReadableChannel = (evaluate, onError, chunkSize) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([readable, mailbox]) => readableOffer(readable, mailbox, onError)), ([readable, mailbox]) => readableTake(readable, mailbox, chunkSize), ([readable, mailbox]) => Effect.zipRight(Effect.sync(() => { | ||
if ("closed" in readable && !readable.closed) { | ||
readable.destroy(); | ||
} | ||
}), Queue.shutdown(queue))); | ||
}), mailbox.shutdown)); | ||
/** @internal */ | ||
@@ -151,28 +150,19 @@ exports.fromReadableChannel = fromReadableChannel; | ||
exports.writeEffect = writeEffect; | ||
const readableOffer = (readable, queue, onError) => Effect.sync(() => { | ||
const readableOffer = (readable, mailbox, onError) => Effect.sync(() => { | ||
readable.on("readable", () => { | ||
const size = queue.unsafeSize(); | ||
if (size._tag === "Some" && size.value <= 0) { | ||
queue.unsafeOffer(Either.right(void 0)); | ||
} | ||
mailbox.unsafeOffer(void 0); | ||
}); | ||
readable.on("error", err => { | ||
queue.unsafeOffer(Either.left(Exit.fail(onError(err)))); | ||
mailbox.unsafeDone(Exit.fail(onError(err))); | ||
}); | ||
readable.on("end", () => { | ||
queue.unsafeOffer(Either.left(Exit.void)); | ||
mailbox.unsafeDone(Exit.void); | ||
}); | ||
if (readable.readable) { | ||
queue.unsafeOffer(Either.right(void 0)); | ||
mailbox.unsafeOffer(void 0); | ||
} | ||
}); | ||
const readableTake = (readable, queue, chunkSize) => { | ||
const readableTake = (readable, mailbox, chunkSize) => { | ||
const read = readChunkChannel(readable, chunkSize); | ||
const loop = Channel.flatMap(Queue.take(queue), Either.match({ | ||
onLeft: Exit.match({ | ||
onFailure: Channel.failCause, | ||
onSuccess: _ => Channel.void | ||
}), | ||
onRight: _ => Channel.flatMap(read, () => loop) | ||
})); | ||
const loop = Channel.flatMap(mailbox.takeAll, ([, done]) => done ? read : Channel.zipRight(read, loop)); | ||
return loop; | ||
@@ -183,2 +173,5 @@ }; | ||
let chunk = readable.read(chunkSize); | ||
if (chunk === null) { | ||
return Channel.void; | ||
} | ||
while (chunk !== null) { | ||
@@ -185,0 +178,0 @@ arr.push(chunk); |
@@ -6,6 +6,5 @@ import { SystemError } from "@effect/platform/Error"; | ||
import * as Effect from "effect/Effect"; | ||
import * as Either from "effect/Either"; | ||
import * as Exit from "effect/Exit"; | ||
import { dual } from "effect/Function"; | ||
import * as Queue from "effect/Queue"; | ||
import * as Mailbox from "effect/Mailbox"; | ||
import * as Runtime from "effect/Runtime"; | ||
@@ -74,7 +73,7 @@ import * as Scope from "effect/Scope"; | ||
/** @internal */ | ||
export const fromDuplex = (evaluate, onError, options = {}) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Queue.unbounded()), ([duplex, queue]) => readableOffer(duplex, queue, onError)), ([duplex, queue]) => Channel.embedInput(readableTake(duplex, queue, options.chunkSize ? Number(options.chunkSize) : undefined), writeInput(duplex, cause => Queue.offer(queue, Either.left(Exit.failCause(cause))), options)), ([duplex, queue]) => Effect.zipRight(Effect.sync(() => { | ||
export const fromDuplex = (evaluate, onError, options = {}) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([duplex, mailbox]) => readableOffer(duplex, mailbox, onError)), ([duplex, mailbox]) => Channel.embedInput(readableTake(duplex, mailbox, options.chunkSize ? Number(options.chunkSize) : undefined), writeInput(duplex, cause => mailbox.failCause(cause), options)), ([duplex, mailbox]) => Effect.zipRight(Effect.sync(() => { | ||
if (!duplex.closed) { | ||
duplex.destroy(); | ||
} | ||
}), Queue.shutdown(queue))); | ||
}), mailbox.shutdown)); | ||
/** @internal */ | ||
@@ -91,7 +90,7 @@ export const pipeThroughDuplex = /*#__PURE__*/dual(args => Stream.StreamTypeId in args[0], (self, duplex, onError, options) => Stream.pipeThroughChannelOrFail(self, fromDuplex(duplex, onError, options))); | ||
/** @internal */ | ||
export const fromReadableChannel = (evaluate, onError, chunkSize) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Queue.unbounded()), ([readable, queue]) => readableOffer(readable, queue, onError)), ([readable, queue]) => readableTake(readable, queue, chunkSize), ([readable, queue]) => Effect.zipRight(Effect.sync(() => { | ||
export const fromReadableChannel = (evaluate, onError, chunkSize) => Channel.acquireUseRelease(Effect.tap(Effect.zip(Effect.sync(evaluate), Mailbox.make()), ([readable, mailbox]) => readableOffer(readable, mailbox, onError)), ([readable, mailbox]) => readableTake(readable, mailbox, chunkSize), ([readable, mailbox]) => Effect.zipRight(Effect.sync(() => { | ||
if ("closed" in readable && !readable.closed) { | ||
readable.destroy(); | ||
} | ||
}), Queue.shutdown(queue))); | ||
}), mailbox.shutdown)); | ||
/** @internal */ | ||
@@ -136,28 +135,19 @@ export const writeInput = (writable, onFailure, { | ||
}); | ||
const readableOffer = (readable, queue, onError) => Effect.sync(() => { | ||
const readableOffer = (readable, mailbox, onError) => Effect.sync(() => { | ||
readable.on("readable", () => { | ||
const size = queue.unsafeSize(); | ||
if (size._tag === "Some" && size.value <= 0) { | ||
queue.unsafeOffer(Either.right(void 0)); | ||
} | ||
mailbox.unsafeOffer(void 0); | ||
}); | ||
readable.on("error", err => { | ||
queue.unsafeOffer(Either.left(Exit.fail(onError(err)))); | ||
mailbox.unsafeDone(Exit.fail(onError(err))); | ||
}); | ||
readable.on("end", () => { | ||
queue.unsafeOffer(Either.left(Exit.void)); | ||
mailbox.unsafeDone(Exit.void); | ||
}); | ||
if (readable.readable) { | ||
queue.unsafeOffer(Either.right(void 0)); | ||
mailbox.unsafeOffer(void 0); | ||
} | ||
}); | ||
const readableTake = (readable, queue, chunkSize) => { | ||
const readableTake = (readable, mailbox, chunkSize) => { | ||
const read = readChunkChannel(readable, chunkSize); | ||
const loop = Channel.flatMap(Queue.take(queue), Either.match({ | ||
onLeft: Exit.match({ | ||
onFailure: Channel.failCause, | ||
onSuccess: _ => Channel.void | ||
}), | ||
onRight: _ => Channel.flatMap(read, () => loop) | ||
})); | ||
const loop = Channel.flatMap(mailbox.takeAll, ([, done]) => done ? read : Channel.zipRight(read, loop)); | ||
return loop; | ||
@@ -168,2 +158,5 @@ }; | ||
let chunk = readable.read(chunkSize); | ||
if (chunk === null) { | ||
return Channel.void; | ||
} | ||
while (chunk !== null) { | ||
@@ -170,0 +163,0 @@ arr.push(chunk); |
{ | ||
"name": "@effect/platform-node-shared", | ||
"version": "0.15.1", | ||
"version": "0.15.2", | ||
"description": "Unified interfaces for common platform-specific services", | ||
@@ -17,4 +17,4 @@ "license": "MIT", | ||
"peerDependencies": { | ||
"@effect/platform": "^0.65.1", | ||
"effect": "^3.8.0" | ||
"@effect/platform": "^0.65.2", | ||
"effect": "^3.8.1" | ||
}, | ||
@@ -21,0 +21,0 @@ "publishConfig": { |
@@ -7,7 +7,6 @@ import { type PlatformError, SystemError } from "@effect/platform/Error" | ||
import * as Effect from "effect/Effect" | ||
import * as Either from "effect/Either" | ||
import * as Exit from "effect/Exit" | ||
import type { LazyArg } from "effect/Function" | ||
import { dual } from "effect/Function" | ||
import * as Queue from "effect/Queue" | ||
import * as Mailbox from "effect/Mailbox" | ||
import * as Runtime from "effect/Runtime" | ||
@@ -122,16 +121,16 @@ import * as Scope from "effect/Scope" | ||
Effect.sync(evaluate), | ||
Queue.unbounded<Either.Either<void, Exit.Exit<void, IE | E>>>() | ||
Mailbox.make<void, IE | E>() | ||
), | ||
([duplex, queue]) => readableOffer(duplex, queue, onError) | ||
([duplex, mailbox]) => readableOffer(duplex, mailbox, onError) | ||
), | ||
([duplex, queue]) => | ||
([duplex, mailbox]) => | ||
Channel.embedInput( | ||
readableTake(duplex, queue, options.chunkSize ? Number(options.chunkSize) : undefined), | ||
readableTake(duplex, mailbox, options.chunkSize ? Number(options.chunkSize) : undefined), | ||
writeInput( | ||
duplex, | ||
(cause) => Queue.offer(queue, Either.left(Exit.failCause(cause))), | ||
(cause) => mailbox.failCause(cause), | ||
options | ||
) | ||
), | ||
([duplex, queue]) => | ||
([duplex, mailbox]) => | ||
Effect.zipRight( | ||
@@ -143,3 +142,3 @@ Effect.sync(() => { | ||
}), | ||
Queue.shutdown(queue) | ||
mailbox.shutdown | ||
) | ||
@@ -205,8 +204,8 @@ ) | ||
Effect.sync(evaluate), | ||
Queue.unbounded<Either.Either<void, Exit.Exit<void, E>>>() | ||
Mailbox.make<void, E>() | ||
), | ||
([readable, queue]) => readableOffer(readable, queue, onError) | ||
([readable, mailbox]) => readableOffer(readable, mailbox, onError) | ||
), | ||
([readable, queue]) => readableTake(readable, queue, chunkSize), | ||
([readable, queue]) => | ||
([readable, mailbox]) => readableTake(readable, mailbox, chunkSize), | ||
([readable, mailbox]) => | ||
Effect.zipRight( | ||
@@ -218,3 +217,3 @@ Effect.sync(() => { | ||
}), | ||
Queue.shutdown(queue) | ||
mailbox.shutdown | ||
) | ||
@@ -277,3 +276,3 @@ ) | ||
readable: Readable | NodeJS.ReadableStream, | ||
queue: Queue.Queue<Either.Either<void, Exit.Exit<void, E>>>, | ||
mailbox: Mailbox.Mailbox<void, E>, | ||
onError: (error: unknown) => E | ||
@@ -283,15 +282,12 @@ ) => | ||
readable.on("readable", () => { | ||
const size = queue.unsafeSize() | ||
if (size._tag === "Some" && size.value <= 0) { | ||
queue.unsafeOffer(Either.right(void 0)) | ||
} | ||
mailbox.unsafeOffer(void 0) | ||
}) | ||
readable.on("error", (err) => { | ||
queue.unsafeOffer(Either.left(Exit.fail(onError(err)))) | ||
mailbox.unsafeDone(Exit.fail(onError(err))) | ||
}) | ||
readable.on("end", () => { | ||
queue.unsafeOffer(Either.left(Exit.void)) | ||
mailbox.unsafeDone(Exit.void) | ||
}) | ||
if (readable.readable) { | ||
queue.unsafeOffer(Either.right(void 0)) | ||
mailbox.unsafeOffer(void 0) | ||
} | ||
@@ -302,15 +298,9 @@ }) | ||
readable: Readable | NodeJS.ReadableStream, | ||
queue: Queue.Queue<Either.Either<void, Exit.Exit<void, E>>>, | ||
mailbox: Mailbox.Mailbox<void, E>, | ||
chunkSize: number | undefined | ||
) => { | ||
const read = readChunkChannel<A>(readable, chunkSize) | ||
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = Channel.flatMap( | ||
Queue.take(queue), | ||
Either.match({ | ||
onLeft: Exit.match({ | ||
onFailure: Channel.failCause, | ||
onSuccess: (_) => Channel.void | ||
}), | ||
onRight: (_) => Channel.flatMap(read, () => loop) | ||
}) | ||
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = Channel.flatMap( | ||
mailbox.takeAll, | ||
([, done]) => done ? read : Channel.zipRight(read, loop) | ||
) | ||
@@ -327,2 +317,5 @@ return loop | ||
let chunk = readable.read(chunkSize) | ||
if (chunk === null) { | ||
return Channel.void | ||
} | ||
while (chunk !== null) { | ||
@@ -329,0 +322,0 @@ arr.push(chunk) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
369964
-0.57%5316
-0.39%