
Security News
CVE Volume Surges Past 48,000 in 2025 as WordPress Plugin Ecosystem Drives Growth
CVE disclosures hit a record 48,185 in 2025, driven largely by vulnerabilities in third-party WordPress plugins.
@durable-streams/client
Advanced tools
TypeScript client for the Durable Streams protocol.
npm install @durable-streams/client
The Durable Streams client provides three main APIs:
stream() function - A fetch-like read-only API for consuming streamsDurableStream class - A handle for read/write operations on a streamIdempotentProducer class - High-throughput producer with exactly-once write semantics (recommended for writes)IdempotentProducer provides Kafka-style exactly-once semantics with automatic deduplicationstream() and DurableStream.stream() provide rich consumption options (promises, ReadableStreams, subscribers)stream() (fetch-like API)The stream() function provides a simple, fetch-like interface for reading from streams:
import { stream } from "@durable-streams/client"
// Connect and get a StreamResponse
const res = await stream<{ message: string }>({
url: "https://streams.example.com/my-account/chat/room-1",
headers: {
Authorization: `Bearer ${process.env.DS_TOKEN!}`,
},
offset: savedOffset, // optional: resume from offset
live: "auto", // default: behavior driven by consumption method
})
// Accumulate all JSON items until up-to-date
const items = await res.json()
console.log("All items:", items)
// Or stream live with a subscriber
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log("item:", item)
saveOffset(batch.offset) // persist for resumption
}
})
The StreamResponse object returned by stream() offers multiple ways to consume data:
// Promise helpers (accumulate until first upToDate)
const bytes = await res.body() // Uint8Array
const items = await res.json() // Array<TJson>
const text = await res.text() // string
// ReadableStreams
const byteStream = res.bodyStream() // ReadableStream<Uint8Array>
const jsonStream = res.jsonStream() // ReadableStream<TJson>
const textStream = res.textStream() // ReadableStream<string>
// Subscribers (with backpressure)
const unsubscribe = res.subscribeJson(async (batch) => {
await processBatch(batch.items)
})
const unsubscribe2 = res.subscribeBytes(async (chunk) => {
await processBytes(chunk.data)
})
const unsubscribe3 = res.subscribeText(async (chunk) => {
await processText(chunk.text)
})
IdempotentProducer (Recommended)For reliable, high-throughput writes with exactly-once semantics, use IdempotentProducer:
import { DurableStream, IdempotentProducer } from "@durable-streams/client"
const stream = await DurableStream.create({
url: "https://streams.example.com/events",
contentType: "application/json",
})
const producer = new IdempotentProducer(stream, "event-processor-1", {
autoClaim: true,
onError: (err) => console.error("Batch failed:", err), // Errors reported here
})
// Fire-and-forget - don't await, errors go to onError callback
for (const event of events) {
producer.append(event) // Objects serialized automatically for JSON streams
}
// IMPORTANT: Always flush before shutdown to ensure delivery
await producer.flush()
await producer.close()
For high-throughput scenarios, append() is fire-and-forget (returns immediately):
// Fire-and-forget - errors reported via onError callback
for (const event of events) {
producer.append(event) // Returns void, adds to batch
}
// Always flush before shutdown to ensure delivery
await producer.flush()
Why use IdempotentProducer?
(producerId, epoch, seq) tupleDurableStreamFor simple write operations or when you need a persistent handle:
import { DurableStream } from "@durable-streams/client"
// Create a new stream
const handle = await DurableStream.create({
url: "https://streams.example.com/my-account/chat/room-1",
headers: {
Authorization: `Bearer ${process.env.DS_TOKEN!}`,
},
contentType: "application/json",
ttlSeconds: 3600,
})
// Append data (simple API without exactly-once guarantees)
await handle.append(JSON.stringify({ type: "message", text: "Hello" }), {
seq: "writer-1-000001",
})
// Read using the new stream() API
const res = await handle.stream<{ type: string; text: string }>()
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log("message:", item.text)
}
})
// HEAD gives you the current tail offset if the server exposes it
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
const { offset } = await handle.head()
// Read only new data from that point on
const res = await handle.stream({ offset })
res.subscribeBytes(async (chunk) => {
console.log("new data:", new TextDecoder().decode(chunk.data))
})
// Read existing data only, stop when up-to-date
const res = await stream({
url: "https://streams.example.com/my-stream",
live: false,
})
const text = await res.text()
console.log("All existing data:", text)
stream(options): Promise<StreamResponse>Creates a fetch-like streaming session:
const res = await stream<TJson>({
url: string | URL, // Stream URL
headers?: HeadersRecord, // Headers (static or function-based)
params?: ParamsRecord, // Query params (static or function-based)
signal?: AbortSignal, // Cancellation
fetch?: typeof fetch, // Custom fetch implementation
backoffOptions?: BackoffOptions,// Retry backoff configuration
offset?: Offset, // Starting offset (default: start of stream)
live?: LiveMode, // Live mode (default: "auto")
json?: boolean, // Force JSON mode
onError?: StreamErrorHandler, // Error handler
})
DurableStreamclass DurableStream {
readonly url: string
readonly contentType?: string
constructor(opts: DurableStreamConstructorOptions)
// Static methods
static create(opts: CreateOptions): Promise<DurableStream>
static connect(opts: DurableStreamOptions): Promise<DurableStream>
static head(opts: DurableStreamOptions): Promise<HeadResult>
static delete(opts: DurableStreamOptions): Promise<void>
// Instance methods
head(opts?: { signal?: AbortSignal }): Promise<HeadResult>
create(opts?: CreateOptions): Promise<this>
delete(opts?: { signal?: AbortSignal }): Promise<void>
append(
body: BodyInit | Uint8Array | string,
opts?: AppendOptions
): Promise<void>
appendStream(
source: AsyncIterable<Uint8Array | string>,
opts?: AppendOptions
): Promise<void>
// Fetch-like read API
stream<TJson>(opts?: StreamOptions): Promise<StreamResponse<TJson>>
}
// "auto" (default): behavior driven by consumption method
// - Promise helpers (body/json/text): stop after upToDate
// - Streams/subscribers: continue with long-poll
// false: catch-up only, stop at first upToDate
const res = await stream({ url, live: false })
// "long-poll": explicit long-poll mode for live updates
const res = await stream({ url, live: "long-poll" })
// "sse": explicit SSE mode for live updates
const res = await stream({ url, live: "sse" })
Headers and params support both static values and functions (sync or async) for dynamic values like authentication tokens.
// Static headers
{
headers: {
Authorization: "Bearer my-token",
"X-Custom-Header": "value",
}
}
// Function-based headers (sync)
{
headers: {
Authorization: () => `Bearer ${getCurrentToken()}`,
"X-Tenant-Id": () => getCurrentTenant(),
}
}
// Async function headers (for refreshing tokens)
{
headers: {
Authorization: async () => {
const token = await refreshToken()
return `Bearer ${token}`
}
}
}
// Mix static and function headers
{
headers: {
"X-Static": "always-the-same",
Authorization: async () => `Bearer ${await getToken()}`,
}
}
// Query params work the same way
{
params: {
tenant: "static-tenant",
region: () => getCurrentRegion(),
token: async () => await getSessionToken(),
}
}
import { stream, FetchError, DurableStreamError } from "@durable-streams/client"
const res = await stream({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
onError: async (error) => {
if (error instanceof FetchError) {
if (error.status === 401) {
const newToken = await refreshAuthToken()
return { headers: { Authorization: `Bearer ${newToken}` } }
}
}
if (error instanceof DurableStreamError) {
console.error(`Stream error: ${error.code}`)
}
return {} // Retry with same params
},
})
The StreamResponse object provides multiple ways to consume stream data. All methods respect the live mode setting.
These methods accumulate data until the stream is up-to-date, then resolve.
body(): Promise<Uint8Array>Accumulates all bytes until up-to-date.
const res = await stream({ url, live: false })
const bytes = await res.body()
console.log("Total bytes:", bytes.length)
// Process as needed
const text = new TextDecoder().decode(bytes)
json(): Promise<Array<TJson>>Accumulates all JSON items until up-to-date. Only works with JSON content.
const res = await stream<{ id: number; name: string }>({
url,
live: false,
})
const items = await res.json()
for (const item of items) {
console.log(`User ${item.id}: ${item.name}`)
}
text(): Promise<string>Accumulates all text until up-to-date.
const res = await stream({ url, live: false })
const text = await res.text()
console.log("Full content:", text)
Web Streams API for piping to other streams or using with streaming APIs. ReadableStreams can be consumed using either getReader() or for await...of syntax.
Safari/iOS Compatibility: The client ensures all returned streams are async-iterable by defining
[Symbol.asyncIterator]on stream instances when missing. This allowsfor await...ofconsumption without requiring a global polyfill, while preservinginstanceof ReadableStreambehavior.Derived streams: Streams created via
.pipeThrough()or similar transformations are NOT automatically patched. Use the exportedasAsyncIterableReadableStream()helper:import { asAsyncIterableReadableStream } from "@durable-streams/client" const derived = res.bodyStream().pipeThrough(myTransform) const iterable = asAsyncIterableReadableStream(derived) for await (const chunk of iterable) { ... }
bodyStream(): ReadableStream<Uint8Array> & AsyncIterable<Uint8Array>Raw bytes as a ReadableStream.
Using getReader():
const res = await stream({ url, live: false })
const readable = res.bodyStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Received:", value.length, "bytes")
}
Using for await...of:
const res = await stream({ url, live: false })
for await (const chunk of res.bodyStream()) {
console.log("Received:", chunk.length, "bytes")
}
Piping to a file (Node.js):
import { Readable } from "node:stream"
import { pipeline } from "node:stream/promises"
const res = await stream({ url, live: false })
await pipeline(
Readable.fromWeb(res.bodyStream()),
fs.createWriteStream("output.bin")
)
jsonStream(): ReadableStream<TJson> & AsyncIterable<TJson>Individual JSON items as a ReadableStream.
Using getReader():
const res = await stream<{ id: number }>({ url, live: false })
const readable = res.jsonStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Item:", value)
}
Using for await...of:
const res = await stream<{ id: number; name: string }>({ url, live: false })
for await (const item of res.jsonStream()) {
console.log(`User ${item.id}: ${item.name}`)
}
textStream(): ReadableStream<string> & AsyncIterable<string>Text chunks as a ReadableStream.
Using getReader():
const res = await stream({ url, live: false })
const readable = res.textStream()
const reader = readable.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log("Text chunk:", value)
}
Using for await...of:
const res = await stream({ url, live: false })
for await (const text of res.textStream()) {
console.log("Text chunk:", text)
}
Using with Response API:
const res = await stream({ url, live: false })
const textResponse = new Response(res.textStream())
const fullText = await textResponse.text()
Subscribers provide callback-based consumption with backpressure. The next chunk isn't fetched until your callback's promise resolves. Returns an unsubscribe function.
subscribeJson(callback): () => voidSubscribe to JSON batches with metadata. Provides backpressure-aware consumption.
const res = await stream<{ event: string }>({ url, live: "auto" })
const unsubscribe = res.subscribeJson(async (batch) => {
// Process items - next batch waits until this resolves
for (const item of batch.items) {
await processEvent(item)
}
await saveCheckpoint(batch.offset)
})
// Later: stop receiving updates
setTimeout(() => {
unsubscribe()
}, 60000)
subscribeBytes(callback): () => voidSubscribe to byte chunks with metadata.
const res = await stream({ url, live: "auto" })
const unsubscribe = res.subscribeBytes(async (chunk) => {
console.log("Received bytes:", chunk.data.length)
console.log("Offset:", chunk.offset)
console.log("Up to date:", chunk.upToDate)
await writeToFile(chunk.data)
await saveCheckpoint(chunk.offset)
})
subscribeText(callback): () => voidSubscribe to text chunks with metadata.
const res = await stream({ url, live: "auto" })
const unsubscribe = res.subscribeText(async (chunk) => {
console.log("Text:", chunk.text)
console.log("Offset:", chunk.offset)
await appendToLog(chunk.text)
})
cancel(reason?: unknown): voidCancel the stream session. Aborts any pending requests.
const res = await stream({ url, live: "auto" })
// Start consuming
res.subscribeBytes(async (chunk) => {
console.log("Chunk:", chunk)
})
// Cancel after 10 seconds
setTimeout(() => {
res.cancel("Timeout")
}, 10000)
closed: Promise<void>Promise that resolves when the session is complete or cancelled.
const res = await stream({ url, live: false })
// Start consuming in background
const consumer = res.text()
// Wait for completion
await res.closed
console.log("Stream fully consumed")
const res = await stream({ url })
res.url // The stream URL
res.contentType // Content-Type from response headers
res.live // The live mode ("auto", "long-poll", "sse", or false)
res.startOffset // The starting offset passed to stream()
res.offset // Current offset (updates as data is consumed)
res.cursor // Cursor for collapsing (if provided by server)
res.upToDate // Whether we've caught up to the stream head
DurableStream.create(opts): Promise<DurableStream>Create a new stream on the server.
const handle = await DurableStream.create({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
contentType: "application/json",
ttlSeconds: 3600, // Optional: auto-delete after 1 hour
})
await handle.append('{"hello": "world"}')
DurableStream.connect(opts): Promise<DurableStream>Connect to an existing stream (validates it exists via HEAD).
const handle = await DurableStream.connect({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
console.log("Content-Type:", handle.contentType)
DurableStream.head(opts): Promise<HeadResult>Get stream metadata without creating a handle.
const metadata = await DurableStream.head({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
console.log("Offset:", metadata.offset)
console.log("Content-Type:", metadata.contentType)
DurableStream.delete(opts): Promise<void>Delete a stream without creating a handle.
await DurableStream.delete({
url: "https://streams.example.com/my-stream",
headers: {
Authorization: "Bearer my-token",
},
})
head(opts?): Promise<HeadResult>Get metadata for this stream.
const handle = new DurableStream({
url,
headers: { Authorization: `Bearer ${token}` },
})
const metadata = await handle.head()
console.log("Current offset:", metadata.offset)
create(opts?): Promise<this>Create this stream on the server.
const handle = new DurableStream({
url,
headers: { Authorization: `Bearer ${token}` },
})
await handle.create({
contentType: "text/plain",
ttlSeconds: 7200,
})
delete(opts?): Promise<void>Delete this stream.
const handle = new DurableStream({
url,
headers: { Authorization: `Bearer ${token}` },
})
await handle.delete()
append(body, opts?): Promise<void>Append data to the stream. By default, automatic batching is enabled: multiple append() calls made while a POST is in-flight will be batched together into a single request. This significantly improves throughput for high-frequency writes.
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
// Append string
await handle.append("Hello, world!")
// Append with sequence number for ordering
await handle.append("Message 1", { seq: "writer-1-001" })
await handle.append("Message 2", { seq: "writer-1-002" })
// For JSON streams, append objects directly (serialized automatically)
await handle.append({ event: "click", x: 100, y: 200 })
// Batching happens automatically - these may be sent in a single request
await Promise.all([
handle.append({ event: "msg1" }),
handle.append({ event: "msg2" }),
handle.append({ event: "msg3" }),
])
Batching behavior:
contentType: "application/json"): Multiple values are sent as a JSON array [val1, val2, ...]Disabling batching:
If you need to ensure each append is sent immediately (e.g., for precise timing or debugging):
const handle = new DurableStream({
url,
batching: false, // Disable automatic batching
})
appendStream(source, opts?): Promise<void>Append streaming data from an async iterable or ReadableStream. This method supports piping from any source.
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
// From async generator
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield `Line ${i}\n`
}
}
await handle.appendStream(generateData())
// From ReadableStream
const readable = new ReadableStream({
start(controller) {
controller.enqueue("chunk 1")
controller.enqueue("chunk 2")
controller.close()
},
})
await handle.appendStream(readable)
// Pipe from a fetch response body
const response = await fetch("https://example.com/data")
await handle.appendStream(response.body!)
writable(opts?): WritableStream<Uint8Array | string>Create a WritableStream that can receive piped data. Useful for stream composition:
const handle = await DurableStream.connect({ url, auth })
// Pipe from any ReadableStream
await someReadableStream.pipeTo(handle.writable())
// Pipe through a transform
const readable = inputStream.pipeThrough(new TextEncoderStream())
await readable.pipeTo(handle.writable())
stream(opts?): Promise<StreamResponse>Start a read session (same as standalone stream() function).
const handle = await DurableStream.connect({
url,
headers: { Authorization: `Bearer ${token}` },
})
const res = await handle.stream<{ message: string }>({
offset: savedOffset,
live: "auto",
})
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log(item.message)
}
})
The IdempotentProducer class provides Kafka-style exactly-once write semantics with automatic batching and pipelining.
new IdempotentProducer(stream: DurableStream, producerId: string, opts?: IdempotentProducerOptions)
Parameters:
stream - The DurableStream to write toproducerId - Stable identifier for this producer (e.g., "order-service-1")opts - Optional configurationOptions:
interface IdempotentProducerOptions {
epoch?: number // Starting epoch (default: 0)
autoClaim?: boolean // On 403, retry with epoch+1 (default: false)
maxBatchBytes?: number // Max bytes before sending batch (default: 1MB)
lingerMs?: number // Max time to wait for more messages (default: 5ms)
maxInFlight?: number // Concurrent batches in flight (default: 5)
signal?: AbortSignal // Cancellation signal
fetch?: typeof fetch // Custom fetch implementation
onError?: (error: Error) => void // Error callback for batch failures
}
append(body): voidAppend data to the stream (fire-and-forget). For JSON streams, you can pass objects directly.
Returns immediately after adding to the internal batch. Errors are reported via onError callback.
// For JSON streams - pass objects directly
producer.append({ event: "click", x: 100 })
// Or strings/bytes
producer.append("message data")
producer.append(new Uint8Array([1, 2, 3]))
// All appends are fire-and-forget - use flush() to wait for delivery
await producer.flush()
flush(): Promise<void>Send any pending batch immediately and wait for all in-flight batches to complete.
// Always call before shutdown
await producer.flush()
close(): Promise<void>Flush pending messages and close the producer. Further append() calls will throw.
await producer.close()
restart(): Promise<void>Increment epoch and reset sequence. Call this when restarting the producer to establish a new session.
await producer.restart()
epoch: number - Current epoch for this producernextSeq: number - Next sequence number to be assignedpendingCount: number - Messages in the current pending batchinFlightCount: number - Batches currently in flightErrors are delivered via the onError callback since append() is fire-and-forget:
import {
IdempotentProducer,
StaleEpochError,
SequenceGapError,
} from "@durable-streams/client"
const producer = new IdempotentProducer(stream, "my-producer", {
onError: (error) => {
if (error instanceof StaleEpochError) {
// Another producer has a higher epoch - this producer is "fenced"
console.log(`Fenced by epoch ${error.currentEpoch}`)
} else if (error instanceof SequenceGapError) {
// Sequence gap detected (should not happen with proper usage)
console.log(`Expected seq ${error.expectedSeq}, got ${error.receivedSeq}`)
}
},
})
producer.append("data") // Fire-and-forget, errors go to onError
await producer.flush() // Wait for all batches to complete
Key types exported from the package:
Offset - Opaque string for stream positionStreamResponse - Response object from stream()ByteChunk - { data: Uint8Array, offset: Offset, upToDate: boolean, cursor?: string }JsonBatch<T> - { items: T[], offset: Offset, upToDate: boolean, cursor?: string }TextChunk - { text: string, offset: Offset, upToDate: boolean, cursor?: string }HeadResult - Metadata from HEAD requestsIdempotentProducer - Exactly-once producer classStaleEpochError - Thrown when producer epoch is stale (zombie fencing)SequenceGapError - Thrown when sequence numbers are out of orderDurableStreamError - Protocol-level errors with codesFetchError - Transport/network errorsApache-2.0
FAQs
TypeScript client for the Durable Streams protocol
We found that @durable-streams/client demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
CVE disclosures hit a record 48,185 in 2025, driven largely by vulnerabilities in third-party WordPress plugins.

Security News
Socket CEO Feross Aboukhadijeh joins Insecure Agents to discuss CVE remediation and why supply chain attacks require a different security approach.

Security News
Tailwind Labs laid off 75% of its engineering team after revenue dropped 80%, as LLMs redirect traffic away from documentation where developers discover paid products.