
Security News
US Government Forces Anthropic to Pull Claude Fable Days After Launch
Anthropic says the directive cited national security concerns over a narrow jailbreak, but offered no specific technical details.
stream-fork
Advanced tools
Toolkit of 1→N stream combinators: broadcast, route, filter — Node Streams and Web Streams, with proper backpressure handling.
A toolkit of 1→N stream combinators — sinks that distribute every chunk to N downstream sinks under different dispatch shapes, with proper backpressure handling. Three primitives cover the three useful control-flow shapes:
fork — broadcast: every chunk to every live output, slowest gates.route — single-target dispatch: per-chunk picker selects one output.filter — subset broadcast: per-output predicates decide who receives.Available in two flavors that share the same options surface: Node Streams (the default stream-fork entry) and Web Streams (stream-fork/web). Zero runtime dependencies. Part of the stream-chain / stream-json family.
npm i stream-fork
Node 22+ required (or any host with a WritableStream global for the Web variant — modern browsers, Deno 2+, Bun 1+).
import fork from 'stream-fork';
import fs from 'node:fs';
import zlib from 'node:zlib';
const gzip = zlib.createGzip();
gzip.pipe(fs.createWriteStream('log.txt.gz'));
// push every chunk to both the gzip chain and stdout
dataSource.pipe(fork([gzip, process.stdout]));
import fork from 'stream-fork/web';
// dataSource is a ReadableStream, sinkA/B are WritableStreams
await dataSource.pipeTo(fork([sinkA, sinkB]));
The Web fork is a backpressure-preserving generalization of ReadableStream.tee() to N outputs — unlike tee, it does not buffer per branch (a slow branch slows upstream rather than ballooning a queue).
The Node and Web primitives share the same options surface. Replace Writable[] with WritableStream[] in the signatures below for the Web flavor.
fork(outputs[, options])Broadcast sink. Every chunk goes to every live output; Promise.all over the per-output writes gates upstream backpressure to the slowest downstream.
outputs — array of downstream sinks.options — Writable options (Node) or {queuingStrategy} (Web). Default {objectMode: true} on Node.
options.ignoreErrors — when truthy, downstream errors are silently dropped and the failing stream is removed from outputs.import fork from 'stream-fork';
source.pipe(fork([sinkA, sinkB, sinkC]));
route(outputs, options)Per-chunk single-target dispatch.
outputs — non-empty array of downstream sinks.options.pick(chunk[, encoding]) => number | undefined — required picker. Returns the index of the output to forward to, or any non-index value to drop the chunk.ignoreErrors.import route from 'stream-fork/route.js';
source.pipe(
route([evenSink, oddSink], {
pick: chunk => (chunk % 2 === 0 ? 0 : 1)
})
);
filter(outputs, options)Per-chunk subset broadcast.
outputs — non-empty array of downstream sinks.options.predicates — array of predicates, one per output (same length).ignoreErrors.import filter from 'stream-fork/filter.js';
source.pipe(
filter([auditSink, errorSink, allSink], {
predicates: [log => log.audit, log => log.level === 'error', () => true]
})
);
Shared between the Node and Web trees — pure functions, no runtime imports.
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
import pickByHash from 'stream-fork/utils/pick-by-hash.js';
import pickByKey from 'stream-fork/utils/pick-by-key.js';
import pickFirstMatch from 'stream-fork/utils/pick-first-match.js';
pickRoundRobin(count) — cycles 0..count-1. Load-balance across N workers.pickByHash(keyFn, count) — stable hash(key) % count sharding.pickByKey(keyFn, table) — explicit key → index map (object or Map).pickFirstMatch(predicates) — first matching predicate's index; append () => true for catch-all.Example: round-robin load balance (Web variant).
import route from 'stream-fork/web/route.js';
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
await source.pipeTo(route([worker1, worker2, worker3], {pick: pickRoundRobin(3)}));
For detailed usage docs see the wiki.
fork(...), no new, route, filter.'finish' event instead of _final().The full release notes are in the wiki: Release notes.
FAQs
Toolkit of 1→N stream combinators: broadcast, route, filter — Node Streams and Web Streams, with proper backpressure handling.
The npm package stream-fork receives a total of 2,155 weekly downloads. As such, stream-fork popularity was classified as popular.
We found that stream-fork demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Anthropic says the directive cited national security concerns over a narrow jailbreak, but offered no specific technical details.

Security News
A network of 152 Chrome live wallpaper extensions hid ad tracking and made extension-driven traffic look like Google search clicks.

Company News
Socket’s first CISO brings deep experience securing high-growth SaaS companies as open source supply chain threats accelerate.