@effect/rpc
Advanced tools
Comparing version 0.38.1 to 0.39.0
@@ -16,5 +16,5 @@ "use strict"; | ||
var _Function = require("effect/Function"); | ||
var Mailbox = _interopRequireWildcard(require("effect/Mailbox")); | ||
var _Pipeable = require("effect/Pipeable"); | ||
var Predicate = _interopRequireWildcard(require("effect/Predicate")); | ||
var Queue = _interopRequireWildcard(require("effect/Queue")); | ||
var Stream = _interopRequireWildcard(require("effect/Stream")); | ||
@@ -69,12 +69,2 @@ var _rpc = require("./internal/rpc.js"); | ||
const provideService = exports.provideService = /*#__PURE__*/(0, _Function.dual)(3, (self, tag, service) => fromSet(new Set([...self.rpcs].map(Rpc.provideService(tag, service))))); | ||
const EOF = /*#__PURE__*/Symbol.for("@effect/rpc/Router/EOF"); | ||
const channelFromQueue = queue => { | ||
const loop = Channel.flatMap(Queue.takeBetween(queue, 1, Number.MAX_SAFE_INTEGER), chunk => { | ||
if (Chunk.unsafeLast(chunk) === EOF) { | ||
return Channel.write(Chunk.dropRight(chunk, 1)); | ||
} | ||
return Channel.zipRight(Channel.write(chunk), loop); | ||
}); | ||
return loop; | ||
}; | ||
const emptyExit = /*#__PURE__*/Schema.encodeSync(Schema.Exit({ | ||
@@ -100,3 +90,3 @@ failure: Schema.Never, | ||
const getEncodeChunk = (0, _rpc.withRequestTag)(req => Schema.encode(Schema.Chunk(Serializable.exitSchema(req)))); | ||
return u => (0, _Function.pipe)(decode(u), Effect.zip(Queue.bounded(4)), Effect.tap(([requests, queue]) => (0, _Function.pipe)(Effect.forEach(requests, (req, index) => { | ||
return u => (0, _Function.pipe)(decode(u), Effect.zip(Mailbox.make(4)), Effect.tap(([requests, mailbox]) => (0, _Function.pipe)(Effect.forEach(requests, (req, index) => { | ||
const [request, rpc] = req.request; | ||
@@ -106,4 +96,4 @@ if (rpc._tag === "Effect") { | ||
return (0, _Function.pipe)(Effect.exit(rpc.handler(request)), Effect.flatMap(encode), Effect.orDie, Effect.matchCauseEffect({ | ||
onSuccess: response => Queue.offer(queue, [index, response]), | ||
onFailure: cause => Effect.flatMap(encode(Exit.failCause(cause)), response => Queue.offer(queue, [index, response])) | ||
onSuccess: response => mailbox.offer([index, response]), | ||
onFailure: cause => Effect.flatMap(encode(Exit.failCause(cause)), response => mailbox.offer([index, response])) | ||
}), Effect.locally(Rpc.currentHeaders, req.headers), Effect.withSpan(`${spanPrefix}${request._tag}`, { | ||
@@ -122,5 +112,5 @@ kind: "server", | ||
const encode = getEncodeChunk(request); | ||
return (0, _Function.pipe)(rpc.handler(request), Stream.toChannel, Channel.mapOutEffect(chunk => Effect.flatMap(encode(Chunk.map(chunk, Exit.succeed)), response => Queue.offer(queue, [index, response]))), Channel.runDrain, Effect.matchCauseEffect({ | ||
onSuccess: () => Queue.offer(queue, [index, [emptyExit]]), | ||
onFailure: cause => Effect.flatMap(encode(Chunk.of(Exit.failCause(cause))), response => Queue.offer(queue, [index, response])) | ||
return (0, _Function.pipe)(rpc.handler(request), Stream.toChannel, Channel.mapOutEffect(chunk => Effect.flatMap(encode(Chunk.map(chunk, Exit.succeed)), response => mailbox.offer([index, response]))), Channel.runDrain, Effect.matchCauseEffect({ | ||
onSuccess: () => mailbox.offer([index, [emptyExit]]), | ||
onFailure: cause => Effect.flatMap(encode(Chunk.of(Exit.failCause(cause))), response => mailbox.offer([index, response])) | ||
}), Effect.locally(Rpc.currentHeaders, req.headers), Effect.withSpan(`${spanPrefix}${request._tag}`, { | ||
@@ -140,3 +130,3 @@ kind: "server", | ||
discard: true | ||
}), Effect.ensuring(Queue.offer(queue, EOF)), Effect.forkScoped)), Effect.map(([_, queue]) => Stream.fromChannel(channelFromQueue(queue))), Stream.unwrapScoped); | ||
}), Effect.ensuring(mailbox.end), Effect.forkScoped)), Effect.map(([_, mailbox]) => Mailbox.toStream(mailbox)), Stream.unwrapScoped); | ||
}; | ||
@@ -143,0 +133,0 @@ /** |
@@ -10,5 +10,5 @@ import * as Schema from "@effect/schema/Schema"; | ||
import { dual, pipe } from "effect/Function"; | ||
import * as Mailbox from "effect/Mailbox"; | ||
import { pipeArguments } from "effect/Pipeable"; | ||
import * as Predicate from "effect/Predicate"; | ||
import * as Queue from "effect/Queue"; | ||
import * as Stream from "effect/Stream"; | ||
@@ -59,12 +59,2 @@ import { StreamRequestTypeId, withRequestTag } from "./internal/rpc.js"; | ||
export const provideService = /*#__PURE__*/dual(3, (self, tag, service) => fromSet(new Set([...self.rpcs].map(Rpc.provideService(tag, service))))); | ||
const EOF = /*#__PURE__*/Symbol.for("@effect/rpc/Router/EOF"); | ||
const channelFromQueue = queue => { | ||
const loop = Channel.flatMap(Queue.takeBetween(queue, 1, Number.MAX_SAFE_INTEGER), chunk => { | ||
if (Chunk.unsafeLast(chunk) === EOF) { | ||
return Channel.write(Chunk.dropRight(chunk, 1)); | ||
} | ||
return Channel.zipRight(Channel.write(chunk), loop); | ||
}); | ||
return loop; | ||
}; | ||
const emptyExit = /*#__PURE__*/Schema.encodeSync(Schema.Exit({ | ||
@@ -90,3 +80,3 @@ failure: Schema.Never, | ||
const getEncodeChunk = withRequestTag(req => Schema.encode(Schema.Chunk(Serializable.exitSchema(req)))); | ||
return u => pipe(decode(u), Effect.zip(Queue.bounded(4)), Effect.tap(([requests, queue]) => pipe(Effect.forEach(requests, (req, index) => { | ||
return u => pipe(decode(u), Effect.zip(Mailbox.make(4)), Effect.tap(([requests, mailbox]) => pipe(Effect.forEach(requests, (req, index) => { | ||
const [request, rpc] = req.request; | ||
@@ -96,4 +86,4 @@ if (rpc._tag === "Effect") { | ||
return pipe(Effect.exit(rpc.handler(request)), Effect.flatMap(encode), Effect.orDie, Effect.matchCauseEffect({ | ||
onSuccess: response => Queue.offer(queue, [index, response]), | ||
onFailure: cause => Effect.flatMap(encode(Exit.failCause(cause)), response => Queue.offer(queue, [index, response])) | ||
onSuccess: response => mailbox.offer([index, response]), | ||
onFailure: cause => Effect.flatMap(encode(Exit.failCause(cause)), response => mailbox.offer([index, response])) | ||
}), Effect.locally(Rpc.currentHeaders, req.headers), Effect.withSpan(`${spanPrefix}${request._tag}`, { | ||
@@ -112,5 +102,5 @@ kind: "server", | ||
const encode = getEncodeChunk(request); | ||
return pipe(rpc.handler(request), Stream.toChannel, Channel.mapOutEffect(chunk => Effect.flatMap(encode(Chunk.map(chunk, Exit.succeed)), response => Queue.offer(queue, [index, response]))), Channel.runDrain, Effect.matchCauseEffect({ | ||
onSuccess: () => Queue.offer(queue, [index, [emptyExit]]), | ||
onFailure: cause => Effect.flatMap(encode(Chunk.of(Exit.failCause(cause))), response => Queue.offer(queue, [index, response])) | ||
return pipe(rpc.handler(request), Stream.toChannel, Channel.mapOutEffect(chunk => Effect.flatMap(encode(Chunk.map(chunk, Exit.succeed)), response => mailbox.offer([index, response]))), Channel.runDrain, Effect.matchCauseEffect({ | ||
onSuccess: () => mailbox.offer([index, [emptyExit]]), | ||
onFailure: cause => Effect.flatMap(encode(Chunk.of(Exit.failCause(cause))), response => mailbox.offer([index, response])) | ||
}), Effect.locally(Rpc.currentHeaders, req.headers), Effect.withSpan(`${spanPrefix}${request._tag}`, { | ||
@@ -130,3 +120,3 @@ kind: "server", | ||
discard: true | ||
}), Effect.ensuring(Queue.offer(queue, EOF)), Effect.forkScoped)), Effect.map(([_, queue]) => Stream.fromChannel(channelFromQueue(queue))), Stream.unwrapScoped); | ||
}), Effect.ensuring(mailbox.end), Effect.forkScoped)), Effect.map(([_, mailbox]) => Mailbox.toStream(mailbox)), Stream.unwrapScoped); | ||
}; | ||
@@ -133,0 +123,0 @@ /** |
{ | ||
"name": "@effect/rpc", | ||
"version": "0.38.1", | ||
"version": "0.39.0", | ||
"description": "Functional programming in TypeScript", | ||
@@ -13,5 +13,5 @@ "license": "MIT", | ||
"peerDependencies": { | ||
"@effect/platform": "^0.64.1", | ||
"@effect/schema": "^0.72.4", | ||
"effect": "^3.7.3" | ||
"@effect/platform": "^0.65.0", | ||
"@effect/schema": "^0.73.0", | ||
"effect": "^3.8.0" | ||
}, | ||
@@ -18,0 +18,0 @@ "publishConfig": { |
@@ -14,5 +14,5 @@ /** | ||
import { dual, pipe } from "effect/Function" | ||
import * as Mailbox from "effect/Mailbox" | ||
import { type Pipeable, pipeArguments } from "effect/Pipeable" | ||
import * as Predicate from "effect/Predicate" | ||
import * as Queue from "effect/Queue" | ||
import * as Stream from "effect/Stream" | ||
@@ -176,17 +176,2 @@ import { StreamRequestTypeId, withRequestTag } from "./internal/rpc.js" | ||
const EOF = Symbol.for("@effect/rpc/Router/EOF") | ||
const channelFromQueue = <A>(queue: Queue.Queue<A | typeof EOF>) => { | ||
const loop: Channel.Channel<Chunk.Chunk<A>> = Channel.flatMap( | ||
Queue.takeBetween(queue, 1, Number.MAX_SAFE_INTEGER), | ||
(chunk) => { | ||
if (Chunk.unsafeLast(chunk) === EOF) { | ||
return Channel.write(Chunk.dropRight(chunk as Chunk.Chunk<A>, 1)) | ||
} | ||
return Channel.zipRight(Channel.write(chunk as Chunk.Chunk<A>), loop) | ||
} | ||
) | ||
return loop | ||
} | ||
const emptyExit = Schema.encodeSync(Schema.Exit({ | ||
@@ -224,4 +209,4 @@ failure: Schema.Never, | ||
decode(u), | ||
Effect.zip(Queue.bounded<RpcRouter.Response | typeof EOF>(4)), | ||
Effect.tap(([requests, queue]) => | ||
Effect.zip(Mailbox.make<RpcRouter.Response>(4)), | ||
Effect.tap(([requests, mailbox]) => | ||
pipe( | ||
@@ -237,7 +222,7 @@ Effect.forEach(requests, (req, index) => { | ||
Effect.matchCauseEffect({ | ||
onSuccess: (response) => Queue.offer(queue, [index, response]), | ||
onSuccess: (response) => mailbox.offer([index, response]), | ||
onFailure: (cause) => | ||
Effect.flatMap( | ||
encode(Exit.failCause(cause)), | ||
(response) => Queue.offer(queue, [index, response]) | ||
(response) => mailbox.offer([index, response]) | ||
) | ||
@@ -266,3 +251,3 @@ }), | ||
encode(Chunk.map(chunk, Exit.succeed)), | ||
(response) => Queue.offer(queue, [index, response]) | ||
(response) => mailbox.offer([index, response]) | ||
) | ||
@@ -272,7 +257,7 @@ ), | ||
Effect.matchCauseEffect({ | ||
onSuccess: () => Queue.offer(queue, [index, [emptyExit]]), | ||
onSuccess: () => mailbox.offer([index, [emptyExit]]), | ||
onFailure: (cause) => | ||
Effect.flatMap( | ||
encode(Chunk.of(Exit.failCause(cause))), | ||
(response) => Queue.offer(queue, [index, response]) | ||
(response) => mailbox.offer([index, response]) | ||
) | ||
@@ -294,7 +279,7 @@ }), | ||
}, { concurrency: "unbounded", discard: true }), | ||
Effect.ensuring(Queue.offer(queue, EOF)), | ||
Effect.ensuring(mailbox.end), | ||
Effect.forkScoped | ||
) | ||
), | ||
Effect.map(([_, queue]) => Stream.fromChannel(channelFromQueue(queue))), | ||
Effect.map(([_, mailbox]) => Mailbox.toStream(mailbox)), | ||
Stream.unwrapScoped | ||
@@ -301,0 +286,0 @@ ) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
198171
2701