@replit/river
Advanced tools
Comparing version 0.9.1 to 0.9.2
@@ -82,11 +82,13 @@ import { afterAll, assert, describe, expect, test } from 'vitest'; | ||
input.push({ msg: '2', ignore: false, end: true }); | ||
input.end(); | ||
const result1 = await iterNext(output); | ||
assert(result1.ok); | ||
expect(result1.payload).toStrictEqual({ response: '1' }); | ||
// ensure we only have one stream despite pushing multiple messages. | ||
await waitUntil(() => server.streams.size, 1); | ||
input.end(); | ||
// ensure we no longer have any streams since the input was closed. | ||
await waitUntil(() => server.streams.size, 0); | ||
const result2 = await iterNext(output); | ||
assert(result2.ok); | ||
expect(result2.payload).toStrictEqual({ response: '2' }); | ||
// ensure we exactly have one stream even after we send multiple messages | ||
expect(server.streams.size).toEqual(1); | ||
const result3 = await output.next(); | ||
@@ -93,0 +95,0 @@ assert(result3.done); |
@@ -45,5 +45,3 @@ import { pushable } from 'it-pushable'; | ||
function belongsToSameStream(msg) { | ||
return (msg.serviceName === serviceName && | ||
msg.procedureName === procName && | ||
msg.streamId === streamId); | ||
return msg.streamId === streamId; | ||
} | ||
@@ -55,3 +53,3 @@ if (procType === 'stream') { | ||
if (input) { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input); | ||
const m = msg(transport.clientId, serverId, streamId, input, serviceName, procName); | ||
// first message needs the open bit. | ||
@@ -66,4 +64,6 @@ m.controlFlags = 2 /* ControlFlags.StreamOpenBit */; | ||
for await (const rawIn of inputStream) { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, rawIn); | ||
const m = msg(transport.clientId, serverId, streamId, rawIn); | ||
if (firstMessage) { | ||
m.serviceName = serviceName; | ||
m.procedureName = procName; | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
@@ -74,9 +74,14 @@ firstMessage = false; | ||
} | ||
transport.send(closeStream(transport.clientId, serverId, streamId)); | ||
})(); | ||
// transport -> output | ||
const listener = (msg) => { | ||
if (!belongsToSameStream(msg)) { | ||
return; | ||
} | ||
if (isStreamClose(msg.controlFlags)) { | ||
outputStream.end(); | ||
transport.removeEventListener('message', listener); | ||
} | ||
else if (belongsToSameStream(msg)) { | ||
else { | ||
outputStream.push(msg.payload); | ||
@@ -89,3 +94,3 @@ } | ||
outputStream.end(); | ||
transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId)); | ||
transport.send(closeStream(transport.clientId, serverId, streamId)); | ||
transport.removeEventListener('message', listener); | ||
@@ -96,3 +101,3 @@ }; | ||
else if (procType === 'rpc') { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input); | ||
const m = msg(transport.clientId, serverId, streamId, input, serviceName, procName); | ||
// rpc is a stream open + close | ||
@@ -105,3 +110,3 @@ m.controlFlags |= | ||
else if (procType === 'subscribe') { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input); | ||
const m = msg(transport.clientId, serverId, streamId, input, serviceName, procName); | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
@@ -112,8 +117,12 @@ transport.send(m); | ||
const listener = (msg) => { | ||
if (belongsToSameStream(msg)) { | ||
outputStream.push(msg.payload); | ||
if (!belongsToSameStream(msg)) { | ||
return; | ||
} | ||
if (isStreamClose(msg.controlFlags)) { | ||
outputStream.end(); | ||
transport.removeEventListener('message', listener); | ||
} | ||
else { | ||
outputStream.push(msg.payload); | ||
} | ||
}; | ||
@@ -123,3 +132,3 @@ transport.addEventListener('message', listener); | ||
outputStream.end(); | ||
transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId)); | ||
transport.send(closeStream(transport.clientId, serverId, streamId)); | ||
transport.removeEventListener('message', listener); | ||
@@ -133,3 +142,3 @@ }; | ||
if (input) { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, input); | ||
const m = msg(transport.clientId, serverId, streamId, input, serviceName, procName); | ||
// first message needs the open bit. | ||
@@ -144,5 +153,7 @@ m.controlFlags = 2 /* ControlFlags.StreamOpenBit */; | ||
for await (const rawIn of inputStream) { | ||
const m = msg(transport.clientId, serverId, serviceName, procName, streamId, rawIn); | ||
const m = msg(transport.clientId, serverId, streamId, rawIn); | ||
if (firstMessage) { | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
m.serviceName = serviceName; | ||
m.procedureName = procName; | ||
firstMessage = false; | ||
@@ -152,3 +163,3 @@ } | ||
} | ||
transport.send(closeStream(transport.clientId, serverId, serviceName, procName, streamId)); | ||
transport.send(closeStream(transport.clientId, serverId, streamId)); | ||
})(); | ||
@@ -155,0 +166,0 @@ return [inputStream, waitForMessage(transport, belongsToSameStream)]; |
import { Static } from '@sinclair/typebox'; | ||
import { Connection, Transport } from '../transport/transport'; | ||
import { AnyService, PayloadType } from './builder'; | ||
import { AnyProcedure, AnyService, PayloadType } from './builder'; | ||
import type { Pushable } from 'it-pushable'; | ||
@@ -18,2 +18,6 @@ import { TransportMessage } from '../transport/message'; | ||
interface ProcStream { | ||
id: string; | ||
serviceName: string; | ||
procedureName: string; | ||
procedure: AnyProcedure; | ||
incoming: Pushable<TransportMessage>; | ||
@@ -20,0 +24,0 @@ outgoing: Pushable<TransportMessage<Result<Static<PayloadType>, Static<RiverError>>>>; |
@@ -20,9 +20,10 @@ import { pushable } from 'it-pushable'; | ||
const stream = streamMap.get(id); | ||
if (stream) { | ||
stream.incoming.end(); | ||
await stream.promises.inputHandler; | ||
stream.outgoing.end(); | ||
await stream.promises.outputHandler; | ||
streamMap.delete(id); | ||
if (!stream) { | ||
return; | ||
} | ||
stream.incoming.end(); | ||
await stream.promises.inputHandler; | ||
stream.outgoing.end(); | ||
await stream.promises.outputHandler; | ||
streamMap.delete(id); | ||
} | ||
@@ -47,3 +48,22 @@ function getContext(service) { | ||
} | ||
if (!(message.serviceName in services)) { | ||
const streamIdx = message.streamId; | ||
const procStream = streamMap.get(streamIdx); | ||
if (procStream) { | ||
// If the stream is a continuation, we do not admit the init messages. | ||
if (Value.Check(procStream.procedure.input, message.payload)) { | ||
procStream.incoming.push(message); | ||
} | ||
else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { | ||
log?.error(`${transport.clientId} -- procedure ${procStream.serviceName}.${procStream.procedureName} received invalid payload: ${JSON.stringify(message.payload)}`); | ||
} | ||
if (isStreamClose(message.controlFlags)) { | ||
await cleanupStream(streamIdx); | ||
} | ||
return; | ||
} | ||
if (!isStreamOpen(message.controlFlags)) { | ||
log?.warn(`${transport.clientId} -- couldn't find a matching procedure stream for ${message.serviceName}.${message.procedureName}:${message.streamId}`); | ||
return; | ||
} | ||
if (!message.serviceName || !(message.serviceName in services)) { | ||
log?.warn(`${transport.clientId} -- couldn't find service ${message.serviceName}`); | ||
@@ -54,3 +74,4 @@ return; | ||
const serviceContext = getContext(service); | ||
if (!(message.procedureName in service.procedures)) { | ||
if (!message.procedureName || | ||
!(message.procedureName in service.procedures)) { | ||
log?.warn(`${transport.clientId} -- couldn't find a matching procedure for ${message.serviceName}.${message.procedureName}`); | ||
@@ -60,56 +81,84 @@ return; | ||
const procedure = service.procedures[message.procedureName]; | ||
const streamIdx = `${message.serviceName}.${message.procedureName}:${message.streamId}`; | ||
if (isStreamOpen(message.controlFlags) && !streamMap.has(streamIdx)) { | ||
const incoming = pushable({ objectMode: true }); | ||
const outgoing = pushable({ objectMode: true }); | ||
const outputHandler = | ||
// sending outgoing messages back to client | ||
(async () => { | ||
for await (const response of outgoing) { | ||
transport.send(response); | ||
const procHasInitMessage = 'init' in procedure; | ||
const incoming = pushable({ objectMode: true }); | ||
const outgoing = pushable({ objectMode: true }); | ||
const outputHandler = | ||
// sending outgoing messages back to client | ||
(async () => { | ||
for await (const response of outgoing) { | ||
transport.send(response); | ||
} | ||
// we ended, send a close bit back to the client | ||
// only subscriptions and streams have streams the | ||
// handler can close | ||
if (procedure.type === 'subscription' || procedure.type === 'stream') { | ||
transport.send(closeStream(transport.clientId, message.from, message.streamId)); | ||
} | ||
})(); | ||
function errorHandler(err) { | ||
const errorMsg = err instanceof Error ? err.message : `[coerced to error] ${err}`; | ||
log?.error(`${transport.clientId} -- procedure ${message.serviceName}.${message.procedureName}:${message.streamId} threw an error: ${errorMsg}`); | ||
outgoing.push(reply(message, Err({ | ||
code: UNCAUGHT_ERROR, | ||
message: errorMsg, | ||
}))); | ||
} | ||
// pump incoming message stream -> handler -> outgoing message stream | ||
let inputHandler; | ||
if (procedure.type === 'stream') { | ||
if (procHasInitMessage) { | ||
inputHandler = (async () => { | ||
const initMessage = await incoming.next(); | ||
if (initMessage.done) { | ||
return; | ||
} | ||
return procedure | ||
.handler(serviceContext, initMessage.value, incoming, outgoing) | ||
.catch(errorHandler); | ||
})(); | ||
} | ||
else { | ||
inputHandler = procedure | ||
.handler(serviceContext, incoming, outgoing) | ||
.catch(errorHandler); | ||
} | ||
} | ||
else if (procedure.type === 'rpc') { | ||
inputHandler = (async () => { | ||
const inputMessage = await incoming.next(); | ||
if (inputMessage.done) { | ||
return; | ||
} | ||
// we ended, send a close bit back to the client | ||
// only subscriptions and streams have streams the | ||
// handler can close | ||
if (procedure.type === 'subscription' || | ||
procedure.type === 'stream') { | ||
transport.send(closeStream(transport.clientId, message.from, message.serviceName, message.procedureName, message.streamId)); | ||
try { | ||
const outputMessage = await procedure.handler(serviceContext, inputMessage.value); | ||
outgoing.push(outputMessage); | ||
} | ||
catch (err) { | ||
errorHandler(err); | ||
} | ||
})(); | ||
function errorHandler(err) { | ||
const errorMsg = err instanceof Error ? err.message : `[coerced to error] ${err}`; | ||
log?.error(`${transport.clientId} -- procedure ${message.serviceName}.${message.procedureName}:${message.streamId} threw an error: ${errorMsg}`); | ||
outgoing.push(reply(message, Err({ | ||
code: UNCAUGHT_ERROR, | ||
message: errorMsg, | ||
}))); | ||
} | ||
// pump incoming message stream -> handler -> outgoing message stream | ||
let inputHandler; | ||
if (procedure.type === 'stream') { | ||
if ('init' in procedure) { | ||
inputHandler = (async () => { | ||
const initMessage = await incoming.next(); | ||
if (initMessage.done) { | ||
return; | ||
} | ||
return procedure | ||
.handler(serviceContext, initMessage.value, incoming, outgoing) | ||
.catch(errorHandler); | ||
})(); | ||
} | ||
else if (procedure.type === 'subscription') { | ||
inputHandler = (async () => { | ||
const inputMessage = await incoming.next(); | ||
if (inputMessage.done) { | ||
return; | ||
} | ||
else { | ||
inputHandler = procedure | ||
.handler(serviceContext, incoming, outgoing) | ||
.catch(errorHandler); | ||
try { | ||
await procedure.handler(serviceContext, inputMessage.value, outgoing); | ||
} | ||
} | ||
else if (procedure.type === 'rpc') { | ||
catch (err) { | ||
errorHandler(err); | ||
} | ||
})(); | ||
} | ||
else if (procedure.type === 'upload') { | ||
if (procHasInitMessage) { | ||
inputHandler = (async () => { | ||
const inputMessage = await incoming.next(); | ||
if (inputMessage.done) { | ||
const initMessage = await incoming.next(); | ||
if (initMessage.done) { | ||
return; | ||
} | ||
try { | ||
const outputMessage = await procedure.handler(serviceContext, inputMessage.value); | ||
const outputMessage = await procedure.handler(serviceContext, initMessage.value, incoming); | ||
outgoing.push(outputMessage); | ||
@@ -122,10 +171,7 @@ } | ||
} | ||
else if (procedure.type === 'subscription') { | ||
else { | ||
inputHandler = (async () => { | ||
const inputMessage = await incoming.next(); | ||
if (inputMessage.done) { | ||
return; | ||
} | ||
try { | ||
await procedure.handler(serviceContext, inputMessage.value, outgoing); | ||
const outputMessage = await procedure.handler(serviceContext, incoming); | ||
outgoing.push(outputMessage); | ||
} | ||
@@ -137,50 +183,22 @@ catch (err) { | ||
} | ||
else if (procedure.type === 'upload') { | ||
if ('init' in procedure) { | ||
inputHandler = (async () => { | ||
const initMessage = await incoming.next(); | ||
if (initMessage.done) { | ||
return; | ||
} | ||
try { | ||
const outputMessage = await procedure.handler(serviceContext, initMessage.value, incoming); | ||
outgoing.push(outputMessage); | ||
} | ||
catch (err) { | ||
errorHandler(err); | ||
} | ||
})(); | ||
} | ||
else { | ||
inputHandler = (async () => { | ||
try { | ||
const outputMessage = await procedure.handler(serviceContext, incoming); | ||
outgoing.push(outputMessage); | ||
} | ||
catch (err) { | ||
errorHandler(err); | ||
} | ||
})(); | ||
} | ||
} | ||
else { | ||
// procedure is inferred to be never here as this is not a valid procedure type | ||
// we cast just to log | ||
log?.warn(`${transport.clientId} -- got request for invalid procedure type ${procedure.type} at ${message.serviceName}.${message.procedureName}`); | ||
return; | ||
} | ||
streamMap.set(streamIdx, { | ||
incoming, | ||
outgoing, | ||
promises: { inputHandler, outputHandler }, | ||
}); | ||
} | ||
const procStream = streamMap.get(streamIdx); | ||
if (!procStream) { | ||
log?.warn(`${transport.clientId} -- couldn't find a matching procedure stream for ${message.serviceName}.${message.procedureName}:${message.streamId}`); | ||
else { | ||
// procedure is inferred to be never here as this is not a valid procedure type | ||
// we cast just to log | ||
log?.warn(`${transport.clientId} -- got request for invalid procedure type ${procedure.type} at ${message.serviceName}.${message.procedureName}`); | ||
return; | ||
} | ||
if (Value.Check(procedure.input, message.payload) || | ||
('init' in procedure && Value.Check(procedure.init, message.payload))) { | ||
procStream.incoming.push(message); | ||
streamMap.set(streamIdx, { | ||
id: message.streamId, | ||
incoming, | ||
outgoing, | ||
serviceName: message.serviceName, | ||
procedureName: message.procedureName, | ||
procedure, | ||
promises: { inputHandler, outputHandler }, | ||
}); | ||
// This is the first message, so we parse is as the initialization message, if supplied. | ||
if ((!procHasInitMessage && Value.Check(procedure.input, message.payload)) || | ||
(procHasInitMessage && Value.Check(procedure.init, message.payload))) { | ||
incoming.push(message); | ||
} | ||
@@ -187,0 +205,0 @@ else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { |
@@ -28,5 +28,5 @@ import http from 'http'; | ||
const makeDummyMessage = (from, to, message) => { | ||
return msg(from, to, 'service', 'proc', 'stream', { | ||
return msg(from, to, 'stream', { | ||
msg: message, | ||
}); | ||
}, 'service', 'proc'); | ||
}; | ||
@@ -33,0 +33,0 @@ const clientId1 = 'client1'; |
@@ -24,4 +24,4 @@ import { TSchema } from '@sinclair/typebox'; | ||
to: import("@sinclair/typebox").TString; | ||
serviceName: import("@sinclair/typebox").TString; | ||
procedureName: import("@sinclair/typebox").TString; | ||
serviceName: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TUnion<[import("@sinclair/typebox").TString, import("@sinclair/typebox").TNull]>>; | ||
procedureName: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TUnion<[import("@sinclair/typebox").TString, import("@sinclair/typebox").TNull]>>; | ||
streamId: import("@sinclair/typebox").TString; | ||
@@ -40,4 +40,4 @@ controlFlags: import("@sinclair/typebox").TInteger; | ||
to: import("@sinclair/typebox").TString; | ||
serviceName: import("@sinclair/typebox").TString; | ||
procedureName: import("@sinclair/typebox").TString; | ||
serviceName: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TUnion<[import("@sinclair/typebox").TString, import("@sinclair/typebox").TNull]>>; | ||
procedureName: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TUnion<[import("@sinclair/typebox").TString, import("@sinclair/typebox").TNull]>>; | ||
streamId: import("@sinclair/typebox").TString; | ||
@@ -61,4 +61,4 @@ controlFlags: import("@sinclair/typebox").TInteger; | ||
to: import("@sinclair/typebox").TString; | ||
serviceName: import("@sinclair/typebox").TString; | ||
procedureName: import("@sinclair/typebox").TString; | ||
serviceName: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TUnion<[import("@sinclair/typebox").TString, import("@sinclair/typebox").TNull]>>; | ||
procedureName: import("@sinclair/typebox").TOptional<import("@sinclair/typebox").TUnion<[import("@sinclair/typebox").TString, import("@sinclair/typebox").TNull]>>; | ||
streamId: import("@sinclair/typebox").TString; | ||
@@ -71,2 +71,11 @@ controlFlags: import("@sinclair/typebox").TInteger; | ||
* we can't statically infer generics from generic Typebox schemas so we have to define it again here. | ||
* | ||
* TypeScript can't enforce types when a bitmask is involved, so these are the semantics of | ||
* `controlFlags`: | ||
* * If `controlFlags & StreamOpenBit == StreamOpenBit`, `streamId` must be set to a unique value | ||
* (suggestion: use `nanoid`). | ||
* * `serviceName` and `procedureName` must be set only when `controlFlags & StreamOpenBit == | ||
* StreamOpenBit`. | ||
* * If `controlFlags & StreamClosedBit` is set and the kind is `stream` or `subscription`, | ||
* `payload` can be a control message. | ||
* @template Payload The type of the payload. | ||
@@ -78,4 +87,4 @@ */ | ||
to: string; | ||
serviceName: string; | ||
procedureName: string; | ||
serviceName?: string; | ||
procedureName?: string; | ||
streamId: string; | ||
@@ -103,3 +112,3 @@ controlFlags: number; | ||
*/ | ||
export declare function msg<Payload extends object>(from: string, to: string, service: string, proc: string, stream: string, payload: Payload): TransportMessage<Payload>; | ||
export declare function msg<Payload extends object>(from: string, to: string, streamId: string, payload: Payload, serviceName?: string, procedureName?: string): TransportMessage<Payload>; | ||
/** | ||
@@ -119,3 +128,3 @@ * Creates a new transport message as a response to the given message. | ||
*/ | ||
export declare function closeStream(from: TransportClientId, to: TransportClientId, service: string, proc: string, stream: string): TransportMessage<{ | ||
export declare function closeStream(from: TransportClientId, to: TransportClientId, stream: string): TransportMessage<{ | ||
type: "CLOSE"; | ||
@@ -122,0 +131,0 @@ }>; |
@@ -13,4 +13,4 @@ import { Type } from '@sinclair/typebox'; | ||
to: Type.String(), | ||
serviceName: Type.String(), | ||
procedureName: Type.String(), | ||
serviceName: Type.Optional(Type.Union([Type.String(), Type.Null()])), | ||
procedureName: Type.Optional(Type.Union([Type.String(), Type.Null()])), | ||
streamId: Type.String(), | ||
@@ -48,3 +48,3 @@ controlFlags: Type.Integer(), | ||
*/ | ||
export function msg(from, to, service, proc, stream, payload) { | ||
export function msg(from, to, streamId, payload, serviceName, procedureName) { | ||
return { | ||
@@ -54,5 +54,5 @@ id: nanoid(), | ||
from, | ||
serviceName: service, | ||
procedureName: proc, | ||
streamId: stream, | ||
serviceName, | ||
procedureName, | ||
streamId, | ||
controlFlags: 0, | ||
@@ -70,5 +70,5 @@ payload, | ||
return { | ||
...msg, | ||
id: nanoid(), | ||
streamId: msg.streamId, | ||
controlFlags: 0, | ||
id: nanoid(), | ||
to: msg.from, | ||
@@ -86,4 +86,4 @@ from: msg.to, | ||
*/ | ||
export function closeStream(from, to, service, proc, stream) { | ||
const closeMessage = msg(from, to, service, proc, stream, { | ||
export function closeStream(from, to, stream) { | ||
const closeMessage = msg(from, to, stream, { | ||
type: 'CLOSE', | ||
@@ -90,0 +90,0 @@ }); |
@@ -5,3 +5,3 @@ import { isAck, isStreamClose, isStreamOpen, msg, reply, } from './message'; | ||
test('ack', () => { | ||
const m = msg('a', 'b', 'svc', 'proc', 'stream', { test: 1 }); | ||
const m = msg('a', 'b', 'stream', { test: 1 }, 'svc', 'proc'); | ||
m.controlFlags |= 1 /* ControlFlags.AckBit */; | ||
@@ -14,3 +14,3 @@ expect(m).toHaveProperty('controlFlags'); | ||
test('streamOpen', () => { | ||
const m = msg('a', 'b', 'svc', 'proc', 'stream', { test: 1 }); | ||
const m = msg('a', 'b', 'stream', { test: 1 }, 'svc', 'proc'); | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
@@ -23,3 +23,3 @@ expect(m).toHaveProperty('controlFlags'); | ||
test('streamClose', () => { | ||
const m = msg('a', 'b', 'svc', 'proc', 'stream', { test: 1 }); | ||
const m = msg('a', 'b', 'stream', { test: 1 }, 'svc', 'proc'); | ||
m.controlFlags |= 4 /* ControlFlags.StreamClosedBit */; | ||
@@ -32,3 +32,3 @@ expect(m).toHaveProperty('controlFlags'); | ||
test('reply', () => { | ||
const m = msg('a', 'b', 'svc', 'proc', 'stream', { test: 1 }); | ||
const m = msg('a', 'b', 'stream', { test: 1 }, 'svc', 'proc'); | ||
const payload = { cool: 2 }; | ||
@@ -42,3 +42,3 @@ const resp = reply(m, payload); | ||
test('default message has no control flags set', () => { | ||
const m = msg('a', 'b', 'svc', 'proc', 'stream', { test: 1 }); | ||
const m = msg('a', 'b', 'stream', { test: 1 }, 'svc', 'proc'); | ||
expect(isAck(m.controlFlags)).toBe(false); | ||
@@ -49,3 +49,3 @@ expect(isStreamOpen(m.controlFlags)).toBe(false); | ||
test('combining control flags works', () => { | ||
const m = msg('a', 'b', 'svc', 'proc', 'stream', { test: 1 }); | ||
const m = msg('a', 'b', 'stream', { test: 1 }, 'svc', 'proc'); | ||
m.controlFlags |= 2 /* ControlFlags.StreamOpenBit */; | ||
@@ -52,0 +52,0 @@ expect(isStreamOpen(m.controlFlags)).toBe(true); |
@@ -158,6 +158,13 @@ import { Value } from '@sinclair/typebox/value'; | ||
if (Value.Check(OpaqueTransportMessageSchema, parsedMsg)) { | ||
return parsedMsg; | ||
// JSON can't express the difference between `undefined` and `null`, so we need to patch that. | ||
return { | ||
...parsedMsg, | ||
serviceName: parsedMsg.serviceName === null ? undefined : parsedMsg.serviceName, | ||
procedureName: parsedMsg.procedureName === null | ||
? undefined | ||
: parsedMsg.procedureName, | ||
}; | ||
} | ||
else { | ||
log?.warn(`${this.clientId} -- received invalid msg: ${JSON.stringify(msg)}`); | ||
log?.warn(`${this.clientId} -- received invalid msg: ${JSON.stringify(parsedMsg)}`); | ||
return null; | ||
@@ -164,0 +171,0 @@ } |
@@ -119,2 +119,3 @@ import WebSocket from 'isomorphic-ws'; | ||
} | ||
rawOutput.end(); | ||
})(); | ||
@@ -172,2 +173,3 @@ // handle | ||
} | ||
rawOutput.end(); | ||
})(); | ||
@@ -214,2 +216,3 @@ // handle | ||
} | ||
rawOutput.end(); | ||
})(); | ||
@@ -315,3 +318,3 @@ return async (msg) => { | ||
export function payloadToTransportMessage(payload, streamId, from = 'client', to = 'SERVER') { | ||
return msg(from, to, 'service', 'procedure', streamId ?? 'stream', payload); | ||
return msg(from, to, streamId ?? 'stream', payload, 'service', 'procedure'); | ||
} | ||
@@ -318,0 +321,0 @@ /** |
@@ -5,3 +5,3 @@ { | ||
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", | ||
"version": "0.9.1", | ||
"version": "0.9.2", | ||
"type": "module", | ||
@@ -8,0 +8,0 @@ "exports": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
300196
6144