Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@replit/river

Package Overview
Dependencies
Maintainers
30
Versions
162
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@replit/river - npm Package Compare versions

Comparing version 0.8.1 to 0.9.0

dist/transport/events.d.ts

65

dist/__tests__/e2e.test.js

@@ -6,3 +6,3 @@ import { afterAll, assert, describe, expect, test } from 'vitest';

import http from 'http';
import { BinaryFileServiceConstructor, DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, TestServiceConstructor, } from './fixtures/services';
import { BinaryFileServiceConstructor, DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, UploadableServiceConstructor, TestServiceConstructor, } from './fixtures/services';
import { UNCAUGHT_ERROR } from '../router/result';

@@ -104,2 +104,27 @@ import { codecs } from '../codec/codec.test';

});
test('stream with init message', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { test: TestServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient(clientTransport);
const [input, output, close] = await client.test.echoWithPrefix.stream({
prefix: 'test',
});
input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();
const result1 = await iterNext(output);
assert(result1.ok);
expect(result1.payload).toStrictEqual({ response: 'test abc' });
const result2 = await iterNext(output);
assert(result2.ok);
expect(result2.payload).toStrictEqual({ response: 'test ghi' });
close();
await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});
test('fallible stream', async () => {

@@ -174,2 +199,40 @@ const [clientTransport, serverTransport] = getTransports();

});
test('upload', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { uploadable: UploadableServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient(clientTransport);
const [addStream, addResult] = await client.uploadable.addMultiple.upload();
addStream.push({ n: 1 });
addStream.push({ n: 2 });
addStream.end();
const result = await addResult;
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 3 });
await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});
test('upload with init message', async () => {
const [clientTransport, serverTransport] = getTransports();
const serviceDefs = { uploadable: UploadableServiceConstructor() };
const server = await createServer(serverTransport, serviceDefs);
const client = createClient(clientTransport);
const [addStream, addResult] = await client.uploadable.addMultipleWithPrefix.upload({
prefix: 'test',
});
addStream.push({ n: 1 });
addStream.push({ n: 2 });
addStream.end();
const result = await addResult;
assert(result.ok);
expect(result.payload).toStrictEqual({ result: 'test 3' });
await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});
test('message order is preserved in the face of disconnects', async () => {

@@ -176,0 +239,0 @@ const [clientTransport, serverTransport] = getTransports();

2

dist/__tests__/fixtures/cleanup.js

@@ -9,3 +9,3 @@ import { expect, vi } from 'vitest';

expect(t.connections, `transport ${t.clientId} should not have open connections after the test`).toStrictEqual(new Map());
expect(t.messageHandlers, `transport ${t.clientId} should not have open message handlers after the test`).toStrictEqual(new Set());
expect(t.eventDispatcher.numberOfListeners('message'), `transport ${t.clientId} should not have open message handlers after the test`).equal(0);
}

@@ -12,0 +12,0 @@ export async function waitUntil(valueGetter, expected, message) {

@@ -55,2 +55,29 @@ import { Observable } from './observable';

};
} & {
echoWithPrefix: {
init: import("@sinclair/typebox").TObject<{
prefix: import("@sinclair/typebox").TString;
}>;
input: import("@sinclair/typebox").TObject<{
msg: import("@sinclair/typebox").TString;
ignore: import("@sinclair/typebox").TBoolean;
end: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TBoolean>;
}>;
output: import("@sinclair/typebox").TObject<{
response: import("@sinclair/typebox").TString;
}>;
errors: import("@sinclair/typebox").TNever;
handler: (context: import("../../router").ServiceContextWithState<{
count: number;
}>, init: import("../../transport/message").TransportMessage<{
prefix: string;
}>, input: AsyncIterable<import("../../transport/message").TransportMessage<{
end?: boolean | undefined;
msg: string;
ignore: boolean;
}>>, output: import("it-pushable").Pushable<import("../../transport/message").TransportMessage<import("../../router/result").Result<{
response: string;
}, never>>, void, unknown>) => Promise<void>;
type: "stream";
};
};

@@ -220,2 +247,44 @@ };

};
export declare const UploadableServiceConstructor: () => {
name: "uploadable";
state: {};
procedures: {
addMultiple: {
input: import("@sinclair/typebox").TObject<{
n: import("@sinclair/typebox").TNumber;
}>;
output: import("@sinclair/typebox").TObject<{
result: import("@sinclair/typebox").TNumber;
}>;
errors: import("@sinclair/typebox").TNever;
handler: (context: import("../../router").ServiceContextWithState<{}>, input: AsyncIterable<import("../../transport/message").TransportMessage<{
n: number;
}>>) => Promise<import("../../transport/message").TransportMessage<import("../../router/result").Result<{
result: number;
}, never>>>;
type: "upload";
};
} & {
addMultipleWithPrefix: {
init: import("@sinclair/typebox").TObject<{
prefix: import("@sinclair/typebox").TString;
}>;
input: import("@sinclair/typebox").TObject<{
n: import("@sinclair/typebox").TNumber;
}>;
output: import("@sinclair/typebox").TObject<{
result: import("@sinclair/typebox").TString;
}>;
errors: import("@sinclair/typebox").TNever;
handler: (context: import("../../router").ServiceContextWithState<{}>, init: import("../../transport/message").TransportMessage<{
prefix: string;
}>, input: AsyncIterable<import("../../transport/message").TransportMessage<{
n: number;
}>>) => Promise<import("../../transport/message").TransportMessage<import("../../router/result").Result<{
result: string;
}, never>>>;
type: "upload";
};
};
};
//# sourceMappingURL=services.d.ts.map

@@ -44,2 +44,17 @@ import { Type } from '@sinclair/typebox';

})
.defineProcedure('echoWithPrefix', {
type: 'stream',
init: Type.Object({ prefix: Type.String() }),
input: EchoRequest,
output: EchoResponse,
errors: Type.Never(),
async handler(_ctx, init, msgStream, returnStream) {
for await (const msg of msgStream) {
const req = msg.payload;
if (!req.ignore) {
returnStream.push(reply(msg, Ok({ response: `${init.payload.prefix} ${req.msg}` })));
}
}
},
})
.finalize();

@@ -175,1 +190,37 @@ export const OrderingServiceConstructor = () => ServiceBuilder.create('test')

.finalize();
export const UploadableServiceConstructor = () => ServiceBuilder.create('uploadable')
.initialState({})
.defineProcedure('addMultiple', {
type: 'upload',
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
async handler(_ctx, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
result += n;
lastMsg = msg;
}
return reply(lastMsg, Ok({ result: result }));
},
})
.defineProcedure('addMultipleWithPrefix', {
type: 'upload',
init: Type.Object({ prefix: Type.String() }),
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.String() }),
errors: Type.Never(),
async handler(_ctx, init, msgStream) {
let result = 0;
let lastMsg;
for await (const msg of msgStream) {
const { n } = msg.payload;
result += n;
lastMsg = msg;
}
return reply(lastMsg, Ok({ result: init.payload.prefix + ' ' + result }));
},
})
.finalize();

@@ -1,4 +0,4 @@

import { asClientRpc, asClientStream, asClientSubscription, iterNext, } from '../util/testHelpers';
import { asClientRpc, asClientStream, asClientStreamWithInitialization, asClientSubscription, asClientUpload, asClientUploadWithInitialization, iterNext, } from '../util/testHelpers';
import { assert, describe, expect, test } from 'vitest';
import { DIV_BY_ZERO, FallibleServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, TestServiceConstructor, } from './fixtures/services';
import { DIV_BY_ZERO, FallibleServiceConstructor, STREAM_ERROR, SubscribableServiceConstructor, UploadableServiceConstructor, TestServiceConstructor, } from './fixtures/services';
import { UNCAUGHT_ERROR } from '../router/result';

@@ -51,2 +51,16 @@ import { Observable } from './fixtures/observable';

});
test('stream with initialization', async () => {
const [input, output] = asClientStreamWithInitialization(initialState, service.procedures.echoWithPrefix, { prefix: 'test' });
input.push({ msg: 'abc', ignore: false });
input.push({ msg: 'def', ignore: true });
input.push({ msg: 'ghi', ignore: false });
input.end();
const result1 = await iterNext(output);
assert(result1 && result1.ok);
expect(result1.payload).toStrictEqual({ response: 'test abc' });
const result2 = await iterNext(output);
assert(result2 && result2.ok);
expect(result2.payload).toStrictEqual({ response: 'test ghi' });
expect(output.readableLength).toBe(0);
});
test('fallible stream', async () => {

@@ -89,2 +103,21 @@ const service = FallibleServiceConstructor();

});
test('uploads', async () => {
const service = UploadableServiceConstructor();
const [input, result] = asClientUpload({}, service.procedures.addMultiple);
input.push({ n: 1 });
input.push({ n: 2 });
input.end();
expect(await result).toStrictEqual({ ok: true, payload: { result: 3 } });
});
test('uploads with initialization', async () => {
const service = UploadableServiceConstructor();
const [input, result] = asClientUploadWithInitialization({}, service.procedures.addMultipleWithPrefix, { prefix: 'test' });
input.push({ n: 1 });
input.push({ n: 2 });
input.end();
expect(await result).toStrictEqual({
ok: true,
payload: { result: 'test 3' },
});
});
});

@@ -55,4 +55,4 @@ import { afterAll, assert, describe, expect, test } from 'vitest';

const client = createClient(clientTransport);
let serverListeners = serverTransport.messageHandlers.size;
let clientListeners = clientTransport.messageHandlers.size;
let serverListeners = serverTransport.eventDispatcher.numberOfListeners('message');
let clientListeners = clientTransport.eventDispatcher.numberOfListeners('message');
// start procedure

@@ -62,4 +62,4 @@ await client.test.add.rpc({ n: 3 });

// number of message handlers shouldn't increase after rpc
expect(serverTransport.messageHandlers.size).toEqual(serverListeners);
expect(clientTransport.messageHandlers.size).toEqual(clientListeners);
expect(serverTransport.eventDispatcher.numberOfListeners('message')).toEqual(serverListeners);
expect(clientTransport.eventDispatcher.numberOfListeners('message')).toEqual(clientListeners);
// check number of connections

@@ -78,4 +78,4 @@ expect(serverTransport.connections.size).toEqual(1);

const client = createClient(clientTransport);
let serverListeners = serverTransport.messageHandlers.size;
let clientListeners = clientTransport.messageHandlers.size;
let serverListeners = serverTransport.eventDispatcher.numberOfListeners('message');
let clientListeners = clientTransport.eventDispatcher.numberOfListeners('message');
// start procedure

@@ -99,4 +99,4 @@ const [input, output, close] = await client.test.echo.stream();

// number of message handlers shouldn't increase after stream ends
expect(serverTransport.messageHandlers.size).toEqual(serverListeners);
expect(clientTransport.messageHandlers.size).toEqual(clientListeners);
expect(serverTransport.eventDispatcher.numberOfListeners('message')).toEqual(serverListeners);
expect(clientTransport.eventDispatcher.numberOfListeners('message')).toEqual(clientListeners);
// check number of connections

@@ -115,4 +115,4 @@ expect(serverTransport.connections.size).toEqual(1);

const client = createClient(clientTransport);
let serverListeners = serverTransport.messageHandlers.size;
let clientListeners = clientTransport.messageHandlers.size;
let serverListeners = serverTransport.eventDispatcher.numberOfListeners('message');
let clientListeners = clientTransport.eventDispatcher.numberOfListeners('message');
// start procedure

@@ -131,4 +131,4 @@ const [subscription, close] = await client.test.value.subscribe({});

// number of message handlers shouldn't increase after stream ends
expect(serverTransport.messageHandlers.size).toEqual(serverListeners);
expect(clientTransport.messageHandlers.size).toEqual(clientListeners);
expect(serverTransport.eventDispatcher.numberOfListeners('message')).toEqual(serverListeners);
expect(clientTransport.eventDispatcher.numberOfListeners('message')).toEqual(clientListeners);
// check number of connections

@@ -135,0 +135,0 @@ expect(serverTransport.connections.size).toEqual(1);

@@ -49,2 +49,41 @@ import { expect, describe, test } from 'vitest';

},
echoWithPrefix: {
errors: {
not: {},
},
init: {
properties: {
prefix: {
type: 'string',
},
},
required: ['prefix'],
type: 'object',
},
input: {
properties: {
end: {
type: 'boolean',
},
ignore: {
type: 'boolean',
},
msg: {
type: 'string',
},
},
required: ['msg', 'ignore'],
type: 'object',
},
output: {
properties: {
response: {
type: 'string',
},
},
required: ['response'],
type: 'object',
},
type: 'stream',
},
},

@@ -51,0 +90,0 @@ });

@@ -10,4 +10,7 @@ import { describe, expect, test } from 'vitest';

import { Ok } from '../router/result';
const input = Type.Object({ a: Type.Number() });
const output = Type.Object({ b: Type.Number() });
const input = Type.Union([
Type.Object({ a: Type.Number() }),
Type.Object({ c: Type.String() }),
]);
const output = Type.Object({ b: Type.Union([Type.Number(), Type.String()]) });
const errors = Type.Union([

@@ -29,3 +32,8 @@ Type.Object({

async handler(_state, msg) {
return reply(msg, Ok({ b: msg.payload.a }));
if ('c' in msg.payload) {
return reply(msg, Ok({ b: msg.payload.c }));
}
else {
return reply(msg, Ok({ b: msg.payload.a }));
}
},

@@ -112,2 +120,4 @@ };

const client = createClient(new MockTransport('client'));
expect(client.d.f48.rpc({ a: 0 })).toBeTruthy();
expect(client.a.f2.rpc({ c: 'abc' })).toBeTruthy();
expect(server).toBeTruthy();

@@ -114,0 +124,0 @@ expect(client).toBeTruthy();

@@ -7,5 +7,7 @@ import { TObject, Static, TUnion } from '@sinclair/typebox';

/**
* The valid {@link Procedure} types.
* The valid {@link Procedure} types. The `stream` and `upload` types can optionally have a
* different type for the very first initialization message. The suffixless types correspond to
* gRPC's four combinations of stream / non-stream in each direction.
*/
export type ValidProcType = 'rpc' | 'stream' | 'subscription';
export type ValidProcType = 'rpc' | 'upload' | 'subscription' | 'stream';
/**

@@ -43,2 +45,16 @@ * A generic procedure listing where the keys are the names of the procedures

/**
* Helper to get whether the type definition for the procedure contains an init type.
* @template S - The service.
* @template ProcName - The name of the procedure.
*/
export type ProcHasInit<S extends AnyService, ProcName extends keyof S['procedures']> = S['procedures'][ProcName] extends {
init: any;
} ? true : false;
/**
* Helper to get the type definition for the procedure init type of a service.
* @template S - The service.
* @template ProcName - The name of the procedure.
*/
export type ProcInit<S extends AnyService, ProcName extends keyof S['procedures']> = S['procedures'][ProcName]['init'];
/**
* Helper to get the type definition for the procedure input of a service.

@@ -67,2 +83,3 @@ * @template S - The service.

export type ProcType<S extends AnyService, ProcName extends keyof S['procedures']> = S['procedures'][ProcName]['type'];
export type PayloadType = TObject | TUnion<TObject[]>;
/**

@@ -74,4 +91,5 @@ * Defines a Procedure type that can be either an RPC or a stream procedure.

* @template O - The TypeBox schema of the output object.
* @template Init - The TypeBox schema of the input initialization object.
*/
export type Procedure<State extends object | unknown, Ty extends ValidProcType, I extends TObject, O extends TObject, E extends RiverError> = Ty extends 'rpc' ? {
export type Procedure<State extends object | unknown, Ty extends ValidProcType, I extends PayloadType, O extends PayloadType, E extends RiverError, Init extends PayloadType | null = null> = Ty extends 'rpc' ? Init extends null ? {
input: I;

@@ -82,16 +100,36 @@ output: O;

type: Ty;
} : Ty extends 'stream' ? {
} : never : Ty extends 'upload' ? Init extends PayloadType ? {
init: Init;
input: I;
output: O;
errors: E;
handler: (context: ServiceContextWithState<State>, input: AsyncIterable<TransportMessage<Static<I>>>, output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>) => Promise<void>;
handler: (context: ServiceContextWithState<State>, init: TransportMessage<Static<Init>>, input: AsyncIterable<TransportMessage<Static<I>>>) => Promise<TransportMessage<Result<Static<O>, Static<E>>>>;
type: Ty;
} : Ty extends 'subscription' ? {
} : {
input: I;
output: O;
errors: E;
handler: (context: ServiceContextWithState<State>, input: AsyncIterable<TransportMessage<Static<I>>>) => Promise<TransportMessage<Result<Static<O>, Static<E>>>>;
type: Ty;
} : Ty extends 'subscription' ? Init extends null ? {
input: I;
output: O;
errors: E;
handler: (context: ServiceContextWithState<State>, input: TransportMessage<Static<I>>, output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>) => Promise<void>;
type: Ty;
} : never : Ty extends 'stream' ? Init extends PayloadType ? {
init: Init;
input: I;
output: O;
errors: E;
handler: (context: ServiceContextWithState<State>, init: TransportMessage<Static<Init>>, input: AsyncIterable<TransportMessage<Static<I>>>, output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>) => Promise<void>;
type: Ty;
} : {
input: I;
output: O;
errors: E;
handler: (context: ServiceContextWithState<State>, input: AsyncIterable<TransportMessage<Static<I>>>, output: Pushable<TransportMessage<Result<Static<O>, Static<E>>>>) => Promise<void>;
type: Ty;
} : never;
export type AnyProcedure = Procedure<object, ValidProcType, TObject, TObject, RiverError>;
export type AnyProcedure = Procedure<object, ValidProcType, PayloadType, PayloadType, RiverError, PayloadType | null>;
/**

@@ -124,10 +162,10 @@ * A builder class for creating River Services.

* @param {ProcName} procName The name of the procedure.
* @param {Procedure<T['state'], Ty, I, O>} procDef The definition of the procedure.
* @returns {ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { [k in ProcName]: Procedure<T['state'], Ty, I, O>; }; }>} A new ServiceBuilder instance with the updated schema.
* @param {Procedure<T['state'], Ty, I, O, E, Init>} procDef The definition of the procedure.
* @returns {ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { [k in ProcName]: Procedure<T['state'], Ty, I, O, E, Init>; }; }>} A new ServiceBuilder instance with the updated schema.
*/
defineProcedure<ProcName extends string, Ty extends ValidProcType, I extends TObject, O extends TObject, E extends RiverError>(procName: ProcName, procDef: Procedure<T['state'], Ty, I, O, E>): ServiceBuilder<{
defineProcedure<ProcName extends string, Ty extends ValidProcType, I extends PayloadType, O extends PayloadType, E extends RiverError, Init extends PayloadType | null = null>(procName: ProcName, procDef: Procedure<T['state'], Ty, I, O, E, Init>): ServiceBuilder<{
name: T['name'];
state: T['state'];
procedures: T['procedures'] & {
[k in ProcName]: Procedure<T['state'], Ty, I, O, E>;
[k in ProcName]: Procedure<T['state'], Ty, I, O, E, Init>;
};

@@ -134,0 +172,0 @@ }>;

@@ -18,2 +18,8 @@ import { Type } from '@sinclair/typebox';

type: procDef.type,
// Only add the `init` field if the type declares it.
...('init' in procDef
? {
init: Type.Strict(procDef.init),
}
: {}),
},

@@ -55,4 +61,4 @@ ])),

* @param {ProcName} procName The name of the procedure.
* @param {Procedure<T['state'], Ty, I, O>} procDef The definition of the procedure.
* @returns {ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { [k in ProcName]: Procedure<T['state'], Ty, I, O>; }; }>} A new ServiceBuilder instance with the updated schema.
* @param {Procedure<T['state'], Ty, I, O, E, Init>} procDef The definition of the procedure.
* @returns {ServiceBuilder<{ name: T['name']; state: T['state']; procedures: T['procedures'] & { [k in ProcName]: Procedure<T['state'], Ty, I, O, E, Init>; }; }>} A new ServiceBuilder instance with the updated schema.
*/

@@ -59,0 +65,0 @@ defineProcedure(procName, procDef) {

import { Connection, Transport } from '../transport/transport';
import { AnyService, ProcErrors, ProcInput, ProcOutput, ProcType } from './builder';
import { AnyService, ProcErrors, ProcHasInit, ProcInit, ProcInput, ProcOutput, ProcType } from './builder';
import type { Pushable } from 'it-pushable';

@@ -17,3 +17,19 @@ import { Server } from './server';

rpc: (input: Static<ProcInput<Router, ProcName>>) => Promise<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>;
} : ProcType<Router, ProcName> extends 'stream' ? {
} : ProcType<Router, ProcName> extends 'upload' ? ProcHasInit<Router, ProcName> extends true ? {
upload: (init: Static<ProcInit<Router, ProcName>>) => Promise<[
Pushable<Static<ProcInput<Router, ProcName>>>,
Promise<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>
]>;
} : {
upload: () => Promise<[
Pushable<Static<ProcInput<Router, ProcName>>>,
Promise<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>
]>;
} : ProcType<Router, ProcName> extends 'stream' ? ProcHasInit<Router, ProcName> extends true ? {
stream: (init: Static<ProcInit<Router, ProcName>>) => Promise<[
Pushable<Static<ProcInput<Router, ProcName>>>,
AsyncIter<Result<Static<ProcOutput<Router, ProcName>>, Static<ProcErrors<Router, ProcName>>>>,
() => void
]>;
} : {
stream: () => Promise<[

@@ -20,0 +36,0 @@ Pushable<Static<ProcInput<Router, ProcName>>>,

@@ -53,4 +53,11 @@ import { pushable } from 'it-pushable';

let firstMessage = true;
if (input) {
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input);
// first message needs the open bit.
m.controlFlags = 2 /* ControlFlags.StreamOpenBit */;
transport.send(m);
firstMessage = false;
}
// input -> transport
// this gets cleaned up on i.end() which is called by closeHandler
// this gets cleaned up on inputStream.end() which is called by closeHandler
(async () => {

@@ -75,3 +82,3 @@ for await (const rawIn of inputStream) {

};
transport.addMessageListener(listener);
transport.addEventListener('message', listener);
const closeHandler = () => {

@@ -81,3 +88,3 @@ inputStream.end();

transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId));
transport.removeMessageListener(listener);
transport.removeEventListener('message', listener);
};

@@ -108,10 +115,35 @@ return [inputStream, outputStream, closeHandler];

};
transport.addMessageListener(listener);
transport.addEventListener('message', listener);
const closeHandler = () => {
outputStream.end();
transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId));
transport.removeMessageListener(listener);
transport.removeEventListener('message', listener);
};
return [outputStream, closeHandler];
}
else if (procType === 'upload') {
const inputStream = pushable({ objectMode: true });
let firstMessage = true;
if (input) {
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input);
// first message needs the open bit.
m.controlFlags = 2 /* ControlFlags.StreamOpenBit */;
transport.send(m);
firstMessage = false;
}
// input -> transport
// this gets cleaned up on inputStream.end(), which the caller should call.
(async () => {
for await (const rawIn of inputStream) {
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, rawIn);
if (firstMessage) {
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */;
firstMessage = false;
}
transport.send(m);
}
transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId));
})();
return [inputStream, waitForMessage(transport, belongsToSameStream)];
}
else {

@@ -118,0 +150,0 @@ throw new Error(`invalid river call, unknown procedure type ${procType}`);

export { serializeService, ServiceBuilder } from './builder';
export type { ValidProcType, ProcListing, Service, ProcHandler, ProcInput, ProcOutput, ProcType, Procedure, } from './builder';
export type { ValidProcType, ProcListing, Service, ProcHandler, ProcInput, ProcOutput, ProcType, Procedure, PayloadType, } from './builder';
export { createClient } from './client';

@@ -4,0 +4,0 @@ export type { ServerClient } from './client';

@@ -1,4 +0,4 @@

import { Static, TObject } from '@sinclair/typebox';
import { Static } from '@sinclair/typebox';
import { Connection, Transport } from '../transport/transport';
import { AnyService } from './builder';
import { AnyService, PayloadType } from './builder';
import type { Pushable } from 'it-pushable';

@@ -19,3 +19,3 @@ import { TransportMessage } from '../transport/message';

incoming: Pushable<TransportMessage>;
outgoing: Pushable<TransportMessage<Result<Static<TObject>, Static<RiverError>>>>;
outgoing: Pushable<TransportMessage<Result<Static<PayloadType>, Static<RiverError>>>>;
promises: {

@@ -22,0 +22,0 @@ outputHandler: Promise<unknown>;

@@ -86,5 +86,18 @@ import { pushable } from 'it-pushable';

if (procedure.type === 'stream') {
inputHandler = procedure
.handler(serviceContext, incoming, outgoing)
.catch(errorHandler);
if ('init' in procedure) {
inputHandler = (async () => {
const initMessage = await incoming.next();
if (initMessage.done) {
return;
}
return procedure
.handler(serviceContext, initMessage.value, incoming, outgoing)
.catch(errorHandler);
})();
}
else {
inputHandler = procedure
.handler(serviceContext, incoming, outgoing)
.catch(errorHandler);
}
}

@@ -120,2 +133,30 @@ else if (procedure.type === 'rpc') {

}
else if (procedure.type === 'upload') {
if ('init' in procedure) {
inputHandler = (async () => {
const initMessage = await incoming.next();
if (initMessage.done) {
return;
}
try {
const outputMessage = await procedure.handler(serviceContext, initMessage.value, incoming);
outgoing.push(outputMessage);
}
catch (err) {
errorHandler(err);
}
})();
}
else {
inputHandler = (async () => {
try {
const outputMessage = await procedure.handler(serviceContext, incoming);
outgoing.push(outputMessage);
}
catch (err) {
errorHandler(err);
}
})();
}
}
else {

@@ -138,3 +179,4 @@ // procedure is inferred to be never here as this is not a valid procedure type

}
if (Value.Check(procedure.input, message.payload)) {
if (Value.Check(procedure.input, message.payload) ||
('init' in procedure && Value.Check(procedure.init, message.payload))) {
procStream.incoming.push(message);

@@ -149,3 +191,3 @@ }

};
transport.addMessageListener(handler);
transport.addEventListener('message', handler);
return {

@@ -155,3 +197,3 @@ services,

async close() {
transport.removeMessageListener(handler);
transport.removeEventListener('message', handler);
for (const streamIdx of streamMap.keys()) {

@@ -158,0 +200,0 @@ await cleanupStream(streamIdx);

@@ -15,3 +15,3 @@ // re-export

resolve(msg.payload);
t.removeMessageListener(onMessage);
t.removeEventListener('message', onMessage);
}

@@ -22,4 +22,4 @@ else if (rejectMismatch) {

}
t.addMessageListener(onMessage);
t.addEventListener('message', onMessage);
});
}
import { Codec } from '../codec/types';
import { MessageId, OpaqueTransportMessage, TransportClientId } from './message';
import { EventDispatcher, EventHandler, EventTypes } from './events';
/**

@@ -67,6 +68,2 @@ * A 1:1 connection between two transports. Once this is created,

/**
* The set of message handlers registered with this transport.
*/
messageHandlers: Set<(msg: OpaqueTransportMessage) => void>;
/**
* An array of message IDs that are waiting to be sent over the WebSocket connection.

@@ -85,2 +82,6 @@ * This builds up if the WebSocket is down for a period of time.

/**
* The event dispatcher for handling events of type EventTypes.
*/
eventDispatcher: EventDispatcher<EventTypes>;
/**
* Creates a new Transport instance.

@@ -131,11 +132,13 @@ * @param codec The codec used to encode and decode messages.

/**
* Adds a message listener to this transport.
* Adds a listener to this transport.
* @param the type of event to listen for
* @param handler The message handler to add.
*/
addMessageListener(handler: (msg: OpaqueTransportMessage) => void): void;
addEventListener<K extends EventTypes, T extends EventHandler<K>>(type: K, handler: T): void;
/**
* Removes a message listener from this transport.
* Removes a listener from this transport.
* @param the type of event to unlisten on
* @param handler The message handler to remove.
*/
removeMessageListener(handler: (msg: OpaqueTransportMessage) => void): void;
removeEventListener<K extends EventTypes, T extends EventHandler<K>>(type: K, handler: T): void;
/**

@@ -142,0 +145,0 @@ * Sends a message over this transport, delegating to the appropriate connection to actually

import { Value } from '@sinclair/typebox/value';
import { OpaqueTransportMessageSchema, TransportAckSchema, isAck, reply, } from './message';
import { log } from '../logging';
import { EventDispatcher } from './events';
/**

@@ -68,6 +69,2 @@ * A 1:1 connection between two transports. Once this is created,

/**
* The set of message handlers registered with this transport.
*/
messageHandlers;
/**
* An array of message IDs that are waiting to be sent over the WebSocket connection.

@@ -86,2 +83,6 @@ * This builds up if the WebSocket is down for a period of time.

/**
* The event dispatcher for handling events of type EventTypes.
*/
eventDispatcher;
/**
* Creates a new Transport instance.

@@ -92,3 +93,3 @@ * @param codec The codec used to encode and decode messages.

constructor(codec, clientId) {
this.messageHandlers = new Set();
this.eventDispatcher = new EventDispatcher();
this.sendBuffer = new Map();

@@ -108,2 +109,6 @@ this.sendQueue = new Map();

this.connections.set(conn.connectedTo, conn);
this.eventDispatcher.dispatchEvent('connectionStatus', {
status: 'connect',
conn,
});
// send outstanding

@@ -132,2 +137,6 @@ const outstanding = this.sendQueue.get(conn.connectedTo);

this.connections.delete(conn.connectedTo);
this.eventDispatcher.dispatchEvent('connectionStatus', {
status: 'disconnect',
conn,
});
}

@@ -183,5 +192,3 @@ /**

}
for (const handler of this.messageHandlers) {
handler(msg);
}
this.eventDispatcher.dispatchEvent('message', msg);
if (!isAck(msg.controlFlags)) {

@@ -196,14 +203,16 @@ const ackMsg = reply(msg, { ack: msg.id });

/**
* Adds a message listener to this transport.
* Adds a listener to this transport.
* @param the type of event to listen for
* @param handler The message handler to add.
*/
addMessageListener(handler) {
this.messageHandlers.add(handler);
addEventListener(type, handler) {
this.eventDispatcher.addEventListener(type, handler);
}
/**
* Removes a message listener from this transport.
* Removes a listener from this transport.
* @param the type of event to unlisten on
* @param handler The message handler to remove.
*/
removeMessageListener(handler) {
this.messageHandlers.delete(handler);
removeEventListener(type, handler) {
this.eventDispatcher.removeEventListener(type, handler);
}

@@ -210,0 +219,0 @@ /**

@@ -6,3 +6,3 @@ /// <reference types="node" />

import { WebSocketClientTransport } from '../transport/impls/ws/client';
import { Static, TObject } from '@sinclair/typebox';
import { Static } from '@sinclair/typebox';
import { Procedure, ServiceContext } from '../router';

@@ -14,2 +14,3 @@ import { OpaqueTransportMessage, TransportClientId, TransportMessage } from '../transport';

import { WebSocketServerTransport } from '../transport/impls/ws/server';
import { PayloadType } from '../router/builder';
/**

@@ -51,7 +52,7 @@ * Creates a WebSocket server instance using the provided HTTP server.

* @param {State} state - The state object.
* @param {Procedure<State, 'rpc', I, O>} proc - The RPC procedure to invoke.
* @param {Procedure<State, 'rpc', I, O, E, null>} proc - The RPC procedure to invoke.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - Optional extended context.
* @returns A function that can be used to invoke the RPC procedure.
*/
export declare function asClientRpc<State extends object | unknown, I extends TObject, O extends TObject, E extends RiverError>(state: State, proc: Procedure<State, 'rpc', I, O, E>, extendedContext?: Omit<ServiceContext, 'state'>): (msg: Static<I>) => Promise<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>;
export declare function asClientRpc<State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError>(state: State, proc: Procedure<State, 'rpc', I, O, E, null>, extendedContext?: Omit<ServiceContext, 'state'>): (msg: Static<I>) => Promise<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>;
/**

@@ -65,7 +66,7 @@ * Transforms a stream procedure definition into a pair of input and output streams.

* @param {State} state - The state object.
* @param {Procedure<State, 'stream', I, O>} proc - The procedure to handle the stream.
* @param {Procedure<State, 'stream', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns Pair of input and output streams.
*/
export declare function asClientStream<State extends object | unknown, I extends TObject, O extends TObject, E extends RiverError>(state: State, proc: Procedure<State, 'stream', I, O, E>, extendedContext?: Omit<ServiceContext, 'state'>): [
export declare function asClientStream<State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError>(state: State, proc: Procedure<State, 'stream', I, O, E, null>, extendedContext?: Omit<ServiceContext, 'state'>): [
Pushable<Static<I>>,

@@ -75,2 +76,18 @@ Pushable<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>

/**
* Transforms a stream procedure definition into a pair of input and output streams.
* Input messages can be pushed into the input stream.
* This should only be used for testing.
* @template State - The type of the state object.
* @template I - The type of the input object.
* @template O - The type of the output object.
* @param {State} state - The state object.
* @param {Procedure<State, 'stream', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns Pair of input and output streams.
*/
export declare function asClientStreamWithInitialization<State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, Init extends PayloadType>(state: State, proc: Procedure<State, 'stream', I, O, E, Init>, init: Static<PayloadType>, extendedContext?: Omit<ServiceContext, 'state'>): [
Pushable<Static<I>>,
Pushable<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>
];
/**
* Transforms a subscription procedure definition into a procedure that returns an output stream.

@@ -83,8 +100,42 @@ * Input messages can be pushed into the input stream.

* @param {State} state - The state object.
* @param {Procedure<State, 'stream', I, O>} proc - The procedure to handle the stream.
* @param {Procedure<State, 'stream', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns A function that when passed a message, returns the output stream.
*/
export declare function asClientSubscription<State extends object | unknown, I extends TObject, O extends TObject, E extends RiverError>(state: State, proc: Procedure<State, 'subscription', I, O, E>, extendedContext?: Omit<ServiceContext, 'state'>): (msg: Static<I>) => Promise<Pushable<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>>;
export declare function asClientSubscription<State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError>(state: State, proc: Procedure<State, 'subscription', I, O, E, null>, extendedContext?: Omit<ServiceContext, 'state'>): (msg: Static<I>) => Promise<Pushable<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>>;
/**
* Transforms an upload procedure definition into a procedure that returns an input stream.
* Input messages can be pushed into the input stream.
* This should only be used for testing.
* @template State - The type of the state object.
* @template I - The type of the input object.
* @template O - The type of the output object.
* @param {State} state - The state object.
* @param {Procedure<State, 'upload', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns A function that when passed a message, returns the output stream.
*/
export declare function asClientUpload<State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError>(state: State, proc: Procedure<State, 'upload', I, O, E, null>, extendedContext?: Omit<ServiceContext, 'state'>): [
Pushable<Static<I>>,
Promise<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>
];
/**
* Transforms an upload with initialization procedure definition into a procedure that returns an
* input stream.
* Input messages can be pushed into the input stream.
* This should only be used for testing.
* @template State - The type of the state object.
* @template Init - The type of the init object.
* @template I - The type of the input object.
* @template O - The type of the output object.
* @param {State} state - The state object.
* @param {Procedure<State, 'upload', I, O, E, Init>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns A function that when passed a message, returns the output stream.
*/
export declare function asClientUploadWithInitialization<State extends object | unknown, I extends PayloadType, O extends PayloadType, E extends RiverError, Init extends PayloadType>(state: State, proc: Procedure<State, 'upload', I, O, E, Init>, init: Static<Init>, extendedContext?: Omit<ServiceContext, 'state'>): [
Pushable<Static<I>>,
Promise<Result<Static<O>, Static<E> | Static<typeof RiverUncaughtSchema>>>
];
/**
* Converts a payload object to a transport message with reasonable defaults.

@@ -91,0 +142,0 @@ * This should only be used for testing.

@@ -68,3 +68,3 @@ import WebSocket from 'isomorphic-ws';

* @param {State} state - The state object.
* @param {Procedure<State, 'rpc', I, O>} proc - The RPC procedure to invoke.
* @param {Procedure<State, 'rpc', I, O, E, null>} proc - The RPC procedure to invoke.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - Optional extended context.

@@ -93,3 +93,3 @@ * @returns A function that can be used to invoke the RPC procedure.

* @param {State} state - The state object.
* @param {Procedure<State, 'stream', I, O>} proc - The procedure to handle the stream.
* @param {Procedure<State, 'stream', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.

@@ -139,2 +139,54 @@ * @returns Pair of input and output streams.

/**
* Transforms a stream procedure definition into a pair of input and output streams.
* Input messages can be pushed into the input stream.
* This should only be used for testing.
* @template State - The type of the state object.
* @template I - The type of the input object.
* @template O - The type of the output object.
* @param {State} state - The state object.
* @param {Procedure<State, 'stream', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns Pair of input and output streams.
*/
export function asClientStreamWithInitialization(state, proc, init, extendedContext) {
const rawInput = pushable({ objectMode: true });
const rawOutput = pushable({
objectMode: true,
});
const transportInput = pushable({
objectMode: true,
});
const transportOutput = pushable({
objectMode: true,
});
// wrapping in transport
(async () => {
for await (const rawIn of rawInput) {
transportInput.push(payloadToTransportMessage(rawIn));
}
transportInput.end();
})();
// unwrap from transport
(async () => {
for await (const transportRes of transportOutput) {
rawOutput.push(transportRes.payload);
}
})();
// handle
(async () => {
try {
await proc.handler({ ...extendedContext, state }, payloadToTransportMessage(init), transportInput, transportOutput);
}
catch (err) {
const errorMsg = err instanceof Error ? err.message : `[coerced to error] ${err}`;
transportOutput.push(reply(payloadToTransportMessage({}), Err({
code: UNCAUGHT_ERROR,
message: errorMsg,
})));
}
transportOutput.end();
})();
return [rawInput, rawOutput];
}
/**
* Transforms a subscription procedure definition into a procedure that returns an output stream.

@@ -147,3 +199,3 @@ * Input messages can be pushed into the input stream.

* @param {State} state - The state object.
* @param {Procedure<State, 'stream', I, O>} proc - The procedure to handle the stream.
* @param {Procedure<State, 'stream', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.

@@ -179,2 +231,80 @@ * @returns A function that when passed a message, returns the output stream.

/**
* Transforms an upload procedure definition into a procedure that returns an input stream.
* Input messages can be pushed into the input stream.
* This should only be used for testing.
* @template State - The type of the state object.
* @template I - The type of the input object.
* @template O - The type of the output object.
* @param {State} state - The state object.
* @param {Procedure<State, 'upload', I, O, E, null>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns A function that when passed a message, returns the output stream.
*/
export function asClientUpload(state, proc, extendedContext) {
const rawInput = pushable({ objectMode: true });
const transportInput = pushable({
objectMode: true,
});
// wrapping in transport
(async () => {
for await (const rawIn of rawInput) {
transportInput.push(payloadToTransportMessage(rawIn));
}
transportInput.end();
})();
return [
rawInput,
proc
.handler({ ...extendedContext, state }, transportInput)
.then((res) => res.payload)
.catch((err) => {
const errorMsg = err instanceof Error ? err.message : `[coerced to error] ${err}`;
return Err({
code: UNCAUGHT_ERROR,
message: errorMsg,
});
}),
];
}
/**
* Transforms an upload with initialization procedure definition into a procedure that returns an
* input stream.
* Input messages can be pushed into the input stream.
* This should only be used for testing.
* @template State - The type of the state object.
* @template Init - The type of the init object.
* @template I - The type of the input object.
* @template O - The type of the output object.
* @param {State} state - The state object.
* @param {Procedure<State, 'upload', I, O, E, Init>} proc - The procedure to handle the stream.
* @param {Omit<ServiceContext, 'state'>} [extendedContext] - The extended context object.
* @returns A function that when passed a message, returns the output stream.
*/
export function asClientUploadWithInitialization(state, proc, init, extendedContext) {
const rawInput = pushable({ objectMode: true });
const transportInput = pushable({
objectMode: true,
});
// wrapping in transport
(async () => {
for await (const rawIn of rawInput) {
transportInput.push(payloadToTransportMessage(rawIn));
}
transportInput.end();
})();
return [
rawInput,
proc
.handler({ ...extendedContext, state }, payloadToTransportMessage(init), transportInput)
.then((res) => res.payload)
.catch((err) => {
const errorMsg = err instanceof Error ? err.message : `[coerced to error] ${err}`;
return Err({
code: UNCAUGHT_ERROR,
message: errorMsg,
});
}),
];
}
/**
* Converts a payload object to a transport message with reasonable defaults.

@@ -181,0 +311,0 @@ * This should only be used for testing.

@@ -5,3 +5,3 @@ {

"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.8.1",
"version": "0.9.0",
"type": "module",

@@ -37,12 +37,2 @@ "exports": {

},
"scripts": {
"check": "tsc --noEmit && npx prettier . --check",
"format": "npx prettier . --write",
"build": "rm -rf ./dist && tsc",
"prepack": "npm run build",
"release": "npm publish --access public",
"test:ui": "echo \"remember to go to /__vitest__ in the webview\" && vitest --ui --api.host 0.0.0.0 --api.port 3000",
"test": "vitest --test-timeout=500",
"bench": "vitest bench"
},
"engines": {

@@ -57,3 +47,12 @@ "node": ">=16"

"author": "Jacky Zhao",
"license": "MIT"
}
"license": "MIT",
"scripts": {
"check": "tsc --noEmit && npx prettier . --check",
"format": "npx prettier . --write",
"build": "rm -rf ./dist && tsc",
"release": "npm publish --access public",
"test:ui": "echo \"remember to go to /__vitest__ in the webview\" && vitest --ui --api.host 0.0.0.0 --api.port 3000",
"test": "vitest --test-timeout=500",
"bench": "vitest bench"
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc