@replit/river
Advanced tools
Comparing version 0.5.2 to 0.6.0
@@ -6,8 +6,10 @@ import { afterAll, assert, describe, expect, test } from 'vitest'; | ||
import http from 'http'; | ||
import { DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, TestServiceConstructor, } from './fixtures'; | ||
import { BinaryFileServiceConstructor, DIV_BY_ZERO, FallibleServiceConstructor, OrderingServiceConstructor, STREAM_ERROR, TestServiceConstructor, } from './fixtures'; | ||
import { UNCAUGHT_ERROR } from '../router/result'; | ||
describe('client <-> server integration test', async () => { | ||
import { codecs } from '../codec/codec.test'; | ||
describe.each(codecs)('client <-> server integration test ($name codec)', async ({ codec }) => { | ||
const server = http.createServer(); | ||
const port = await onServerReady(server); | ||
const webSocketServer = await createWebSocketServer(server); | ||
const getTransports = () => createWsTransports(port, webSocketServer, codec); | ||
afterAll(() => { | ||
@@ -20,3 +22,3 @@ webSocketServer.clients.forEach((socket) => { | ||
test('rpc', async () => { | ||
const [clientTransport, serverTransport] = createWsTransports(port, webSocketServer); | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: TestServiceConstructor() }; | ||
@@ -30,3 +32,3 @@ const server = await createServer(serverTransport, serviceDefs); | ||
test('fallible rpc', async () => { | ||
const [clientTransport, serverTransport] = createWsTransports(port, webSocketServer); | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: FallibleServiceConstructor() }; | ||
@@ -48,4 +50,14 @@ const server = await createServer(serverTransport, serviceDefs); | ||
}); | ||
test('rpc with binary (uint8array)', async () => { | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: BinaryFileServiceConstructor() }; | ||
const server = await createServer(serverTransport, serviceDefs); | ||
const client = createClient(clientTransport); | ||
const result = await client.test.getFile({ file: 'test.py' }); | ||
assert(result.ok); | ||
assert(result.payload.contents instanceof Uint8Array); | ||
expect(new TextDecoder().decode(result.payload.contents)).toStrictEqual('contents for file test.py'); | ||
}); | ||
test('stream', async () => { | ||
const [clientTransport, serverTransport] = createWsTransports(port, webSocketServer); | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: TestServiceConstructor() }; | ||
@@ -68,3 +80,3 @@ const server = await createServer(serverTransport, serviceDefs); | ||
test('fallible stream', async () => { | ||
const [clientTransport, serverTransport] = createWsTransports(port, webSocketServer); | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: FallibleServiceConstructor() }; | ||
@@ -92,3 +104,3 @@ const server = await createServer(serverTransport, serviceDefs); | ||
test('message order is preserved in the face of disconnects', async () => { | ||
const [clientTransport, serverTransport] = createWsTransports(port, webSocketServer); | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: OrderingServiceConstructor() }; | ||
@@ -116,3 +128,3 @@ const server = await createServer(serverTransport, serviceDefs); | ||
test('concurrent rpcs', async () => { | ||
const [clientTransport, serverTransport] = createWsTransports(port, webSocketServer); | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: OrderingServiceConstructor() }; | ||
@@ -132,3 +144,3 @@ const server = await createServer(serverTransport, serviceDefs); | ||
test('concurrent streams', async () => { | ||
const [clientTransport, serverTransport] = createWsTransports(port, webSocketServer); | ||
const [clientTransport, serverTransport] = getTransports(); | ||
const serviceDefs = { test: TestServiceConstructor() }; | ||
@@ -135,0 +147,0 @@ const server = await createServer(serverTransport, serviceDefs); |
@@ -92,2 +92,23 @@ export declare const EchoRequest: import("@sinclair/typebox").TObject<{ | ||
}; | ||
export declare const BinaryFileServiceConstructor: () => { | ||
name: "bin"; | ||
state: {}; | ||
procedures: { | ||
getFile: { | ||
input: import("@sinclair/typebox").TObject<{ | ||
file: import("@sinclair/typebox").TString; | ||
}>; | ||
output: import("@sinclair/typebox").TObject<{ | ||
contents: import("@sinclair/typebox").TUint8Array; | ||
}>; | ||
errors: import("@sinclair/typebox").TNever; | ||
handler: (context: import("../router").ServiceContextWithState<{}>, input: import("../transport/message").TransportMessage<{ | ||
file: string; | ||
}>) => Promise<import("../transport/message").TransportMessage<import("../router/result").Result<{ | ||
contents: Uint8Array; | ||
}, never>>>; | ||
type: "rpc"; | ||
}; | ||
}; | ||
}; | ||
export declare const DIV_BY_ZERO = "DIV_BY_ZERO"; | ||
@@ -94,0 +115,0 @@ export declare const STREAM_ERROR = "STREAM_ERROR"; |
@@ -65,2 +65,14 @@ import { Type } from '@sinclair/typebox'; | ||
.finalize(); | ||
export const BinaryFileServiceConstructor = () => ServiceBuilder.create('bin') | ||
.defineProcedure('getFile', { | ||
type: 'rpc', | ||
input: Type.Object({ file: Type.String() }), | ||
output: Type.Object({ contents: Type.Uint8Array() }), | ||
errors: Type.Never(), | ||
async handler(_ctx, msg) { | ||
const bytes = new TextEncoder().encode(`contents for file ${msg.payload.file}`); | ||
return reply(msg, Ok({ contents: bytes })); | ||
}, | ||
}) | ||
.finalize(); | ||
export const DIV_BY_ZERO = 'DIV_BY_ZERO'; | ||
@@ -67,0 +79,0 @@ export const STREAM_ERROR = 'STREAM_ERROR'; |
import { expect, describe, test } from 'vitest'; | ||
import { serializeService } from '../router/builder'; | ||
import { FallibleServiceConstructor, TestServiceConstructor } from './fixtures'; | ||
import { BinaryFileServiceConstructor, FallibleServiceConstructor, TestServiceConstructor, } from './fixtures'; | ||
describe('serialize service to jsonschema', () => { | ||
@@ -51,2 +51,35 @@ test('serialize basic service', () => { | ||
}); | ||
test('serialize service with binary', () => { | ||
const service = BinaryFileServiceConstructor(); | ||
expect(serializeService(service)).toStrictEqual({ | ||
name: 'bin', | ||
procedures: { | ||
getFile: { | ||
errors: { | ||
not: {}, | ||
}, | ||
input: { | ||
properties: { | ||
file: { | ||
type: 'string', | ||
}, | ||
}, | ||
required: ['file'], | ||
type: 'object', | ||
}, | ||
output: { | ||
properties: { | ||
contents: { | ||
type: 'Uint8Array', | ||
}, | ||
}, | ||
required: ['contents'], | ||
type: 'object', | ||
}, | ||
type: 'rpc', | ||
}, | ||
}, | ||
state: {}, | ||
}); | ||
}); | ||
test('serialize service with errors', () => { | ||
@@ -53,0 +86,0 @@ const service = FallibleServiceConstructor(); |
@@ -1,2 +0,5 @@ | ||
export {}; | ||
export declare const codecs: { | ||
name: string; | ||
codec: import("./types").Codec; | ||
}[]; | ||
//# sourceMappingURL=codec.test.d.ts.map |
@@ -0,11 +1,16 @@ | ||
import { BinaryCodec } from './binary'; | ||
import { NaiveJsonCodec } from './json'; | ||
import { describe, test, expect } from 'vitest'; | ||
describe('naive json codec', () => { | ||
export const codecs = [ | ||
{ name: 'naive', codec: NaiveJsonCodec }, | ||
{ name: 'binary', codec: BinaryCodec }, | ||
]; | ||
describe.each(codecs)('codec -- $name', ({ codec }) => { | ||
test('empty object', () => { | ||
const msg = {}; | ||
expect(NaiveJsonCodec.fromStringBuf(NaiveJsonCodec.toStringBuf(msg))).toStrictEqual(msg); | ||
expect(codec.fromBuffer(codec.toBuffer(msg))).toStrictEqual(msg); | ||
}); | ||
test('simple test', () => { | ||
const msg = { abc: 123, def: 'cool' }; | ||
expect(NaiveJsonCodec.fromStringBuf(NaiveJsonCodec.toStringBuf(msg))).toStrictEqual(msg); | ||
expect(codec.fromBuffer(codec.toBuffer(msg))).toStrictEqual(msg); | ||
}); | ||
@@ -21,10 +26,17 @@ test('deeply nested test', () => { | ||
}; | ||
expect(NaiveJsonCodec.fromStringBuf(NaiveJsonCodec.toStringBuf(msg))).toStrictEqual(msg); | ||
expect(codec.fromBuffer(codec.toBuffer(msg))).toStrictEqual(msg); | ||
}); | ||
test('buffer test', () => { | ||
const msg = { | ||
buff: Uint8Array.from([0, 42, 100, 255]), | ||
}; | ||
expect(codec.fromBuffer(codec.toBuffer(msg))).toStrictEqual(msg); | ||
}); | ||
test('invalid json returns null', () => { | ||
expect(NaiveJsonCodec.fromStringBuf('')).toBeNull(); | ||
expect(NaiveJsonCodec.fromStringBuf('[')).toBeNull(); | ||
expect(NaiveJsonCodec.fromStringBuf('[{}')).toBeNull(); | ||
expect(NaiveJsonCodec.fromStringBuf('{"a":1}[]')).toBeNull(); | ||
const encoder = new TextEncoder(); | ||
expect(codec.fromBuffer(encoder.encode(''))).toBeNull(); | ||
expect(codec.fromBuffer(encoder.encode('['))).toBeNull(); | ||
expect(codec.fromBuffer(encoder.encode('[{}'))).toBeNull(); | ||
expect(codec.fromBuffer(encoder.encode('{"a":1}[]'))).toBeNull(); | ||
}); | ||
}); |
@@ -0,1 +1,20 @@ | ||
const encoder = new TextEncoder(); | ||
const decoder = new TextDecoder(); | ||
// Convert Uint8Array to base64 | ||
function uint8ArrayToBase64(uint8Array) { | ||
let binary = ''; | ||
uint8Array.forEach((byte) => { | ||
binary += String.fromCharCode(byte); | ||
}); | ||
return btoa(binary); | ||
} | ||
// Convert base64 to Uint8Array | ||
function base64ToUint8Array(base64) { | ||
const binaryString = atob(base64); | ||
const uint8Array = new Uint8Array(binaryString.length); | ||
for (let i = 0; i < binaryString.length; i++) { | ||
uint8Array[i] = binaryString.charCodeAt(i); | ||
} | ||
return uint8Array; | ||
} | ||
/** | ||
@@ -6,6 +25,23 @@ * Naive JSON codec implementation using JSON.stringify and JSON.parse. | ||
export const NaiveJsonCodec = { | ||
toStringBuf: JSON.stringify, | ||
fromStringBuf: (s) => { | ||
toBuffer: (obj) => { | ||
return encoder.encode(JSON.stringify(obj, function replacer(key) { | ||
let val = this[key]; | ||
if (val instanceof Uint8Array) { | ||
return { $t: uint8ArrayToBase64(val) }; | ||
} | ||
else { | ||
return val; | ||
} | ||
})); | ||
}, | ||
fromBuffer: (buff) => { | ||
try { | ||
return JSON.parse(s); | ||
return JSON.parse(decoder.decode(buff), function reviver(_key, val) { | ||
if (val?.$t) { | ||
return base64ToUint8Array(val.$t); | ||
} | ||
else { | ||
return val; | ||
} | ||
}); | ||
} | ||
@@ -12,0 +48,0 @@ catch { |
/** | ||
* Codec interface for encoding and decoding objects to and from string buffers. | ||
* Codec interface for encoding and decoding objects to and from Uint8 buffers. | ||
* Used to prepare messages for use by the transport layer. | ||
@@ -7,14 +7,14 @@ */ | ||
/** | ||
* Encodes an object to a string buffer. | ||
* Encodes an object to a Uint8 buffer. | ||
* @param obj - The object to encode. | ||
* @returns The encoded string buffer. | ||
* @returns The encoded Uint8 buffer. | ||
*/ | ||
toStringBuf(obj: object): string; | ||
toBuffer(obj: object): Uint8Array; | ||
/** | ||
* Decodes an object from a string buffer. | ||
* @param buf - The string buffer to decode. | ||
* Decodes an object from a Uint8 buffer. | ||
* @param buf - The Uint8 buffer to decode. | ||
* @returns The decoded object, or null if decoding failed. | ||
*/ | ||
fromStringBuf(buf: string): object | null; | ||
fromBuffer(buf: Uint8Array): object | null; | ||
} | ||
//# sourceMappingURL=types.d.ts.map |
@@ -11,2 +11,3 @@ /// <reference types="node" /> | ||
import { Result, RiverError, RiverUncaughtSchema } from './router/result'; | ||
import { Codec } from './codec'; | ||
/** | ||
@@ -40,3 +41,3 @@ * Creates a WebSocket server instance using the provided HTTP server. | ||
*/ | ||
export declare function createWsTransports(port: number, wss: WebSocketServer): [WebSocketTransport, WebSocketTransport]; | ||
export declare function createWsTransports(port: number, wss: WebSocketServer, codec?: Codec): [WebSocketTransport, WebSocketTransport]; | ||
/** | ||
@@ -43,0 +44,0 @@ * Transforms an RPC procedure definition into a normal function call. |
@@ -51,7 +51,8 @@ import WebSocket from 'isomorphic-ws'; | ||
*/ | ||
export function createWsTransports(port, wss) { | ||
export function createWsTransports(port, wss, codec) { | ||
const options = codec ? { codec } : undefined; | ||
return [ | ||
new WebSocketTransport(async () => { | ||
return createLocalWebSocketClient(port); | ||
}, 'client'), | ||
}, 'client', options), | ||
new WebSocketTransport(async () => { | ||
@@ -64,3 +65,3 @@ return new Promise((resolve) => { | ||
}); | ||
}, 'SERVER'), | ||
}, 'SERVER', options), | ||
]; | ||
@@ -67,0 +68,0 @@ } |
@@ -7,2 +7,3 @@ import { NaiveJsonCodec } from '../../codec/json'; | ||
}; | ||
const newlineBuff = new TextEncoder().encode('\n'); | ||
/** | ||
@@ -35,3 +36,4 @@ * A transport implementation that uses standard input and output streams. | ||
}); | ||
rl.on('line', (msg) => this.onMessage(msg)); | ||
const encoder = new TextEncoder(); | ||
rl.on('line', (msg) => this.onMessage(encoder.encode(msg))); | ||
} | ||
@@ -45,3 +47,7 @@ /** | ||
const id = msg.id; | ||
this.output.write(this.codec.toStringBuf(msg) + '\n'); | ||
const payload = this.codec.toBuffer(msg); | ||
const out = new Uint8Array(payload.length + newlineBuff.length); | ||
out.set(payload, 0); | ||
out.set(newlineBuff, payload.length); | ||
this.output.write(out); | ||
return id; | ||
@@ -48,0 +54,0 @@ } |
@@ -9,2 +9,3 @@ /// <reference types="ws" /> | ||
codec: Codec; | ||
binaryType: 'arraybuffer' | 'blob'; | ||
} | ||
@@ -11,0 +12,0 @@ type WebSocketResult = { |
@@ -7,2 +7,3 @@ import { Transport } from '../types'; | ||
codec: NaiveJsonCodec, | ||
binaryType: 'arraybuffer', | ||
}; | ||
@@ -83,3 +84,4 @@ /** | ||
this.ws = res.ws; | ||
this.ws.onmessage = (msg) => this.onMessage(msg.data.toString()); | ||
this.ws.binaryType = 'arraybuffer'; | ||
this.ws.onmessage = (msg) => this.onMessage(msg.data); | ||
this.ws.onclose = () => { | ||
@@ -98,3 +100,3 @@ this.reconnectPromise = undefined; | ||
log?.info(`${this.clientId} -- sending ${JSON.stringify(msg)}`); | ||
this.ws.send(this.codec.toStringBuf(msg)); | ||
this.ws.send(this.codec.toBuffer(msg)); | ||
} | ||
@@ -125,3 +127,3 @@ this.sendQueue = []; | ||
log?.info(`${this.clientId} -- sending ${JSON.stringify(msg)}`); | ||
this.ws.send(this.codec.toStringBuf(msg)); | ||
this.ws.send(this.codec.toBuffer(msg)); | ||
} | ||
@@ -128,0 +130,0 @@ else { |
@@ -36,3 +36,3 @@ import { Codec } from '../codec/types'; | ||
*/ | ||
onMessage(msg: string): void; | ||
onMessage(msg: Uint8Array): void; | ||
/** | ||
@@ -39,0 +39,0 @@ * Adds a message listener to this transport. |
@@ -44,3 +44,3 @@ import { Value } from '@sinclair/typebox/value'; | ||
onMessage(msg) { | ||
const parsedMsg = this.codec.fromStringBuf(msg); | ||
const parsedMsg = this.codec.fromBuffer(msg); | ||
if (parsedMsg === null) { | ||
@@ -47,0 +47,0 @@ log?.warn(`${this.clientId} -- received malformed msg: ${msg}`); |
@@ -5,3 +5,3 @@ { | ||
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", | ||
"version": "0.5.2", | ||
"version": "0.6.0", | ||
"type": "module", | ||
@@ -21,2 +21,3 @@ "exports": { | ||
"dependencies": { | ||
"@msgpack/msgpack": "^3.0.0-beta2", | ||
"@sinclair/typebox": "^0.31.8", | ||
@@ -23,0 +24,0 @@ "isomorphic-ws": "^5.0.0", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
203412
87
4438
6
+ Added@msgpack/msgpack@3.0.0-beta2(transitive)