
Research
2025 Report: Destructive Malware in Open Source Packages
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.
@durable-streams/state
Advanced tools
Building blocks for transmitting structured state over Durable Streams. Use these primitives for any real-time protocol: AI token streams, presence updates, collaborative editing, or database sync.
pnpm add @durable-streams/state
This package provides flexible primitives for streaming structured state. You choose how much structure you need:
Stream whatever state your protocol requires.
Stream structured JSON and query current values:
import { MaterializedState } from "@durable-streams/state"
const state = new MaterializedState()
// Apply any structured change
state.apply({
type: "token",
key: "stream-1",
value: { content: "Hello", model: "claude-3" },
headers: { operation: "insert" },
})
// Query current state
const token = state.get("token", "stream-1")
const allTokens = state.getType("token")
Add schemas and validation for structured entities:
import { createStateSchema, createStreamDB } from "@durable-streams/state"
// Define your schema
const schema = createStateSchema({
users: {
schema: userSchema, // Standard Schema validator
type: "user", // Event type field
primaryKey: "id", // Primary key field name
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})
// Create a stream-backed database
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
},
state: schema,
})
// Load initial data
await db.preload()
// Reactive queries with useLiveQuery
import { useLiveQuery } from "@tanstack/react-db" // or solid-db, vue-db
import { eq } from "@tanstack/db"
const userQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.id, "123"))
.findOne()
)
const allUsersQuery = useLiveQuery((q) =>
q.from({ users: db.collections.users })
)
The Durable Streams State Protocol defines a standard format for state change events:
insert, update, delete operations on entitiessnapshot-start, snapshot-end, reset signalsSee STATE-PROTOCOL.md for the full specification.
Simple in-memory state container for basic use cases:
import { MaterializedState } from "@durable-streams/state"
const state = new MaterializedState()
// Apply change events
state.apply({
type: "user",
key: "1",
value: { name: "Kyle" },
headers: { operation: "insert" },
})
// Query state
const user = state.get("user", "1")
const allUsers = state.getType("user")
Stream-backed database with TanStack DB collections. Provides reactive queries, subscriptions, and optimistic updates.
Define your application state structure:
const schema = createStateSchema({
users: {
schema: userSchema, // Standard Schema validator
type: "user", // Event type for routing
primaryKey: "id", // Field to use as primary key
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})
Uses Standard Schema for validation, supporting multiple libraries:
// Zod
import { z } from "zod"
const userSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
// Valibot
import * as v from "valibot"
const userSchema = v.object({
id: v.string(),
name: v.string(),
email: v.pipe(v.string(), v.email()),
})
// Manual Standard Schema
const userSchema = {
"~standard": {
version: 1,
vendor: "my-app",
validate: (value) => {
// Your validation logic
if (isValid(value)) {
return { value }
}
return { issues: [{ message: "Invalid user" }] }
},
},
}
Schema provides typed event creation helpers:
// Insert
const insertEvent = schema.users.insert({
value: { id: "1", name: "Kyle", email: "kyle@example.com" },
key: "1", // Optional, defaults to value[primaryKey]
})
// Update
const updateEvent = schema.users.update({
value: { id: "1", name: "Kyle Mathews", email: "kyle@example.com" },
oldValue: { id: "1", name: "Kyle", email: "kyle@example.com" }, // Optional
})
// Delete
const deleteEvent = schema.users.delete({
key: "1",
oldValue: { id: "1", name: "Kyle", email: "kyle@example.com" }, // Optional
})
// Custom headers
const eventWithTxId = schema.users.insert({
value: { id: "1", name: "Kyle" },
headers: {
txid: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
// All DurableStream options supported
headers: { Authorization: "Bearer token" },
batching: true,
},
state: schema,
})
// The stream is created lazily when preload() is called
await db.preload()
StreamDB collections are TanStack DB collections. Use TanStack DB's query builder for filtering, sorting, aggregation, and joins with differential dataflow - dramatically faster than JavaScript filtering:
import { useLiveQuery } from "@tanstack/[framework]-db" // react-db, solid-db, etc
import { eq, gt, and, count } from "@tanstack/db"
// Simple collection access
const query = useLiveQuery((q) => q.from({ users: db.collections.users }))
// Filtering with WHERE
const activeQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.active, true))
)
// Complex conditions
const query = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => and(eq(users.active, true), gt(users.age, 18)))
)
// Sorting and limiting
const topUsersQuery = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.orderBy(({ users }) => users.lastSeen, "desc")
.limit(10)
)
// Aggregation with GROUP BY and ordering
const langStatsQuery = useLiveQuery((q) => {
const languageCounts = q
.from({ events: db.collections.events })
.groupBy(({ events }) => events.language)
.select(({ events }) => ({
language: events.language,
total: count(events.id),
}))
return q
.from({ stats: languageCounts })
.orderBy(({ stats }) => stats.total, "desc")
})
// Joins across collections
const query = useLiveQuery((q) =>
q
.from({ messages: db.collections.messages })
.join({ users: db.collections.users }, ({ messages, users }) =>
eq(messages.userId, users.id)
)
.select(({ messages, users }) => ({
messageId: messages.id,
text: messages.text,
userName: users.name,
}))
)
Why use the query builder?
Framework integration: See TanStack DB docs for framework-specific guides:
// Load all data until up-to-date
await db.preload()
// Stop syncing and cleanup
db.close()
// Wait for a transaction to be confirmed
await db.utils.awaitTxId("txid-uuid", 5000) // 5 second timeout
Define actions with optimistic updates and server confirmation:
const db = createStreamDB({
streamOptions: { url: streamUrl, contentType: "application/json" },
state: schema,
actions: ({ db, stream }) => ({
addUser: {
// Optimistic update (runs immediately)
onMutate: (user) => {
db.collections.users.insert(user)
},
// Server mutation (runs async)
mutationFn: async (user) => {
const txid = crypto.randomUUID()
await stream.append(
schema.users.insert({
value: user,
headers: { txid },
})
)
// Wait for confirmation
await db.utils.awaitTxId(txid)
},
},
updateUser: {
onMutate: ({ id, updates }) => {
db.collections.users.update(id, (draft) => {
Object.assign(draft, updates)
})
},
mutationFn: async ({ id, updates }) => {
const txid = crypto.randomUUID()
const current = await db.collections.users.get(id)
await stream.append(
schema.users.update({
value: { ...current, ...updates },
oldValue: current,
headers: { txid },
})
)
await db.utils.awaitTxId(txid)
},
},
}),
})
// Call actions
await db.actions.addUser({ id: "1", name: "Kyle", email: "kyle@example.com" })
await db.actions.updateUser({ id: "1", updates: { name: "Kyle Mathews" } })
Use TanStack DB's framework adapters for reactive queries:
import { useLiveQuery } from '@tanstack/react-db'
import { eq } from '@tanstack/db'
function UserProfile({ userId }: { userId: string }) {
const userQuery = useLiveQuery((q) =>
q.from({ users: db.collections.users })
.where(({ users }) => eq(users.id, userId))
.findOne()
)
if (userQuery.isLoading()) return <div>Loading...</div>
if (!userQuery.data) return <div>Not found</div>
return (
<div>
<h1>{userQuery.data.name}</h1>
<p>{userQuery.data.email}</p>
</div>
)
}
See @tanstack/react-db docs for more.
import { useLiveQuery } from '@tanstack/solid-db'
import { eq } from '@tanstack/db'
function MessageList() {
const messagesQuery = useLiveQuery((q) =>
q.from({ messages: db.collections.messages })
.orderBy(({ messages }) => messages.timestamp, 'desc')
.limit(50)
)
return (
<For each={messagesQuery.data}>
{(message) => <MessageCard message={message} />}
</For>
)
}
See @tanstack/solid-db docs for more.
const schema = createStateSchema({
config: {
schema: configSchema,
type: "config",
primaryKey: "key",
},
})
// Set value
await stream.append(
schema.config.insert({
value: { key: "theme", value: "dark" },
})
)
// Query value reactively
const themeQuery = useLiveQuery((q) =>
q
.from({ config: db.collections.config })
.where(({ config }) => eq(config.key, "theme"))
.findOne()
)
const schema = createStateSchema({
presence: {
schema: presenceSchema,
type: "presence",
primaryKey: "userId",
},
})
// Update presence
await stream.append(
schema.presence.update({
value: {
userId: "kyle",
status: "online",
lastSeen: Date.now(),
},
})
)
// Query presence with TanStack DB
const presenceQuery = useLiveQuery((q) =>
q
.from({ presence: db.collections.presence })
.where(({ presence }) => eq(presence.status, "online"))
)
const schema = createStateSchema({
users: { schema: userSchema, type: "user", primaryKey: "id" },
messages: { schema: messageSchema, type: "message", primaryKey: "id" },
reactions: { schema: reactionSchema, type: "reaction", primaryKey: "id" },
typing: { schema: typingSchema, type: "typing", primaryKey: "userId" },
})
// Different types coexist in the same stream
await stream.append(schema.users.insert({ value: user }))
await stream.append(schema.messages.insert({ value: message }))
await stream.append(schema.reactions.insert({ value: reaction }))
StreamDB requires object values (not primitives) for the primary key pattern:
// ❌ Won't work
{ type: 'count', key: 'views', value: 42 }
// ✅ Works
{ type: 'count', key: 'views', value: { count: 42 } }
useEffect(() => {
const db = createStreamDB({ streamOptions, state: schema })
return () => db.close() // Cleanup on unmount
}, [])
Use Standard Schema to validate data at system boundaries:
const userSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
age: z.number().min(0).max(150),
})
For critical operations, always use transaction IDs to ensure confirmation:
const txid = crypto.randomUUID()
await stream.append(schema.users.insert({ value: user, headers: { txid } }))
await db.utils.awaitTxId(txid, 10000) // Wait up to 10 seconds
try {
await db.actions.addUser(user)
} catch (error) {
if (error.message.includes("Timeout")) {
// Handle timeout
} else {
// Handle other errors
}
}
export type Operation = "insert" | "update" | "delete"
export interface ChangeEvent<T = unknown> {
type: string
key: string
value?: T
old_value?: T
headers: ChangeHeaders
}
export interface ChangeHeaders {
operation: Operation
txid?: string
timestamp?: string
}
export interface ControlEvent {
headers: {
control: "snapshot-start" | "snapshot-end" | "reset"
offset?: string
}
}
export type StateEvent<T = unknown> = ChangeEvent<T> | ControlEvent
// Create a state schema with typed collections and event helpers
export function createStateSchema<
T extends Record<string, CollectionDefinition>,
>(collections: T): StateSchema<T>
// Create a stream-backed database
export async function createStreamDB<
TDef extends StreamStateDefinition,
TActions extends Record<string, ActionDefinition<any>>,
>(
options: CreateStreamDBOptions<TDef, TActions>
): Promise<StreamDB<TDef> | StreamDBWithActions<TDef, TActions>>
export class MaterializedState {
apply(event: ChangeEvent): void
applyBatch(events: ChangeEvent[]): void
get<T>(type: string, key: string): T | undefined
getType(type: string): Map<string, unknown>
clear(): void
readonly typeCount: number
readonly types: string[]
}
Apache-2.0
FAQs
State change event protocol for Durable Streams
The npm package @durable-streams/state receives a total of 206 weekly downloads. As such, @durable-streams/state popularity was classified as not popular.
We found that @durable-streams/state demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.

Security News
Socket CTO Ahmad Nassri shares practical AI coding techniques, tools, and team workflows, plus what still feels noisy and why shipping remains human-led.

Research
/Security News
A five-month operation turned 27 npm packages into durable hosting for browser-run lures that mimic document-sharing portals and Microsoft sign-in, targeting 25 organizations across manufacturing, industrial automation, plastics, and healthcare for credential theft.