@replit/river
Advanced tools
Comparing version 0.1.7 to 0.1.8
@@ -21,5 +21,5 @@ export declare const EchoRequest: import("@sinclair/typebox").TObject<{ | ||
}>; | ||
handler: (state: { | ||
handler: (context: import("..").ServiceContextWithState<{ | ||
count: number; | ||
}, input: import("../transport/message").TransportMessage<{ | ||
}>, input: import("../transport/message").TransportMessage<{ | ||
n: number; | ||
@@ -40,5 +40,5 @@ }>) => Promise<import("../transport/message").TransportMessage<{ | ||
}>; | ||
handler: (state: { | ||
handler: (context: import("..").ServiceContextWithState<{ | ||
count: number; | ||
}, input: AsyncIterable<import("../transport/message").TransportMessage<{ | ||
}>, input: AsyncIterable<import("../transport/message").TransportMessage<{ | ||
msg: string; | ||
@@ -45,0 +45,0 @@ ignore: boolean; |
@@ -23,6 +23,6 @@ import http from 'http'; | ||
output: Type.Object({ result: Type.Number() }), | ||
async handler(state, msg) { | ||
async handler(ctx, msg) { | ||
const { n } = msg.payload; | ||
state.count += n; | ||
return reply(msg, { result: state.count }); | ||
ctx.state.count += n; | ||
return reply(msg, { result: ctx.state.count }); | ||
}, | ||
@@ -34,3 +34,3 @@ }) | ||
output: EchoResponse, | ||
async handler(_state, msgStream, returnStream) { | ||
async handler(_ctx, msgStream, returnStream) { | ||
for await (const msg of msgStream) { | ||
@@ -37,0 +37,0 @@ const req = msg.payload; |
@@ -8,2 +8,3 @@ export { serializeService, ServiceBuilder } from './router/builder'; | ||
export type { Server } from './router/server'; | ||
export type { ServiceContext, ServiceContextWithState } from './router/context'; | ||
export { Transport } from './transport/types'; | ||
@@ -10,0 +11,0 @@ export { TransportMessageSchema, OpaqueTransportMessageSchema, TransportAckSchema, msg, payloadToTransportMessage, ack, reply, } from './transport/message'; |
import { TObject, Static } from '@sinclair/typebox'; | ||
import type { Pushable } from 'it-pushable'; | ||
import { TransportMessage } from '../transport/message'; | ||
import { ServiceContextWithState } from './context'; | ||
export type ValidProcType = 'stream' | 'rpc'; | ||
@@ -20,3 +21,3 @@ export type ProcListing = Record<string, Procedure<object, ValidProcType, TObject, TObject>>; | ||
output: O; | ||
handler: (state: State, input: TransportMessage<Static<I>>) => Promise<TransportMessage<Static<O>>>; | ||
handler: (context: ServiceContextWithState<State>, input: TransportMessage<Static<I>>) => Promise<TransportMessage<Static<O>>>; | ||
type: Ty; | ||
@@ -26,3 +27,3 @@ } : { | ||
output: O; | ||
handler: (state: State, input: AsyncIterable<TransportMessage<Static<I>>>, output: Pushable<TransportMessage<Static<O>>>) => Promise<void>; | ||
handler: (context: ServiceContextWithState<State>, input: AsyncIterable<TransportMessage<Static<I>>>, output: Pushable<TransportMessage<Static<O>>>) => Promise<void>; | ||
type: Ty; | ||
@@ -29,0 +30,0 @@ }; |
import { Transport } from '../transport/types'; | ||
import { AnyService } from './builder'; | ||
import { ServiceContext } from './context'; | ||
export interface Server<Services> { | ||
@@ -7,2 +8,2 @@ services: Services; | ||
} | ||
export declare function createServer<Services extends Record<string, AnyService>>(transport: Transport, services: Services): Promise<Server<Services>>; | ||
export declare function createServer<Services extends Record<string, AnyService>>(transport: Transport, services: Services, extendedContext?: Omit<ServiceContext, 'state'>): Promise<Server<Services>>; |
import { Value } from '@sinclair/typebox/value'; | ||
import { pushable } from 'it-pushable'; | ||
export async function createServer(transport, services) { | ||
// create streams for every stream procedure | ||
export async function createServer(transport, services, extendedContext) { | ||
const contextMap = new Map(); | ||
const streamMap = new Map(); | ||
function getContext(service) { | ||
const context = contextMap.get(service); | ||
if (!context) { | ||
throw new Error(`No context found for ${service.name}`); | ||
} | ||
return context; | ||
} | ||
for (const [serviceName, service] of Object.entries(services)) { | ||
// populate the context map | ||
contextMap.set(service, { ...extendedContext, state: service.state }); | ||
// create streams for every stream procedure | ||
for (const [procedureName, proc] of Object.entries(service.procedures)) { | ||
@@ -17,3 +27,3 @@ const procedure = proc; | ||
// processing the actual procedure | ||
procedure.handler(service.state, incoming, outgoing), | ||
procedure.handler(getContext(service), incoming, outgoing), | ||
// sending outgoing messages back to client | ||
@@ -43,4 +53,3 @@ (async () => { | ||
Value.Check(procedure.input, inputMessage.payload)) { | ||
// synchronous rpc | ||
const response = await procedure.handler(service.state, inputMessage); | ||
const response = await procedure.handler(getContext(service), inputMessage); | ||
transport.send(response); | ||
@@ -47,0 +56,0 @@ return; |
import { Static, TObject } from '@sinclair/typebox'; | ||
import { Procedure } from './builder'; | ||
import type { Pushable } from 'it-pushable'; | ||
export declare function asClientRpc<State extends object | unknown, I extends TObject, O extends TObject>(state: State, proc: Procedure<State, 'rpc', I, O>): (msg: Static<I>) => Promise<Static<O>>; | ||
export declare function asClientStream<State extends object | unknown, I extends TObject, O extends TObject>(state: State, proc: Procedure<State, 'stream', I, O>): [Pushable<Static<I>>, Pushable<Static<O>>]; | ||
import { ServiceContext } from './context'; | ||
export declare function asClientRpc<State extends object | unknown, I extends TObject, O extends TObject>(state: State, proc: Procedure<State, 'rpc', I, O>, extendedContext?: Omit<ServiceContext, 'state'>): (msg: Static<I>) => Promise<Static<O>>; | ||
export declare function asClientStream<State extends object | unknown, I extends TObject, O extends TObject>(state: State, proc: Procedure<State, 'stream', I, O>, extendedContext?: Omit<ServiceContext, 'state'>): [Pushable<Static<I>>, Pushable<Static<O>>]; |
import { payloadToTransportMessage, } from '../transport/message'; | ||
import { pushable } from 'it-pushable'; | ||
export function asClientRpc(state, proc) { | ||
export function asClientRpc(state, proc, extendedContext) { | ||
return (msg) => proc | ||
.handler(state, payloadToTransportMessage(msg)) | ||
.handler({ ...extendedContext, state }, payloadToTransportMessage(msg)) | ||
.then((res) => res.payload); | ||
} | ||
export function asClientStream(state, proc) { | ||
export function asClientStream(state, proc, extendedContext) { | ||
const i = pushable({ objectMode: true }); | ||
@@ -28,3 +28,3 @@ const o = pushable({ objectMode: true }); | ||
(async () => { | ||
await proc.handler(state, ri, ro); | ||
await proc.handler({ ...extendedContext, state }, ri, ro); | ||
ro.end(); | ||
@@ -31,0 +31,0 @@ })(); |
{ | ||
"name": "@replit/river", | ||
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", | ||
"version": "0.1.7", | ||
"version": "0.1.8", | ||
"type": "module", | ||
@@ -6,0 +6,0 @@ "main": "dist/index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
46072
45
1151