@durable-streams/state
Building blocks for transmitting structured state over Durable Streams. Use these primitives for any real-time protocol: AI token streams, presence updates, collaborative editing, or database sync.
Installation
pnpm add @durable-streams/state
Overview
This package provides flexible primitives for streaming structured state. You choose how much structure you need:
- Simple state updates: Stream JSON payloads and track current values
- Typed collections: Add schemas and primary keys for structured entities
- Reactive queries: Build on TanStack DB for subscriptions and optimistic updates
Stream whatever state your protocol requires.
Quick Start
Simple State
Stream structured JSON and query current values:
import { MaterializedState } from "@durable-streams/state"
const state = new MaterializedState()
state.apply({
type: "token",
key: "stream-1",
value: { content: "Hello", model: "claude-3" },
headers: { operation: "insert" },
})
const token = state.get("token", "stream-1")
const allTokens = state.getType("token")
Typed Collections
Add schemas and validation for structured entities:
import { createStateSchema, createStreamDB } from "@durable-streams/state"
const schema = createStateSchema({
users: {
schema: userSchema,
type: "user",
primaryKey: "id",
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
},
state: schema,
})
await db.preload()
import { useLiveQuery } from "@tanstack/react-db"
import { eq } from "@tanstack/db"
const userQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.id, "123"))
.findOne()
)
const allUsersQuery = useLiveQuery((q) =>
q.from({ users: db.collections.users })
)
Core Concepts
State Protocol
The Durable Streams State Protocol defines a standard format for state change events:
- Change Events:
insert, update, delete operations on entities
- Control Events:
snapshot-start, snapshot-end, reset signals
- Entity Types: Discriminator field that routes events to collections
- Primary Keys: Unique identifiers extracted from entity values
See STATE-PROTOCOL.md for the full specification.
MaterializedState
Simple in-memory state container for basic use cases:
import { MaterializedState } from "@durable-streams/state"
const state = new MaterializedState()
state.apply({
type: "user",
key: "1",
value: { name: "Kyle" },
headers: { operation: "insert" },
})
const user = state.get("user", "1")
const allUsers = state.getType("user")
StreamDB
Stream-backed database with TanStack DB collections. Provides reactive queries, subscriptions, and optimistic updates.
Schema Definition
createStateSchema()
Define your application state structure:
const schema = createStateSchema({
users: {
schema: userSchema,
type: "user",
primaryKey: "id",
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})
Standard Schema Support
Uses Standard Schema for validation, supporting multiple libraries:
import { z } from "zod"
const userSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
import * as v from "valibot"
const userSchema = v.object({
id: v.string(),
name: v.string(),
email: v.pipe(v.string(), v.email()),
})
const userSchema = {
"~standard": {
version: 1,
vendor: "my-app",
validate: (value) => {
if (isValid(value)) {
return { value }
}
return { issues: [{ message: "Invalid user" }] }
},
},
}
Event Helpers
Schema provides typed event creation helpers:
const insertEvent = schema.users.insert({
value: { id: "1", name: "Kyle", email: "kyle@example.com" },
key: "1",
})
const updateEvent = schema.users.update({
value: { id: "1", name: "Kyle Mathews", email: "kyle@example.com" },
oldValue: { id: "1", name: "Kyle", email: "kyle@example.com" },
})
const deleteEvent = schema.users.delete({
key: "1",
oldValue: { id: "1", name: "Kyle", email: "kyle@example.com" },
})
const eventWithTxId = schema.users.insert({
value: { id: "1", name: "Kyle" },
headers: {
txid: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
StreamDB
Creating a Database
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
headers: { Authorization: "Bearer token" },
batching: true,
},
state: schema,
})
await db.preload()
Reactive Queries with TanStack DB
StreamDB collections are TanStack DB collections. Use TanStack DB's query builder for filtering, sorting, aggregation, and joins with differential dataflow - dramatically faster than JavaScript filtering:
import { useLiveQuery } from "@tanstack/[framework]-db"
import { eq, gt, and, count } from "@tanstack/db"
const query = useLiveQuery((q) => q.from({ users: db.collections.users }))
const activeQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.active, true))
)
const query = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => and(eq(users.active, true), gt(users.age, 18)))
)
const topUsersQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.orderBy(({ users }) => users.lastSeen, "desc")
.limit(10)
)
const langStatsQuery = useLiveQuery((q) => {
const languageCounts = q
.from({ events: db.collections.events })
.groupBy(({ events }) => events.language)
.select(({ events }) => ({
language: events.language,
total: count(events.id),
}))
return q
.from({ stats: languageCounts })
.orderBy(({ stats }) => stats.total, "desc")
})
const query = useLiveQuery((q) =>
q
.from({ messages: db.collections.messages })
.join({ users: db.collections.users }, ({ messages, users }) =>
eq(messages.userId, users.id)
)
.select(({ messages, users }) => ({
messageId: messages.id,
text: messages.text,
userName: users.name,
}))
)
Why use the query builder?
- Differential dataflow: Incremental updates only recompute affected results
- Dramatically faster: Push filtering/sorting into the DB engine vs JavaScript
- Reactive: Queries automatically update when data changes
- Type-safe: Full TypeScript support with autocomplete
Framework integration: See TanStack DB docs for framework-specific guides:
Lifecycle Methods
await db.preload()
db.close()
await db.utils.awaitTxId("txid-uuid", 5000)
Optimistic Actions
Define actions with optimistic updates and server confirmation:
const db = createStreamDB({
streamOptions: { url: streamUrl, contentType: "application/json" },
state: schema,
actions: ({ db, stream }) => ({
addUser: {
onMutate: (user) => {
db.collections.users.insert(user)
},
mutationFn: async (user) => {
const txid = crypto.randomUUID()
await stream.append(
schema.users.insert({
value: user,
headers: { txid },
})
)
await db.utils.awaitTxId(txid)
},
},
updateUser: {
onMutate: ({ id, updates }) => {
db.collections.users.update(id, (draft) => {
Object.assign(draft, updates)
})
},
mutationFn: async ({ id, updates }) => {
const txid = crypto.randomUUID()
const current = await db.collections.users.get(id)
await stream.append(
schema.users.update({
value: { ...current, ...updates },
oldValue: current,
headers: { txid },
})
)
await db.utils.awaitTxId(txid)
},
},
}),
})
await db.actions.addUser({ id: "1", name: "Kyle", email: "kyle@example.com" })
await db.actions.updateUser({ id: "1", updates: { name: "Kyle Mathews" } })
Framework Integration
Use TanStack DB's framework adapters for reactive queries:
React
import { useLiveQuery } from '@tanstack/react-db'
import { eq } from '@tanstack/db'
function UserProfile({ userId }: { userId: string }) {
const userQuery = useLiveQuery((q) =>
q.from({ users: db.collections.users })
.where(({ users }) => eq(users.id, userId))
.findOne()
)
if (userQuery.isLoading()) return <div>Loading...</div>
if (!userQuery.data) return <div>Not found</div>
return (
<div>
<h1>{userQuery.data.name}</h1>
<p>{userQuery.data.email}</p>
</div>
)
}
See @tanstack/react-db docs for more.
Solid.js
import { useLiveQuery } from '@tanstack/solid-db'
import { eq } from '@tanstack/db'
function MessageList() {
const messagesQuery = useLiveQuery((q) =>
q.from({ messages: db.collections.messages })
.orderBy(({ messages }) => messages.timestamp, 'desc')
.limit(50)
)
return (
<For each={messagesQuery.data}>
{(message) => <MessageCard message={message} />}
</For>
)
}
See @tanstack/solid-db docs for more.
Common Patterns
Key/Value Store
const schema = createStateSchema({
config: {
schema: configSchema,
type: "config",
primaryKey: "key",
},
})
await stream.append(
schema.config.insert({
value: { key: "theme", value: "dark" },
})
)
const themeQuery = useLiveQuery((q) =>
q
.from({ config: db.collections.config })
.where(({ config }) => eq(config.key, "theme"))
.findOne()
)
Presence Tracking
const schema = createStateSchema({
presence: {
schema: presenceSchema,
type: "presence",
primaryKey: "userId",
},
})
await stream.append(
schema.presence.update({
value: {
userId: "kyle",
status: "online",
lastSeen: Date.now(),
},
})
)
const presenceQuery = useLiveQuery((q) =>
q
.from({ presence: db.collections.presence })
.where(({ presence }) => eq(presence.status, "online"))
)
Multi-Type Chat Room
const schema = createStateSchema({
users: { schema: userSchema, type: "user", primaryKey: "id" },
messages: { schema: messageSchema, type: "message", primaryKey: "id" },
reactions: { schema: reactionSchema, type: "reaction", primaryKey: "id" },
typing: { schema: typingSchema, type: "typing", primaryKey: "userId" },
})
await stream.append(schema.users.insert({ value: user }))
await stream.append(schema.messages.insert({ value: message }))
await stream.append(schema.reactions.insert({ value: reaction }))
Best Practices
1. Use Object Values
StreamDB requires object values (not primitives) for the primary key pattern:
{ type: 'count', key: 'views', value: 42 }
{ type: 'count', key: 'views', value: { count: 42 } }
2. Always Call close()
useEffect(() => {
const db = createStreamDB({ streamOptions, state: schema })
return () => db.close()
}, [])
3. Validate at Boundaries
Use Standard Schema to validate data at system boundaries:
const userSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
age: z.number().min(0).max(150),
})
4. Use Transaction IDs
For critical operations, always use transaction IDs to ensure confirmation:
const txid = crypto.randomUUID()
await stream.append(schema.users.insert({ value: user, headers: { txid } }))
await db.utils.awaitTxId(txid, 10000)
5. Handle Errors Gracefully
try {
await db.actions.addUser(user)
} catch (error) {
if (error.message.includes("Timeout")) {
} else {
}
}
API Reference
Types
export type Operation = "insert" | "update" | "delete"
export interface ChangeEvent<T = unknown> {
type: string
key: string
value?: T
old_value?: T
headers: ChangeHeaders
}
export interface ChangeHeaders {
operation: Operation
txid?: string
timestamp?: string
}
export interface ControlEvent {
headers: {
control: "snapshot-start" | "snapshot-end" | "reset"
offset?: string
}
}
export type StateEvent<T = unknown> = ChangeEvent<T> | ControlEvent
Functions
export function createStateSchema<
T extends Record<string, CollectionDefinition>,
>(collections: T): StateSchema<T>
export async function createStreamDB<
TDef extends StreamStateDefinition,
TActions extends Record<string, ActionDefinition<any>>,
>(
options: CreateStreamDBOptions<TDef, TActions>
): Promise<StreamDB<TDef> | StreamDBWithActions<TDef, TActions>>
Classes
export class MaterializedState {
apply(event: ChangeEvent): void
applyBatch(events: ChangeEvent[]): void
get<T>(type: string, key: string): T | undefined
getType(type: string): Map<string, unknown>
clear(): void
readonly typeCount: number
readonly types: string[]
}
License
Apache-2.0
Learn More