@effect/platform
Advanced tools
Comparing version 0.69.15 to 0.69.16
@@ -67,10 +67,20 @@ "use strict"; | ||
const encodeUrlParams = endpoint.urlParamsSchema.pipe(Option.map(Schema.encodeUnknown)); | ||
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => { | ||
const url = request && request.path ? makeUrl(request.path) : endpoint.path; | ||
const baseRequest = HttpClientRequest.make(endpoint.method)(url); | ||
return (isMultipart ? Effect.succeed(baseRequest.pipe(HttpClientRequest.bodyFormData(request.payload))) : encodePayload._tag === "Some" ? encodePayload.value(request.payload).pipe(Effect.flatMap(payload => HttpMethod.hasBody(endpoint.method) ? HttpClientRequest.bodyJson(baseRequest, payload) : Effect.succeed(HttpClientRequest.setUrlParams(baseRequest, payload))), Effect.orDie) : Effect.succeed(baseRequest)).pipe(encodeHeaders._tag === "Some" ? Effect.flatMap(httpRequest => encodeHeaders.value(request.headers).pipe(Effect.orDie, Effect.map(headers => HttpClientRequest.setHeaders(httpRequest, headers)))) : _Function.identity, encodeUrlParams._tag === "Some" ? Effect.flatMap(httpRequest => encodeUrlParams.value(request.urlParams).pipe(Effect.orDie, Effect.map(params => HttpClientRequest.appendUrlParams(httpRequest, params)))) : _Function.identity, Effect.flatMap(httpClient.execute), Effect.flatMap(response => { | ||
const value = options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response)); | ||
return request?.withResponse === true ? Effect.map(value, value => [value, response]) : value; | ||
}), Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input))); | ||
}; | ||
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => Effect.gen(function* () { | ||
let httpRequest = HttpClientRequest.make(endpoint.method)(request && request.path ? makeUrl(request.path) : endpoint.path); | ||
if (isMultipart) { | ||
httpRequest = HttpClientRequest.bodyFormData(httpRequest, request.payload); | ||
} else if (encodePayload._tag === "Some") { | ||
const payload = yield* encodePayload.value(request.payload); | ||
httpRequest = HttpMethod.hasBody(endpoint.method) ? yield* Effect.orDie(HttpClientRequest.bodyJson(httpRequest, payload)) : HttpClientRequest.setUrlParams(httpRequest, payload); | ||
} | ||
if (encodeHeaders._tag === "Some") { | ||
httpRequest = HttpClientRequest.setHeaders(httpRequest, yield* encodeHeaders.value(request.headers)); | ||
} | ||
if (encodeUrlParams._tag === "Some") { | ||
httpRequest = HttpClientRequest.appendUrlParams(httpRequest, yield* encodeUrlParams.value(request.urlParams)); | ||
} | ||
const response = yield* httpClient.execute(httpRequest); | ||
const value = yield* options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response)); | ||
return request?.withResponse === true ? [value, response] : value; | ||
}).pipe(Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input))); | ||
} | ||
@@ -77,0 +87,0 @@ }); |
@@ -13,2 +13,3 @@ "use strict"; | ||
var Exit = _interopRequireWildcard(require("effect/Exit")); | ||
var Fiber = _interopRequireWildcard(require("effect/Fiber")); | ||
var FiberRef = _interopRequireWildcard(require("effect/FiberRef")); | ||
@@ -20,4 +21,4 @@ var FiberSet = _interopRequireWildcard(require("effect/FiberSet")); | ||
var Mailbox = _interopRequireWildcard(require("effect/Mailbox")); | ||
var Option = _interopRequireWildcard(require("effect/Option")); | ||
var Predicate = _interopRequireWildcard(require("effect/Predicate")); | ||
var Queue = _interopRequireWildcard(require("effect/Queue")); | ||
var Scope = _interopRequireWildcard(require("effect/Scope")); | ||
@@ -218,4 +219,8 @@ var _Error = require("./Error.js"); | ||
exports.makeWebSocket = makeWebSocket; | ||
const fromWebSocket = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () { | ||
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity)); | ||
const fromWebSocket = (acquire, options) => Effect.gen(function* () { | ||
const fiber = Option.getOrThrow(Fiber.getCurrentFiber()); | ||
const sendQueue = yield* Mailbox.make({ | ||
capacity: fiber.getFiberRef(currentSendQueueCapacity), | ||
strategy: "dropping" | ||
}); | ||
const acquireContext = fiber.currentContext; | ||
@@ -278,11 +283,8 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError; | ||
open = true; | ||
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => isCloseEvent(chunk) ? Effect.failSync(() => { | ||
ws.close(chunk.code, chunk.reason); | ||
return new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}); | ||
}) : Effect.try({ | ||
try: () => ws.send(chunk), | ||
yield* sendQueue.takeAll.pipe(Effect.tap(([chunk]) => Effect.try({ | ||
try: () => { | ||
for (const item of chunk) { | ||
ws.send(item); | ||
} | ||
}, | ||
catch: cause => new SocketGenericError({ | ||
@@ -292,3 +294,6 @@ reason: "Write", | ||
}) | ||
})), Effect.forever, FiberSet.run(fiberSet)); | ||
})), Effect.forever, Effect.catchIf(SocketCloseError.is, error => { | ||
ws.close(error.code, error.closeReason); | ||
return Effect.fail(error); | ||
}), FiberSet.run(fiberSet)); | ||
return yield* FiberSet.join(fiberSet).pipe(Effect.catchIf(SocketCloseError.isClean(_ => !closeCodeIsError(_)), _ => Effect.void)); | ||
@@ -298,3 +303,7 @@ }).pipe(Effect.mapInputContext(input => Context.merge(acquireContext, input)), Effect.scoped, Effect.interruptible); | ||
const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data)); | ||
const write = chunk => Queue.offer(sendQueue, chunk); | ||
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
})) : sendQueue.offer(chunk); | ||
const writer = Effect.succeed(write); | ||
@@ -307,3 +316,3 @@ return Socket.of({ | ||
}); | ||
})); | ||
}); | ||
/** | ||
@@ -331,5 +340,8 @@ * @since 1.0.0 | ||
*/ | ||
const fromTransformStream = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () { | ||
const EOF = Symbol(); | ||
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity)); | ||
const fromTransformStream = (acquire, options) => Effect.gen(function* () { | ||
const fiber = Option.getOrThrow(Fiber.getCurrentFiber()); | ||
const sendQueue = yield* Mailbox.make({ | ||
capacity: fiber.getFiberRef(currentSendQueueCapacity), | ||
strategy: "dropping" | ||
}); | ||
const acquireContext = fiber.currentContext; | ||
@@ -345,16 +357,11 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError; | ||
const encoder = new TextEncoder(); | ||
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => { | ||
if (chunk === EOF || isCloseEvent(chunk)) { | ||
return Effect.zipRight(Effect.promise(() => writer.close()), chunk === EOF ? Effect.interrupt : Effect.fail(new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}))); | ||
} | ||
return Effect.try({ | ||
yield* sendQueue.takeAll.pipe(Effect.flatMap(([chunk, done]) => { | ||
const write = Effect.try({ | ||
try: () => { | ||
if (typeof chunk === "string") { | ||
writer.write(encoder.encode(chunk)); | ||
} else { | ||
writer.write(chunk); | ||
for (const item of chunk) { | ||
if (typeof item === "string") { | ||
writer.write(encoder.encode(item)); | ||
} else { | ||
writer.write(item); | ||
} | ||
} | ||
@@ -367,3 +374,4 @@ }, | ||
}); | ||
}), Effect.forever, FiberSet.run(fiberSet)); | ||
return done ? Effect.zipRight(write, Effect.interrupt) : write; | ||
}), Effect.forever, Effect.ensuring(Effect.promise(() => writer.close())), FiberSet.run(fiberSet)); | ||
yield* Effect.tryPromise({ | ||
@@ -388,4 +396,8 @@ try: () => reader.read(), | ||
const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data)); | ||
const write = chunk => Queue.offer(sendQueue, chunk); | ||
const writer = Effect.acquireRelease(Effect.succeed(write), () => Queue.offer(sendQueue, EOF)); | ||
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
})) : sendQueue.offer(chunk); | ||
const writer = Effect.acquireRelease(Effect.succeed(write), () => sendQueue.end); | ||
return Socket.of({ | ||
@@ -397,4 +409,4 @@ [TypeId]: TypeId, | ||
}); | ||
})); | ||
}); | ||
exports.fromTransformStream = fromTransformStream; | ||
//# sourceMappingURL=Socket.js.map |
@@ -58,10 +58,20 @@ /** | ||
const encodeUrlParams = endpoint.urlParamsSchema.pipe(Option.map(Schema.encodeUnknown)); | ||
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => { | ||
const url = request && request.path ? makeUrl(request.path) : endpoint.path; | ||
const baseRequest = HttpClientRequest.make(endpoint.method)(url); | ||
return (isMultipart ? Effect.succeed(baseRequest.pipe(HttpClientRequest.bodyFormData(request.payload))) : encodePayload._tag === "Some" ? encodePayload.value(request.payload).pipe(Effect.flatMap(payload => HttpMethod.hasBody(endpoint.method) ? HttpClientRequest.bodyJson(baseRequest, payload) : Effect.succeed(HttpClientRequest.setUrlParams(baseRequest, payload))), Effect.orDie) : Effect.succeed(baseRequest)).pipe(encodeHeaders._tag === "Some" ? Effect.flatMap(httpRequest => encodeHeaders.value(request.headers).pipe(Effect.orDie, Effect.map(headers => HttpClientRequest.setHeaders(httpRequest, headers)))) : identity, encodeUrlParams._tag === "Some" ? Effect.flatMap(httpRequest => encodeUrlParams.value(request.urlParams).pipe(Effect.orDie, Effect.map(params => HttpClientRequest.appendUrlParams(httpRequest, params)))) : identity, Effect.flatMap(httpClient.execute), Effect.flatMap(response => { | ||
const value = options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response)); | ||
return request?.withResponse === true ? Effect.map(value, value => [value, response]) : value; | ||
}), Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input))); | ||
}; | ||
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => Effect.gen(function* () { | ||
let httpRequest = HttpClientRequest.make(endpoint.method)(request && request.path ? makeUrl(request.path) : endpoint.path); | ||
if (isMultipart) { | ||
httpRequest = HttpClientRequest.bodyFormData(httpRequest, request.payload); | ||
} else if (encodePayload._tag === "Some") { | ||
const payload = yield* encodePayload.value(request.payload); | ||
httpRequest = HttpMethod.hasBody(endpoint.method) ? yield* Effect.orDie(HttpClientRequest.bodyJson(httpRequest, payload)) : HttpClientRequest.setUrlParams(httpRequest, payload); | ||
} | ||
if (encodeHeaders._tag === "Some") { | ||
httpRequest = HttpClientRequest.setHeaders(httpRequest, yield* encodeHeaders.value(request.headers)); | ||
} | ||
if (encodeUrlParams._tag === "Some") { | ||
httpRequest = HttpClientRequest.appendUrlParams(httpRequest, yield* encodeUrlParams.value(request.urlParams)); | ||
} | ||
const response = yield* httpClient.execute(httpRequest); | ||
const value = yield* options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response)); | ||
return request?.withResponse === true ? [value, response] : value; | ||
}).pipe(Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input))); | ||
} | ||
@@ -68,0 +78,0 @@ }); |
@@ -10,2 +10,3 @@ /** | ||
import * as Exit from "effect/Exit"; | ||
import * as Fiber from "effect/Fiber"; | ||
import * as FiberRef from "effect/FiberRef"; | ||
@@ -17,4 +18,4 @@ import * as FiberSet from "effect/FiberSet"; | ||
import * as Mailbox from "effect/Mailbox"; | ||
import * as Option from "effect/Option"; | ||
import * as Predicate from "effect/Predicate"; | ||
import * as Queue from "effect/Queue"; | ||
import * as Scope from "effect/Scope"; | ||
@@ -197,4 +198,8 @@ import { TypeIdError } from "./Error.js"; | ||
*/ | ||
export const fromWebSocket = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () { | ||
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity)); | ||
export const fromWebSocket = (acquire, options) => Effect.gen(function* () { | ||
const fiber = Option.getOrThrow(Fiber.getCurrentFiber()); | ||
const sendQueue = yield* Mailbox.make({ | ||
capacity: fiber.getFiberRef(currentSendQueueCapacity), | ||
strategy: "dropping" | ||
}); | ||
const acquireContext = fiber.currentContext; | ||
@@ -257,11 +262,8 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError; | ||
open = true; | ||
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => isCloseEvent(chunk) ? Effect.failSync(() => { | ||
ws.close(chunk.code, chunk.reason); | ||
return new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}); | ||
}) : Effect.try({ | ||
try: () => ws.send(chunk), | ||
yield* sendQueue.takeAll.pipe(Effect.tap(([chunk]) => Effect.try({ | ||
try: () => { | ||
for (const item of chunk) { | ||
ws.send(item); | ||
} | ||
}, | ||
catch: cause => new SocketGenericError({ | ||
@@ -271,3 +273,6 @@ reason: "Write", | ||
}) | ||
})), Effect.forever, FiberSet.run(fiberSet)); | ||
})), Effect.forever, Effect.catchIf(SocketCloseError.is, error => { | ||
ws.close(error.code, error.closeReason); | ||
return Effect.fail(error); | ||
}), FiberSet.run(fiberSet)); | ||
return yield* FiberSet.join(fiberSet).pipe(Effect.catchIf(SocketCloseError.isClean(_ => !closeCodeIsError(_)), _ => Effect.void)); | ||
@@ -277,3 +282,7 @@ }).pipe(Effect.mapInputContext(input => Context.merge(acquireContext, input)), Effect.scoped, Effect.interruptible); | ||
const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data)); | ||
const write = chunk => Queue.offer(sendQueue, chunk); | ||
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
})) : sendQueue.offer(chunk); | ||
const writer = Effect.succeed(write); | ||
@@ -286,3 +295,3 @@ return Socket.of({ | ||
}); | ||
})); | ||
}); | ||
/** | ||
@@ -307,5 +316,8 @@ * @since 1.0.0 | ||
*/ | ||
export const fromTransformStream = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () { | ||
const EOF = Symbol(); | ||
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity)); | ||
export const fromTransformStream = (acquire, options) => Effect.gen(function* () { | ||
const fiber = Option.getOrThrow(Fiber.getCurrentFiber()); | ||
const sendQueue = yield* Mailbox.make({ | ||
capacity: fiber.getFiberRef(currentSendQueueCapacity), | ||
strategy: "dropping" | ||
}); | ||
const acquireContext = fiber.currentContext; | ||
@@ -321,16 +333,11 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError; | ||
const encoder = new TextEncoder(); | ||
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => { | ||
if (chunk === EOF || isCloseEvent(chunk)) { | ||
return Effect.zipRight(Effect.promise(() => writer.close()), chunk === EOF ? Effect.interrupt : Effect.fail(new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}))); | ||
} | ||
return Effect.try({ | ||
yield* sendQueue.takeAll.pipe(Effect.flatMap(([chunk, done]) => { | ||
const write = Effect.try({ | ||
try: () => { | ||
if (typeof chunk === "string") { | ||
writer.write(encoder.encode(chunk)); | ||
} else { | ||
writer.write(chunk); | ||
for (const item of chunk) { | ||
if (typeof item === "string") { | ||
writer.write(encoder.encode(item)); | ||
} else { | ||
writer.write(item); | ||
} | ||
} | ||
@@ -343,3 +350,4 @@ }, | ||
}); | ||
}), Effect.forever, FiberSet.run(fiberSet)); | ||
return done ? Effect.zipRight(write, Effect.interrupt) : write; | ||
}), Effect.forever, Effect.ensuring(Effect.promise(() => writer.close())), FiberSet.run(fiberSet)); | ||
yield* Effect.tryPromise({ | ||
@@ -364,4 +372,8 @@ try: () => reader.read(), | ||
const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data)); | ||
const write = chunk => Queue.offer(sendQueue, chunk); | ||
const writer = Effect.acquireRelease(Effect.succeed(write), () => Queue.offer(sendQueue, EOF)); | ||
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
})) : sendQueue.offer(chunk); | ||
const writer = Effect.acquireRelease(Effect.succeed(write), () => sendQueue.end); | ||
return Socket.of({ | ||
@@ -373,3 +385,3 @@ [TypeId]: TypeId, | ||
}); | ||
})); | ||
}); | ||
//# sourceMappingURL=Socket.js.map |
{ | ||
"name": "@effect/platform", | ||
"version": "0.69.15", | ||
"version": "0.69.16", | ||
"description": "Unified interfaces for common platform-specific services", | ||
@@ -17,3 +17,3 @@ "license": "MIT", | ||
"peerDependencies": { | ||
"effect": "^3.10.9" | ||
"effect": "^3.10.10" | ||
}, | ||
@@ -20,0 +20,0 @@ "publishConfig": { |
@@ -155,47 +155,37 @@ /** | ||
readonly withResponse?: boolean | ||
}) => { | ||
const url = request && request.path ? makeUrl(request.path) : endpoint.path | ||
const baseRequest = HttpClientRequest.make(endpoint.method)(url) | ||
return (isMultipart ? | ||
Effect.succeed(baseRequest.pipe( | ||
HttpClientRequest.bodyFormData(request.payload) | ||
)) | ||
: encodePayload._tag === "Some" | ||
? encodePayload.value(request.payload).pipe( | ||
Effect.flatMap((payload) => | ||
HttpMethod.hasBody(endpoint.method) | ||
? HttpClientRequest.bodyJson(baseRequest, payload) | ||
: Effect.succeed(HttpClientRequest.setUrlParams(baseRequest, payload as any)) | ||
), | ||
Effect.orDie | ||
}) => | ||
Effect.gen(function*() { | ||
let httpRequest = HttpClientRequest.make(endpoint.method)( | ||
request && request.path ? makeUrl(request.path) : endpoint.path | ||
) | ||
: Effect.succeed(baseRequest)).pipe( | ||
encodeHeaders._tag === "Some" | ||
? Effect.flatMap((httpRequest) => | ||
encodeHeaders.value(request.headers).pipe( | ||
Effect.orDie, | ||
Effect.map((headers) => HttpClientRequest.setHeaders(httpRequest, headers as any)) | ||
) | ||
) | ||
: identity, | ||
encodeUrlParams._tag === "Some" | ||
? Effect.flatMap((httpRequest) => | ||
encodeUrlParams.value(request.urlParams).pipe( | ||
Effect.orDie, | ||
Effect.map((params) => HttpClientRequest.appendUrlParams(httpRequest, params as any)) | ||
) | ||
) | ||
: identity, | ||
Effect.flatMap(httpClient.execute), | ||
Effect.flatMap((response) => { | ||
const value = options?.transformResponse === undefined | ||
? decodeResponse(response) | ||
: options.transformResponse(decodeResponse(response)) | ||
return request?.withResponse === true ? Effect.map(value, (value) => [value, response]) : value | ||
}), | ||
Effect.scoped, | ||
Effect.catchIf(ParseResult.isParseError, Effect.die), | ||
Effect.mapInputContext((input) => Context.merge(context, input)) | ||
) | ||
} | ||
if (isMultipart) { | ||
httpRequest = HttpClientRequest.bodyFormData(httpRequest, request.payload) | ||
} else if (encodePayload._tag === "Some") { | ||
const payload = yield* encodePayload.value(request.payload) | ||
httpRequest = HttpMethod.hasBody(endpoint.method) | ||
? yield* Effect.orDie(HttpClientRequest.bodyJson(httpRequest, payload)) | ||
: HttpClientRequest.setUrlParams(httpRequest, payload as any) | ||
} | ||
if (encodeHeaders._tag === "Some") { | ||
httpRequest = HttpClientRequest.setHeaders( | ||
httpRequest, | ||
(yield* encodeHeaders.value(request.headers)) as any | ||
) | ||
} | ||
if (encodeUrlParams._tag === "Some") { | ||
httpRequest = HttpClientRequest.appendUrlParams( | ||
httpRequest, | ||
(yield* encodeUrlParams.value(request.urlParams)) as any | ||
) | ||
} | ||
const response = yield* httpClient.execute(httpRequest) | ||
const value = yield* (options?.transformResponse === undefined | ||
? decodeResponse(response) | ||
: options.transformResponse(decodeResponse(response))) | ||
return request?.withResponse === true ? [value, response] : value | ||
}).pipe( | ||
Effect.scoped, | ||
Effect.catchIf(ParseResult.isParseError, Effect.die), | ||
Effect.mapInputContext((input) => Context.merge(context, input)) | ||
) | ||
} | ||
@@ -202,0 +192,0 @@ }) |
@@ -12,2 +12,3 @@ /** | ||
import * as Exit from "effect/Exit" | ||
import * as Fiber from "effect/Fiber" | ||
import * as FiberRef from "effect/FiberRef" | ||
@@ -19,4 +20,4 @@ import * as FiberSet from "effect/FiberSet" | ||
import * as Mailbox from "effect/Mailbox" | ||
import * as Option from "effect/Option" | ||
import * as Predicate from "effect/Predicate" | ||
import * as Queue from "effect/Queue" | ||
import * as Scope from "effect/Scope" | ||
@@ -399,122 +400,129 @@ import type * as AsyncProducer from "effect/SingleProducerAsyncInput" | ||
): Effect.Effect<Socket, never, Exclude<RO, Scope.Scope>> => | ||
Effect.withFiberRuntime<Socket, never, Exclude<RO, Scope.Scope>>((fiber) => | ||
Effect.gen(function*() { | ||
const sendQueue = yield* Queue.dropping<Uint8Array | string | CloseEvent>( | ||
fiber.getFiberRef(currentSendQueueCapacity) | ||
) | ||
const acquireContext = fiber.currentContext as Context.Context<RO> | ||
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError | ||
Effect.gen(function*() { | ||
const fiber = Option.getOrThrow(Fiber.getCurrentFiber()) | ||
const sendQueue = yield* Mailbox.make<Uint8Array | string, SocketError>({ | ||
capacity: fiber.getFiberRef(currentSendQueueCapacity), | ||
strategy: "dropping" | ||
}) | ||
const acquireContext = fiber.currentContext as Context.Context<RO> | ||
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError | ||
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
Effect.gen(function*() { | ||
const fiberSet = yield* FiberSet.make<any, E | SocketError>() | ||
const ws = yield* acquire | ||
const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws) | ||
let open = false | ||
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
Effect.gen(function*() { | ||
const fiberSet = yield* FiberSet.make<any, E | SocketError>() | ||
const ws = yield* acquire | ||
const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws) | ||
let open = false | ||
function onMessage(event: MessageEvent) { | ||
if (event.data instanceof Blob) { | ||
return Effect.promise(() => event.data.arrayBuffer() as Promise<ArrayBuffer>).pipe( | ||
Effect.andThen((buffer) => handler(new Uint8Array(buffer))), | ||
run | ||
) | ||
} | ||
const result = handler(event.data) | ||
if (Effect.isEffect(result)) { | ||
run(result) | ||
} | ||
} | ||
function onError(cause: Event) { | ||
ws.removeEventListener("message", onMessage) | ||
ws.removeEventListener("close", onClose) | ||
Deferred.unsafeDone( | ||
fiberSet.deferred, | ||
Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause })) | ||
function onMessage(event: MessageEvent) { | ||
if (event.data instanceof Blob) { | ||
return Effect.promise(() => event.data.arrayBuffer() as Promise<ArrayBuffer>).pipe( | ||
Effect.andThen((buffer) => handler(new Uint8Array(buffer))), | ||
run | ||
) | ||
} | ||
function onClose(event: globalThis.CloseEvent) { | ||
ws.removeEventListener("message", onMessage) | ||
ws.removeEventListener("error", onError) | ||
Deferred.unsafeDone( | ||
fiberSet.deferred, | ||
Effect.fail( | ||
new SocketCloseError({ | ||
reason: "Close", | ||
code: event.code, | ||
closeReason: event.reason | ||
}) | ||
) | ||
const result = handler(event.data) | ||
if (Effect.isEffect(result)) { | ||
run(result) | ||
} | ||
} | ||
function onError(cause: Event) { | ||
ws.removeEventListener("message", onMessage) | ||
ws.removeEventListener("close", onClose) | ||
Deferred.unsafeDone( | ||
fiberSet.deferred, | ||
Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause })) | ||
) | ||
} | ||
function onClose(event: globalThis.CloseEvent) { | ||
ws.removeEventListener("message", onMessage) | ||
ws.removeEventListener("error", onError) | ||
Deferred.unsafeDone( | ||
fiberSet.deferred, | ||
Effect.fail( | ||
new SocketCloseError({ | ||
reason: "Close", | ||
code: event.code, | ||
closeReason: event.reason | ||
}) | ||
) | ||
} | ||
) | ||
} | ||
ws.addEventListener("close", onClose, { once: true }) | ||
ws.addEventListener("error", onError, { once: true }) | ||
ws.addEventListener("message", onMessage) | ||
ws.addEventListener("close", onClose, { once: true }) | ||
ws.addEventListener("error", onError, { once: true }) | ||
ws.addEventListener("message", onMessage) | ||
if (ws.readyState !== 1) { | ||
const openDeferred = Deferred.unsafeMake<void>(fiber.id()) | ||
ws.addEventListener("open", () => { | ||
open = true | ||
Deferred.unsafeDone(openDeferred, Effect.void) | ||
}, { once: true }) | ||
yield* Deferred.await(openDeferred).pipe( | ||
Effect.timeoutFail({ | ||
duration: options?.openTimeout ?? 10000, | ||
onTimeout: () => | ||
new SocketGenericError({ reason: "OpenTimeout", cause: "timeout waiting for \"open\"" }) | ||
}), | ||
Effect.raceFirst(FiberSet.join(fiberSet)) | ||
) | ||
} | ||
open = true | ||
yield* Queue.take(sendQueue).pipe( | ||
Effect.tap((chunk) => | ||
isCloseEvent(chunk) ? | ||
Effect.failSync(() => { | ||
ws.close(chunk.code, chunk.reason) | ||
return new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}) | ||
}) : | ||
Effect.try({ | ||
try: () => ws.send(chunk), | ||
catch: (cause) => new SocketGenericError({ reason: "Write", cause }) | ||
}) | ||
), | ||
Effect.forever, | ||
FiberSet.run(fiberSet) | ||
if (ws.readyState !== 1) { | ||
const openDeferred = Deferred.unsafeMake<void>(fiber.id()) | ||
ws.addEventListener("open", () => { | ||
open = true | ||
Deferred.unsafeDone(openDeferred, Effect.void) | ||
}, { once: true }) | ||
yield* Deferred.await(openDeferred).pipe( | ||
Effect.timeoutFail({ | ||
duration: options?.openTimeout ?? 10000, | ||
onTimeout: () => new SocketGenericError({ reason: "OpenTimeout", cause: "timeout waiting for \"open\"" }) | ||
}), | ||
Effect.raceFirst(FiberSet.join(fiberSet)) | ||
) | ||
return yield* FiberSet.join(fiberSet).pipe( | ||
Effect.catchIf( | ||
SocketCloseError.isClean((_) => !closeCodeIsError(_)), | ||
(_) => Effect.void | ||
) | ||
} | ||
open = true | ||
yield* sendQueue.takeAll.pipe( | ||
Effect.tap(([chunk]) => | ||
Effect.try({ | ||
try: () => { | ||
for (const item of chunk) { | ||
ws.send(item) | ||
} | ||
}, | ||
catch: (cause) => new SocketGenericError({ reason: "Write", cause }) | ||
}) | ||
), | ||
Effect.forever, | ||
Effect.catchIf(SocketCloseError.is, (error) => { | ||
ws.close(error.code, error.closeReason) | ||
return Effect.fail(error) | ||
}), | ||
FiberSet.run(fiberSet) | ||
) | ||
return yield* FiberSet.join(fiberSet).pipe( | ||
Effect.catchIf( | ||
SocketCloseError.isClean((_) => !closeCodeIsError(_)), | ||
(_) => Effect.void | ||
) | ||
}).pipe( | ||
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)), | ||
Effect.scoped, | ||
Effect.interruptible | ||
) | ||
}).pipe( | ||
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)), | ||
Effect.scoped, | ||
Effect.interruptible | ||
) | ||
const encoder = new TextEncoder() | ||
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
runRaw((data) => | ||
typeof data === "string" | ||
? handler(encoder.encode(data)) | ||
: handler(data) | ||
const encoder = new TextEncoder() | ||
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
runRaw((data) => | ||
typeof data === "string" | ||
? handler(encoder.encode(data)) | ||
: handler(data) | ||
) | ||
const write = (chunk: Uint8Array | string | CloseEvent) => | ||
isCloseEvent(chunk) | ||
? sendQueue.fail( | ||
new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}) | ||
) | ||
: sendQueue.offer(chunk) | ||
const writer = Effect.succeed(write) | ||
const write = (chunk: Uint8Array | string | CloseEvent) => Queue.offer(sendQueue, chunk) | ||
const writer = Effect.succeed(write) | ||
return Socket.of({ | ||
[TypeId]: TypeId, | ||
run, | ||
runRaw, | ||
writer | ||
}) | ||
return Socket.of({ | ||
[TypeId]: TypeId, | ||
run, | ||
runRaw, | ||
writer | ||
}) | ||
) | ||
}) | ||
@@ -580,107 +588,104 @@ /** | ||
}): Effect.Effect<Socket, never, Exclude<R, Scope.Scope>> => | ||
Effect.withFiberRuntime<Socket, never, Exclude<R, Scope.Scope>>((fiber) => | ||
Effect.gen(function*() { | ||
const EOF = Symbol() | ||
const sendQueue = yield* Queue.dropping<Uint8Array | string | CloseEvent | typeof EOF>( | ||
fiber.getFiberRef(currentSendQueueCapacity) | ||
) | ||
const acquireContext = fiber.currentContext as Context.Context<R> | ||
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError | ||
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
Effect.gen(function*() { | ||
const stream = yield* acquire | ||
const reader = yield* Effect.acquireRelease( | ||
Effect.sync(() => stream.readable.getReader()), | ||
(reader) => | ||
Effect.promise(() => reader.cancel()).pipe( | ||
Effect.tap(() => { | ||
reader.releaseLock() | ||
}) | ||
) | ||
) | ||
const writer = yield* Effect.acquireRelease( | ||
Effect.sync(() => stream.writable.getWriter()), | ||
(reader) => Effect.sync(() => reader.releaseLock()) | ||
) | ||
const fiberSet = yield* FiberSet.make<any, E | SocketError>() | ||
const encoder = new TextEncoder() | ||
yield* Queue.take(sendQueue).pipe( | ||
Effect.tap((chunk) => { | ||
if ( | ||
chunk === EOF || | ||
isCloseEvent(chunk) | ||
) { | ||
return Effect.zipRight( | ||
Effect.promise(() => writer.close()), | ||
chunk === EOF ? Effect.interrupt : Effect.fail( | ||
new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}) | ||
) | ||
) | ||
} | ||
return Effect.try({ | ||
try: () => { | ||
if (typeof chunk === "string") { | ||
writer.write(encoder.encode(chunk)) | ||
Effect.gen(function*() { | ||
const fiber = Option.getOrThrow(Fiber.getCurrentFiber()) | ||
const sendQueue = yield* Mailbox.make<Uint8Array | string, SocketError>({ | ||
capacity: fiber.getFiberRef(currentSendQueueCapacity), | ||
strategy: "dropping" | ||
}) | ||
const acquireContext = fiber.currentContext as Context.Context<R> | ||
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError | ||
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
Effect.gen(function*() { | ||
const stream = yield* acquire | ||
const reader = yield* Effect.acquireRelease( | ||
Effect.sync(() => stream.readable.getReader()), | ||
(reader) => | ||
Effect.promise(() => reader.cancel()).pipe( | ||
Effect.tap(() => { | ||
reader.releaseLock() | ||
}) | ||
) | ||
) | ||
const writer = yield* Effect.acquireRelease( | ||
Effect.sync(() => stream.writable.getWriter()), | ||
(reader) => Effect.sync(() => reader.releaseLock()) | ||
) | ||
const fiberSet = yield* FiberSet.make<any, E | SocketError>() | ||
const encoder = new TextEncoder() | ||
yield* sendQueue.takeAll.pipe( | ||
Effect.flatMap(([chunk, done]) => { | ||
const write = Effect.try({ | ||
try: () => { | ||
for (const item of chunk) { | ||
if (typeof item === "string") { | ||
writer.write(encoder.encode(item)) | ||
} else { | ||
writer.write(chunk) | ||
writer.write(item) | ||
} | ||
}, | ||
catch: (cause) => new SocketGenericError({ reason: "Write", cause }) | ||
}) | ||
}), | ||
Effect.forever, | ||
FiberSet.run(fiberSet) | ||
) | ||
} | ||
}, | ||
catch: (cause) => new SocketGenericError({ reason: "Write", cause }) | ||
}) | ||
return done ? Effect.zipRight(write, Effect.interrupt) : write | ||
}), | ||
Effect.forever, | ||
Effect.ensuring(Effect.promise(() => writer.close())), | ||
FiberSet.run(fiberSet) | ||
) | ||
yield* Effect.tryPromise({ | ||
try: () => reader.read(), | ||
catch: (cause) => new SocketGenericError({ reason: "Read", cause }) | ||
}).pipe( | ||
Effect.tap((result) => { | ||
if (result.done) { | ||
return Effect.fail(new SocketCloseError({ reason: "Close", code: 1000 })) | ||
} | ||
return handler(result.value) | ||
}), | ||
Effect.forever, | ||
FiberSet.run(fiberSet) | ||
) | ||
return yield* FiberSet.join(fiberSet).pipe( | ||
Effect.catchIf( | ||
SocketCloseError.isClean((_) => !closeCodeIsError(_)), | ||
(_) => Effect.void | ||
) | ||
) | ||
yield* Effect.tryPromise({ | ||
try: () => reader.read(), | ||
catch: (cause) => new SocketGenericError({ reason: "Read", cause }) | ||
}).pipe( | ||
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)), | ||
Effect.scoped, | ||
Effect.interruptible | ||
Effect.tap((result) => { | ||
if (result.done) { | ||
return Effect.fail(new SocketCloseError({ reason: "Close", code: 1000 })) | ||
} | ||
return handler(result.value) | ||
}), | ||
Effect.forever, | ||
FiberSet.run(fiberSet) | ||
) | ||
const encoder = new TextEncoder() | ||
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
runRaw((data) => | ||
typeof data === "string" | ||
? handler(encoder.encode(data)) | ||
: handler(data) | ||
return yield* FiberSet.join(fiberSet).pipe( | ||
Effect.catchIf( | ||
SocketCloseError.isClean((_) => !closeCodeIsError(_)), | ||
(_) => Effect.void | ||
) | ||
) | ||
}).pipe( | ||
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)), | ||
Effect.scoped, | ||
Effect.interruptible | ||
) | ||
const write = (chunk: Uint8Array | string | CloseEvent) => Queue.offer(sendQueue, chunk) | ||
const writer = Effect.acquireRelease( | ||
Effect.succeed(write), | ||
() => Queue.offer(sendQueue, EOF) | ||
const encoder = new TextEncoder() | ||
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) => | ||
runRaw((data) => | ||
typeof data === "string" | ||
? handler(encoder.encode(data)) | ||
: handler(data) | ||
) | ||
return Socket.of({ | ||
[TypeId]: TypeId, | ||
run, | ||
runRaw, | ||
writer | ||
}) | ||
const write = (chunk: Uint8Array | string | CloseEvent) => | ||
isCloseEvent(chunk) ? | ||
sendQueue.fail( | ||
new SocketCloseError({ | ||
reason: "Close", | ||
code: chunk.code, | ||
closeReason: chunk.reason | ||
}) | ||
) : | ||
sendQueue.offer(chunk) | ||
const writer = Effect.acquireRelease( | ||
Effect.succeed(write), | ||
() => sendQueue.end | ||
) | ||
return Socket.of({ | ||
[TypeId]: TypeId, | ||
run, | ||
runRaw, | ||
writer | ||
}) | ||
) | ||
}) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
107364
16757857