
Product
Introducing Socket Firewall Enterprise: Flexible, Configurable Protection for Modern Package Ecosystems
Socket Firewall Enterprise is now available with flexible deployment, configurable policies, and expanded language support.
@replit/river
Advanced tools
It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!
⚠️ Not production ready, while Replit is using parts of River in production, we are still going through rapid breaking changes. First production ready version will be 1.x.x ⚠️
River allows multiple clients to connect to and make remote procedure calls to a remote server as if they were local procedures.
River provides a framework for long-lived streaming Remote Procedure Calls (RPCs) in modern web applications, featuring advanced error handling and customizable retry policies to ensure seamless communication between clients and servers.
River provides a framework similar to tRPC and gRPC but with additional features:
See PROTOCOL.md for more information on the protocol.
Before proceeding, ensure you have TypeScript 5 installed and configured appropriately:
Ensure your tsconfig.json is configured correctly:
You must verify that:
compilerOptions.moduleResolution is set to "bundler"compilerOptions.strictFunctionTypes is set to truecompilerOptions.strictNullChecks is set to trueor, preferably, that:
compilerOptions.moduleResolution is set to "bundler"compilerOptions.strict is set to trueLike so:
{
"compilerOptions": {
"moduleResolution": "bundler",
"strict": true
// Other compiler options...
}
}
If these options already exist in your tsconfig.json and don't match what is shown above, modify them. River is designed for "strict": true, but technically only strictFunctionTypes and strictNullChecks being set to true is required. Failing to set these will cause unresolvable type errors when defining services.
Install River and Dependencies:
To use River, install the required packages using npm:
npm i @replit/river @sinclair/typebox
rpc, single request, single responseupload, multiple requests, single responsesubscription, single request, multiple responsesstream, multiple requests, multiple responseTransport to work.
First, we create a service using ServiceSchema:
import { ServiceSchema, Procedure, Ok } from '@replit/river';
import { Type } from '@sinclair/typebox';
export const ExampleService = ServiceSchema.define(
// configuration
{
// initializer for shared state
initializeState: () => ({ count: 0 }),
},
// procedures
{
add: Procedure.rpc({
requestInit: Type.Object({ n: Type.Number() }),
responseData: Type.Object({ result: Type.Number() }),
requestErrors: Type.Never(),
// note that a handler is unique per user RPC
async handler({ ctx, reqInit: { n } }) {
// access and mutate shared state
ctx.state.count += n;
return Ok({ result: ctx.state.count });
},
}),
},
);
Then, we create the server:
import http from 'http';
import { WebSocketServer } from 'ws';
import { WebSocketServerTransport } from '@replit/river/transport/ws/server';
import { createServer } from '@replit/river';
// start websocket server on port 3000
const httpServer = http.createServer();
const port = 3000;
const wss = new WebSocketServer({ server: httpServer });
const transport = new WebSocketServerTransport(wss, 'SERVER');
export const server = createServer(transport, {
example: ExampleService,
});
export type ServiceSurface = typeof server;
httpServer.listen(port);
In another file for the client (to create a separate entrypoint),
import { WebSocketClientTransport } from '@replit/river/transport/ws/client';
import { createClient } from '@replit/river';
import { WebSocket } from 'ws';
const transport = new WebSocketClientTransport(
async () => new WebSocket('ws://localhost:3000'),
'my-client-id',
);
const client = createClient(
transport,
'SERVER', // transport id of the server in the previous step
{ eagerlyConnect: true }, // whether to eagerly connect to the server on creation (optional argument)
);
// we get full type safety on `client`
// client.<service name>.<procedure name>.<procedure type>()
// e.g.
const result = await client.example.add.rpc({ n: 3 });
if (result.ok) {
const msg = result.payload;
console.log(msg.result); // 0 + 3 = 3
}
To add logging, you can bind a logging function to a transport.
import { coloredStringLogger } from '@replit/river/logging';
const transport = new WebSocketClientTransport(
async () => new WebSocket('ws://localhost:3000'),
'my-client-id',
);
transport.bindLogger(console.log);
// or
transport.bindLogger(coloredStringLogger);
You can define your own logging functions that satisfy the LogFn type.
River defines two types of reconnects:
Hard reconnects are signaled via sessionStatus events.
If your application is stateful on either the server or the client, the service consumer should wrap all the client-side setup with transport.addEventListener('sessionStatus', (evt) => ...) to do appropriate setup and teardown.
transport.addEventListener('sessionStatus', (evt) => {
if (evt.status === 'created') {
// do something
} else if (evt.status === 'closing') {
// do other things
} else if (evt.status === 'closed') {
// note that evt.session only has id + to
// this is useful for doing things like creating a new session if
// a session just got yanked
}
});
// or, listen for specific session states
transport.addEventListener('sessionTransition', (evt) => {
if (evt.state === SessionState.Connected) {
// switch on various transition states
} else if (evt.state === SessionState.NoConnection) {
// do something
}
});
River allows you to extend the protocol-level handshake so you can add additional logic to validate incoming connections.
You can do this by passing extra options to createClient and createServer and extending the ParsedMetadata interface:
declare module '@replit/river' {
interface ParsedMetadata {
userId: number;
}
}
const schema = Type.Object({ token: Type.String() });
createClient<typeof services>(new MockClientTransport('client'), 'SERVER', {
eagerlyConnect: false,
handshakeOptions: createClientHandshakeOptions(schema, async () => ({
// the type of this function is
// () => Static<typeof schema> | Promise<Static<typeof schema>>
token: '123',
})),
});
createServer(new MockServerTransport('SERVER'), services, {
handshakeOptions: createServerHandshakeOptions(
schema,
(metadata, previousMetadata) => {
// the type of this function is
// (metadata: Static<typeof<schema>, previousMetadata?: ParsedMetadata) =>
// | false | Promise<false> (if you reject it)
// | ParsedMetadata | Promise<ParsedMetadata> (if you allow it)
// next time a connection happens on the same session, previousMetadata will
// be populated with the last returned value
},
),
});
You can then access the ParsedMetadata in your procedure handlers:
async handler(ctx, ...args) {
// this contains the parsed metadata
console.log(ctx.metadata)
}
We've also provided an end-to-end testing environment using Next.js, and a simple backend connected with the WebSocket transport that you can play with on Replit.
You can find more service examples in the E2E test fixtures
npm i -- install dependenciesnpm run check -- lintnpm run format -- formatnpm run test -- run testsnpm run release -- cut a new release (should bump version in package.json first)River uses an automated release process with Release Drafter for version management and NPM publishing.
Merge PRs to main - Release Drafter automatically:
When ready to release, create a version bump PR:
package.json and package-lock.json. You can run pnpm version --no-git-tag-version <version> to bump the version.patch - Bug fixes, small improvements (e.g., 0.208.4 → 0.208.5)minor - New features, backwards compatible (e.g., 0.208.4 → 0.209.0)major - Breaking changes (e.g., 0.208.4 → 1.0.0)Publish the GitHub release:
v0.209.0)Automation takes over:
river package is published to NPMManual npm release:
npm run release locallyFAQs
It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!
The npm package @replit/river receives a total of 5,315 weekly downloads. As such, @replit/river popularity was classified as popular.
We found that @replit/river demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 22 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Socket Firewall Enterprise is now available with flexible deployment, configurable policies, and expanded language support.

Security News
Open source dashboard CNAPulse tracks CVE Numbering Authorities’ publishing activity, highlighting trends and transparency across the CVE ecosystem.

Product
Detect malware, unsafe data flows, and license issues in GitHub Actions with Socket’s new workflow scanning support.