
Security News
The Hidden Blast Radius of the Axios Compromise
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.
@robot.com/rpc
Advanced tools
A fully type-safe, modular RPC framework powered by [NATS.IO](https://nats.io) for building distributed systems and microservices.
A fully type-safe, modular RPC framework powered by NATS.IO for building distributed systems and microservices.
npm install @robot.com/rpc
# or
pnpm add @robot.com/rpc
# or
yarn add @robot.com/rpc
# or
bun add @robot.com/rpc
import { defineProcedure } from '@robot.com/rpc/client'
import { z } from 'zod'
// Define your API procedures
export const api = {
// GET /users/:id
getUser: defineProcedure({
method: 'GET',
path: 'users.$id',
paramsSchema: z.object({ id: z.string() }),
outputSchema: z.object({
id: z.string(),
name: z.string(),
email: z.string()
})
}),
// POST /users
createUser: defineProcedure({
method: 'POST',
path: 'users',
inputSchema: z.object({
name: z.string(),
email: z.string()
}),
outputSchema: z.object({
id: z.string(),
name: z.string(),
email: z.string()
})
}),
// PUT /users/:id
updateUser: defineProcedure({
method: 'PUT',
path: 'users.$id',
paramsSchema: z.object({ id: z.string() }),
inputSchema: z.object({
name: z.string().optional(),
email: z.string().optional()
}),
outputSchema: z.object({
id: z.string(),
name: z.string(),
email: z.string()
})
})
}
import { Registry, startRpcNatsServer } from '@robot.com/rpc/server'
import { connect } from '@nats-io/transport-node'
import { jetstream } from '@nats-io/jetstream'
import { RPCError } from '@robot.com/rpc'
import { api } from './api'
// Create a registry to manage your procedures
const registry = new Registry({
initialContext: null,
middleware: async (_, req) => {
// Add authentication, logging, or other middleware logic
const authHeader = req.msg?.headers?.get('Authorization')
if (!authHeader || !authHeader.startsWith('Bearer ')) {
throw new RPCError('UNAUTHORIZED', 'Missing or invalid authorization token')
}
return {
userId: authHeader.replace('Bearer ', ''),
timestamp: new Date().toISOString()
}
}
})
// Implement your procedures
registry
.impl(api.getUser, async ({ ctx, params }) => {
// Fetch user from database
const user = await getUserFromDatabase(params.id)
if (!user) {
throw new RPCError('NOT_FOUND', 'User not found')
}
return { data: user }
})
.impl(api.createUser, async ({ ctx, input }) => {
// Create new user
const user = await createUserInDatabase(input)
return { data: user }
})
.impl(api.updateUser, async ({ ctx, params, input }) => {
// Update existing user
const user = await updateUserInDatabase(params.id, input)
if (!user) {
throw new RPCError('NOT_FOUND', 'User not found')
}
return { data: user }
})
// Setup NATS connection
const natsConnection = await connect({
servers: [process.env.NATS_URL!],
token: process.env.NATS_TOKEN
})
// Setup JetStream for reliable message delivery
const js = jetstream(natsConnection)
const manager = await js.jetstreamManager()
// Create stream for RPC requests
await manager.streams.add({
name: 'rpc_requests',
subjects: ['rpc.requests.>'],
retention: 'workqueue'
})
// Create consumer for processing requests
await manager.consumers.add('rpc_requests', {
name: 'handler',
durable_name: 'handler',
ack_policy: 'explicit'
})
// Start the RPC server
const server = await startRpcNatsServer({
nats: natsConnection,
streamName: 'rpc_requests',
consumerName: 'handler',
subjectPrefix: 'rpc.requests.',
registry
})
console.log('RPC Server started successfully')
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down RPC server...')
await server.stop()
await natsConnection.drain()
process.exit(0)
})
import { RPCClientNATS } from '@robot.com/rpc/client'
import { connect } from '@nats-io/transport-node'
import { api } from './api'
// Connect to NATS
const natsConnection = await connect({
servers: [process.env.NATS_URL!],
token: process.env.NATS_TOKEN
})
// Create RPC client
const client = new RPCClientNATS({
nats: natsConnection,
publishPrefix: 'rpc.requests.',
headers: {
Authorization: 'Bearer your-auth-token'
}
})
// Call your procedures
try {
// Get user
const user = await client.call(api.getUser, {
params: { id: '123' }
})
console.log('User:', user.name)
// Create user
const newUser = await client.call(api.createUser, {
input: {
name: 'John Doe',
email: 'john@example.com'
}
})
console.log('Created user:', newUser.id)
// Update user
const updatedUser = await client.call(api.updateUser, {
params: { id: '123' },
input: { name: 'Jane Doe' }
})
console.log('Updated user:', updatedUser.name)
} catch (error) {
if (error.code === 'NOT_FOUND') {
console.error('User not found')
} else if (error.code === 'UNAUTHORIZED') {
console.error('Authentication failed')
} else {
console.error('RPC call failed:', error.message)
}
}
// Clean up
await natsConnection.drain()
defineProcedure(options)Creates a new procedure definition with full type safety.
defineProcedure({
method: 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE',
path: string, // e.g., 'users.$id' or 'posts.$category.$id'
paramsSchema?: ZodSchema, // URL parameters validation
inputSchema?: ZodSchema, // Request body validation
outputSchema: ZodSchema // Response validation
})
Path Parameters:
$paramName for dynamic segments$method parameter maps HTTP methods to NATS subjects:
GET → getPOST → createPUT → doPATCH → updateDELETE → deleteRegistryManages procedure implementations and routing.
const registry = new Registry({
initialContext: InitialContextType,
middleware: (initialContext, request) => Promise<ContextType>
})
// Add implementations
registry.impl(procedure, handler)
// Merge registries
registry.merge(otherRegistry)
RPCClientNATSNATS-based RPC client implementation.
const client = new RPCClientNATS({
nats: NatsConnection,
publishPrefix: string,
headers?: Record<string, string>
})
// Make calls
await client.call(procedure, {
params?: ParamsType,
input?: InputType,
signal?: AbortSignal,
timeout?: number
})
The framework provides comprehensive error handling with standard HTTP status codes:
import { RPCError } from '@robot.com/rpc'
// Common error types
throw new RPCError('NOT_FOUND', 'Resource not found')
throw new RPCError('UNAUTHORIZED', 'Authentication required')
throw new RPCError('BAD_REQUEST', 'Invalid input data')
throw new RPCError('INTERNAL_SERVER_ERROR', 'Something went wrong')
// Custom error handling
try {
const result = await client.call(api.getUser, { params: { id: '123' } })
} catch (error) {
if (RPCError.isRPCError(error)) {
switch (error.code) {
case 'NOT_FOUND':
console.log('User not found')
break
case 'UNAUTHORIZED':
console.log('Please log in')
break
default:
console.log('Error:', error.message)
}
}
}
const registry = new Registry({
initialContext: { version: '1.0.0' },
middleware: async (initialContext, req) => {
// Add authentication
const token = req.msg?.headers?.get('Authorization')
const user = await validateToken(token)
// Add request metadata
return {
...initialContext,
user,
requestId: crypto.randomUUID(),
timestamp: new Date()
}
}
})
// Use context in handlers
registry.impl(api.getUser, async ({ ctx, params }) => {
console.log(`User ${ctx.user.id} requesting user ${params.id}`)
// ... handler logic
})
// Set custom timeout
const result = await client.call(api.getUser, {
params: { id: '123' },
timeout: 5000 // 5 seconds
})
// Use AbortController for cancellation
const controller = new AbortController()
setTimeout(() => controller.abort(), 3000)
try {
const result = await client.call(api.getUser, {
params: { id: '123' },
signal: controller.signal
})
} catch (error) {
if (error.message === 'Aborted') {
console.log('Request was cancelled')
}
}
// Process multiple requests concurrently
const userIds = ['1', '2', '3', '4', '5']
const userPromises = userIds.map(id =>
client.call(api.getUser, { params: { id } })
)
const users = await Promise.all(userPromises)
console.log(`Retrieved ${users.length} users`)
// Create modular registries
const userRegistry = new Registry({ /* ... */ })
.impl(api.getUser, getUserHandler)
.impl(api.createUser, createUserHandler)
const postRegistry = new Registry({ /* ... */ })
.impl(api.getPost, getPostHandler)
.impl(api.createPost, createPostHandler)
// Combine them
const mainRegistry = new Registry({ /* ... */ })
.merge(userRegistry)
.merge(postRegistry)
# NATS connection
NATS_URL=nats://localhost:4222
NATS_TOKEN=your-nats-token
# RPC configuration
RPC_STREAM_NAME=rpc_requests
RPC_SUBJECT_PREFIX=rpc.requests.
RPC_CONSUMER_NAME=handler
// Recommended stream settings for production
await manager.streams.add({
name: 'rpc_requests',
subjects: ['rpc.requests.>'],
retention: 'workqueue', // Ensures message delivery
max_msgs_per_subject: 1000, // Limit messages per subject
max_age: 3600000000000, // 1 hour TTL
storage: 'memory' // or 'file' for persistence
})
See the src/tests/ directory for comprehensive examples of:
This package is part of the Robot OSS project. Contributions are welcome!
MIT License - see LICENSE file for details.
FAQs
A fully type-safe, modular RPC framework powered by [NATS.IO](https://nats.io) for building distributed systems and microservices.
We found that @robot.com/rpc demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.

Research
A supply chain attack on Axios introduced a malicious dependency, plain-crypto-js@4.2.1, published minutes earlier and absent from the project’s GitHub releases.

Research
Malicious versions of the Telnyx Python SDK on PyPI delivered credential-stealing malware via a multi-stage supply chain attack.