@durable-streams/client
TypeScript client for the Durable Streams protocol.
Installation
npm install @durable-streams/client
Overview
The Durable Streams client provides three main APIs:
stream() function - A fetch-like read-only API for consuming streams
DurableStream class - A handle for read/write operations on a stream
IdempotentProducer class - High-throughput producer with exactly-once write semantics (recommended for writes)
Key Features
- Exactly-Once Writes:
IdempotentProducer provides Kafka-style exactly-once semantics with automatic deduplication
- Automatic Batching: Multiple writes are automatically batched together for high throughput
- Pipelining: Up to 5 concurrent batches in flight by default for maximum throughput
- Streaming Reads:
stream() and DurableStream.stream() provide rich consumption options (promises, ReadableStreams, subscribers)
- Resumable: Offset-based reads let you resume from any point
- Real-time: Long-poll and SSE modes for live tailing with catch-up from any offset
Usage
Read-only: Using stream() (fetch-like API)
The stream() function provides a simple, fetch-like interface for reading from streams:
import { stream } from "@durable-streams/client"
const res = await stream<{ message: string }>({
url: "https://streams.example.com/my-account/chat/room-1",
headers: {
Authorization: `Bearer ${process.env.DS_TOKEN!}`,
},
offset: savedOffset,
live: "auto",
})
const items = await res.json()
console.log("All items:", items)
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log("item:", item)
saveOffset(batch.offset)
}
})
StreamResponse consumption methods
The StreamResponse object returned by stream() offers multiple ways to consume data:
const bytes = await res.body()
const items = await res.json()
const text = await res.text()
const byteStream = res.bodyStream()
const jsonStream = res.jsonStream()
const textStream = res.textStream()
const unsubscribe = res.subscribeJson(async (batch) => {
await processBatch(batch.items)
})
const unsubscribe2 = res.subscribeBytes(async (chunk) => {
await processBytes(chunk.data)
})
const unsubscribe3 = res.subscribeText(async (chunk) => {
await processText(chunk.text)
})
High-Throughput Writes: Using IdempotentProducer (Recommended)
For reliable, high-throughput writes with exactly-once semantics, use IdempotentProducer:
import { DurableStream, IdempotentProducer } from "@durable-streams/client"
const stream = await DurableStream.create({
url: "https://streams.example.com/events",
contentType: "application/json",
})
const producer = new IdempotentProducer(stream, "event-processor-1", {
autoClaim: true,
onError: (err) => console.error("Batch failed:", err),
})
for (const event of events) {
producer.append(event)
}
await producer.flush()
await producer.close()
For high-throughput scenarios, append() is fire-and-forget (returns immediately):
for (const event of events) {
producer.append(event)
}
await producer.flush()
Why use IdempotentProducer?
- Exactly-once delivery: Server deduplicates using
(producerId, epoch, seq) tuple
- Automatic batching: Multiple writes batched into single HTTP requests
- Pipelining: Multiple batches in flight concurrently
- Zombie fencing: Stale producers are rejected, preventing split-brain scenarios
- Network resilience: Safe to retry on network errors (server deduplicates)
Read/Write: Using DurableStream
For simple write operations or when you need a persistent handle:
import { DurableStream } from "@durable-streams/client"
const handle = await DurableStream.create({
url: "https://streams.example.com/my-account/chat/room-1",
headers: {
Authorization: `Bearer ${process.env.DS_TOKEN!}`,
},
contentType: "application/json",
ttlSeconds: 3600,
})
await handle.append(JSON.stringify({ type: "message", text: "Hello" }), {
seq: "writer-1-000001",
})
const res = await handle.stream<{ type: string; text: string }>()
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log("message:", item.text)
}
})
Read from "now" (skip existing data)
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
const { offset } = await handle.head()
const res = await handle.stream({ offset })
res.subscribeBytes(async (chunk) => {
console.log("new data:", new TextDecoder().decode(chunk.data))
})
Read catch-up only (no live updates)
const res = await stream({
url: "https://streams.example.com/my-stream",
live: false,
})
const text = await res.text()
console.log("All existing data:", text)
API
stream(options): Promise<StreamResponse>
Creates a fetch-like streaming session:
const res = await stream<TJson>({
url: string | URL,
headers?: HeadersRecord,
params?: ParamsRecord,
signal?: AbortSignal,
fetch?: typeof fetch,
backoffOptions?: BackoffOptions,
offset?: Offset,
live?: LiveMode,
json?: boolean,
onError?: StreamErrorHandler,
})
DurableStream
class DurableStream {
readonly url: string
readonly contentType?: string
constructor(opts: DurableStreamConstructorOptions)
static create(opts: CreateOptions): Promise<DurableStream>
static connect(opts: DurableStreamOptions): Promise<DurableStream>
static head(opts: DurableStreamOptions): Promise<HeadResult>
static delete(opts: DurableStreamOptions): Promise<void>
head(opts?: { signal?: AbortSignal }): Promise<HeadResult>
create(opts?: CreateOptions): Promise<this>
delete(opts?: { signal?: AbortSignal }): Promise<void>
append(
body: BodyInit | Uint8Array | string,
opts?: AppendOptions
): Promise<void>
appendStream(
source: AsyncIterable<Uint8Array | string>,
opts?: AppendOptions
): Promise<void>
stream<TJson>(opts?: StreamOptions): Promise<StreamResponse<TJson>>
}
Live Modes
const res = await stream({ url, live: false })
const res = await stream({ url, live: "long-poll" })
const res = await stream({ url, live: "sse" })
Headers and Params
Headers and params support both static values and functions (sync or async) for dynamic values like authentication tokens.
{
headers: {
Authorization: "Bearer my-token",
"X-Custom-Header": "value",
}
}
{
headers: {
Authorization: () => `Bearer ${getCurrentToken()}`,
"X-Tenant-Id": () => getCurrentTenant(),
}
}
{
headers: {
Authorization: async () => {
const token = await refreshToken()
return `Bearer ${token}`
}
}
}
{
headers: {
"X-Static": "always-the-same",
Authorization: async () => `Bearer ${await getToken()}`,
}
}
{
params: {
tenant: "static-tenant",
region: () => getCurrentRegion(),
token: async () => await getSessionToken(),
}
}
Error Handling
import { stream, FetchError, DurableStreamError } from "@durable-streams/client"
const res = await stream({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
onError: async (error) => {
if (error instanceof FetchError) {
if (error.status === 401) {
const newToken = await refreshAuthToken()
return { headers: { Authorization: `Bearer ${newToken}` } }
}
}
if (error instanceof DurableStreamError) {
console.error(`Stream error: ${error.code}`)
}
return {}
},
})
StreamResponse Methods
The StreamResponse object provides multiple ways to consume stream data. All methods respect the live mode setting.
Promise Helpers
These methods accumulate data until the stream is up-to-date, then resolve.
body(): Promise<Uint8Array>
Accumulates all bytes until up-to-date.
const res = await stream({ url, live: false })
const bytes = await res.body()
console.log("Total bytes:", bytes.length)
const text = new TextDecoder().decode(bytes)
json(): Promise<Array<TJson>>
Accumulates all JSON items until up-to-date. Only works with JSON content.
const res = await stream<{ id: number; name: string }>({
url,
live: false,
})
const items = await res.json()
for (const item of items) {
console.log(`User ${item.id}: ${item.name}`)
}
text(): Promise<string>
Accumulates all text until up-to-date.
const res = await stream({ url, live: false })
const text = await res.text()
console.log("Full content:", text)
ReadableStreams
Web Streams API for piping to other streams or using with streaming APIs. ReadableStreams can be consumed using either getReader() or for await...of syntax.
Safari/iOS Compatibility: The client ensures all returned streams are async-iterable by defining [Symbol.asyncIterator] on stream instances when missing. This allows for await...of consumption without requiring a global polyfill, while preserving instanceof ReadableStream behavior.
Derived streams: Streams created via .pipeThrough() or similar transformations are NOT automatically patched. Use the exported asAsyncIterableReadableStream() helper:
import { asAsyncIterableReadableStream } from "@durable-streams/client"
const derived = res.bodyStream().pipeThrough(myTransform)
const iterable = asAsyncIterableReadableStream(derived)
for await (const chunk of iterable) { ... }
bodyStream(): ReadableStream<Uint8Array> & AsyncIterable<Uint8Array>
Raw bytes as a ReadableStream.
Using getReader():
const res = await stream({ url, live: false })
const readable = res.bodyStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Received:", value.length, "bytes")
}
Using for await...of:
const res = await stream({ url, live: false })
for await (const chunk of res.bodyStream()) {
console.log("Received:", chunk.length, "bytes")
}
Piping to a file (Node.js):
import { Readable } from "node:stream"
import { pipeline } from "node:stream/promises"
const res = await stream({ url, live: false })
await pipeline(
Readable.fromWeb(res.bodyStream()),
fs.createWriteStream("output.bin")
)
jsonStream(): ReadableStream<TJson> & AsyncIterable<TJson>
Individual JSON items as a ReadableStream.
Using getReader():
const res = await stream<{ id: number }>({ url, live: false })
const readable = res.jsonStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Item:", value)
}
Using for await...of:
const res = await stream<{ id: number; name: string }>({ url, live: false })
for await (const item of res.jsonStream()) {
console.log(`User ${item.id}: ${item.name}`)
}
textStream(): ReadableStream<string> & AsyncIterable<string>
Text chunks as a ReadableStream.
Using getReader():
const res = await stream({ url, live: false })
const readable = res.textStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Text chunk:", value)
}
Using for await...of:
const res = await stream({ url, live: false })
for await (const text of res.textStream()) {
console.log("Text chunk:", text)
}
Using with Response API:
const res = await stream({ url, live: false })
const textResponse = new Response(res.textStream())
const fullText = await textResponse.text()
Subscribers
Subscribers provide callback-based consumption with backpressure. The next chunk isn't fetched until your callback's promise resolves. Returns an unsubscribe function.
subscribeJson(callback): () => void
Subscribe to JSON batches with metadata. Provides backpressure-aware consumption.
const res = await stream<{ event: string }>({ url, live: "auto" })
const unsubscribe = res.subscribeJson(async (batch) => {
for (const item of batch.items) {
await processEvent(item)
}
await saveCheckpoint(batch.offset)
})
setTimeout(() => {
unsubscribe()
}, 60000)
subscribeBytes(callback): () => void
Subscribe to byte chunks with metadata.
const res = await stream({ url, live: "auto" })
const unsubscribe = res.subscribeBytes(async (chunk) => {
console.log("Received bytes:", chunk.data.length)
console.log("Offset:", chunk.offset)
console.log("Up to date:", chunk.upToDate)
await writeToFile(chunk.data)
await saveCheckpoint(chunk.offset)
})
subscribeText(callback): () => void
Subscribe to text chunks with metadata.
const res = await stream({ url, live: "auto" })
const unsubscribe = res.subscribeText(async (chunk) => {
console.log("Text:", chunk.text)
console.log("Offset:", chunk.offset)
await appendToLog(chunk.text)
})
Lifecycle
cancel(reason?: unknown): void
Cancel the stream session. Aborts any pending requests.
const res = await stream({ url, live: "auto" })
res.subscribeBytes(async (chunk) => {
console.log("Chunk:", chunk)
})
setTimeout(() => {
res.cancel("Timeout")
}, 10000)
closed: Promise<void>
Promise that resolves when the session is complete or cancelled.
const res = await stream({ url, live: false })
const consumer = res.text()
await res.closed
console.log("Stream fully consumed")
State Properties
const res = await stream({ url })
res.url
res.contentType
res.live
res.startOffset
res.offset
res.cursor
res.upToDate
DurableStream Methods
Static Methods
DurableStream.create(opts): Promise<DurableStream>
Create a new stream on the server.
const handle = await DurableStream.create({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
contentType: "application/json",
ttlSeconds: 3600,
})
await handle.append('{"hello": "world"}')
DurableStream.connect(opts): Promise<DurableStream>
Connect to an existing stream (validates it exists via HEAD).
const handle = await DurableStream.connect({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
console.log("Content-Type:", handle.contentType)
DurableStream.head(opts): Promise<HeadResult>
Get stream metadata without creating a handle.
const metadata = await DurableStream.head({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
console.log("Offset:", metadata.offset)
console.log("Content-Type:", metadata.contentType)
DurableStream.delete(opts): Promise<void>
Delete a stream without creating a handle.
await DurableStream.delete({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
Instance Methods
head(opts?): Promise<HeadResult>
Get metadata for this stream.
const handle = new DurableStream({
url,
headers: { Authorization: `Bearer ${token}` },
})
const metadata = await handle.head()
console.log("Current offset:", metadata.offset)
create(opts?): Promise<this>
Create this stream on the server.
const handle = new DurableStream({
url,
headers: { Authorization: `Bearer ${token}` },
})
await handle.create({
contentType: "text/plain",
ttlSeconds: 7200,
})
delete(opts?): Promise<void>
Delete this stream.
const handle = new DurableStream({
url,
headers: { Authorization: `Bearer ${token}` },
})
await handle.delete()
append(body, opts?): Promise<void>
Append data to the stream. By default, automatic batching is enabled: multiple append() calls made while a POST is in-flight will be batched together into a single request. This significantly improves throughput for high-frequency writes.
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
await handle.append("Hello, world!")
await handle.append("Message 1", { seq: "writer-1-001" })
await handle.append("Message 2", { seq: "writer-1-002" })
await handle.append({ event: "click", x: 100, y: 200 })
await Promise.all([
handle.append({ event: "msg1" }),
handle.append({ event: "msg2" }),
handle.append({ event: "msg3" }),
])
Batching behavior:
- JSON mode (
contentType: "application/json"): Multiple values are sent as a JSON array [val1, val2, ...]
- Byte mode: Binary data is concatenated
Disabling batching:
If you need to ensure each append is sent immediately (e.g., for precise timing or debugging):
const handle = new DurableStream({
url,
batching: false,
})
appendStream(source, opts?): Promise<void>
Append streaming data from an async iterable or ReadableStream. This method supports piping from any source.
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield `Line ${i}\n`
}
}
await handle.appendStream(generateData())
const readable = new ReadableStream({
start(controller) {
controller.enqueue("chunk 1")
controller.enqueue("chunk 2")
controller.close()
},
})
await handle.appendStream(readable)
const response = await fetch("https://example.com/data")
await handle.appendStream(response.body!)
writable(opts?): WritableStream<Uint8Array | string>
Create a WritableStream that can receive piped data. Useful for stream composition:
const handle = await DurableStream.connect({ url, auth })
await someReadableStream.pipeTo(handle.writable())
const readable = inputStream.pipeThrough(new TextEncoderStream())
await readable.pipeTo(handle.writable())
stream(opts?): Promise<StreamResponse>
Start a read session (same as standalone stream() function).
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
const res = await handle.stream<{ message: string }>({
offset: savedOffset,
live: "auto",
})
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log(item.message)
}
})
IdempotentProducer
The IdempotentProducer class provides Kafka-style exactly-once write semantics with automatic batching and pipelining.
Constructor
new IdempotentProducer(stream: DurableStream, producerId: string, opts?: IdempotentProducerOptions)
Parameters:
stream - The DurableStream to write to
producerId - Stable identifier for this producer (e.g., "order-service-1")
opts - Optional configuration
Options:
interface IdempotentProducerOptions {
epoch?: number
autoClaim?: boolean
maxBatchBytes?: number
lingerMs?: number
maxInFlight?: number
signal?: AbortSignal
fetch?: typeof fetch
onError?: (error: Error) => void
}
Methods
append(body): void
Append data to the stream (fire-and-forget). For JSON streams, you can pass objects directly.
Returns immediately after adding to the internal batch. Errors are reported via onError callback.
producer.append({ event: "click", x: 100 })
producer.append("message data")
producer.append(new Uint8Array([1, 2, 3]))
await producer.flush()
flush(): Promise<void>
Send any pending batch immediately and wait for all in-flight batches to complete.
await producer.flush()
close(): Promise<void>
Flush pending messages and close the producer. Further append() calls will throw.
await producer.close()
restart(): Promise<void>
Increment epoch and reset sequence. Call this when restarting the producer to establish a new session.
await producer.restart()
Properties
epoch: number - Current epoch for this producer
nextSeq: number - Next sequence number to be assigned
pendingCount: number - Messages in the current pending batch
inFlightCount: number - Batches currently in flight
Error Handling
Errors are delivered via the onError callback since append() is fire-and-forget:
import {
IdempotentProducer,
StaleEpochError,
SequenceGapError,
} from "@durable-streams/client"
const producer = new IdempotentProducer(stream, "my-producer", {
onError: (error) => {
if (error instanceof StaleEpochError) {
console.log(`Fenced by epoch ${error.currentEpoch}`)
} else if (error instanceof SequenceGapError) {
console.log(`Expected seq ${error.expectedSeq}, got ${error.receivedSeq}`)
}
},
})
producer.append("data")
await producer.flush()
Types
Key types exported from the package:
Offset - Opaque string for stream position
StreamResponse - Response object from stream()
ByteChunk - { data: Uint8Array, offset: Offset, upToDate: boolean, cursor?: string }
JsonBatch<T> - { items: T[], offset: Offset, upToDate: boolean, cursor?: string }
TextChunk - { text: string, offset: Offset, upToDate: boolean, cursor?: string }
HeadResult - Metadata from HEAD requests
IdempotentProducer - Exactly-once producer class
StaleEpochError - Thrown when producer epoch is stale (zombie fencing)
SequenceGapError - Thrown when sequence numbers are out of order
DurableStreamError - Protocol-level errors with codes
FetchError - Transport/network errors
License
Apache-2.0