Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@effect/platform

Package Overview
Dependencies
Maintainers
3
Versions
377
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effect/platform - npm Package Compare versions

Comparing version 0.69.15 to 0.69.16

26

dist/cjs/HttpApiClient.js

@@ -67,10 +67,20 @@ "use strict";

const encodeUrlParams = endpoint.urlParamsSchema.pipe(Option.map(Schema.encodeUnknown));
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => {
const url = request && request.path ? makeUrl(request.path) : endpoint.path;
const baseRequest = HttpClientRequest.make(endpoint.method)(url);
return (isMultipart ? Effect.succeed(baseRequest.pipe(HttpClientRequest.bodyFormData(request.payload))) : encodePayload._tag === "Some" ? encodePayload.value(request.payload).pipe(Effect.flatMap(payload => HttpMethod.hasBody(endpoint.method) ? HttpClientRequest.bodyJson(baseRequest, payload) : Effect.succeed(HttpClientRequest.setUrlParams(baseRequest, payload))), Effect.orDie) : Effect.succeed(baseRequest)).pipe(encodeHeaders._tag === "Some" ? Effect.flatMap(httpRequest => encodeHeaders.value(request.headers).pipe(Effect.orDie, Effect.map(headers => HttpClientRequest.setHeaders(httpRequest, headers)))) : _Function.identity, encodeUrlParams._tag === "Some" ? Effect.flatMap(httpRequest => encodeUrlParams.value(request.urlParams).pipe(Effect.orDie, Effect.map(params => HttpClientRequest.appendUrlParams(httpRequest, params)))) : _Function.identity, Effect.flatMap(httpClient.execute), Effect.flatMap(response => {
const value = options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response));
return request?.withResponse === true ? Effect.map(value, value => [value, response]) : value;
}), Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input)));
};
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => Effect.gen(function* () {
let httpRequest = HttpClientRequest.make(endpoint.method)(request && request.path ? makeUrl(request.path) : endpoint.path);
if (isMultipart) {
httpRequest = HttpClientRequest.bodyFormData(httpRequest, request.payload);
} else if (encodePayload._tag === "Some") {
const payload = yield* encodePayload.value(request.payload);
httpRequest = HttpMethod.hasBody(endpoint.method) ? yield* Effect.orDie(HttpClientRequest.bodyJson(httpRequest, payload)) : HttpClientRequest.setUrlParams(httpRequest, payload);
}
if (encodeHeaders._tag === "Some") {
httpRequest = HttpClientRequest.setHeaders(httpRequest, yield* encodeHeaders.value(request.headers));
}
if (encodeUrlParams._tag === "Some") {
httpRequest = HttpClientRequest.appendUrlParams(httpRequest, yield* encodeUrlParams.value(request.urlParams));
}
const response = yield* httpClient.execute(httpRequest);
const value = yield* options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response));
return request?.withResponse === true ? [value, response] : value;
}).pipe(Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input)));
}

@@ -77,0 +87,0 @@ });

@@ -13,2 +13,3 @@ "use strict";

var Exit = _interopRequireWildcard(require("effect/Exit"));
var Fiber = _interopRequireWildcard(require("effect/Fiber"));
var FiberRef = _interopRequireWildcard(require("effect/FiberRef"));

@@ -20,4 +21,4 @@ var FiberSet = _interopRequireWildcard(require("effect/FiberSet"));

var Mailbox = _interopRequireWildcard(require("effect/Mailbox"));
var Option = _interopRequireWildcard(require("effect/Option"));
var Predicate = _interopRequireWildcard(require("effect/Predicate"));
var Queue = _interopRequireWildcard(require("effect/Queue"));
var Scope = _interopRequireWildcard(require("effect/Scope"));

@@ -218,4 +219,8 @@ var _Error = require("./Error.js");

exports.makeWebSocket = makeWebSocket;
const fromWebSocket = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () {
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity));
const fromWebSocket = (acquire, options) => Effect.gen(function* () {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber());
const sendQueue = yield* Mailbox.make({
capacity: fiber.getFiberRef(currentSendQueueCapacity),
strategy: "dropping"
});
const acquireContext = fiber.currentContext;

@@ -278,11 +283,8 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError;

open = true;
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => isCloseEvent(chunk) ? Effect.failSync(() => {
ws.close(chunk.code, chunk.reason);
return new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
});
}) : Effect.try({
try: () => ws.send(chunk),
yield* sendQueue.takeAll.pipe(Effect.tap(([chunk]) => Effect.try({
try: () => {
for (const item of chunk) {
ws.send(item);
}
},
catch: cause => new SocketGenericError({

@@ -292,3 +294,6 @@ reason: "Write",

})
})), Effect.forever, FiberSet.run(fiberSet));
})), Effect.forever, Effect.catchIf(SocketCloseError.is, error => {
ws.close(error.code, error.closeReason);
return Effect.fail(error);
}), FiberSet.run(fiberSet));
return yield* FiberSet.join(fiberSet).pipe(Effect.catchIf(SocketCloseError.isClean(_ => !closeCodeIsError(_)), _ => Effect.void));

@@ -298,3 +303,7 @@ }).pipe(Effect.mapInputContext(input => Context.merge(acquireContext, input)), Effect.scoped, Effect.interruptible);

const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data));
const write = chunk => Queue.offer(sendQueue, chunk);
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})) : sendQueue.offer(chunk);
const writer = Effect.succeed(write);

@@ -307,3 +316,3 @@ return Socket.of({

});
}));
});
/**

@@ -331,5 +340,8 @@ * @since 1.0.0

*/
const fromTransformStream = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () {
const EOF = Symbol();
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity));
const fromTransformStream = (acquire, options) => Effect.gen(function* () {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber());
const sendQueue = yield* Mailbox.make({
capacity: fiber.getFiberRef(currentSendQueueCapacity),
strategy: "dropping"
});
const acquireContext = fiber.currentContext;

@@ -345,16 +357,11 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError;

const encoder = new TextEncoder();
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => {
if (chunk === EOF || isCloseEvent(chunk)) {
return Effect.zipRight(Effect.promise(() => writer.close()), chunk === EOF ? Effect.interrupt : Effect.fail(new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})));
}
return Effect.try({
yield* sendQueue.takeAll.pipe(Effect.flatMap(([chunk, done]) => {
const write = Effect.try({
try: () => {
if (typeof chunk === "string") {
writer.write(encoder.encode(chunk));
} else {
writer.write(chunk);
for (const item of chunk) {
if (typeof item === "string") {
writer.write(encoder.encode(item));
} else {
writer.write(item);
}
}

@@ -367,3 +374,4 @@ },

});
}), Effect.forever, FiberSet.run(fiberSet));
return done ? Effect.zipRight(write, Effect.interrupt) : write;
}), Effect.forever, Effect.ensuring(Effect.promise(() => writer.close())), FiberSet.run(fiberSet));
yield* Effect.tryPromise({

@@ -388,4 +396,8 @@ try: () => reader.read(),

const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data));
const write = chunk => Queue.offer(sendQueue, chunk);
const writer = Effect.acquireRelease(Effect.succeed(write), () => Queue.offer(sendQueue, EOF));
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})) : sendQueue.offer(chunk);
const writer = Effect.acquireRelease(Effect.succeed(write), () => sendQueue.end);
return Socket.of({

@@ -397,4 +409,4 @@ [TypeId]: TypeId,

});
}));
});
exports.fromTransformStream = fromTransformStream;
//# sourceMappingURL=Socket.js.map

@@ -58,10 +58,20 @@ /**

const encodeUrlParams = endpoint.urlParamsSchema.pipe(Option.map(Schema.encodeUnknown));
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => {
const url = request && request.path ? makeUrl(request.path) : endpoint.path;
const baseRequest = HttpClientRequest.make(endpoint.method)(url);
return (isMultipart ? Effect.succeed(baseRequest.pipe(HttpClientRequest.bodyFormData(request.payload))) : encodePayload._tag === "Some" ? encodePayload.value(request.payload).pipe(Effect.flatMap(payload => HttpMethod.hasBody(endpoint.method) ? HttpClientRequest.bodyJson(baseRequest, payload) : Effect.succeed(HttpClientRequest.setUrlParams(baseRequest, payload))), Effect.orDie) : Effect.succeed(baseRequest)).pipe(encodeHeaders._tag === "Some" ? Effect.flatMap(httpRequest => encodeHeaders.value(request.headers).pipe(Effect.orDie, Effect.map(headers => HttpClientRequest.setHeaders(httpRequest, headers)))) : identity, encodeUrlParams._tag === "Some" ? Effect.flatMap(httpRequest => encodeUrlParams.value(request.urlParams).pipe(Effect.orDie, Effect.map(params => HttpClientRequest.appendUrlParams(httpRequest, params)))) : identity, Effect.flatMap(httpClient.execute), Effect.flatMap(response => {
const value = options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response));
return request?.withResponse === true ? Effect.map(value, value => [value, response]) : value;
}), Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input)));
};
(group.topLevel ? client : client[group.identifier])[endpoint.name] = request => Effect.gen(function* () {
let httpRequest = HttpClientRequest.make(endpoint.method)(request && request.path ? makeUrl(request.path) : endpoint.path);
if (isMultipart) {
httpRequest = HttpClientRequest.bodyFormData(httpRequest, request.payload);
} else if (encodePayload._tag === "Some") {
const payload = yield* encodePayload.value(request.payload);
httpRequest = HttpMethod.hasBody(endpoint.method) ? yield* Effect.orDie(HttpClientRequest.bodyJson(httpRequest, payload)) : HttpClientRequest.setUrlParams(httpRequest, payload);
}
if (encodeHeaders._tag === "Some") {
httpRequest = HttpClientRequest.setHeaders(httpRequest, yield* encodeHeaders.value(request.headers));
}
if (encodeUrlParams._tag === "Some") {
httpRequest = HttpClientRequest.appendUrlParams(httpRequest, yield* encodeUrlParams.value(request.urlParams));
}
const response = yield* httpClient.execute(httpRequest);
const value = yield* options?.transformResponse === undefined ? decodeResponse(response) : options.transformResponse(decodeResponse(response));
return request?.withResponse === true ? [value, response] : value;
}).pipe(Effect.scoped, Effect.catchIf(ParseResult.isParseError, Effect.die), Effect.mapInputContext(input => Context.merge(context, input)));
}

@@ -68,0 +78,0 @@ });

@@ -10,2 +10,3 @@ /**

import * as Exit from "effect/Exit";
import * as Fiber from "effect/Fiber";
import * as FiberRef from "effect/FiberRef";

@@ -17,4 +18,4 @@ import * as FiberSet from "effect/FiberSet";

import * as Mailbox from "effect/Mailbox";
import * as Option from "effect/Option";
import * as Predicate from "effect/Predicate";
import * as Queue from "effect/Queue";
import * as Scope from "effect/Scope";

@@ -197,4 +198,8 @@ import { TypeIdError } from "./Error.js";

*/
export const fromWebSocket = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () {
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity));
export const fromWebSocket = (acquire, options) => Effect.gen(function* () {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber());
const sendQueue = yield* Mailbox.make({
capacity: fiber.getFiberRef(currentSendQueueCapacity),
strategy: "dropping"
});
const acquireContext = fiber.currentContext;

@@ -257,11 +262,8 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError;

open = true;
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => isCloseEvent(chunk) ? Effect.failSync(() => {
ws.close(chunk.code, chunk.reason);
return new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
});
}) : Effect.try({
try: () => ws.send(chunk),
yield* sendQueue.takeAll.pipe(Effect.tap(([chunk]) => Effect.try({
try: () => {
for (const item of chunk) {
ws.send(item);
}
},
catch: cause => new SocketGenericError({

@@ -271,3 +273,6 @@ reason: "Write",

})
})), Effect.forever, FiberSet.run(fiberSet));
})), Effect.forever, Effect.catchIf(SocketCloseError.is, error => {
ws.close(error.code, error.closeReason);
return Effect.fail(error);
}), FiberSet.run(fiberSet));
return yield* FiberSet.join(fiberSet).pipe(Effect.catchIf(SocketCloseError.isClean(_ => !closeCodeIsError(_)), _ => Effect.void));

@@ -277,3 +282,7 @@ }).pipe(Effect.mapInputContext(input => Context.merge(acquireContext, input)), Effect.scoped, Effect.interruptible);

const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data));
const write = chunk => Queue.offer(sendQueue, chunk);
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})) : sendQueue.offer(chunk);
const writer = Effect.succeed(write);

@@ -286,3 +295,3 @@ return Socket.of({

});
}));
});
/**

@@ -307,5 +316,8 @@ * @since 1.0.0

*/
export const fromTransformStream = (acquire, options) => Effect.withFiberRuntime(fiber => Effect.gen(function* () {
const EOF = Symbol();
const sendQueue = yield* Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity));
export const fromTransformStream = (acquire, options) => Effect.gen(function* () {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber());
const sendQueue = yield* Mailbox.make({
capacity: fiber.getFiberRef(currentSendQueueCapacity),
strategy: "dropping"
});
const acquireContext = fiber.currentContext;

@@ -321,16 +333,11 @@ const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError;

const encoder = new TextEncoder();
yield* Queue.take(sendQueue).pipe(Effect.tap(chunk => {
if (chunk === EOF || isCloseEvent(chunk)) {
return Effect.zipRight(Effect.promise(() => writer.close()), chunk === EOF ? Effect.interrupt : Effect.fail(new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})));
}
return Effect.try({
yield* sendQueue.takeAll.pipe(Effect.flatMap(([chunk, done]) => {
const write = Effect.try({
try: () => {
if (typeof chunk === "string") {
writer.write(encoder.encode(chunk));
} else {
writer.write(chunk);
for (const item of chunk) {
if (typeof item === "string") {
writer.write(encoder.encode(item));
} else {
writer.write(item);
}
}

@@ -343,3 +350,4 @@ },

});
}), Effect.forever, FiberSet.run(fiberSet));
return done ? Effect.zipRight(write, Effect.interrupt) : write;
}), Effect.forever, Effect.ensuring(Effect.promise(() => writer.close())), FiberSet.run(fiberSet));
yield* Effect.tryPromise({

@@ -364,4 +372,8 @@ try: () => reader.read(),

const run = handler => runRaw(data => typeof data === "string" ? handler(encoder.encode(data)) : handler(data));
const write = chunk => Queue.offer(sendQueue, chunk);
const writer = Effect.acquireRelease(Effect.succeed(write), () => Queue.offer(sendQueue, EOF));
const write = chunk => isCloseEvent(chunk) ? sendQueue.fail(new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})) : sendQueue.offer(chunk);
const writer = Effect.acquireRelease(Effect.succeed(write), () => sendQueue.end);
return Socket.of({

@@ -373,3 +385,3 @@ [TypeId]: TypeId,

});
}));
});
//# sourceMappingURL=Socket.js.map
{
"name": "@effect/platform",
"version": "0.69.15",
"version": "0.69.16",
"description": "Unified interfaces for common platform-specific services",

@@ -17,3 +17,3 @@ "license": "MIT",

"peerDependencies": {
"effect": "^3.10.9"
"effect": "^3.10.10"
},

@@ -20,0 +20,0 @@ "publishConfig": {

@@ -155,47 +155,37 @@ /**

readonly withResponse?: boolean
}) => {
const url = request && request.path ? makeUrl(request.path) : endpoint.path
const baseRequest = HttpClientRequest.make(endpoint.method)(url)
return (isMultipart ?
Effect.succeed(baseRequest.pipe(
HttpClientRequest.bodyFormData(request.payload)
))
: encodePayload._tag === "Some"
? encodePayload.value(request.payload).pipe(
Effect.flatMap((payload) =>
HttpMethod.hasBody(endpoint.method)
? HttpClientRequest.bodyJson(baseRequest, payload)
: Effect.succeed(HttpClientRequest.setUrlParams(baseRequest, payload as any))
),
Effect.orDie
}) =>
Effect.gen(function*() {
let httpRequest = HttpClientRequest.make(endpoint.method)(
request && request.path ? makeUrl(request.path) : endpoint.path
)
: Effect.succeed(baseRequest)).pipe(
encodeHeaders._tag === "Some"
? Effect.flatMap((httpRequest) =>
encodeHeaders.value(request.headers).pipe(
Effect.orDie,
Effect.map((headers) => HttpClientRequest.setHeaders(httpRequest, headers as any))
)
)
: identity,
encodeUrlParams._tag === "Some"
? Effect.flatMap((httpRequest) =>
encodeUrlParams.value(request.urlParams).pipe(
Effect.orDie,
Effect.map((params) => HttpClientRequest.appendUrlParams(httpRequest, params as any))
)
)
: identity,
Effect.flatMap(httpClient.execute),
Effect.flatMap((response) => {
const value = options?.transformResponse === undefined
? decodeResponse(response)
: options.transformResponse(decodeResponse(response))
return request?.withResponse === true ? Effect.map(value, (value) => [value, response]) : value
}),
Effect.scoped,
Effect.catchIf(ParseResult.isParseError, Effect.die),
Effect.mapInputContext((input) => Context.merge(context, input))
)
}
if (isMultipart) {
httpRequest = HttpClientRequest.bodyFormData(httpRequest, request.payload)
} else if (encodePayload._tag === "Some") {
const payload = yield* encodePayload.value(request.payload)
httpRequest = HttpMethod.hasBody(endpoint.method)
? yield* Effect.orDie(HttpClientRequest.bodyJson(httpRequest, payload))
: HttpClientRequest.setUrlParams(httpRequest, payload as any)
}
if (encodeHeaders._tag === "Some") {
httpRequest = HttpClientRequest.setHeaders(
httpRequest,
(yield* encodeHeaders.value(request.headers)) as any
)
}
if (encodeUrlParams._tag === "Some") {
httpRequest = HttpClientRequest.appendUrlParams(
httpRequest,
(yield* encodeUrlParams.value(request.urlParams)) as any
)
}
const response = yield* httpClient.execute(httpRequest)
const value = yield* (options?.transformResponse === undefined
? decodeResponse(response)
: options.transformResponse(decodeResponse(response)))
return request?.withResponse === true ? [value, response] : value
}).pipe(
Effect.scoped,
Effect.catchIf(ParseResult.isParseError, Effect.die),
Effect.mapInputContext((input) => Context.merge(context, input))
)
}

@@ -202,0 +192,0 @@ })

@@ -12,2 +12,3 @@ /**

import * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import * as FiberRef from "effect/FiberRef"

@@ -19,4 +20,4 @@ import * as FiberSet from "effect/FiberSet"

import * as Mailbox from "effect/Mailbox"
import * as Option from "effect/Option"
import * as Predicate from "effect/Predicate"
import * as Queue from "effect/Queue"
import * as Scope from "effect/Scope"

@@ -399,122 +400,129 @@ import type * as AsyncProducer from "effect/SingleProducerAsyncInput"

): Effect.Effect<Socket, never, Exclude<RO, Scope.Scope>> =>
Effect.withFiberRuntime<Socket, never, Exclude<RO, Scope.Scope>>((fiber) =>
Effect.gen(function*() {
const sendQueue = yield* Queue.dropping<Uint8Array | string | CloseEvent>(
fiber.getFiberRef(currentSendQueueCapacity)
)
const acquireContext = fiber.currentContext as Context.Context<RO>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
Effect.gen(function*() {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber())
const sendQueue = yield* Mailbox.make<Uint8Array | string, SocketError>({
capacity: fiber.getFiberRef(currentSendQueueCapacity),
strategy: "dropping"
})
const acquireContext = fiber.currentContext as Context.Context<RO>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) =>
Effect.gen(function*() {
const fiberSet = yield* FiberSet.make<any, E | SocketError>()
const ws = yield* acquire
const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws)
let open = false
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) =>
Effect.gen(function*() {
const fiberSet = yield* FiberSet.make<any, E | SocketError>()
const ws = yield* acquire
const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws)
let open = false
function onMessage(event: MessageEvent) {
if (event.data instanceof Blob) {
return Effect.promise(() => event.data.arrayBuffer() as Promise<ArrayBuffer>).pipe(
Effect.andThen((buffer) => handler(new Uint8Array(buffer))),
run
)
}
const result = handler(event.data)
if (Effect.isEffect(result)) {
run(result)
}
}
function onError(cause: Event) {
ws.removeEventListener("message", onMessage)
ws.removeEventListener("close", onClose)
Deferred.unsafeDone(
fiberSet.deferred,
Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause }))
function onMessage(event: MessageEvent) {
if (event.data instanceof Blob) {
return Effect.promise(() => event.data.arrayBuffer() as Promise<ArrayBuffer>).pipe(
Effect.andThen((buffer) => handler(new Uint8Array(buffer))),
run
)
}
function onClose(event: globalThis.CloseEvent) {
ws.removeEventListener("message", onMessage)
ws.removeEventListener("error", onError)
Deferred.unsafeDone(
fiberSet.deferred,
Effect.fail(
new SocketCloseError({
reason: "Close",
code: event.code,
closeReason: event.reason
})
)
const result = handler(event.data)
if (Effect.isEffect(result)) {
run(result)
}
}
function onError(cause: Event) {
ws.removeEventListener("message", onMessage)
ws.removeEventListener("close", onClose)
Deferred.unsafeDone(
fiberSet.deferred,
Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause }))
)
}
function onClose(event: globalThis.CloseEvent) {
ws.removeEventListener("message", onMessage)
ws.removeEventListener("error", onError)
Deferred.unsafeDone(
fiberSet.deferred,
Effect.fail(
new SocketCloseError({
reason: "Close",
code: event.code,
closeReason: event.reason
})
)
}
)
}
ws.addEventListener("close", onClose, { once: true })
ws.addEventListener("error", onError, { once: true })
ws.addEventListener("message", onMessage)
ws.addEventListener("close", onClose, { once: true })
ws.addEventListener("error", onError, { once: true })
ws.addEventListener("message", onMessage)
if (ws.readyState !== 1) {
const openDeferred = Deferred.unsafeMake<void>(fiber.id())
ws.addEventListener("open", () => {
open = true
Deferred.unsafeDone(openDeferred, Effect.void)
}, { once: true })
yield* Deferred.await(openDeferred).pipe(
Effect.timeoutFail({
duration: options?.openTimeout ?? 10000,
onTimeout: () =>
new SocketGenericError({ reason: "OpenTimeout", cause: "timeout waiting for \"open\"" })
}),
Effect.raceFirst(FiberSet.join(fiberSet))
)
}
open = true
yield* Queue.take(sendQueue).pipe(
Effect.tap((chunk) =>
isCloseEvent(chunk) ?
Effect.failSync(() => {
ws.close(chunk.code, chunk.reason)
return new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})
}) :
Effect.try({
try: () => ws.send(chunk),
catch: (cause) => new SocketGenericError({ reason: "Write", cause })
})
),
Effect.forever,
FiberSet.run(fiberSet)
if (ws.readyState !== 1) {
const openDeferred = Deferred.unsafeMake<void>(fiber.id())
ws.addEventListener("open", () => {
open = true
Deferred.unsafeDone(openDeferred, Effect.void)
}, { once: true })
yield* Deferred.await(openDeferred).pipe(
Effect.timeoutFail({
duration: options?.openTimeout ?? 10000,
onTimeout: () => new SocketGenericError({ reason: "OpenTimeout", cause: "timeout waiting for \"open\"" })
}),
Effect.raceFirst(FiberSet.join(fiberSet))
)
return yield* FiberSet.join(fiberSet).pipe(
Effect.catchIf(
SocketCloseError.isClean((_) => !closeCodeIsError(_)),
(_) => Effect.void
)
}
open = true
yield* sendQueue.takeAll.pipe(
Effect.tap(([chunk]) =>
Effect.try({
try: () => {
for (const item of chunk) {
ws.send(item)
}
},
catch: (cause) => new SocketGenericError({ reason: "Write", cause })
})
),
Effect.forever,
Effect.catchIf(SocketCloseError.is, (error) => {
ws.close(error.code, error.closeReason)
return Effect.fail(error)
}),
FiberSet.run(fiberSet)
)
return yield* FiberSet.join(fiberSet).pipe(
Effect.catchIf(
SocketCloseError.isClean((_) => !closeCodeIsError(_)),
(_) => Effect.void
)
}).pipe(
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)),
Effect.scoped,
Effect.interruptible
)
}).pipe(
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)),
Effect.scoped,
Effect.interruptible
)
const encoder = new TextEncoder()
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
runRaw((data) =>
typeof data === "string"
? handler(encoder.encode(data))
: handler(data)
const encoder = new TextEncoder()
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
runRaw((data) =>
typeof data === "string"
? handler(encoder.encode(data))
: handler(data)
)
const write = (chunk: Uint8Array | string | CloseEvent) =>
isCloseEvent(chunk)
? sendQueue.fail(
new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})
)
: sendQueue.offer(chunk)
const writer = Effect.succeed(write)
const write = (chunk: Uint8Array | string | CloseEvent) => Queue.offer(sendQueue, chunk)
const writer = Effect.succeed(write)
return Socket.of({
[TypeId]: TypeId,
run,
runRaw,
writer
})
return Socket.of({
[TypeId]: TypeId,
run,
runRaw,
writer
})
)
})

@@ -580,107 +588,104 @@ /**

}): Effect.Effect<Socket, never, Exclude<R, Scope.Scope>> =>
Effect.withFiberRuntime<Socket, never, Exclude<R, Scope.Scope>>((fiber) =>
Effect.gen(function*() {
const EOF = Symbol()
const sendQueue = yield* Queue.dropping<Uint8Array | string | CloseEvent | typeof EOF>(
fiber.getFiberRef(currentSendQueueCapacity)
)
const acquireContext = fiber.currentContext as Context.Context<R>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) =>
Effect.gen(function*() {
const stream = yield* acquire
const reader = yield* Effect.acquireRelease(
Effect.sync(() => stream.readable.getReader()),
(reader) =>
Effect.promise(() => reader.cancel()).pipe(
Effect.tap(() => {
reader.releaseLock()
})
)
)
const writer = yield* Effect.acquireRelease(
Effect.sync(() => stream.writable.getWriter()),
(reader) => Effect.sync(() => reader.releaseLock())
)
const fiberSet = yield* FiberSet.make<any, E | SocketError>()
const encoder = new TextEncoder()
yield* Queue.take(sendQueue).pipe(
Effect.tap((chunk) => {
if (
chunk === EOF ||
isCloseEvent(chunk)
) {
return Effect.zipRight(
Effect.promise(() => writer.close()),
chunk === EOF ? Effect.interrupt : Effect.fail(
new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})
)
)
}
return Effect.try({
try: () => {
if (typeof chunk === "string") {
writer.write(encoder.encode(chunk))
Effect.gen(function*() {
const fiber = Option.getOrThrow(Fiber.getCurrentFiber())
const sendQueue = yield* Mailbox.make<Uint8Array | string, SocketError>({
capacity: fiber.getFiberRef(currentSendQueueCapacity),
strategy: "dropping"
})
const acquireContext = fiber.currentContext as Context.Context<R>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) =>
Effect.gen(function*() {
const stream = yield* acquire
const reader = yield* Effect.acquireRelease(
Effect.sync(() => stream.readable.getReader()),
(reader) =>
Effect.promise(() => reader.cancel()).pipe(
Effect.tap(() => {
reader.releaseLock()
})
)
)
const writer = yield* Effect.acquireRelease(
Effect.sync(() => stream.writable.getWriter()),
(reader) => Effect.sync(() => reader.releaseLock())
)
const fiberSet = yield* FiberSet.make<any, E | SocketError>()
const encoder = new TextEncoder()
yield* sendQueue.takeAll.pipe(
Effect.flatMap(([chunk, done]) => {
const write = Effect.try({
try: () => {
for (const item of chunk) {
if (typeof item === "string") {
writer.write(encoder.encode(item))
} else {
writer.write(chunk)
writer.write(item)
}
},
catch: (cause) => new SocketGenericError({ reason: "Write", cause })
})
}),
Effect.forever,
FiberSet.run(fiberSet)
)
}
},
catch: (cause) => new SocketGenericError({ reason: "Write", cause })
})
return done ? Effect.zipRight(write, Effect.interrupt) : write
}),
Effect.forever,
Effect.ensuring(Effect.promise(() => writer.close())),
FiberSet.run(fiberSet)
)
yield* Effect.tryPromise({
try: () => reader.read(),
catch: (cause) => new SocketGenericError({ reason: "Read", cause })
}).pipe(
Effect.tap((result) => {
if (result.done) {
return Effect.fail(new SocketCloseError({ reason: "Close", code: 1000 }))
}
return handler(result.value)
}),
Effect.forever,
FiberSet.run(fiberSet)
)
return yield* FiberSet.join(fiberSet).pipe(
Effect.catchIf(
SocketCloseError.isClean((_) => !closeCodeIsError(_)),
(_) => Effect.void
)
)
yield* Effect.tryPromise({
try: () => reader.read(),
catch: (cause) => new SocketGenericError({ reason: "Read", cause })
}).pipe(
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)),
Effect.scoped,
Effect.interruptible
Effect.tap((result) => {
if (result.done) {
return Effect.fail(new SocketCloseError({ reason: "Close", code: 1000 }))
}
return handler(result.value)
}),
Effect.forever,
FiberSet.run(fiberSet)
)
const encoder = new TextEncoder()
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
runRaw((data) =>
typeof data === "string"
? handler(encoder.encode(data))
: handler(data)
return yield* FiberSet.join(fiberSet).pipe(
Effect.catchIf(
SocketCloseError.isClean((_) => !closeCodeIsError(_)),
(_) => Effect.void
)
)
}).pipe(
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)),
Effect.scoped,
Effect.interruptible
)
const write = (chunk: Uint8Array | string | CloseEvent) => Queue.offer(sendQueue, chunk)
const writer = Effect.acquireRelease(
Effect.succeed(write),
() => Queue.offer(sendQueue, EOF)
const encoder = new TextEncoder()
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
runRaw((data) =>
typeof data === "string"
? handler(encoder.encode(data))
: handler(data)
)
return Socket.of({
[TypeId]: TypeId,
run,
runRaw,
writer
})
const write = (chunk: Uint8Array | string | CloseEvent) =>
isCloseEvent(chunk) ?
sendQueue.fail(
new SocketCloseError({
reason: "Close",
code: chunk.code,
closeReason: chunk.reason
})
) :
sendQueue.offer(chunk)
const writer = Effect.acquireRelease(
Effect.succeed(write),
() => sendQueue.end
)
return Socket.of({
[TypeId]: TypeId,
run,
runRaw,
writer
})
)
})

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc