@replit/river
Advanced tools
Comparing version 0.7.2 to 0.8.0
import http from 'http'; | ||
import { assert, bench, describe } from 'vitest'; | ||
import { createWebSocketServer, createWsTransports, onServerReady, } from '../testUtils'; | ||
import { createWebSocketServer, createWsTransports, onServerReady, } from '../util/testHelpers'; | ||
import largePayload from './largePayload.json'; | ||
import { TestServiceConstructor } from './fixtures'; | ||
import { TestServiceConstructor } from './fixtures/services'; | ||
import { createServer } from '../router/server'; | ||
@@ -60,6 +60,6 @@ import { createClient } from '../router/client'; | ||
bench('rpc (wait for response)', async () => { | ||
const result = await client.test.add({ n: 1 }); | ||
const result = await client.test.add.rpc({ n: 1 }); | ||
assert(result.ok); | ||
}, { time: BENCH_DURATION }); | ||
const [input, output] = await client.test.echo(); | ||
const [input, output] = await client.test.echo.stream(); | ||
bench('stream (wait for response)', async () => { | ||
@@ -88,5 +88,5 @@ input.push({ msg: 'abc', ignore: false }); | ||
bench('rpc (wait for response)', async () => { | ||
const result = await client.b.f35({ a: 1 }); | ||
const result = await client.b.f35.rpc({ a: 1 }); | ||
assert(result.ok); | ||
}, { time: BENCH_DURATION }); | ||
}); |
import { afterAll, assert, describe, expect, test } from 'vitest'; | ||
import { createWebSocketServer, createWsTransports, onServerReady, } from '../testUtils'; | ||
import { createLocalWebSocketClient, createWebSocketServer, createWsTransports, iterNext, onServerReady, } from '../util/testHelpers'; | ||
import { createServer } from '../router/server'; | ||
import { createClient } from '../router/client'; | ||
import http from 'http'; | ||
import { BinaryFileServiceConstructor, DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, TestServiceConstructor, } from './fixtures'; | ||
import { BinaryFileServiceConstructor, DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, TestServiceConstructor, } from './fixtures/services'; | ||
import { UNCAUGHT_ERROR } from '../router/result'; | ||
import { codecs } from '../codec/codec.test'; | ||
import { WebSocketClientTransport } from '../transport/impls/ws/client'; | ||
import { WebSocketServerTransport } from '../transport/impls/ws/server'; | ||
describe.each(codecs)('client <-> server integration test ($name codec)', async ({ codec }) => { | ||
@@ -25,3 +27,3 @@ const httpServer = http.createServer(); | ||
const client = createClient(clientTransport); | ||
const result = await client.test.add({ n: 3 }); | ||
const result = await client.test.add.rpc({ n: 3 }); | ||
assert(result.ok); | ||
@@ -35,6 +37,6 @@ expect(result.payload).toStrictEqual({ result: 3 }); | ||
const client = createClient(clientTransport); | ||
const result = await client.test.divide({ a: 10, b: 2 }); | ||
const result = await client.test.divide.rpc({ a: 10, b: 2 }); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 5 }); | ||
const result2 = await client.test.divide({ a: 10, b: 0 }); | ||
const result2 = await client.test.divide.rpc({ a: 10, b: 0 }); | ||
assert(!result2.ok); | ||
@@ -54,3 +56,3 @@ expect(result2.payload).toStrictEqual({ | ||
const client = createClient(clientTransport); | ||
const result = await client.test.getFile({ file: 'test.py' }); | ||
const result = await client.test.getFile.rpc({ file: 'test.py' }); | ||
assert(result.ok); | ||
@@ -65,3 +67,3 @@ assert(result.payload.contents instanceof Uint8Array); | ||
const client = createClient(clientTransport); | ||
const [input, output, close] = await client.test.echo(); | ||
const [input, output, close] = await client.test.echo.stream(); | ||
input.push({ msg: 'abc', ignore: false }); | ||
@@ -71,6 +73,6 @@ input.push({ msg: 'def', ignore: true }); | ||
input.end(); | ||
const result1 = await output.next().then((res) => res.value); | ||
const result1 = await iterNext(output); | ||
assert(result1.ok); | ||
expect(result1.payload).toStrictEqual({ response: 'abc' }); | ||
const result2 = await output.next().then((res) => res.value); | ||
const result2 = await iterNext(output); | ||
assert(result2.ok); | ||
@@ -85,13 +87,13 @@ expect(result2.payload).toStrictEqual({ response: 'ghi' }); | ||
const client = createClient(clientTransport); | ||
const [input, output, close] = await client.test.echo(); | ||
const [input, output, close] = await client.test.echo.stream(); | ||
input.push({ msg: 'abc', throwResult: false, throwError: false }); | ||
const result1 = await output.next().then((res) => res.value); | ||
const result1 = await iterNext(output); | ||
assert(result1 && result1.ok); | ||
expect(result1.payload).toStrictEqual({ response: 'abc' }); | ||
input.push({ msg: 'def', throwResult: true, throwError: false }); | ||
const result2 = await output.next().then((res) => res.value); | ||
const result2 = await iterNext(output); | ||
assert(result2 && !result2.ok); | ||
expect(result2.payload.code).toStrictEqual(STREAM_ERROR); | ||
input.push({ msg: 'ghi', throwResult: false, throwError: true }); | ||
const result3 = await output.next().then((res) => res.value); | ||
const result3 = await iterNext(output); | ||
assert(result3 && !result3.ok); | ||
@@ -104,2 +106,38 @@ expect(result3.payload).toStrictEqual({ | ||
}); | ||
test('subscription', async () => { | ||
const options = { codec }; | ||
const serverTransport = new WebSocketServerTransport(webSocketServer, 'SERVER', options); | ||
const client1Transport = new WebSocketClientTransport(() => createLocalWebSocketClient(port), 'client1', 'SERVER', options); | ||
const client2Transport = new WebSocketClientTransport(() => createLocalWebSocketClient(port), 'client2', 'SERVER', options); | ||
const serviceDefs = { test: SubscribableServiceConstructor() }; | ||
const server = await createServer(serverTransport, serviceDefs); | ||
const client1 = createClient(client1Transport); | ||
const client2 = createClient(client2Transport); | ||
const [subscription1, close1] = await client1.test.value.subscribe({}); | ||
let result = await iterNext(subscription1); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 0 }); | ||
const [subscription2, close2] = await client2.test.value.subscribe({}); | ||
result = await iterNext(subscription2); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 0 }); | ||
const add1 = await client1.test.add.rpc({ n: 1 }); | ||
assert(add1.ok); | ||
result = await iterNext(subscription1); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 1 }); | ||
result = await iterNext(subscription2); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 1 }); | ||
const add2 = await client2.test.add.rpc({ n: 3 }); | ||
assert(add2.ok); | ||
result = await iterNext(subscription1); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 4 }); | ||
result = await iterNext(subscription2); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 4 }); | ||
close1(); | ||
close2(); | ||
}); | ||
test('message order is preserved in the face of disconnects', async () => { | ||
@@ -119,7 +157,7 @@ const [clientTransport, serverTransport] = getTransports(); | ||
} | ||
await client.test.add({ | ||
await client.test.add.rpc({ | ||
n: i, | ||
}); | ||
} | ||
const res = await client.test.getAll({}); | ||
const res = await client.test.getAll.rpc({}); | ||
assert(res.ok); | ||
@@ -136,3 +174,3 @@ return expect(res.payload.msgs).toStrictEqual(expected); | ||
for (let i = 0; i < CONCURRENCY; i++) { | ||
promises.push(client.test.add({ n: i })); | ||
promises.push(client.test.add.rpc({ n: i })); | ||
} | ||
@@ -152,3 +190,3 @@ for (let i = 0; i < CONCURRENCY; i++) { | ||
for (let i = 0; i < CONCURRENCY; i++) { | ||
const streamHandle = await client.test.echo(); | ||
const streamHandle = await client.test.echo.stream(); | ||
const input = streamHandle[0]; | ||
@@ -161,6 +199,6 @@ input.push({ msg: `${i}-1`, ignore: false }); | ||
const output = openStreams[i][1]; | ||
const result1 = await output.next().then((res) => res.value); | ||
const result1 = await iterNext(output); | ||
assert(result1.ok); | ||
expect(result1.payload).toStrictEqual({ response: `${i}-1` }); | ||
const result2 = await output.next().then((res) => res.value); | ||
const result2 = await iterNext(output); | ||
assert(result2.ok); | ||
@@ -167,0 +205,0 @@ expect(result2.payload).toStrictEqual({ response: `${i}-2` }); |
@@ -1,5 +0,6 @@ | ||
import { asClientRpc, asClientStream } from '../testUtils'; | ||
import { asClientRpc, asClientStream, asClientSubscription, iterNext, } from '../util/testHelpers'; | ||
import { assert, describe, expect, test } from 'vitest'; | ||
import { DIV_BY_ZERO, FallibleServiceConstructor, STREAM_ERROR, TestServiceConstructor, } from './fixtures'; | ||
import { DIV_BY_ZERO, FallibleServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, TestServiceConstructor, } from './fixtures/services'; | ||
import { UNCAUGHT_ERROR } from '../router/result'; | ||
import { Observable } from './fixtures/observable'; | ||
describe('server-side test', () => { | ||
@@ -42,6 +43,6 @@ const service = TestServiceConstructor(); | ||
input.end(); | ||
const result1 = await output.next().then((res) => res.value); | ||
const result1 = await iterNext(output); | ||
assert(result1 && result1.ok); | ||
expect(result1.payload).toStrictEqual({ response: 'abc' }); | ||
const result2 = await output.next().then((res) => res.value); | ||
const result2 = await iterNext(output); | ||
assert(result2 && result2.ok); | ||
@@ -55,11 +56,11 @@ expect(result2.payload).toStrictEqual({ response: 'ghi' }); | ||
input.push({ msg: 'abc', throwResult: false, throwError: false }); | ||
const result1 = await output.next().then((res) => res.value); | ||
const result1 = await iterNext(output); | ||
assert(result1 && result1.ok); | ||
expect(result1.payload).toStrictEqual({ response: 'abc' }); | ||
input.push({ msg: 'def', throwResult: true, throwError: false }); | ||
const result2 = await output.next().then((res) => res.value); | ||
const result2 = await iterNext(output); | ||
assert(result2 && !result2.ok); | ||
expect(result2.payload.code).toStrictEqual(STREAM_ERROR); | ||
input.push({ msg: 'ghi', throwResult: false, throwError: true }); | ||
const result3 = await output.next().then((res) => res.value); | ||
const result3 = await iterNext(output); | ||
assert(result3 && !result3.ok); | ||
@@ -73,2 +74,18 @@ expect(result3.payload).toStrictEqual({ | ||
}); | ||
test('subscriptions', async () => { | ||
const service = SubscribableServiceConstructor(); | ||
const state = { count: new Observable(0) }; | ||
const add = asClientRpc(state, service.procedures.add); | ||
const subscribe = asClientSubscription(state, service.procedures.value); | ||
const stream = await subscribe({}); | ||
const streamResult1 = await iterNext(stream); | ||
assert(streamResult1 && streamResult1.ok); | ||
expect(streamResult1.payload).toStrictEqual({ result: 0 }); | ||
const result = await add({ n: 3 }); | ||
assert(result.ok); | ||
expect(result.payload).toStrictEqual({ result: 3 }); | ||
const streamResult2 = await iterNext(stream); | ||
assert(streamResult2 && streamResult1.ok); | ||
expect(streamResult2.payload).toStrictEqual({ result: 3 }); | ||
}); | ||
}); |
import { expect, describe, test } from 'vitest'; | ||
import { serializeService } from '../router/builder'; | ||
import { BinaryFileServiceConstructor, FallibleServiceConstructor, TestServiceConstructor, } from './fixtures'; | ||
import { BinaryFileServiceConstructor, FallibleServiceConstructor, TestServiceConstructor, } from './fixtures/services'; | ||
describe('serialize service to jsonschema', () => { | ||
@@ -5,0 +5,0 @@ test('serialize basic service', () => { |
@@ -9,3 +9,3 @@ import { TObject, Static, TUnion } from '@sinclair/typebox'; | ||
*/ | ||
export type ValidProcType = 'stream' | 'rpc'; | ||
export type ValidProcType = 'rpc' | 'stream' | 'subscription'; | ||
/** | ||
@@ -69,3 +69,3 @@ * A generic procedure listing where the keys are the names of the procedures | ||
* @template State - The TypeBox schema of the state object. | ||
* @template Ty - The type of the procedure, either 'rpc' or 'stream'. | ||
* @template Ty - The type of the procedure. | ||
* @template I - The TypeBox schema of the input object. | ||
@@ -80,3 +80,3 @@ * @template O - The TypeBox schema of the output object. | ||
type: Ty; | ||
} : { | ||
} : Ty extends 'stream' ? { | ||
input: I; | ||
@@ -87,3 +87,9 @@ output: O; | ||
type: Ty; | ||
}; | ||
} : Ty extends 'subscription' ? { | ||
input: I; | ||
output: O; | ||
errors: E; | ||
handler: (context: ServiceContextWithState<State>, input: TransportMessage<Static<I>>, output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>) => Promise<void>; | ||
type: Ty; | ||
} : never; | ||
export type AnyProcedure = Procedure<object, ValidProcType, TObject, TObject, RiverError>; | ||
@@ -90,0 +96,0 @@ /** |
@@ -15,7 +15,16 @@ import { Connection, Transport } from '../transport/transport'; | ||
type ServiceClient<Router extends AnyService> = { | ||
[ProcName in keyof Router['procedures']]: ProcType<Router, ProcName> extends 'rpc' ? (input: Static<ProcInput<Router, ProcName>>) => Promise<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>> : () => Promise<[ | ||
Pushable<Static<ProcInput<Router, ProcName>>>, | ||
AsyncIter<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>, | ||
() => void | ||
]>; | ||
[ProcName in keyof Router['procedures']]: ProcType<Router, ProcName> extends 'rpc' ? { | ||
rpc: (input: Static<ProcInput<Router, ProcName>>) => Promise<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>; | ||
} : ProcType<Router, ProcName> extends 'stream' ? { | ||
stream: () => Promise<[ | ||
Pushable<Static<ProcInput<Router, ProcName>>>, | ||
AsyncIter<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>, | ||
() => void | ||
]>; | ||
} : ProcType<Router, ProcName> extends 'subscription' ? { | ||
subscribe: (input: Static<ProcInput<Router, ProcName>>) => Promise<[ | ||
AsyncIter<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>, | ||
() => void | ||
]>; | ||
} : never; | ||
}; | ||
@@ -22,0 +31,0 @@ /** |
@@ -38,3 +38,6 @@ import { pushable } from 'it-pushable'; | ||
export const createClient = (transport, serverId = 'SERVER') => _createRecursiveProxy(async (opts) => { | ||
const [serviceName, procName] = [...opts.path]; | ||
const [serviceName, procName, procType] = [...opts.path]; | ||
if (!(serviceName && procName && procType)) { | ||
throw new Error('invalid river call, ensure the service and procedure you are calling exists'); | ||
} | ||
const [input] = opts.args; | ||
@@ -47,4 +50,3 @@ const streamId = nanoid(); | ||
} | ||
if (input === undefined) { | ||
// stream case (stream methods are called with zero arguments) | ||
if (procType === 'stream') { | ||
const inputStream = pushable({ objectMode: true }); | ||
@@ -78,4 +80,3 @@ const outputStream = pushable({ objectMode: true }); | ||
} | ||
else { | ||
// rpc case | ||
else if (procType === 'rpc') { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input); | ||
@@ -88,2 +89,23 @@ // rpc is a stream open + close | ||
} | ||
else if (procType === 'subscribe') { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input); | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
transport.send(m); | ||
// transport -> output | ||
const outputStream = pushable({ objectMode: true }); | ||
const listener = (msg) => { | ||
if (belongsToSameStream(msg)) { | ||
outputStream.push(msg.payload); | ||
} | ||
}; | ||
transport.addMessageListener(listener); | ||
const closeHandler = () => { | ||
outputStream.end(); | ||
transport.removeMessageListener(listener); | ||
}; | ||
return [outputStream, closeHandler]; | ||
} | ||
else { | ||
throw new Error(`invalid river call, unknown procedure type ${procType}`); | ||
} | ||
}, []); |
@@ -47,3 +47,4 @@ import { pushable } from 'it-pushable'; | ||
const procedure = service.procedures[msg.procedureName]; | ||
if (isStreamOpen(msg.controlFlags)) { | ||
const streamIdx = `${msg.serviceName}.${msg.procedureName}:${msg.streamId}`; | ||
if (isStreamOpen(msg.controlFlags) && !streamMap.has(streamIdx)) { | ||
const incoming = pushable({ objectMode: true }); | ||
@@ -75,14 +76,30 @@ const outgoing = pushable({ objectMode: true }); | ||
openPromises.push((async () => { | ||
for await (const inputMessage of incoming) { | ||
try { | ||
const outputMessage = await procedure.handler(serviceContext, inputMessage); | ||
outgoing.push(outputMessage); | ||
} | ||
catch (err) { | ||
errorHandler(err); | ||
} | ||
const inputMessage = await incoming.next(); | ||
if (inputMessage.done) { | ||
return; | ||
} | ||
try { | ||
const outputMessage = await procedure.handler(serviceContext, inputMessage.value); | ||
outgoing.push(outputMessage); | ||
} | ||
catch (err) { | ||
errorHandler(err); | ||
} | ||
})()); | ||
} | ||
streamMap.set(`${msg.serviceName}.${msg.procedureName}:${msg.streamId}`, { | ||
else if (procedure.type === 'subscription') { | ||
openPromises.push((async () => { | ||
const inputMessage = await incoming.next(); | ||
if (inputMessage.done) { | ||
return; | ||
} | ||
try { | ||
await procedure.handler(serviceContext, inputMessage.value, outgoing); | ||
} | ||
catch (err) { | ||
errorHandler(err); | ||
} | ||
})()); | ||
} | ||
streamMap.set(streamIdx, { | ||
incoming, | ||
@@ -93,3 +110,3 @@ outgoing, | ||
} | ||
const procStream = streamMap.get(`${msg.serviceName}.${msg.procedureName}:${msg.streamId}`); | ||
const procStream = streamMap.get(streamIdx); | ||
if (!procStream) { | ||
@@ -96,0 +113,0 @@ log?.warn(`${transport.clientId} -- couldn't find a matching procedure stream for ${msg.serviceName}.${msg.procedureName}:${msg.streamId}`); |
@@ -5,3 +5,3 @@ import { describe, test, expect } from 'vitest'; | ||
import { waitForMessage } from '../..'; | ||
import { payloadToTransportMessage } from '../../../testUtils'; | ||
import { payloadToTransportMessage } from '../../../util/testHelpers'; | ||
describe('sending and receiving across node streams works', () => { | ||
@@ -8,0 +8,0 @@ test('basic send/receive', async () => { |
import http from 'http'; | ||
import { describe, test, expect, afterAll } from 'vitest'; | ||
import { createWebSocketServer, createWsTransports, createDummyTransportMessage, onServerReady, createLocalWebSocketClient, } from '../../../testUtils'; | ||
import { createWebSocketServer, createWsTransports, createDummyTransportMessage, onServerReady, createLocalWebSocketClient, } from '../../../util/testHelpers'; | ||
import { msg, waitForMessage } from '../..'; | ||
@@ -5,0 +5,0 @@ import { WebSocketServerTransport } from './server'; |
@@ -5,3 +5,3 @@ { | ||
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", | ||
"version": "0.7.2", | ||
"version": "0.8.0", | ||
"type": "module", | ||
@@ -12,3 +12,3 @@ "exports": { | ||
"./codec": "./dist/codec/index.js", | ||
"./test-util": "./dist/testUtils.js", | ||
"./test-util": "./dist/util/testHelpers.js", | ||
"./transport": "./dist/transport/index.js", | ||
@@ -15,0 +15,0 @@ "./transport/ws/client": "./dist/transport/impls/ws/client.js", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
229056
97
4879