Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@replit/river

Package Overview
Dependencies
Maintainers
32
Versions
162
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@replit/river

It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!

  • 0.15.5
  • npm
  • Socket score

Version published
Weekly downloads
4.2K
decreased by-61.01%
Maintainers
32
Weekly downloads
 
Created
Source

River

River allows multiple clients to connect to and make remote procedure calls to a remote server as if they were local procedures.

Long-lived streaming remote procedure calls

River provides a framework for long-lived streaming Remote Procedure Calls (RPCs) in modern web applications, featuring advanced error handling and customizable retry policies to ensure seamless communication between clients and servers.

River provides a framework similar to tRPC and gRPC but with additional features:

  • JSON Schema Support + run-time schema validation
  • full-duplex streaming
  • service multiplexing
  • result types and error handling
  • snappy DX (no code generation)
  • transparent reconnect support for long-lived sessions
  • over any transport (WebSockets and Unix Domain Socket out of the box)

See PROTOCOL.md for more information on the protocol.

Prerequisites

Before proceeding, ensure you have TypeScript 5 installed and configured appropriately:

  1. Ensure "moduleResolution": "bundler" in tsconfig.json:

    {
      "compilerOptions": {
        "moduleResolution": "bundler"
        // Other compiler options...
      }
    }
    

    If it exists but is set to a different value, modify it to "bundler".

  2. Install River and Dependencies:

To use River, install the required packages using npm:

  npm i @replit/river @sinclair/typebox
  1. If you plan on using WebSocket for the underlying transport, also install
npm i ws isomorphic-ws

Writing services

Concepts

  • Router: a collection of services, namespaced by service name.
  • Service: a collection of procedures with a shared state.
  • Procedure: a single procedure. A procedure declares its type, an input message type, an output message type, optionally an error type, and the associated handler. Valid types are:
    • rpc whose handler has a signature of Input -> Result<Output, Error>.
    • upload whose handler has a signature of AsyncIterableIterator<Input> -> Result<Output, Error>.
    • subscription whose handler has a signature of Input -> Pushable<Result<Output, Error>>.
    • stream whose handler has a signature of AsyncIterableIterator<Input> -> Pushable<Result<Output, Error>>.
  • Transport: manages the lifecycle (creation/deletion) of connections and multiplexing read/writes from clients. Both the client and the server must be passed in a subclass of Transport to work.
    • Connection: the actual raw underlying transport connection
    • Session: a higher-level abstraction that operates over the span of potentially multiple transport-level connections
  • Codec: encodes messages between clients/servers before the transport sends it across the wire.

A basic router

First, we create a service using the ServiceBuilder

import { ServiceBuilder, Ok, buildServiceDefs } from '@replit/river';
import { Type } from '@sinclair/typebox';

export const ExampleServiceConstructor = () =>
  ServiceBuilder.create('example')
    // initializer for shared state
    .initialState({
      count: 0,
    })
    .defineProcedure('add', {
      type: 'rpc',
      input: Type.Object({ n: Type.Number() }),
      output: Type.Object({ result: Type.Number() }),
      errors: Type.Never(),
      // note that a handler is unique per user RPC
      async handler(ctx, { n }) {
        // access and mutate shared state
        ctx.state.count += n;
        return Ok({ result: ctx.state.count });
      },
    })
    .finalize();

// expore a listing of all the services that we have
export const serviceDefs = buildServiceDefs([ExampleServiceConstructor()]);

Then, we create the server:

import http from 'http';
import { WebSocketServer } from 'ws';
import { WebSocketServerTransport } from '@replit/river/transport/ws/server';
import { createServer } from '@replit/river';

// start websocket server on port 3000
const httpServer = http.createServer();
const port = 3000;
const wss = new WebSocketServer({ server: httpServer });
const transport = new WebSocketServerTransport(wss, 'SERVER');

export const server = createServer(transport, serviceDefs);
export type ServiceSurface = typeof server;

httpServer.listen(port);

In another file for the client (to create a separate entrypoint),

import WebSocket from 'isomorphic-ws';
import { WebSocketClientTransport } from '@replit/river/transport/ws/client';
import { createClient } from '@replit/river';
import type ServiceSurface from './server';

const websocketUrl = `ws://localhost:3000`;
const transport = new WebSocketClientTransport(
  async () => new WebSocket(websocketUrl),
  'my-client-id',
);

const client = createClient<ServiceSurface>(
  transport,
  'SERVER', // transport id of the server in the previous step
  true, // whether to eagerly connect to the server on creation (optional argument)
);

// we get full type safety on `client`
// client.<service name>.<procedure name>.<procedure type>()
// e.g.
const result = await client.example.add.rpc({ n: 3 });
if (result.ok) {
  const msg = result.payload;
  console.log(msg.result); // 0 + 3 = 3
}

Logging

To add logging,

import { bindLogger, setLevel } from '@replit/river/logging';

bindLogger(console.log);
setLevel('info');

Connection status

River defines two types of reconnects:

  1. Transparent reconnects: These occur when the connection is temporarily lost and reestablished without losing any messages. From the application's perspective, this process is seamless and does not disrupt ongoing operations.
  2. Hard reconnect: This occurs when all server state is lost, requiring the client to reinitialize anything stateful (e.g. subscriptions).

You can listen for transparent reconnects via the connectionStatus events, but realistically, no applications should need to listen for this unless it is for debugging purposes. Hard reconnects are signaled via sessionStatus events.

If your application is stateful on either the server or the client, the service consumer should wrap all the client-side setup with transport.addEventListener('sessionStatus', (evt) => ...) to do appropriate setup and teardown.

transport.addEventListener('connectionStatus', (evt) => {
  if (evt.status === 'connect') {
    // do something
  } else if (evt.status === 'disconnect') {
    // do something else
  }
});

transport.addEventListener('sessionStatus', (evt) => {
  if (evt.status === 'connect') {
    // do something
  } else if (evt.status === 'disconnect') {
    // do something else
  }
});

Further examples

We've also provided an end-to-end testing environment using Next.js, and a simple backend connected with the WebSocket transport that you can play with on Replit.

You can find more service examples in the E2E test fixtures

Developing

Run on Repl.it

  • npm i -- install dependencies
  • npm run check -- lint
  • npm run format -- format
  • npm run test -- run tests
  • npm run publish -- cut a new release (should bump version in package.json first)

Keywords

FAQs

Package last updated on 11 Apr 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc