Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@replit/river

Package Overview
Dependencies
Maintainers
30
Versions
162
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@replit/river - npm Package Compare versions

Comparing version 0.7.2 to 0.8.0

dist/__tests__/fixtures/observable.d.ts

10

dist/__tests__/bandwidth.bench.js
import http from 'http';
import { assert, bench, describe } from 'vitest';
import { createWebSocketServer, createWsTransports, onServerReady, } from '../testUtils';
import { createWebSocketServer, createWsTransports, onServerReady, } from '../util/testHelpers';
import largePayload from './largePayload.json';
import { TestServiceConstructor } from './fixtures';
import { TestServiceConstructor } from './fixtures/services';
import { createServer } from '../router/server';

@@ -60,6 +60,6 @@ import { createClient } from '../router/client';

bench('rpc (wait for response)', async () => {
const result = await client.test.add({ n: 1 });
const result = await client.test.add.rpc({ n: 1 });
assert(result.ok);
}, { time: BENCH_DURATION });
const [input, output] = await client.test.echo();
const [input, output] = await client.test.echo.stream();
bench('stream (wait for response)', async () => {

@@ -88,5 +88,5 @@ input.push({ msg: 'abc', ignore: false });

bench('rpc (wait for response)', async () => {
const result = await client.b.f35({ a: 1 });
const result = await client.b.f35.rpc({ a: 1 });
assert(result.ok);
}, { time: BENCH_DURATION });
});
import { afterAll, assert, describe, expect, test } from 'vitest';
import { createWebSocketServer, createWsTransports, onServerReady, } from '../testUtils';
import { createLocalWebSocketClient, createWebSocketServer, createWsTransports, iterNext, onServerReady, } from '../util/testHelpers';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import http from 'http';
import { BinaryFileServiceConstructor, DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, TestServiceConstructor, } from './fixtures';
import { BinaryFileServiceConstructor, DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, TestServiceConstructor, } from './fixtures/services';
import { UNCAUGHT_ERROR } from '../router/result';
import { codecs } from '../codec/codec.test';
import { WebSocketClientTransport } from '../transport/impls/ws/client';
import { WebSocketServerTransport } from '../transport/impls/ws/server';
describe.each(codecs)('client <-> server integration test ($name codec)', async ({ codec }) => {

@@ -25,3 +27,3 @@ const httpServer = http.createServer();

const client = createClient(clientTransport);
const result = await client.test.add({ n: 3 });
const result = await client.test.add.rpc({ n: 3 });
assert(result.ok);

@@ -35,6 +37,6 @@ expect(result.payload).toStrictEqual({ result: 3 });

const client = createClient(clientTransport);
const result = await client.test.divide({ a: 10, b: 2 });
const result = await client.test.divide.rpc({ a: 10, b: 2 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 5 });
const result2 = await client.test.divide({ a: 10, b: 0 });
const result2 = await client.test.divide.rpc({ a: 10, b: 0 });
assert(!result2.ok);

@@ -54,3 +56,3 @@ expect(result2.payload).toStrictEqual({

const client = createClient(clientTransport);
const result = await client.test.getFile({ file: 'test.py' });
const result = await client.test.getFile.rpc({ file: 'test.py' });
assert(result.ok);

@@ -65,3 +67,3 @@ assert(result.payload.contents instanceof Uint8Array);

const client = createClient(clientTransport);
const [input, output, close] = await client.test.echo();
const [input, output, close] = await client.test.echo.stream();
input.push({ msg: 'abc', ignore: false });

@@ -71,6 +73,6 @@ input.push({ msg: 'def', ignore: true });

input.end();
const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });
const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2.ok);

@@ -85,13 +87,13 @@ expect(result2.payload).toStrictEqual({ response: 'ghi' });

const client = createClient(clientTransport);
const [input, output, close] = await client.test.echo();
const [input, output, close] = await client.test.echo.stream();
input.push({ msg: 'abc', throwResult: false, throwError: false });
const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1 && result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });
input.push({ msg: 'def', throwResult: true, throwError: false });
const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2 && !result2.ok);
expect(result2.payload.code).toStrictEqual(STREAM_ERROR);
input.push({ msg: 'ghi', throwResult: false, throwError: true });
const result3 = await output.next().then((res) => res.value);
const result3 = await iterNext(output);
assert(result3 && !result3.ok);

@@ -104,2 +106,38 @@ expect(result3.payload).toStrictEqual({

});
test('subscription', async () => {
const options = { codec };
const serverTransport = new WebSocketServerTransport(webSocketServer, 'SERVER', options);
const client1Transport = new WebSocketClientTransport(() => createLocalWebSocketClient(port), 'client1', 'SERVER', options);
const client2Transport = new WebSocketClientTransport(() => createLocalWebSocketClient(port), 'client2', 'SERVER', options);
const serviceDefs = { test: SubscribableServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client1 = createClient(client1Transport);
const client2 = createClient(client2Transport);
const [subscription1, close1] = await client1.test.value.subscribe({});
let result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });
const [subscription2, close2] = await client2.test.value.subscribe({});
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 0 });
const add1 = await client1.test.add.rpc({ n: 1 });
assert(add1.ok);
result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 1 });
const add2 = await client2.test.add.rpc({ n: 3 });
assert(add2.ok);
result = await iterNext(subscription1);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 4 });
result = await iterNext(subscription2);
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 4 });
close1();
close2();
});
test('message order is preserved in the face of disconnects', async () => {

@@ -119,7 +157,7 @@ const [clientTransport, serverTransport] = getTransports();

}
await client.test.add({
await client.test.add.rpc({
n: i,
});
}
const res = await client.test.getAll({});
const res = await client.test.getAll.rpc({});
assert(res.ok);

@@ -136,3 +174,3 @@ return expect(res.payload.msgs).toStrictEqual(expected);

for (let i = 0; i < CONCURRENCY; i++) {
promises.push(client.test.add({ n: i }));
promises.push(client.test.add.rpc({ n: i }));
}

@@ -152,3 +190,3 @@ for (let i = 0; i < CONCURRENCY; i++) {

for (let i = 0; i < CONCURRENCY; i++) {
const streamHandle = await client.test.echo();
const streamHandle = await client.test.echo.stream();
const input = streamHandle[0];

@@ -161,6 +199,6 @@ input.push({ msg: `${i}-1`, ignore: false });

const output = openStreams[i][1];
const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: `${i}-1` });
const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2.ok);

@@ -167,0 +205,0 @@ expect(result2.payload).toStrictEqual({ response: `${i}-2` });

@@ -1,5 +0,6 @@

import { asClientRpc, asClientStream } from '../testUtils';
import { asClientRpc, asClientStream, asClientSubscription, iterNext, } from '../util/testHelpers';
import { assert, describe, expect, test } from 'vitest';
import { DIV_BY_ZERO, FallibleServiceConstructor, STREAM_ERROR, TestServiceConstructor, } from './fixtures';
import { DIV_BY_ZERO, FallibleServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, TestServiceConstructor, } from './fixtures/services';
import { UNCAUGHT_ERROR } from '../router/result';
import { Observable } from './fixtures/observable';
describe('server-side test', () => {

@@ -42,6 +43,6 @@ const service = TestServiceConstructor();

input.end();
const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1 && result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });
const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2 && result2.ok);

@@ -55,11 +56,11 @@ expect(result2.payload).toStrictEqual({ response: 'ghi' });

input.push({ msg: 'abc', throwResult: false, throwError: false });
const result1 = await output.next().then((res) => res.value);
const result1 = await iterNext(output);
assert(result1 && result1.ok);
expect(result1.payload).toStrictEqual({ response: 'abc' });
input.push({ msg: 'def', throwResult: true, throwError: false });
const result2 = await output.next().then((res) => res.value);
const result2 = await iterNext(output);
assert(result2 && !result2.ok);
expect(result2.payload.code).toStrictEqual(STREAM_ERROR);
input.push({ msg: 'ghi', throwResult: false, throwError: true });
const result3 = await output.next().then((res) => res.value);
const result3 = await iterNext(output);
assert(result3 && !result3.ok);

@@ -73,2 +74,18 @@ expect(result3.payload).toStrictEqual({

});
test('subscriptions', async () => {
const service = SubscribableServiceConstructor();
const state = { count: new Observable(0) };
const add = asClientRpc(state, service.procedures.add);
const subscribe = asClientSubscription(state, service.procedures.value);
const stream = await subscribe({});
const streamResult1 = await iterNext(stream);
assert(streamResult1 && streamResult1.ok);
expect(streamResult1.payload).toStrictEqual({ result: 0 });
const result = await add({ n: 3 });
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
const streamResult2 = await iterNext(stream);
assert(streamResult2 && streamResult1.ok);
expect(streamResult2.payload).toStrictEqual({ result: 3 });
});
});
import { expect, describe, test } from 'vitest';
import { serializeService } from '../router/builder';
import { BinaryFileServiceConstructor, FallibleServiceConstructor, TestServiceConstructor, } from './fixtures';
import { BinaryFileServiceConstructor, FallibleServiceConstructor, TestServiceConstructor, } from './fixtures/services';
describe('serialize service to jsonschema', () => {

@@ -5,0 +5,0 @@ test('serialize basic service', () => {

@@ -9,3 +9,3 @@ import { TObject, Static, TUnion } from '@sinclair/typebox';

*/
export type ValidProcType = 'stream' | 'rpc';
export type ValidProcType = 'rpc' | 'stream' | 'subscription';
/**

@@ -69,3 +69,3 @@ * A generic procedure listing where the keys are the names of the procedures

* @template State - The TypeBox schema of the state object.
* @template Ty - The type of the procedure, either 'rpc' or 'stream'.
* @template Ty - The type of the procedure.
* @template I - The TypeBox schema of the input object.

@@ -80,3 +80,3 @@ * @template O - The TypeBox schema of the output object.

type: Ty;
} : {
} : Ty extends 'stream' ? {
input: I;

@@ -87,3 +87,9 @@ output: O;

type: Ty;
};
} : Ty extends 'subscription' ? {
input: I;
output: O;
errors: E;
handler: (context: ServiceContextWithState<State>, input: TransportMessage<Static<I>>, output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>) => Promise<void>;
type: Ty;
} : never;
export type AnyProcedure = Procedure<object, ValidProcType, TObject, TObject, RiverError>;

@@ -90,0 +96,0 @@ /**

@@ -15,7 +15,16 @@ import { Connection, Transport } from '../transport/transport';

type ServiceClient<Router extends AnyService> = {
[ProcName in keyof Router['procedures']]: ProcType<Router, ProcName> extends 'rpc' ? (input: Static<ProcInput<Router, ProcName>>) => Promise<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>> : () => Promise<[
Pushable<Static<ProcInput<Router, ProcName>>>,
AsyncIter<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>,
() => void
]>;
[ProcName in keyof Router['procedures']]: ProcType<Router, ProcName> extends 'rpc' ? {
rpc: (input: Static<ProcInput<Router, ProcName>>) => Promise<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>;
} : ProcType<Router, ProcName> extends 'stream' ? {
stream: () => Promise<[
Pushable<Static<ProcInput<Router, ProcName>>>,
AsyncIter<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>,
() => void
]>;
} : ProcType<Router, ProcName> extends 'subscription' ? {
subscribe: (input: Static<ProcInput<Router, ProcName>>) => Promise<[
AsyncIter<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>,
() => void
]>;
} : never;
};

@@ -22,0 +31,0 @@ /**

@@ -38,3 +38,6 @@ import { pushable } from 'it-pushable';

export const createClient = (transport, serverId = 'SERVER') => _createRecursiveProxy(async (opts) => {
const [serviceName, procName] = [...opts.path];
const [serviceName, procName, procType] = [...opts.path];
if (!(serviceName && procName && procType)) {
throw new Error('invalid river call, ensure the service and procedure you are calling exists');
}
const [input] = opts.args;

@@ -47,4 +50,3 @@ const streamId = nanoid();

}
if (input === undefined) {
// stream case (stream methods are called with zero arguments)
if (procType === 'stream') {
const inputStream = pushable({ objectMode: true });

@@ -78,4 +80,3 @@ const outputStream = pushable({ objectMode: true });

}
else {
// rpc case
else if (procType === 'rpc') {
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input);

@@ -88,2 +89,23 @@ // rpc is a stream open + close

}
else if (procType === 'subscribe') {
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input);
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */;
transport.send(m);
// transport -> output
const outputStream = pushable({ objectMode: true });
const listener = (msg) => {
if (belongsToSameStream(msg)) {
outputStream.push(msg.payload);
}
};
transport.addMessageListener(listener);
const closeHandler = () => {
outputStream.end();
transport.removeMessageListener(listener);
};
return [outputStream, closeHandler];
}
else {
throw new Error(`invalid river call, unknown procedure type ${procType}`);
}
}, []);

@@ -47,3 +47,4 @@ import { pushable } from 'it-pushable';

const procedure = service.procedures[msg.procedureName];
if (isStreamOpen(msg.controlFlags)) {
const streamIdx = `${msg.serviceName}.${msg.procedureName}:${msg.streamId}`;
if (isStreamOpen(msg.controlFlags) && !streamMap.has(streamIdx)) {
const incoming = pushable({ objectMode: true });

@@ -75,14 +76,30 @@ const outgoing = pushable({ objectMode: true });

openPromises.push((async () => {
for await (const inputMessage of incoming) {
try {
const outputMessage = await procedure.handler(serviceContext, inputMessage);
outgoing.push(outputMessage);
}
catch (err) {
errorHandler(err);
}
const inputMessage = await incoming.next();
if (inputMessage.done) {
return;
}
try {
const outputMessage = await procedure.handler(serviceContext, inputMessage.value);
outgoing.push(outputMessage);
}
catch (err) {
errorHandler(err);
}
})());
}
streamMap.set(`${msg.serviceName}.${msg.procedureName}:${msg.streamId}`, {
else if (procedure.type === 'subscription') {
openPromises.push((async () => {
const inputMessage = await incoming.next();
if (inputMessage.done) {
return;
}
try {
await procedure.handler(serviceContext, inputMessage.value, outgoing);
}
catch (err) {
errorHandler(err);
}
})());
}
streamMap.set(streamIdx, {
incoming,

@@ -93,3 +110,3 @@ outgoing,

}
const procStream = streamMap.get(`${msg.serviceName}.${msg.procedureName}:${msg.streamId}`);
const procStream = streamMap.get(streamIdx);
if (!procStream) {

@@ -96,0 +113,0 @@ log?.warn(`${transport.clientId} -- couldn't find a matching procedure stream for ${msg.serviceName}.${msg.procedureName}:${msg.streamId}`);

@@ -5,3 +5,3 @@ import { describe, test, expect } from 'vitest';

import { waitForMessage } from '../..';
import { payloadToTransportMessage } from '../../../testUtils';
import { payloadToTransportMessage } from '../../../util/testHelpers';
describe('sending and receiving across node streams works', () => {

@@ -8,0 +8,0 @@ test('basic send/receive', async () => {

import http from 'http';
import { describe, test, expect, afterAll } from 'vitest';
import { createWebSocketServer, createWsTransports, createDummyTransportMessage, onServerReady, createLocalWebSocketClient, } from '../../../testUtils';
import { createWebSocketServer, createWsTransports, createDummyTransportMessage, onServerReady, createLocalWebSocketClient, } from '../../../util/testHelpers';
import { msg, waitForMessage } from '../..';

@@ -5,0 +5,0 @@ import { WebSocketServerTransport } from './server';

@@ -5,3 +5,3 @@ {

"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.7.2",
"version": "0.8.0",
"type": "module",

@@ -12,3 +12,3 @@ "exports": {

"./codec": "./dist/codec/index.js",
"./test-util": "./dist/testUtils.js",
"./test-util": "./dist/util/testHelpers.js",
"./transport": "./dist/transport/index.js",

@@ -15,0 +15,0 @@ "./transport/ws/client": "./dist/transport/impls/ws/client.js",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc