@replit/river
Advanced tools
Comparing version 0.8.0 to 0.8.1
import http from 'http'; | ||
import { assert, bench, describe } from 'vitest'; | ||
import { createWebSocketServer, createWsTransports, onServerReady, } from '../util/testHelpers'; | ||
import largePayload from './largePayload.json'; | ||
import largePayload from './fixtures/largePayload.json'; | ||
import { TestServiceConstructor } from './fixtures/services'; | ||
@@ -6,0 +6,0 @@ import { createServer } from '../router/server'; |
@@ -11,2 +11,3 @@ import { afterAll, assert, describe, expect, test } from 'vitest'; | ||
import { WebSocketServerTransport } from '../transport/impls/ws/server'; | ||
import { testFinishesCleanly } from './fixtures/cleanup'; | ||
describe.each(codecs)('client <-> server integration test ($name codec)', async ({ codec }) => { | ||
@@ -18,5 +19,3 @@ const httpServer = http.createServer(); | ||
afterAll(() => { | ||
webSocketServer.clients.forEach((socket) => { | ||
socket.close(); | ||
}); | ||
webSocketServer.close(); | ||
httpServer.close(); | ||
@@ -32,2 +31,7 @@ }); | ||
expect(result.payload).toStrictEqual({ result: 3 }); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -51,2 +55,7 @@ test('fallible rpc', async () => { | ||
}); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -62,2 +71,7 @@ test('rpc with binary (uint8array)', async () => { | ||
expect(new TextDecoder().decode(result.payload.contents)).toStrictEqual('contents for file test.py'); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -73,2 +87,3 @@ test('stream', async () => { | ||
input.push({ msg: 'ghi', ignore: false }); | ||
input.push({ msg: 'end', ignore: false, end: true }); | ||
input.end(); | ||
@@ -81,3 +96,14 @@ const result1 = await iterNext(output); | ||
expect(result2.payload).toStrictEqual({ response: 'ghi' }); | ||
const result3 = await iterNext(output); | ||
assert(result3.ok); | ||
expect(result3.payload).toStrictEqual({ response: 'end' }); | ||
// after the server stream is ended, the client stream should be ended too | ||
const result4 = await output.next(); | ||
assert(result4.done); | ||
close(); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -106,2 +132,7 @@ test('fallible stream', async () => { | ||
close(); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -143,2 +174,7 @@ test('subscription', async () => { | ||
close2(); | ||
await testFinishesCleanly({ | ||
clientTransports: [client1Transport, client2Transport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -165,3 +201,8 @@ test('message order is preserved in the face of disconnects', async () => { | ||
assert(res.ok); | ||
return expect(res.payload.msgs).toStrictEqual(expected); | ||
expect(res.payload.msgs).toStrictEqual(expected); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -183,2 +224,7 @@ const CONCURRENCY = 10; | ||
} | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
@@ -212,3 +258,8 @@ test('concurrent streams', async () => { | ||
} | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
server, | ||
}); | ||
}); | ||
}); |
@@ -5,2 +5,3 @@ import { Observable } from './observable'; | ||
ignore: import("@sinclair/typebox").TBoolean; | ||
end: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TBoolean>; | ||
}>; | ||
@@ -38,2 +39,3 @@ export declare const EchoResponse: import("@sinclair/typebox").TObject<{ | ||
ignore: import("@sinclair/typebox").TBoolean; | ||
end: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TBoolean>; | ||
}>; | ||
@@ -47,2 +49,3 @@ output: import("@sinclair/typebox").TObject<{ | ||
}>, input: AsyncIterable<import("../../transport/message").TransportMessage<{ | ||
end?: boolean | undefined; | ||
msg: string; | ||
@@ -49,0 +52,0 @@ ignore: boolean; |
@@ -9,2 +9,3 @@ import { Type } from '@sinclair/typebox'; | ||
ignore: Type.Boolean(), | ||
end: Type.Optional(Type.Boolean()), | ||
}); | ||
@@ -38,2 +39,5 @@ export const EchoResponse = Type.Object({ response: Type.String() }); | ||
} | ||
if (req.end) { | ||
returnStream.end(); | ||
} | ||
} | ||
@@ -40,0 +44,0 @@ }, |
@@ -34,2 +34,3 @@ import { expect, describe, test } from 'vitest'; | ||
ignore: { type: 'boolean' }, | ||
end: { type: 'boolean' }, | ||
}, | ||
@@ -36,0 +37,0 @@ required: ['msg', 'ignore'], |
import { pushable } from 'it-pushable'; | ||
import { msg, } from '../transport/message'; | ||
import { msg, isStreamClose, closeStream, } from '../transport/message'; | ||
import { waitForMessage } from '../transport'; | ||
@@ -52,2 +52,3 @@ import { nanoid } from 'nanoid'; | ||
const outputStream = pushable({ objectMode: true }); | ||
let firstMessage = true; | ||
// input -> transport | ||
@@ -58,3 +59,6 @@ // this gets cleaned up on i.end() which is called by closeHandler | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, rawIn); | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
if (firstMessage) { | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
firstMessage = false; | ||
} | ||
transport.send(m); | ||
@@ -65,3 +69,6 @@ } | ||
const listener = (msg) => { | ||
if (belongsToSameStream(msg)) { | ||
if (isStreamClose(msg.controlFlags)) { | ||
outputStream.end(); | ||
} | ||
else if (belongsToSameStream(msg)) { | ||
outputStream.push(msg.payload); | ||
@@ -74,5 +81,3 @@ } | ||
outputStream.end(); | ||
const closeMessage = msg(transport.clientId, serverId, serviceName, procName, streamId, { type: 'CLOSE' }); | ||
closeMessage.controlFlags |= 4 /* ControlFlags.StreamClosedBit */; | ||
transport.send(closeMessage); | ||
transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId)); | ||
transport.removeMessageListener(listener); | ||
@@ -100,2 +105,5 @@ }; | ||
} | ||
if (isStreamClose(msg.controlFlags)) { | ||
outputStream.end(); | ||
} | ||
}; | ||
@@ -105,2 +113,3 @@ transport.addMessageListener(listener); | ||
outputStream.end(); | ||
transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId)); | ||
transport.removeMessageListener(listener); | ||
@@ -107,0 +116,0 @@ }; |
@@ -0,4 +1,8 @@ | ||
import { Static, TObject } from '@sinclair/typebox'; | ||
import { Connection, Transport } from '../transport/transport'; | ||
import { AnyService } from './builder'; | ||
import type { Pushable } from 'it-pushable'; | ||
import { TransportMessage } from '../transport/message'; | ||
import { ServiceContext } from './context'; | ||
import { Result, RiverError } from './result'; | ||
/** | ||
@@ -10,4 +14,13 @@ * Represents a server with a set of services. Use {@link createServer} to create it. | ||
services: Services; | ||
streams: Map<string, ProcStream>; | ||
close(): Promise<void>; | ||
} | ||
interface ProcStream { | ||
incoming: Pushable<TransportMessage>; | ||
outgoing: Pushable<TransportMessage<Result<Static<TObject>, Static<RiverError>>>>; | ||
promises: { | ||
outputHandler: Promise<unknown>; | ||
inputHandler: Promise<unknown>; | ||
}; | ||
} | ||
/** | ||
@@ -22,2 +35,3 @@ * Creates a server instance that listens for incoming messages from a transport and routes them to the appropriate service and procedure. | ||
export declare function createServer<Services extends Record<string, AnyService>>(transport: Transport<Connection>, services: Services, extendedContext?: Omit<ServiceContext, 'state'>): Promise<Server<Services>>; | ||
export {}; | ||
//# sourceMappingURL=server.d.ts.map |
import { pushable } from 'it-pushable'; | ||
import { ControlMessagePayloadSchema, isStreamClose, isStreamOpen, reply, } from '../transport/message'; | ||
import { ControlMessagePayloadSchema, isStreamClose, isStreamOpen, reply, closeStream, } from '../transport/message'; | ||
import { log } from '../logging'; | ||
@@ -18,2 +18,12 @@ import { Value } from '@sinclair/typebox/value'; | ||
const streamMap = new Map(); | ||
async function cleanupStream(id) { | ||
const stream = streamMap.get(id); | ||
if (stream) { | ||
stream.incoming.end(); | ||
await stream.promises.inputHandler; | ||
stream.outgoing.end(); | ||
await stream.promises.outputHandler; | ||
streamMap.delete(id); | ||
} | ||
} | ||
function getContext(service) { | ||
@@ -32,34 +42,40 @@ const context = contextMap.get(service); | ||
} | ||
const handler = async (msg) => { | ||
if (msg.to !== transport.clientId) { | ||
const handler = async (message) => { | ||
if (message.to !== transport.clientId) { | ||
log?.info(`${transport.clientId} -- got msg with destination that isn't the server, ignoring`); | ||
return; | ||
} | ||
if (!(msg.serviceName in services)) { | ||
log?.warn(`${transport.clientId} -- couldn't find service ${msg.serviceName}`); | ||
if (!(message.serviceName in services)) { | ||
log?.warn(`${transport.clientId} -- couldn't find service ${message.serviceName}`); | ||
return; | ||
} | ||
const service = services[msg.serviceName]; | ||
const service = services[message.serviceName]; | ||
const serviceContext = getContext(service); | ||
if (!(msg.procedureName in service.procedures)) { | ||
log?.warn(`${transport.clientId} -- couldn't find a matching procedure for ${msg.serviceName}.${msg.procedureName}`); | ||
if (!(message.procedureName in service.procedures)) { | ||
log?.warn(`${transport.clientId} -- couldn't find a matching procedure for ${message.serviceName}.${message.procedureName}`); | ||
return; | ||
} | ||
const procedure = service.procedures[msg.procedureName]; | ||
const streamIdx = `${msg.serviceName}.${msg.procedureName}:${msg.streamId}`; | ||
if (isStreamOpen(msg.controlFlags) && !streamMap.has(streamIdx)) { | ||
const procedure = service.procedures[message.procedureName]; | ||
const streamIdx = `${message.serviceName}.${message.procedureName}:${message.streamId}`; | ||
if (isStreamOpen(message.controlFlags) && !streamMap.has(streamIdx)) { | ||
const incoming = pushable({ objectMode: true }); | ||
const outgoing = pushable({ objectMode: true }); | ||
const openPromises = [ | ||
// sending outgoing messages back to client | ||
(async () => { | ||
for await (const response of outgoing) { | ||
transport.send(response); | ||
} | ||
})(), | ||
]; | ||
const outputHandler = | ||
// sending outgoing messages back to client | ||
(async () => { | ||
for await (const response of outgoing) { | ||
transport.send(response); | ||
} | ||
// we ended, send a close bit back to the client | ||
// only subscriptions and streams have streams the | ||
// handler can close | ||
if (procedure.type === 'subscription' || | ||
procedure.type === 'stream') { | ||
transport.send(closeStream(transport.clientId, message.from, message.serviceName, message.procedureName, message.streamId)); | ||
} | ||
})(); | ||
function errorHandler(err) { | ||
const errorMsg = err instanceof Error ? err.message : `[coerced to error] ${err}`; | ||
log?.error(`${transport.clientId} -- procedure ${msg.serviceName}.${msg.procedureName}:${msg.streamId} threw an error: ${errorMsg}`); | ||
outgoing.push(reply(msg, Err({ | ||
log?.error(`${transport.clientId} -- procedure ${message.serviceName}.${message.procedureName}:${message.streamId} threw an error: ${errorMsg}`); | ||
outgoing.push(reply(message, Err({ | ||
code: UNCAUGHT_ERROR, | ||
@@ -70,9 +86,10 @@ message: errorMsg, | ||
// pump incoming message stream -> handler -> outgoing message stream | ||
let inputHandler; | ||
if (procedure.type === 'stream') { | ||
openPromises.push(procedure | ||
inputHandler = procedure | ||
.handler(serviceContext, incoming, outgoing) | ||
.catch(errorHandler)); | ||
.catch(errorHandler); | ||
} | ||
else if (procedure.type === 'rpc') { | ||
openPromises.push((async () => { | ||
inputHandler = (async () => { | ||
const inputMessage = await incoming.next(); | ||
@@ -89,6 +106,6 @@ if (inputMessage.done) { | ||
} | ||
})()); | ||
})(); | ||
} | ||
else if (procedure.type === 'subscription') { | ||
openPromises.push((async () => { | ||
inputHandler = (async () => { | ||
const inputMessage = await incoming.next(); | ||
@@ -104,8 +121,14 @@ if (inputMessage.done) { | ||
} | ||
})()); | ||
})(); | ||
} | ||
else { | ||
// procedure is inferred to be never here as this is not a valid procedure type | ||
// we cast just to log | ||
log?.warn(`${transport.clientId} -- got request for invalid procedure type ${procedure.type} at ${message.serviceName}.${message.procedureName}`); | ||
return; | ||
} | ||
streamMap.set(streamIdx, { | ||
incoming, | ||
outgoing, | ||
openPromises, | ||
promises: { inputHandler, outputHandler }, | ||
}); | ||
@@ -115,15 +138,13 @@ } | ||
if (!procStream) { | ||
log?.warn(`${transport.clientId} -- couldn't find a matching procedure stream for ${msg.serviceName}.${msg.procedureName}:${msg.streamId}`); | ||
log?.warn(`${transport.clientId} -- couldn't find a matching procedure stream for ${message.serviceName}.${message.procedureName}:${message.streamId}`); | ||
return; | ||
} | ||
if (Value.Check(procedure.input, msg.payload)) { | ||
procStream.incoming.push(msg); | ||
if (Value.Check(procedure.input, message.payload)) { | ||
procStream.incoming.push(message); | ||
} | ||
else if (!Value.Check(ControlMessagePayloadSchema, msg.payload)) { | ||
log?.error(`${transport.clientId} -- procedure ${msg.serviceName}.${msg.procedureName} received invalid payload: ${JSON.stringify(msg.payload)}`); | ||
else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { | ||
log?.error(`${transport.clientId} -- procedure ${message.serviceName}.${message.procedureName} received invalid payload: ${JSON.stringify(message.payload)}`); | ||
} | ||
if (isStreamClose(msg.controlFlags)) { | ||
procStream.incoming.end(); | ||
await Promise.all(procStream.openPromises); | ||
procStream.outgoing.end(); | ||
if (isStreamClose(message.controlFlags)) { | ||
await cleanupStream(streamIdx); | ||
} | ||
@@ -134,8 +155,7 @@ }; | ||
services, | ||
streams: streamMap, | ||
async close() { | ||
transport.removeMessageListener(handler); | ||
for (const [_, stream] of streamMap) { | ||
stream.incoming.end(); | ||
await Promise.all(stream.openPromises); | ||
stream.outgoing.end(); | ||
for (const streamIdx of streamMap.keys()) { | ||
await cleanupStream(streamIdx); | ||
} | ||
@@ -142,0 +162,0 @@ }, |
@@ -6,5 +6,2 @@ /// <reference types="node" /> | ||
export declare class StdioConnection extends Connection { | ||
/** | ||
* The writable stream to use as output. | ||
*/ | ||
output: NodeJS.WritableStream; | ||
@@ -20,3 +17,2 @@ constructor(transport: Transport<StdioConnection>, connectedTo: TransportClientId, output: NodeJS.WritableStream); | ||
* A transport implementation that uses standard input and output streams. | ||
* Can only be used 1:1, not N:1 | ||
* @extends Transport | ||
@@ -23,0 +19,0 @@ */ |
@@ -7,5 +7,2 @@ import { NaiveJsonCodec } from '../../../codec/json'; | ||
export class StdioConnection extends Connection { | ||
/** | ||
* The writable stream to use as output. | ||
*/ | ||
output; | ||
@@ -23,3 +20,2 @@ constructor(transport, connectedTo, output) { | ||
async close() { | ||
this.transport.onDisconnect(this); | ||
this.output.end(); | ||
@@ -33,3 +29,2 @@ } | ||
* A transport implementation that uses standard input and output streams. | ||
* Can only be used 1:1, not N:1 | ||
* @extends Transport | ||
@@ -36,0 +31,0 @@ */ |
@@ -6,2 +6,3 @@ import { describe, test, expect } from 'vitest'; | ||
import { payloadToTransportMessage } from '../../../util/testHelpers'; | ||
import { ensureTransportIsClean } from '../../../__tests__/fixtures/cleanup'; | ||
describe('sending and receiving across node streams works', () => { | ||
@@ -20,3 +21,7 @@ test('basic send/receive', async () => { | ||
await expect(p).resolves.toStrictEqual(msg); | ||
await clientTransport.close(); | ||
await serverTransport.close(); | ||
await ensureTransportIsClean(clientTransport); | ||
await ensureTransportIsClean(serverTransport); | ||
}); | ||
}); |
@@ -86,9 +86,9 @@ import { Transport } from '../../transport'; | ||
// otherwise try and reconnect again | ||
log?.warn(`${this.clientId} -- websocket failed, trying again in ${this.options.retryIntervalMs}ms`); | ||
this.reconnectPromises.delete(to); | ||
if (attempt >= this.options.retryAttemptsMax) { | ||
return; | ||
throw new Error(`${this.clientId} -- websocket to ${to} failed after ${attempt} attempts, giving up`); | ||
} | ||
else { | ||
// linear backoff | ||
log?.warn(`${this.clientId} -- websocket to ${to} failed, trying again in ${this.options.retryIntervalMs * attempt}ms`); | ||
setTimeout(() => this.createNewConnection(to, attempt + 1), this.options.retryIntervalMs * attempt); | ||
@@ -95,0 +95,0 @@ } |
@@ -8,3 +8,4 @@ import { Connection } from '../../transport'; | ||
ws.binaryType = 'arraybuffer'; | ||
this.ws.onmessage = (msg) => this.onMessage(msg.data); | ||
// take over the onmessage for this websocket | ||
this.ws.onmessage = (msg) => transport.onMessage(msg.data); | ||
} | ||
@@ -11,0 +12,0 @@ send(payload) { |
@@ -15,6 +15,4 @@ import { Codec } from '../../../codec'; | ||
createNewConnection(to: string): Promise<void>; | ||
destroy(): Promise<void>; | ||
close(): Promise<void>; | ||
} | ||
export {}; | ||
//# sourceMappingURL=server.d.ts.map |
@@ -22,4 +22,7 @@ import { NaiveJsonCodec } from '../../../codec'; | ||
ws.onmessage = (msg) => { | ||
// when we establish WebSocketConnection, ws.onmessage | ||
// gets overriden so this only runs on the first valid message | ||
// the websocket receives | ||
const parsedMsg = this.parseMsg(msg.data); | ||
if (parsedMsg) { | ||
if (parsedMsg && !conn) { | ||
conn = new WebSocketConnection(this, parsedMsg.from, ws); | ||
@@ -46,10 +49,2 @@ this.onConnect(conn); | ||
} | ||
async destroy() { | ||
super.destroy(); | ||
this.wss.close(); | ||
} | ||
async close() { | ||
super.close(); | ||
this.wss.close(); | ||
} | ||
} |
@@ -7,2 +7,3 @@ import http from 'http'; | ||
import { WebSocketClientTransport } from './client'; | ||
import { testFinishesCleanly } from '../../../__tests__/fixtures/cleanup'; | ||
describe('sending and receiving across websockets works', async () => { | ||
@@ -13,5 +14,3 @@ const server = http.createServer(); | ||
afterAll(() => { | ||
wss.clients.forEach((socket) => { | ||
socket.close(); | ||
}); | ||
wss.close(); | ||
server.close(); | ||
@@ -23,3 +22,7 @@ }); | ||
clientTransport.send(msg); | ||
return expect(waitForMessage(serverTransport, (recv) => recv.id === msg.id)).resolves.toStrictEqual(msg.payload); | ||
await expect(waitForMessage(serverTransport, (recv) => recv.id === msg.id)).resolves.toStrictEqual(msg.payload); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
}); | ||
}); | ||
@@ -56,2 +59,6 @@ test('sending respects to/from fields', async () => { | ||
await expect(promises).resolves.toStrictEqual([msg1.payload, msg2.payload]); | ||
await testFinishesCleanly({ | ||
clientTransports: [client1, client2], | ||
serverTransport, | ||
}); | ||
}); | ||
@@ -64,5 +71,3 @@ }); | ||
afterAll(() => { | ||
wss.clients.forEach((socket) => { | ||
socket.close(); | ||
}); | ||
wss.close(); | ||
server.close(); | ||
@@ -81,3 +86,8 @@ }); | ||
clientTransport.send(msg2); | ||
return expect(waitForMessage(serverTransport, (recv) => recv.id === msg2.id)).resolves.toStrictEqual(msg2.payload); | ||
// by this point the client should have reconnected | ||
await expect(waitForMessage(serverTransport, (recv) => recv.id === msg2.id)).resolves.toStrictEqual(msg2.payload); | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
}); | ||
}); | ||
@@ -92,3 +102,9 @@ test('ws transport is recreated after unclean disconnect', async () => { | ||
clientTransport.send(msg2); | ||
return expect(waitForMessage(serverTransport, (recv) => recv.id === msg2.id)).resolves.toStrictEqual(msg2.payload); | ||
// by this point the client should have reconnected | ||
await expect(waitForMessage(serverTransport, (recv) => recv.id === msg2.id)).resolves.toStrictEqual(msg2.payload); | ||
// this is not expected to be clean because we destroyed the transport | ||
await testFinishesCleanly({ | ||
clientTransports: [clientTransport], | ||
serverTransport, | ||
}); | ||
}); | ||
@@ -102,4 +118,8 @@ test('ws transport is not recreated after destroy', async () => { | ||
clientTransport.destroy(); | ||
return expect(() => clientTransport.send(msg2)).toThrow(new Error('transport is destroyed, cant send')); | ||
expect(() => clientTransport.send(msg2)).toThrow(new Error('transport is destroyed, cant send')); | ||
// this is not expected to be clean because we destroyed the transport | ||
expect(clientTransport.state).toEqual('destroyed'); | ||
await clientTransport.close(); | ||
await serverTransport.close(); | ||
}); | ||
}); |
import { OpaqueTransportMessage } from './message'; | ||
import { Connection, Transport } from './transport'; | ||
export { Transport } from './transport'; | ||
import { Transport, Connection } from './transport'; | ||
export { Transport, Connection } from './transport'; | ||
export { TransportMessageSchema, OpaqueTransportMessageSchema, msg, reply, } from './message'; | ||
export type { TransportMessage, MessageId, OpaqueTransportMessage, TransportClientId, } from './message'; | ||
export type { TransportMessage, MessageId, OpaqueTransportMessage, TransportClientId, isStreamOpen, isStreamClose, } from './message'; | ||
/** | ||
@@ -7,0 +7,0 @@ * Waits for a message from the transport. |
// re-export | ||
export { Transport } from './transport'; | ||
export { Transport, Connection } from './transport'; | ||
export { TransportMessageSchema, OpaqueTransportMessageSchema, msg, reply, } from './message'; | ||
@@ -4,0 +4,0 @@ /** |
@@ -107,2 +107,12 @@ import { TSchema } from '@sinclair/typebox'; | ||
/** | ||
* Create a request to close a stream | ||
* @param from The ID of the client initiating the close. | ||
* @param to The ID of the client being closed. | ||
* @param respondTo The transport message to respond to. | ||
* @returns The close message | ||
*/ | ||
export declare function closeStream(from: TransportClientId, to: TransportClientId, service: string, proc: string, stream: string): TransportMessage<{ | ||
type: "CLOSE"; | ||
}>; | ||
/** | ||
* Checks if the given control flag (usually found in msg.controlFlag) is an ack message. | ||
@@ -109,0 +119,0 @@ * @param controlFlag - The control flag to check. |
@@ -68,2 +68,3 @@ import { Type } from '@sinclair/typebox'; | ||
...msg, | ||
controlFlags: 0, | ||
id: nanoid(), | ||
@@ -76,2 +77,16 @@ to: msg.from, | ||
/** | ||
* Create a request to close a stream | ||
* @param from The ID of the client initiating the close. | ||
* @param to The ID of the client being closed. | ||
* @param respondTo The transport message to respond to. | ||
* @returns The close message | ||
*/ | ||
export function closeStream(from, to, service, proc, stream) { | ||
const closeMessage = msg(from, to, service, proc, stream, { | ||
type: 'CLOSE', | ||
}); | ||
closeMessage.controlFlags |= 4 /* ControlFlags.StreamClosedBit */; | ||
return closeMessage; | ||
} | ||
/** | ||
* Checks if the given control flag (usually found in msg.controlFlag) is an ack message. | ||
@@ -78,0 +93,0 @@ * @param controlFlag - The control flag to check. |
import { Codec } from '../codec/types'; | ||
import { MessageId, OpaqueTransportMessage, TransportClientId } from './message'; | ||
/** | ||
* Abstract base for a connection between two nodes in a River network. | ||
* A connection is responsible for sending and receiving messages on a 1:1 | ||
* basis between nodes. | ||
* Connections can be reused across different transports. | ||
* @abstract | ||
* A 1:1 connection between two transports. Once this is created, | ||
* the {@link Connection} is expected to take over responsibility for | ||
* reading and writing messages from the underlying connection. | ||
* | ||
* 1) Messages received on the {@link Connection} are dispatched back to the {@link Transport} | ||
* via {@link Transport.onMessage}. The {@link Transport} then notifies any registered message listeners. | ||
* 2) When {@link Transport.send}(msg) is called, the transport looks up the appropriate | ||
* connection in the {@link connections} map via `msg.to` and calls {@link send}(bytes) | ||
* so the connection can send it. | ||
*/ | ||
@@ -14,12 +18,34 @@ export declare abstract class Connection { | ||
constructor(transport: Transport<Connection>, connectedTo: TransportClientId); | ||
onMessage(msg: Uint8Array): void; | ||
abstract send(msg: Uint8Array): boolean; | ||
abstract close(): Promise<void>; | ||
abstract close(): void; | ||
} | ||
export type TransportStatus = 'open' | 'closed' | 'destroyed'; | ||
/** | ||
* Abstract base for a transport layer for communication between nodes in a River network. | ||
* A transport is responsible for handling the 1:n connection logic between nodes and | ||
* delegating sending/receiving to connections. | ||
* Any River transport methods need to implement this interface. | ||
* Transports manage the lifecycle (creation/deletion) of connections. Its responsibilities include: | ||
* | ||
* 1) Constructing a new {@link Connection} on {@link TransportMessage}s from new clients. | ||
* After constructing the {@link Connection}, {@link onConnect} is called which adds it to the connection map. | ||
* 2) Delegating message listening of the connection to the newly created {@link Connection}. | ||
* From this point on, the {@link Connection} is responsible for *reading* and *writing* | ||
* messages from the connection. | ||
* 3) When a connection is closed, the {@link Transport} calls {@link onDisconnect} which closes the | ||
* connection via {@link Connection.close} and removes it from the {@link connections} map. | ||
* | ||
* ```plaintext | ||
* ▲ | ||
* incoming │ | ||
* messages │ | ||
* ▼ | ||
* ┌─────────────┐ 1:N ┌────────────┐ | ||
* │ Transport │ ◄─────► │ Connection │ | ||
* └─────────────┘ └────────────┘ | ||
* ▲ | ||
* │ | ||
* ▼ | ||
* ┌───────────┐ | ||
* │ Message │ | ||
* │ Listeners │ | ||
* └───────────┘ | ||
* ``` | ||
* @abstract | ||
@@ -26,0 +52,0 @@ */ |
@@ -5,7 +5,11 @@ import { Value } from '@sinclair/typebox/value'; | ||
/** | ||
* Abstract base for a connection between two nodes in a River network. | ||
* A connection is responsible for sending and receiving messages on a 1:1 | ||
* basis between nodes. | ||
* Connections can be reused across different transports. | ||
* @abstract | ||
* A 1:1 connection between two transports. Once this is created, | ||
* the {@link Connection} is expected to take over responsibility for | ||
* reading and writing messages from the underlying connection. | ||
* | ||
* 1) Messages received on the {@link Connection} are dispatched back to the {@link Transport} | ||
* via {@link Transport.onMessage}. The {@link Transport} then notifies any registered message listeners. | ||
* 2) When {@link Transport.send}(msg) is called, the transport looks up the appropriate | ||
* connection in the {@link connections} map via `msg.to` and calls {@link send}(bytes) | ||
* so the connection can send it. | ||
*/ | ||
@@ -19,11 +23,31 @@ export class Connection { | ||
} | ||
onMessage(msg) { | ||
return this.transport.onMessage(msg); | ||
} | ||
} | ||
/** | ||
* Abstract base for a transport layer for communication between nodes in a River network. | ||
* A transport is responsible for handling the 1:n connection logic between nodes and | ||
* delegating sending/receiving to connections. | ||
* Any River transport methods need to implement this interface. | ||
* Transports manage the lifecycle (creation/deletion) of connections. Its responsibilities include: | ||
* | ||
* 1) Constructing a new {@link Connection} on {@link TransportMessage}s from new clients. | ||
* After constructing the {@link Connection}, {@link onConnect} is called which adds it to the connection map. | ||
* 2) Delegating message listening of the connection to the newly created {@link Connection}. | ||
* From this point on, the {@link Connection} is responsible for *reading* and *writing* | ||
* messages from the connection. | ||
* 3) When a connection is closed, the {@link Transport} calls {@link onDisconnect} which closes the | ||
* connection via {@link Connection.close} and removes it from the {@link connections} map. | ||
* | ||
* ```plaintext | ||
* ▲ | ||
* incoming │ | ||
* messages │ | ||
* ▼ | ||
* ┌─────────────┐ 1:N ┌────────────┐ | ||
* │ Transport │ ◄─────► │ Connection │ | ||
* └─────────────┘ └────────────┘ | ||
* ▲ | ||
* │ | ||
* ▼ | ||
* ┌───────────┐ | ||
* │ Message │ | ||
* │ Listeners │ | ||
* └───────────┘ | ||
* ``` | ||
* @abstract | ||
@@ -96,3 +120,3 @@ */ | ||
} | ||
this.sendQueue.set(conn.connectedTo, []); | ||
this.sendQueue.delete(conn.connectedTo); | ||
} | ||
@@ -99,0 +123,0 @@ /** |
@@ -5,3 +5,3 @@ { | ||
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", | ||
"version": "0.8.0", | ||
"version": "0.8.1", | ||
"type": "module", | ||
@@ -44,3 +44,3 @@ "exports": { | ||
"test:ui": "echo \"remember to go to /__vitest__ in the webview\" && vitest --ui --api.host 0.0.0.0 --api.port 3000", | ||
"test": "vitest", | ||
"test": "vitest --test-timeout=500", | ||
"bench": "vitest bench" | ||
@@ -47,0 +47,0 @@ }, |
@@ -11,2 +11,4 @@ # river - Streaming Remote Procedure Calls | ||
To use River, you must be on least Typescript 5 with `"moduleResolution": "bundler"`. | ||
## Developing | ||
@@ -13,0 +15,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
249558
103
5251
22
4