@effect/platform-node-shared
Advanced tools
Comparing version 0.0.0-snapshot-78b767c2b1625186e17131761a0edbac25d21850 to 0.0.0-snapshot-79dc882eafd6ddb60780764e2f663a6087b52622
@@ -47,6 +47,3 @@ "use strict"; | ||
} | ||
const inputToStdioOption = stdin => Option.match(stdin, { | ||
onNone: () => "inherit", | ||
onSome: () => "pipe" | ||
}); | ||
const inputToStdioOption = stdin => typeof stdin === "string" ? stdin : "pipe"; | ||
const outputToStdioOption = output => typeof output === "string" ? output : "pipe"; | ||
@@ -140,6 +137,3 @@ const toError = err => err instanceof globalThis.Error ? err : new globalThis.Error(String(err)); | ||
}); | ||
}), Effect.tap(process => Option.match(command.stdin, { | ||
onNone: () => Effect.void, | ||
onSome: stdin => Effect.forkDaemon(Stream.run(stdin, process.stdin)) | ||
}))); | ||
}), typeof command.stdin === "string" ? _Function.identity : Effect.tap(process => Effect.forkDaemon(Stream.run(command.stdin, process.stdin)))); | ||
} | ||
@@ -146,0 +140,0 @@ case "PipedCommand": |
@@ -10,3 +10,2 @@ "use strict"; | ||
var FileSystem = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("@effect/platform/FileSystem")); | ||
var Chunk = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/Chunk")); | ||
var Effect = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/Effect")); | ||
@@ -254,4 +253,6 @@ var _Function = /*#__PURE__*/require("effect/Function"); | ||
const makeFile = /*#__PURE__*/makeTempFileFactory("makeTempFileScoped"); | ||
const removeFile = /*#__PURE__*/removeFactory("makeTempFileScoped"); | ||
return options => Effect.acquireRelease(makeFile(options), file => Effect.orDie(removeFile(file))); | ||
const removeDirectory = /*#__PURE__*/removeFactory("makeTempFileScoped"); | ||
return options => Effect.acquireRelease(makeFile(options), file => Effect.orDie(removeDirectory(Path.dirname(file), { | ||
recursive: true | ||
}))); | ||
})(); | ||
@@ -331,45 +332,2 @@ // == readDirectory | ||
// == watch | ||
const watchParcel = (Watcher, path) => Stream.asyncScoped(emit => Effect.acquireRelease(Effect.tryPromise({ | ||
try: () => Watcher.subscribe(path, (error, events) => { | ||
if (error) { | ||
emit.fail(Error.SystemError({ | ||
reason: "Unknown", | ||
module: "FileSystem", | ||
method: "watch", | ||
pathOrDescriptor: path, | ||
message: error.message | ||
})); | ||
} else { | ||
emit.chunk(Chunk.unsafeFromArray(events.map(event => { | ||
switch (event.type) { | ||
case "create": | ||
{ | ||
return FileSystem.WatchEventCreate({ | ||
path: event.path | ||
}); | ||
} | ||
case "update": | ||
{ | ||
return FileSystem.WatchEventUpdate({ | ||
path: event.path | ||
}); | ||
} | ||
case "delete": | ||
{ | ||
return FileSystem.WatchEventRemove({ | ||
path: event.path | ||
}); | ||
} | ||
} | ||
}))); | ||
} | ||
}), | ||
catch: error => Error.SystemError({ | ||
reason: "Unknown", | ||
module: "FileSystem", | ||
method: "watch", | ||
pathOrDescriptor: path, | ||
message: error.message | ||
}) | ||
}), sub => Effect.promise(() => sub.unsubscribe()))); | ||
const watchNode = path => Stream.asyncScoped(emit => Effect.acquireRelease(Effect.sync(() => { | ||
@@ -414,6 +372,3 @@ const watcher = NFS.watch(path, {}, (event, path) => { | ||
}), watcher => Effect.sync(() => watcher.close()))); | ||
const watch = path => stat(path).pipe(Effect.flatMap(stat => stat.type === "Directory" ? Effect.matchCause(Effect.promise(() => import("@parcel/watcher")), { | ||
onSuccess: Watcher => watchParcel(Watcher, path), | ||
onFailure: _ => watchNode(path) | ||
}) : Effect.succeed(watchNode(path))), Stream.unwrap); | ||
const watch = (backend, path) => stat(path).pipe(Effect.map(stat => backend.pipe(Option.flatMap(_ => _.register(path, stat)), Option.getOrElse(() => watchNode(path)))), Stream.unwrap); | ||
// == writeFile | ||
@@ -437,3 +392,3 @@ const writeFile = (path, data, options) => Effect.async((resume, signal) => { | ||
}); | ||
const fileSystemImpl = /*#__PURE__*/FileSystem.make({ | ||
const makeFileSystem = /*#__PURE__*/Effect.map( /*#__PURE__*/Effect.serviceOption(FileSystem.WatchBackend), backend => FileSystem.make({ | ||
access, | ||
@@ -461,7 +416,9 @@ chmod, | ||
utimes, | ||
watch, | ||
watch(path) { | ||
return watch(backend, path); | ||
}, | ||
writeFile | ||
}); | ||
})); | ||
/** @internal */ | ||
const layer = exports.layer = /*#__PURE__*/Layer.succeed(FileSystem.FileSystem, fileSystemImpl); | ||
const layer = exports.layer = /*#__PURE__*/Layer.effect(FileSystem.FileSystem, makeFileSystem); | ||
//# sourceMappingURL=fileSystem.js.map |
@@ -11,2 +11,3 @@ "use strict"; | ||
var Effect = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/Effect")); | ||
var FiberRef = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/FiberRef")); | ||
var FiberSet = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/FiberSet")); | ||
@@ -16,2 +17,3 @@ var Layer = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/Layer")); | ||
var Queue = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/Queue")); | ||
var Scope = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("effect/Scope")); | ||
var Net = /*#__PURE__*/_interopRequireWildcard( /*#__PURE__*/require("node:net")); | ||
@@ -89,5 +91,7 @@ function _getRequireWildcardCache(e) { | ||
const fromNetSocket = open => Effect.gen(function* (_) { | ||
const sendQueue = yield* _(Queue.unbounded()); | ||
const sendQueue = yield* _(Queue.dropping(yield* FiberRef.get(Socket.currentSendQueueCapacity))); | ||
const openContext = yield* Effect.context(); | ||
const run = handler => Effect.gen(function* (_) { | ||
const conn = yield* _(open); | ||
const scope = yield* Effect.scope; | ||
const conn = yield* open.pipe(Effect.provide(Context.add(openContext, Scope.Scope, scope))); | ||
const fiberSet = yield* _(FiberSet.make()); | ||
@@ -108,2 +112,3 @@ const run = yield* _(FiberSet.runtime(fiberSet)(), Effect.provideService(NetSocket, conn)); | ||
} | ||
return Effect.void; | ||
})), Effect.forever, Effect.withUnhandledErrorLogLevel(Option.none()), FiberSet.run(fiberSet)); | ||
@@ -114,6 +119,6 @@ conn.on("data", chunk => { | ||
yield* _(Effect.async(resume => { | ||
conn.on("end", () => { | ||
function onEnd() { | ||
resume(Effect.void); | ||
}); | ||
conn.on("error", error => { | ||
} | ||
function onError(error) { | ||
resume(Effect.fail(new Socket.SocketGenericError({ | ||
@@ -123,4 +128,4 @@ reason: "Read", | ||
}))); | ||
}); | ||
conn.on("close", hadError => { | ||
} | ||
function onClose(hadError) { | ||
resume(Effect.fail(new Socket.SocketCloseError({ | ||
@@ -130,2 +135,10 @@ reason: "Close", | ||
}))); | ||
} | ||
conn.on("end", onEnd); | ||
conn.on("error", onError); | ||
conn.on("close", onClose); | ||
return Effect.sync(() => { | ||
conn.off("end", onEnd); | ||
conn.off("error", onError); | ||
conn.off("close", onClose); | ||
}); | ||
@@ -154,4 +167,4 @@ }), Effect.raceFirst(FiberSet.join(fiberSet))); | ||
exports.makeNetChannel = makeNetChannel; | ||
const layerNet = options => Layer.scoped(Socket.Socket, makeNet(options)); | ||
const layerNet = options => Layer.effect(Socket.Socket, makeNet(options)); | ||
exports.layerNet = layerNet; | ||
//# sourceMappingURL=NodeSocket.js.map |
@@ -11,3 +11,3 @@ /// <reference types="node" resolution-mode="require"/> | ||
import * as Layer from "effect/Layer"; | ||
import type * as Scope from "effect/Scope"; | ||
import * as Scope from "effect/Scope"; | ||
import * as Net from "node:net"; | ||
@@ -30,3 +30,3 @@ /** | ||
*/ | ||
export declare const makeNet: (options: Net.NetConnectOpts) => Effect.Effect<Socket.Socket, Socket.SocketError, Scope.Scope>; | ||
export declare const makeNet: (options: Net.NetConnectOpts) => Effect.Effect<Socket.Socket, Socket.SocketError>; | ||
/** | ||
@@ -36,3 +36,3 @@ * @since 1.0.0 | ||
*/ | ||
export declare const fromNetSocket: (open: Effect.Effect<Net.Socket, Socket.SocketError, Scope.Scope>) => Effect.Effect<Socket.Socket>; | ||
export declare const fromNetSocket: <RO>(open: Effect.Effect<Net.Socket, Socket.SocketError, RO>) => Effect.Effect<Socket.Socket, never, Exclude<RO, Scope.Scope>>; | ||
/** | ||
@@ -39,0 +39,0 @@ * @since 1.0.0 |
@@ -6,3 +6,3 @@ import * as Command from "@effect/platform/Command"; | ||
import * as Effect from "effect/Effect"; | ||
import { constUndefined, pipe } from "effect/Function"; | ||
import { constUndefined, identity, pipe } from "effect/Function"; | ||
import * as Inspectable from "effect/Inspectable"; | ||
@@ -17,6 +17,3 @@ import * as Layer from "effect/Layer"; | ||
import { fromReadable } from "./stream.js"; | ||
const inputToStdioOption = stdin => Option.match(stdin, { | ||
onNone: () => "inherit", | ||
onSome: () => "pipe" | ||
}); | ||
const inputToStdioOption = stdin => typeof stdin === "string" ? stdin : "pipe"; | ||
const outputToStdioOption = output => typeof output === "string" ? output : "pipe"; | ||
@@ -110,6 +107,3 @@ const toError = err => err instanceof globalThis.Error ? err : new globalThis.Error(String(err)); | ||
}); | ||
}), Effect.tap(process => Option.match(command.stdin, { | ||
onNone: () => Effect.void, | ||
onSome: stdin => Effect.forkDaemon(Stream.run(stdin, process.stdin)) | ||
}))); | ||
}), typeof command.stdin === "string" ? identity : Effect.tap(process => Effect.forkDaemon(Stream.run(command.stdin, process.stdin)))); | ||
} | ||
@@ -116,0 +110,0 @@ case "PipedCommand": |
import { effectify } from "@effect/platform/Effectify"; | ||
import * as Error from "@effect/platform/Error"; | ||
import * as FileSystem from "@effect/platform/FileSystem"; | ||
import * as Chunk from "effect/Chunk"; | ||
import * as Effect from "effect/Effect"; | ||
@@ -222,4 +221,6 @@ import { pipe } from "effect/Function"; | ||
const makeFile = /*#__PURE__*/makeTempFileFactory("makeTempFileScoped"); | ||
const removeFile = /*#__PURE__*/removeFactory("makeTempFileScoped"); | ||
return options => Effect.acquireRelease(makeFile(options), file => Effect.orDie(removeFile(file))); | ||
const removeDirectory = /*#__PURE__*/removeFactory("makeTempFileScoped"); | ||
return options => Effect.acquireRelease(makeFile(options), file => Effect.orDie(removeDirectory(Path.dirname(file), { | ||
recursive: true | ||
}))); | ||
})(); | ||
@@ -299,45 +300,2 @@ // == readDirectory | ||
// == watch | ||
const watchParcel = (Watcher, path) => Stream.asyncScoped(emit => Effect.acquireRelease(Effect.tryPromise({ | ||
try: () => Watcher.subscribe(path, (error, events) => { | ||
if (error) { | ||
emit.fail(Error.SystemError({ | ||
reason: "Unknown", | ||
module: "FileSystem", | ||
method: "watch", | ||
pathOrDescriptor: path, | ||
message: error.message | ||
})); | ||
} else { | ||
emit.chunk(Chunk.unsafeFromArray(events.map(event => { | ||
switch (event.type) { | ||
case "create": | ||
{ | ||
return FileSystem.WatchEventCreate({ | ||
path: event.path | ||
}); | ||
} | ||
case "update": | ||
{ | ||
return FileSystem.WatchEventUpdate({ | ||
path: event.path | ||
}); | ||
} | ||
case "delete": | ||
{ | ||
return FileSystem.WatchEventRemove({ | ||
path: event.path | ||
}); | ||
} | ||
} | ||
}))); | ||
} | ||
}), | ||
catch: error => Error.SystemError({ | ||
reason: "Unknown", | ||
module: "FileSystem", | ||
method: "watch", | ||
pathOrDescriptor: path, | ||
message: error.message | ||
}) | ||
}), sub => Effect.promise(() => sub.unsubscribe()))); | ||
const watchNode = path => Stream.asyncScoped(emit => Effect.acquireRelease(Effect.sync(() => { | ||
@@ -382,6 +340,3 @@ const watcher = NFS.watch(path, {}, (event, path) => { | ||
}), watcher => Effect.sync(() => watcher.close()))); | ||
const watch = path => stat(path).pipe(Effect.flatMap(stat => stat.type === "Directory" ? Effect.matchCause(Effect.promise(() => import("@parcel/watcher")), { | ||
onSuccess: Watcher => watchParcel(Watcher, path), | ||
onFailure: _ => watchNode(path) | ||
}) : Effect.succeed(watchNode(path))), Stream.unwrap); | ||
const watch = (backend, path) => stat(path).pipe(Effect.map(stat => backend.pipe(Option.flatMap(_ => _.register(path, stat)), Option.getOrElse(() => watchNode(path)))), Stream.unwrap); | ||
// == writeFile | ||
@@ -405,3 +360,3 @@ const writeFile = (path, data, options) => Effect.async((resume, signal) => { | ||
}); | ||
const fileSystemImpl = /*#__PURE__*/FileSystem.make({ | ||
const makeFileSystem = /*#__PURE__*/Effect.map( /*#__PURE__*/Effect.serviceOption(FileSystem.WatchBackend), backend => FileSystem.make({ | ||
access, | ||
@@ -429,7 +384,9 @@ chmod, | ||
utimes, | ||
watch, | ||
watch(path) { | ||
return watch(backend, path); | ||
}, | ||
writeFile | ||
}); | ||
})); | ||
/** @internal */ | ||
export const layer = /*#__PURE__*/Layer.succeed(FileSystem.FileSystem, fileSystemImpl); | ||
export const layer = /*#__PURE__*/Layer.effect(FileSystem.FileSystem, makeFileSystem); | ||
//# sourceMappingURL=fileSystem.js.map |
@@ -8,2 +8,3 @@ /** | ||
import * as Effect from "effect/Effect"; | ||
import * as FiberRef from "effect/FiberRef"; | ||
import * as FiberSet from "effect/FiberSet"; | ||
@@ -13,2 +14,3 @@ import * as Layer from "effect/Layer"; | ||
import * as Queue from "effect/Queue"; | ||
import * as Scope from "effect/Scope"; | ||
import * as Net from "node:net"; | ||
@@ -56,5 +58,7 @@ /** | ||
export const fromNetSocket = open => Effect.gen(function* (_) { | ||
const sendQueue = yield* _(Queue.unbounded()); | ||
const sendQueue = yield* _(Queue.dropping(yield* FiberRef.get(Socket.currentSendQueueCapacity))); | ||
const openContext = yield* Effect.context(); | ||
const run = handler => Effect.gen(function* (_) { | ||
const conn = yield* _(open); | ||
const scope = yield* Effect.scope; | ||
const conn = yield* open.pipe(Effect.provide(Context.add(openContext, Scope.Scope, scope))); | ||
const fiberSet = yield* _(FiberSet.make()); | ||
@@ -75,2 +79,3 @@ const run = yield* _(FiberSet.runtime(fiberSet)(), Effect.provideService(NetSocket, conn)); | ||
} | ||
return Effect.void; | ||
})), Effect.forever, Effect.withUnhandledErrorLogLevel(Option.none()), FiberSet.run(fiberSet)); | ||
@@ -81,6 +86,6 @@ conn.on("data", chunk => { | ||
yield* _(Effect.async(resume => { | ||
conn.on("end", () => { | ||
function onEnd() { | ||
resume(Effect.void); | ||
}); | ||
conn.on("error", error => { | ||
} | ||
function onError(error) { | ||
resume(Effect.fail(new Socket.SocketGenericError({ | ||
@@ -90,4 +95,4 @@ reason: "Read", | ||
}))); | ||
}); | ||
conn.on("close", hadError => { | ||
} | ||
function onClose(hadError) { | ||
resume(Effect.fail(new Socket.SocketCloseError({ | ||
@@ -97,2 +102,10 @@ reason: "Close", | ||
}))); | ||
} | ||
conn.on("end", onEnd); | ||
conn.on("error", onError); | ||
conn.on("close", onClose); | ||
return Effect.sync(() => { | ||
conn.off("end", onEnd); | ||
conn.off("error", onError); | ||
conn.off("close", onClose); | ||
}); | ||
@@ -119,3 +132,3 @@ }), Effect.raceFirst(FiberSet.join(fiberSet))); | ||
*/ | ||
export const layerNet = options => Layer.scoped(Socket.Socket, makeNet(options)); | ||
export const layerNet = options => Layer.effect(Socket.Socket, makeNet(options)); | ||
//# sourceMappingURL=NodeSocket.js.map |
{ | ||
"name": "@effect/platform-node-shared", | ||
"version": "0.0.0-snapshot-78b767c2b1625186e17131761a0edbac25d21850", | ||
"version": "0.0.0-snapshot-79dc882eafd6ddb60780764e2f663a6087b52622", | ||
"description": "Unified interfaces for common platform-specific services", | ||
@@ -8,3 +8,3 @@ "license": "MIT", | ||
"type": "git", | ||
"url": "https://github.com/effect-ts/effect.git", | ||
"url": "https://github.com/Effect-TS/effect.git", | ||
"directory": "packages/platform-node-shared" | ||
@@ -15,8 +15,11 @@ }, | ||
"@parcel/watcher": "^2.4.1", | ||
"multipasta": "^0.2.0" | ||
"multipasta": "^0.2.1" | ||
}, | ||
"peerDependencies": { | ||
"@effect/platform": "^0.0.0-snapshot-78b767c2b1625186e17131761a0edbac25d21850", | ||
"effect": "^0.0.0-snapshot-78b767c2b1625186e17131761a0edbac25d21850" | ||
"@effect/platform": "^0.0.0-snapshot-79dc882eafd6ddb60780764e2f663a6087b52622", | ||
"effect": "^0.0.0-snapshot-79dc882eafd6ddb60780764e2f663a6087b52622" | ||
}, | ||
"publishConfig": { | ||
"provenance": true | ||
}, | ||
"exports": { | ||
@@ -23,0 +26,0 @@ "./package.json": "./package.json", |
@@ -7,3 +7,3 @@ import * as Command from "@effect/platform/Command" | ||
import * as Effect from "effect/Effect" | ||
import { constUndefined, pipe } from "effect/Function" | ||
import { constUndefined, identity, pipe } from "effect/Function" | ||
import * as Inspectable from "effect/Inspectable" | ||
@@ -20,4 +20,4 @@ import * as Layer from "effect/Layer" | ||
const inputToStdioOption = (stdin: Option.Option<Command.Command.Input>): "pipe" | "inherit" => | ||
Option.match(stdin, { onNone: () => "inherit", onSome: () => "pipe" }) | ||
const inputToStdioOption = (stdin: Command.Command.Input): "pipe" | "inherit" => | ||
typeof stdin === "string" ? stdin : "pipe" | ||
@@ -173,8 +173,7 @@ const outputToStdioOption = (output: Command.Command.Output): "pipe" | "inherit" => | ||
}), | ||
Effect.tap((process) => | ||
Option.match(command.stdin, { | ||
onNone: () => Effect.void, | ||
onSome: (stdin) => Effect.forkDaemon(Stream.run(stdin, process.stdin)) | ||
}) | ||
) | ||
typeof command.stdin === "string" | ||
? identity | ||
: Effect.tap((process) => | ||
Effect.forkDaemon(Stream.run(command.stdin as Stream.Stream<Uint8Array>, process.stdin)) | ||
) | ||
) | ||
@@ -181,0 +180,0 @@ } |
import { effectify } from "@effect/platform/Effectify" | ||
import * as Error from "@effect/platform/Error" | ||
import * as FileSystem from "@effect/platform/FileSystem" | ||
import type * as ParcelWatcher from "@parcel/watcher" | ||
import * as Chunk from "effect/Chunk" | ||
import type * as Context from "effect/Context" | ||
import * as Effect from "effect/Effect" | ||
@@ -384,7 +383,7 @@ import { pipe } from "effect/Function" | ||
const makeFile = makeTempFileFactory("makeTempFileScoped") | ||
const removeFile = removeFactory("makeTempFileScoped") | ||
const removeDirectory = removeFactory("makeTempFileScoped") | ||
return (options?: FileSystem.MakeTempFileOptions) => | ||
Effect.acquireRelease( | ||
makeFile(options), | ||
(file) => Effect.orDie(removeFile(file)) | ||
(file) => Effect.orDie(removeDirectory(Path.dirname(file), { recursive: true })) | ||
) | ||
@@ -528,45 +527,2 @@ })() | ||
const watchParcel = (Watcher: typeof ParcelWatcher, path: string) => | ||
Stream.asyncScoped<FileSystem.WatchEvent, Error.PlatformError>((emit) => | ||
Effect.acquireRelease( | ||
Effect.tryPromise({ | ||
try: () => | ||
Watcher.subscribe(path, (error, events) => { | ||
if (error) { | ||
emit.fail(Error.SystemError({ | ||
reason: "Unknown", | ||
module: "FileSystem", | ||
method: "watch", | ||
pathOrDescriptor: path, | ||
message: error.message | ||
})) | ||
} else { | ||
emit.chunk(Chunk.unsafeFromArray(events.map((event) => { | ||
switch (event.type) { | ||
case "create": { | ||
return FileSystem.WatchEventCreate({ path: event.path }) | ||
} | ||
case "update": { | ||
return FileSystem.WatchEventUpdate({ path: event.path }) | ||
} | ||
case "delete": { | ||
return FileSystem.WatchEventRemove({ path: event.path }) | ||
} | ||
} | ||
}))) | ||
} | ||
}), | ||
catch: (error) => | ||
Error.SystemError({ | ||
reason: "Unknown", | ||
module: "FileSystem", | ||
method: "watch", | ||
pathOrDescriptor: path, | ||
message: (error as Error).message | ||
}) | ||
}), | ||
(sub) => Effect.promise(() => sub.unsubscribe()) | ||
) | ||
) | ||
const watchNode = (path: string) => | ||
@@ -610,11 +566,9 @@ Stream.asyncScoped<FileSystem.WatchEvent, Error.PlatformError>((emit) => | ||
const watch = (path: string) => | ||
const watch = (backend: Option.Option<Context.Tag.Service<FileSystem.WatchBackend>>, path: string) => | ||
stat(path).pipe( | ||
Effect.flatMap((stat) => | ||
stat.type === "Directory" ? | ||
Effect.matchCause(Effect.promise(() => import("@parcel/watcher")), { | ||
onSuccess: (Watcher) => watchParcel(Watcher, path), | ||
onFailure: (_) => watchNode(path) | ||
}) : | ||
Effect.succeed(watchNode(path)) | ||
Effect.map((stat) => | ||
backend.pipe( | ||
Option.flatMap((_) => _.register(path, stat)), | ||
Option.getOrElse(() => watchNode(path)) | ||
) | ||
), | ||
@@ -645,30 +599,33 @@ Stream.unwrap | ||
const fileSystemImpl = FileSystem.make({ | ||
access, | ||
chmod, | ||
chown, | ||
copy, | ||
copyFile, | ||
link, | ||
makeDirectory, | ||
makeTempDirectory, | ||
makeTempDirectoryScoped, | ||
makeTempFile, | ||
makeTempFileScoped, | ||
open, | ||
readDirectory, | ||
readFile, | ||
readLink, | ||
realPath, | ||
remove, | ||
rename, | ||
stat, | ||
symlink, | ||
truncate, | ||
utimes, | ||
watch, | ||
writeFile | ||
}) | ||
const makeFileSystem = Effect.map(Effect.serviceOption(FileSystem.WatchBackend), (backend) => | ||
FileSystem.make({ | ||
access, | ||
chmod, | ||
chown, | ||
copy, | ||
copyFile, | ||
link, | ||
makeDirectory, | ||
makeTempDirectory, | ||
makeTempDirectoryScoped, | ||
makeTempFile, | ||
makeTempFileScoped, | ||
open, | ||
readDirectory, | ||
readFile, | ||
readLink, | ||
realPath, | ||
remove, | ||
rename, | ||
stat, | ||
symlink, | ||
truncate, | ||
utimes, | ||
watch(path) { | ||
return watch(backend, path) | ||
}, | ||
writeFile | ||
})) | ||
/** @internal */ | ||
export const layer = Layer.succeed(FileSystem.FileSystem, fileSystemImpl) | ||
export const layer = Layer.effect(FileSystem.FileSystem, makeFileSystem) |
@@ -9,2 +9,3 @@ /** | ||
import * as Effect from "effect/Effect" | ||
import * as FiberRef from "effect/FiberRef" | ||
import * as FiberSet from "effect/FiberSet" | ||
@@ -14,3 +15,3 @@ import * as Layer from "effect/Layer" | ||
import * as Queue from "effect/Queue" | ||
import type * as Scope from "effect/Scope" | ||
import * as Scope from "effect/Scope" | ||
import * as Net from "node:net" | ||
@@ -41,3 +42,3 @@ | ||
options: Net.NetConnectOpts | ||
): Effect.Effect<Socket.Socket, Socket.SocketError, Scope.Scope> => | ||
): Effect.Effect<Socket.Socket, Socket.SocketError> => | ||
fromNetSocket( | ||
@@ -76,11 +77,17 @@ Effect.acquireRelease( | ||
*/ | ||
export const fromNetSocket = ( | ||
open: Effect.Effect<Net.Socket, Socket.SocketError, Scope.Scope> | ||
): Effect.Effect<Socket.Socket> => | ||
export const fromNetSocket = <RO>( | ||
open: Effect.Effect<Net.Socket, Socket.SocketError, RO> | ||
): Effect.Effect<Socket.Socket, never, Exclude<RO, Scope.Scope>> => | ||
Effect.gen(function*(_) { | ||
const sendQueue = yield* _(Queue.unbounded<Uint8Array | string | Socket.CloseEvent | typeof EOF>()) | ||
const sendQueue = yield* _(Queue.dropping<Uint8Array | string | Socket.CloseEvent | typeof EOF>( | ||
yield* FiberRef.get(Socket.currentSendQueueCapacity) | ||
)) | ||
const openContext = yield* Effect.context<Exclude<RO, Scope.Scope>>() | ||
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) => | ||
Effect.gen(function*(_) { | ||
const conn = yield* _(open) | ||
const scope = yield* Effect.scope | ||
const conn = yield* open.pipe( | ||
Effect.provide(Context.add(openContext, Scope.Scope, scope)) | ||
) as Effect.Effect<Net.Socket> | ||
const fiberSet = yield* _(FiberSet.make<any, E | Socket.SocketError>()) | ||
@@ -104,2 +111,3 @@ const run = yield* _( | ||
} | ||
return Effect.void | ||
}) | ||
@@ -116,9 +124,9 @@ ), | ||
Effect.async<void, Socket.SocketError, never>((resume) => { | ||
conn.on("end", () => { | ||
function onEnd() { | ||
resume(Effect.void) | ||
}) | ||
conn.on("error", (error) => { | ||
} | ||
function onError(error: Error) { | ||
resume(Effect.fail(new Socket.SocketGenericError({ reason: "Read", error }))) | ||
}) | ||
conn.on("close", (hadError) => { | ||
} | ||
function onClose(hadError: boolean) { | ||
resume( | ||
@@ -132,2 +140,10 @@ Effect.fail( | ||
) | ||
} | ||
conn.on("end", onEnd) | ||
conn.on("error", onError) | ||
conn.on("close", onClose) | ||
return Effect.sync(() => { | ||
conn.off("end", onEnd) | ||
conn.off("error", onError) | ||
conn.off("close", onClose) | ||
}) | ||
@@ -179,5 +195,5 @@ }), | ||
export const layerNet = (options: Net.NetConnectOpts): Layer.Layer<Socket.Socket, Socket.SocketError> => | ||
Layer.scoped( | ||
Layer.effect( | ||
Socket.Socket, | ||
makeNet(options) | ||
) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
399923
176
6031
Updatedmultipasta@^0.2.1