Security News
Research
Supply Chain Attack on Rspack npm Packages Injects Cryptojacking Malware
A supply chain attack on Rspack's npm packages injected cryptomining malware, potentially impacting thousands of developers.
@replit/river
Advanced tools
It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!
⚠️ Not production ready, while Replit is using parts of River in production, we are still going through rapid breaking changes. First production ready version will be 1.x.x
⚠️
River allows multiple clients to connect to and make remote procedure calls to a remote server as if they were local procedures.
River provides a framework for long-lived streaming Remote Procedure Calls (RPCs) in modern web applications, featuring advanced error handling and customizable retry policies to ensure seamless communication between clients and servers.
River provides a framework similar to tRPC and gRPC but with additional features:
See PROTOCOL.md for more information on the protocol.
Before proceeding, ensure you have TypeScript 5 installed and configured appropriately:
Ensure your tsconfig.json
is configured correctly:
You must verify that:
compilerOptions.moduleResolution
is set to "bundler"
compilerOptions.strictFunctionTypes
is set to true
compilerOptions.strictNullChecks
is set to true
or, preferably, that:
compilerOptions.moduleResolution
is set to "bundler"
compilerOptions.strict
is set to true
Like so:
{
"compilerOptions": {
"moduleResolution": "bundler",
"strict": true
// Other compiler options...
}
}
If these options already exist in your tsconfig.json
and don't match what is shown above, modify them. River is designed for "strict": true
, but technically only strictFunctionTypes
and strictNullChecks
being set to true
is required. Failing to set these will cause unresolvable type errors when defining services.
Install River and Dependencies:
To use River, install the required packages using npm:
npm i @replit/river @sinclair/typebox
rpc
whose handler has a signature of Input -> Result<Output, Error>
.upload
whose handler has a signature of AsyncIterableIterator<Input> -> Result<Output, Error>
.subscription
whose handler has a signature of Input -> Pushable<Result<Output, Error>>
.stream
whose handler has a signature of AsyncIterableIterator<Input> -> Pushable<Result<Output, Error>>
.Transport
to work.
First, we create a service using ServiceSchema
:
import { ServiceSchema, Procedure, Ok } from '@replit/river';
import { Type } from '@sinclair/typebox';
export const ExampleService = ServiceSchema.define(
// configuration
{
// initializer for shared state
initializeState: () => ({ count: 0 }),
},
// procedures
{
add: Procedure.rpc({
input: Type.Object({ n: Type.Number() }),
output: Type.Object({ result: Type.Number() }),
errors: Type.Never(),
// note that a handler is unique per user RPC
async handler(ctx, { n }) {
// access and mutate shared state
ctx.state.count += n;
return Ok({ result: ctx.state.count });
},
}),
},
);
Then, we create the server:
import http from 'http';
import { WebSocketServer } from 'ws';
import { WebSocketServerTransport } from '@replit/river/transport/ws/server';
import { createServer } from '@replit/river';
// start websocket server on port 3000
const httpServer = http.createServer();
const port = 3000;
const wss = new WebSocketServer({ server: httpServer });
const transport = new WebSocketServerTransport(wss, 'SERVER');
export const server = createServer(transport, {
example: ExampleService,
});
export type ServiceSurface = typeof server;
httpServer.listen(port);
In another file for the client (to create a separate entrypoint),
import { WebSocketClientTransport } from '@replit/river/transport/ws/client';
import { createClient } from '@replit/river';
import { WebSocket } from 'ws';
const transport = new WebSocketClientTransport(
async () => new WebSocket('ws://localhost:3000'),
'my-client-id',
);
const client = createClient(
transport,
'SERVER', // transport id of the server in the previous step
{ eagerlyConnect: true }, // whether to eagerly connect to the server on creation (optional argument)
);
// we get full type safety on `client`
// client.<service name>.<procedure name>.<procedure type>()
// e.g.
const result = await client.example.add.rpc({ n: 3 });
if (result.ok) {
const msg = result.payload;
console.log(msg.result); // 0 + 3 = 3
}
To add logging, you can bind a logging function to a transport.
import { coloredStringLogger } from '@replit/river/logging';
const transport = new WebSocketClientTransport(
async () => new WebSocket('ws://localhost:3000'),
'my-client-id',
);
transport.bindLogger(console.log);
// or
transport.bindLogger(coloredStringLogger);
You can define your own logging functions that satisfy the LogFn
type.
River defines two types of reconnects:
Hard reconnects are signaled via sessionStatus
events.
If your application is stateful on either the server or the client, the service consumer should wrap all the client-side setup with transport.addEventListener('sessionStatus', (evt) => ...)
to do appropriate setup and teardown.
transport.addEventListener('sessionStatus', (evt) => {
if (evt.status === 'connect') {
// do something
} else if (evt.status === 'disconnect') {
// do something else
}
});
// or, listen for specific session states
transport.addEventListener('sessionTransition', (evt) => {
if (evt.state === SessionState.Connected) {
// switch on various transition states
} else if (evt.state === SessionState.NoConnection) {
// do something
}
});
River allows you to extend the protocol-level handshake so you can add additional logic to validate incoming connections.
You can do this by passing extra options to createClient
and createServer
and extending the ParsedMetadata
interface:
declare module '@replit/river' {
interface ParsedMetadata {
userId: number;
}
}
const schema = Type.Object({ token: Type.String() });
createClient<typeof services>(new MockClientTransport('client'), 'SERVER', {
eagerlyConnect: false,
handshakeOptions: createClientHandshakeOptions(schema, async () => ({
// the type of this function is
// () => Static<typeof schema> | Promise<Static<typeof schema>>
token: '123',
})),
});
createServer(new MockServerTransport('SERVER'), services, {
handshakeOptions: createServerHandshakeOptions(
schema,
(metadata, previousMetadata) => {
// the type of this function is
// (metadata: Static<typeof<schema>, previousMetadata?: ParsedMetadata) =>
// | false | Promise<false> (if you reject it)
// | ParsedMetadata | Promise<ParsedMetadata> (if you allow it)
// next time a connection happens on the same session, previousMetadata will
// be populated with the last returned value
},
),
});
You can then access the ParsedMetadata
in your procedure handlers:
async handler(ctx, ...args) {
// this contains the parsed metadata
console.log(ctx.metadata)
}
We've also provided an end-to-end testing environment using Next.js
, and a simple backend connected with the WebSocket transport that you can play with on Replit.
You can find more service examples in the E2E test fixtures
npm i
-- install dependenciesnpm run check
-- lintnpm run format
-- formatnpm run test
-- run testsnpm run publish
-- cut a new release (should bump version in package.json first)FAQs
It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!
The npm package @replit/river receives a total of 10,149 weekly downloads. As such, @replit/river popularity was classified as popular.
We found that @replit/river demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 25 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
A supply chain attack on Rspack's npm packages injected cryptomining malware, potentially impacting thousands of developers.
Research
Security News
Socket researchers discovered a malware campaign on npm delivering the Skuld infostealer via typosquatted packages, exposing sensitive data.
Security News
Sonar’s acquisition of Tidelift highlights a growing industry shift toward sustainable open source funding, addressing maintainer burnout and critical software dependencies.