@effect/platform-node-shared
Advanced tools
Comparing version
@@ -184,3 +184,3 @@ "use strict"; | ||
this.readLatch = Effect.unsafeMakeLatch(false); | ||
this.fiber = Runtime.runFork(runtime)(Stream.runForEachChunk(stream, chunk => this.readLatch.whenOpen(Effect.sync(() => { | ||
this.fiber = Runtime.runFork(runtime)(this.readLatch.whenOpen(Stream.runForEachChunk(stream, chunk => this.readLatch.whenOpen(Effect.sync(() => { | ||
if (chunk.length === 0) return; | ||
@@ -195,3 +195,3 @@ this.readLatch.unsafeClose(); | ||
} | ||
})))); | ||
}))))); | ||
this.fiber.addObserver(exit => { | ||
@@ -202,3 +202,3 @@ this.fiber = undefined; | ||
} else { | ||
this._destroy(Cause.squash(exit.cause), _Function.constVoid); | ||
this.destroy(Cause.squash(exit.cause)); | ||
} | ||
@@ -208,11 +208,10 @@ }); | ||
_read(_size) { | ||
// TODO: refactor to use unsafeOpen when added to Effect | ||
Effect.runSync(this.readLatch.open); | ||
this.readLatch.unsafeOpen(); | ||
} | ||
_destroy(_error, callback) { | ||
_destroy(error, callback) { | ||
if (!this.fiber) { | ||
return callback(null); | ||
return callback(error); | ||
} | ||
Effect.runFork(Fiber.interrupt(this.fiber)).addObserver(exit => { | ||
callback(exit._tag === "Failure" ? Cause.squash(exit.cause) : null); | ||
callback(exit._tag === "Failure" ? Cause.squash(exit.cause) : error); | ||
}); | ||
@@ -219,0 +218,0 @@ } |
@@ -8,3 +8,3 @@ import { SystemError } from "@effect/platform/Error"; | ||
import * as Fiber from "effect/Fiber"; | ||
import { constVoid, dual } from "effect/Function"; | ||
import { dual } from "effect/Function"; | ||
import * as Mailbox from "effect/Mailbox"; | ||
@@ -170,3 +170,3 @@ import * as Runtime from "effect/Runtime"; | ||
this.readLatch = Effect.unsafeMakeLatch(false); | ||
this.fiber = Runtime.runFork(runtime)(Stream.runForEachChunk(stream, chunk => this.readLatch.whenOpen(Effect.sync(() => { | ||
this.fiber = Runtime.runFork(runtime)(this.readLatch.whenOpen(Stream.runForEachChunk(stream, chunk => this.readLatch.whenOpen(Effect.sync(() => { | ||
if (chunk.length === 0) return; | ||
@@ -181,3 +181,3 @@ this.readLatch.unsafeClose(); | ||
} | ||
})))); | ||
}))))); | ||
this.fiber.addObserver(exit => { | ||
@@ -188,3 +188,3 @@ this.fiber = undefined; | ||
} else { | ||
this._destroy(Cause.squash(exit.cause), constVoid); | ||
this.destroy(Cause.squash(exit.cause)); | ||
} | ||
@@ -194,11 +194,10 @@ }); | ||
_read(_size) { | ||
// TODO: refactor to use unsafeOpen when added to Effect | ||
Effect.runSync(this.readLatch.open); | ||
this.readLatch.unsafeOpen(); | ||
} | ||
_destroy(_error, callback) { | ||
_destroy(error, callback) { | ||
if (!this.fiber) { | ||
return callback(null); | ||
return callback(error); | ||
} | ||
Effect.runFork(Fiber.interrupt(this.fiber)).addObserver(exit => { | ||
callback(exit._tag === "Failure" ? Cause.squash(exit.cause) : null); | ||
callback(exit._tag === "Failure" ? Cause.squash(exit.cause) : error); | ||
}); | ||
@@ -205,0 +204,0 @@ } |
{ | ||
"name": "@effect/platform-node-shared", | ||
"version": "0.31.8", | ||
"version": "0.31.9", | ||
"description": "Unified interfaces for common platform-specific services", | ||
@@ -19,3 +19,3 @@ "license": "MIT", | ||
"peerDependencies": { | ||
"@effect/cluster": "^0.30.8", | ||
"@effect/cluster": "^0.30.9", | ||
"@effect/platform": "^0.80.19", | ||
@@ -22,0 +22,0 @@ "@effect/rpc": "^0.56.7", |
@@ -10,3 +10,3 @@ import { type PlatformError, SystemError } from "@effect/platform/Error" | ||
import type { LazyArg } from "effect/Function" | ||
import { constVoid, dual } from "effect/Function" | ||
import { dual } from "effect/Function" | ||
import * as Mailbox from "effect/Mailbox" | ||
@@ -321,4 +321,4 @@ import * as Runtime from "effect/Runtime" | ||
class StreamAdapter<E, R> extends Readable { | ||
private readonly readLatch: Effect.Latch | ||
private fiber: Fiber.RuntimeFiber<void, E> | undefined = undefined | ||
readonly readLatch: Effect.Latch | ||
fiber: Fiber.RuntimeFiber<void, E> | undefined = undefined | ||
@@ -332,14 +332,16 @@ constructor( | ||
this.fiber = Runtime.runFork(runtime)( | ||
Stream.runForEachChunk(stream, (chunk) => | ||
this.readLatch.whenOpen(Effect.sync(() => { | ||
if (chunk.length === 0) return | ||
this.readLatch.unsafeClose() | ||
for (const item of chunk) { | ||
if (typeof item === "string") { | ||
this.push(item, "utf8") | ||
} else { | ||
this.push(item) | ||
this.readLatch.whenOpen( | ||
Stream.runForEachChunk(stream, (chunk) => | ||
this.readLatch.whenOpen(Effect.sync(() => { | ||
if (chunk.length === 0) return | ||
this.readLatch.unsafeClose() | ||
for (const item of chunk) { | ||
if (typeof item === "string") { | ||
this.push(item, "utf8") | ||
} else { | ||
this.push(item) | ||
} | ||
} | ||
} | ||
}))) | ||
}))) | ||
) | ||
) | ||
@@ -351,3 +353,3 @@ this.fiber.addObserver((exit) => { | ||
} else { | ||
this._destroy(Cause.squash(exit.cause) as any, constVoid) | ||
this.destroy(Cause.squash(exit.cause) as any) | ||
} | ||
@@ -358,12 +360,11 @@ }) | ||
_read(_size: number): void { | ||
// TODO: refactor to use unsafeOpen when added to Effect | ||
Effect.runSync(this.readLatch.open) | ||
this.readLatch.unsafeOpen() | ||
} | ||
_destroy(_error: Error | null, callback: (error?: Error | null | undefined) => void): void { | ||
_destroy(error: Error | null, callback: (error?: Error | null | undefined) => void): void { | ||
if (!this.fiber) { | ||
return callback(null) | ||
return callback(error) | ||
} | ||
Effect.runFork(Fiber.interrupt(this.fiber)).addObserver((exit) => { | ||
callback(exit._tag === "Failure" ? Cause.squash(exit.cause) as any : null) | ||
callback(exit._tag === "Failure" ? Cause.squash(exit.cause) as any : error) | ||
}) | ||
@@ -370,0 +371,0 @@ } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
443435
-0.05%6323
-0.02%