@effect/platform-node-shared
Advanced tools
Comparing version 0.2.5 to 0.3.0
@@ -36,5 +36,6 @@ "use strict"; | ||
/** @internal */ | ||
const runMain = (effect, teardown = _Runtime.defaultTeardown) => { | ||
const runMain = (effect, options) => { | ||
const teardown = options?.teardown ?? _Runtime.defaultTeardown; | ||
const keepAlive = setInterval(() => {}, 2 ** 31 - 1); | ||
const fiber = Effect.runFork(Effect.tapErrorCause(effect, cause => { | ||
const fiber = Effect.runFork(options?.disableErrorReporting === true ? effect : Effect.tapErrorCause(effect, cause => { | ||
if (Cause.isInterruptedOnly(cause)) { | ||
@@ -41,0 +42,0 @@ return Effect.unit; |
@@ -62,3 +62,3 @@ "use strict"; | ||
conn.on("error", error => { | ||
resume(Effect.fail(new Socket.SocketError({ | ||
resume(Effect.fail(new Socket.SocketGenericError({ | ||
reason: "Open", | ||
@@ -89,7 +89,9 @@ error | ||
const writeFiber = yield* _(Queue.take(sendQueue), Effect.tap(chunk => Effect.async(resume => { | ||
if (chunk === EOF) { | ||
if (Socket.isCloseEvent(chunk)) { | ||
conn.destroy(chunk.code > 1000 ? new Error(`closed with code ${chunk.code}`) : undefined); | ||
} else if (chunk === EOF) { | ||
conn.end(() => resume(Effect.unit)); | ||
} else { | ||
conn.write(chunk, error => { | ||
resume(error ? Effect.fail(new Socket.SocketError({ | ||
resume(error ? Effect.fail(new Socket.SocketGenericError({ | ||
reason: "Write", | ||
@@ -109,3 +111,3 @@ error | ||
conn.on("error", error => { | ||
resume(Effect.fail(new Socket.SocketError({ | ||
resume(Effect.fail(new Socket.SocketGenericError({ | ||
reason: "Read", | ||
@@ -115,2 +117,8 @@ error | ||
}); | ||
conn.on("close", hadError => { | ||
resume(Effect.fail(new Socket.SocketCloseError({ | ||
reason: "Close", | ||
code: hadError ? 1006 : 1000 | ||
}))); | ||
}); | ||
}), Effect.raceFirst(Fiber.join(writeFiber)), Effect.raceFirst(FiberSet.join(fiberSet))); | ||
@@ -121,3 +129,3 @@ }).pipe(Effect.scoped); | ||
return Socket.Socket.of({ | ||
[Socket.SocketTypeId]: Socket.SocketTypeId, | ||
[Socket.TypeId]: Socket.TypeId, | ||
run, | ||
@@ -124,0 +132,0 @@ writer |
@@ -39,3 +39,3 @@ /// <reference types="node" resolution-mode="require"/> | ||
*/ | ||
export declare const makeNetChannel: <IE = never>(options: Net.NetConnectOpts) => Channel.Channel<Chunk.Chunk<Uint8Array>, Chunk.Chunk<Uint8Array>, Socket.SocketError | IE, IE, void, unknown, never>; | ||
export declare const makeNetChannel: <IE = never>(options: Net.NetConnectOpts) => Channel.Channel<Chunk.Chunk<Uint8Array>, Chunk.Chunk<Uint8Array | Socket.CloseEvent>, Socket.SocketError | IE, IE, void, unknown, never>; | ||
/** | ||
@@ -42,0 +42,0 @@ * @since 1.0.0 |
@@ -5,5 +5,6 @@ import { defaultTeardown } from "@effect/platform/Runtime"; | ||
/** @internal */ | ||
export const runMain = (effect, teardown = defaultTeardown) => { | ||
export const runMain = (effect, options) => { | ||
const teardown = options?.teardown ?? defaultTeardown; | ||
const keepAlive = setInterval(() => {}, 2 ** 31 - 1); | ||
const fiber = Effect.runFork(Effect.tapErrorCause(effect, cause => { | ||
const fiber = Effect.runFork(options?.disableErrorReporting === true ? effect : Effect.tapErrorCause(effect, cause => { | ||
if (Cause.isInterruptedOnly(cause)) { | ||
@@ -10,0 +11,0 @@ return Effect.unit; |
@@ -30,3 +30,3 @@ /** | ||
conn.on("error", error => { | ||
resume(Effect.fail(new Socket.SocketError({ | ||
resume(Effect.fail(new Socket.SocketGenericError({ | ||
reason: "Open", | ||
@@ -56,7 +56,9 @@ error | ||
const writeFiber = yield* _(Queue.take(sendQueue), Effect.tap(chunk => Effect.async(resume => { | ||
if (chunk === EOF) { | ||
if (Socket.isCloseEvent(chunk)) { | ||
conn.destroy(chunk.code > 1000 ? new Error(`closed with code ${chunk.code}`) : undefined); | ||
} else if (chunk === EOF) { | ||
conn.end(() => resume(Effect.unit)); | ||
} else { | ||
conn.write(chunk, error => { | ||
resume(error ? Effect.fail(new Socket.SocketError({ | ||
resume(error ? Effect.fail(new Socket.SocketGenericError({ | ||
reason: "Write", | ||
@@ -76,3 +78,3 @@ error | ||
conn.on("error", error => { | ||
resume(Effect.fail(new Socket.SocketError({ | ||
resume(Effect.fail(new Socket.SocketGenericError({ | ||
reason: "Read", | ||
@@ -82,2 +84,8 @@ error | ||
}); | ||
conn.on("close", hadError => { | ||
resume(Effect.fail(new Socket.SocketCloseError({ | ||
reason: "Close", | ||
code: hadError ? 1006 : 1000 | ||
}))); | ||
}); | ||
}), Effect.raceFirst(Fiber.join(writeFiber)), Effect.raceFirst(FiberSet.join(fiberSet))); | ||
@@ -88,3 +96,3 @@ }).pipe(Effect.scoped); | ||
return Socket.Socket.of({ | ||
[Socket.SocketTypeId]: Socket.SocketTypeId, | ||
[Socket.TypeId]: Socket.TypeId, | ||
run, | ||
@@ -91,0 +99,0 @@ writer |
{ | ||
"name": "@effect/platform-node-shared", | ||
"version": "0.2.5", | ||
"version": "0.3.0", | ||
"description": "Unified interfaces for common platform-specific services", | ||
@@ -16,4 +16,4 @@ "license": "MIT", | ||
"peerDependencies": { | ||
"@effect/platform": "^0.47.1", | ||
"effect": "^2.4.3" | ||
"@effect/platform": "^0.48.0", | ||
"effect": "^2.4.4" | ||
}, | ||
@@ -20,0 +20,0 @@ "exports": { |
@@ -6,15 +6,18 @@ import { defaultTeardown, type RunMain } from "@effect/platform/Runtime" | ||
/** @internal */ | ||
export const runMain: RunMain = <E, A>( | ||
effect: Effect.Effect<A, E>, | ||
teardown = defaultTeardown | ||
export const runMain: RunMain = ( | ||
effect, | ||
options | ||
) => { | ||
const teardown = options?.teardown ?? defaultTeardown | ||
const keepAlive = setInterval(() => {}, 2 ** 31 - 1) | ||
const fiber = Effect.runFork( | ||
Effect.tapErrorCause(effect, (cause) => { | ||
if (Cause.isInterruptedOnly(cause)) { | ||
return Effect.unit | ||
} | ||
return Effect.logError(cause) | ||
}) | ||
options?.disableErrorReporting === true ? | ||
effect : | ||
Effect.tapErrorCause(effect, (cause) => { | ||
if (Cause.isInterruptedOnly(cause)) { | ||
return Effect.unit | ||
} | ||
return Effect.logError(cause) | ||
}) | ||
) | ||
@@ -21,0 +24,0 @@ |
@@ -31,3 +31,2 @@ /** | ||
) | ||
const EOF = Symbol.for("@effect/experimental/Socket/Node/EOF") | ||
@@ -51,3 +50,3 @@ | ||
conn.on("error", (error) => { | ||
resume(Effect.fail(new Socket.SocketError({ reason: "Open", error }))) | ||
resume(Effect.fail(new Socket.SocketGenericError({ reason: "Open", error }))) | ||
}) | ||
@@ -76,3 +75,3 @@ return Effect.sync(() => { | ||
Effect.gen(function*(_) { | ||
const sendQueue = yield* _(Queue.unbounded<Uint8Array | typeof EOF>()) | ||
const sendQueue = yield* _(Queue.unbounded<Uint8Array | Socket.CloseEvent | typeof EOF>()) | ||
@@ -91,7 +90,9 @@ const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) => | ||
Effect.async<void, Socket.SocketError, never>((resume) => { | ||
if (chunk === EOF) { | ||
if (Socket.isCloseEvent(chunk)) { | ||
conn.destroy(chunk.code > 1000 ? new Error(`closed with code ${chunk.code}`) : undefined) | ||
} else if (chunk === EOF) { | ||
conn.end(() => resume(Effect.unit)) | ||
} else { | ||
conn.write(chunk, (error) => { | ||
resume(error ? Effect.fail(new Socket.SocketError({ reason: "Write", error })) : Effect.unit) | ||
resume(error ? Effect.fail(new Socket.SocketGenericError({ reason: "Write", error })) : Effect.unit) | ||
}) | ||
@@ -113,4 +114,14 @@ } | ||
conn.on("error", (error) => { | ||
resume(Effect.fail(new Socket.SocketError({ reason: "Read", error }))) | ||
resume(Effect.fail(new Socket.SocketGenericError({ reason: "Read", error }))) | ||
}) | ||
conn.on("close", (hadError) => { | ||
resume( | ||
Effect.fail( | ||
new Socket.SocketCloseError({ | ||
reason: "Close", | ||
code: hadError ? 1006 : 1000 | ||
}) | ||
) | ||
) | ||
}) | ||
}), | ||
@@ -122,3 +133,3 @@ Effect.raceFirst(Fiber.join(writeFiber)), | ||
const write = (chunk: Uint8Array) => Queue.offer(sendQueue, chunk) | ||
const write = (chunk: Uint8Array | Socket.CloseEvent) => Queue.offer(sendQueue, chunk) | ||
const writer = Effect.acquireRelease( | ||
@@ -130,3 +141,3 @@ Effect.succeed(write), | ||
return Socket.Socket.of({ | ||
[Socket.SocketTypeId]: Socket.SocketTypeId, | ||
[Socket.TypeId]: Socket.TypeId, | ||
run, | ||
@@ -143,3 +154,10 @@ writer | ||
options: Net.NetConnectOpts | ||
): Channel.Channel<Chunk.Chunk<Uint8Array>, Chunk.Chunk<Uint8Array>, Socket.SocketError | IE, IE, void, unknown> => | ||
): Channel.Channel< | ||
Chunk.Chunk<Uint8Array>, | ||
Chunk.Chunk<Uint8Array | Socket.CloseEvent>, | ||
Socket.SocketError | IE, | ||
IE, | ||
void, | ||
unknown | ||
> => | ||
Channel.unwrapScoped( | ||
@@ -146,0 +164,0 @@ Effect.map(makeNet(options), Socket.toChannelWith<IE>()) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
349644
5178