Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

stream-fork

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-fork

Toolkit of 1→N stream combinators: broadcast, route, filter — Node Streams and Web Streams, with proper backpressure handling.

latest
Source
npmnpm
Version
2.0.0
Version published
Weekly downloads
2.3K
67.3%
Maintainers
1
Weekly downloads
 
Created
Source

stream-fork NPM version

A toolkit of 1→N stream combinators — sinks that distribute every chunk to N downstream sinks under different dispatch shapes, with proper backpressure handling. Three primitives cover the three useful control-flow shapes:

  • fork — broadcast: every chunk to every live output, slowest gates.
  • route — single-target dispatch: per-chunk picker selects one output.
  • filter — subset broadcast: per-output predicates decide who receives.

Available in two flavors that share the same options surface: Node Streams (the default stream-fork entry) and Web Streams (stream-fork/web). Zero runtime dependencies. Part of the stream-chain / stream-json family.

Installation

npm i stream-fork

Node 22+ required (or any host with a WritableStream global for the Web variant — modern browsers, Deno 2+, Bun 1+).

Quick start (Node Streams)

import fork from 'stream-fork';
import fs from 'node:fs';
import zlib from 'node:zlib';

const gzip = zlib.createGzip();
gzip.pipe(fs.createWriteStream('log.txt.gz'));

// push every chunk to both the gzip chain and stdout
dataSource.pipe(fork([gzip, process.stdout]));

Quick start (Web Streams)

import fork from 'stream-fork/web';

// dataSource is a ReadableStream, sinkA/B are WritableStreams
await dataSource.pipeTo(fork([sinkA, sinkB]));

The Web fork is a backpressure-preserving generalization of ReadableStream.tee() to N outputs — unlike tee, it does not buffer per branch (a slow branch slows upstream rather than ballooning a queue).

API

The Node and Web primitives share the same options surface. Replace Writable[] with WritableStream[] in the signatures below for the Web flavor.

fork(outputs[, options])

Broadcast sink. Every chunk goes to every live output; Promise.all over the per-output writes gates upstream backpressure to the slowest downstream.

  • outputs — array of downstream sinks.
  • options — Writable options (Node) or {queuingStrategy} (Web). Default {objectMode: true} on Node.
    • options.ignoreErrors — when truthy, downstream errors are silently dropped and the failing stream is removed from outputs.
import fork from 'stream-fork';
source.pipe(fork([sinkA, sinkB, sinkC]));

route(outputs, options)

Per-chunk single-target dispatch.

  • outputs — non-empty array of downstream sinks.
  • options.pick(chunk[, encoding]) => number | undefined — required picker. Returns the index of the output to forward to, or any non-index value to drop the chunk.
  • Plus any inner-stream options and ignoreErrors.
import route from 'stream-fork/route.js';

source.pipe(
  route([evenSink, oddSink], {
    pick: chunk => (chunk % 2 === 0 ? 0 : 1)
  })
);

filter(outputs, options)

Per-chunk subset broadcast.

  • outputs — non-empty array of downstream sinks.
  • options.predicates — array of predicates, one per output (same length).
  • Plus any inner-stream options and ignoreErrors.
import filter from 'stream-fork/filter.js';

source.pipe(
  filter([auditSink, errorSink, allSink], {
    predicates: [log => log.audit, log => log.level === 'error', () => true]
  })
);

Picker helpers

Shared between the Node and Web trees — pure functions, no runtime imports.

import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';
import pickByHash from 'stream-fork/utils/pick-by-hash.js';
import pickByKey from 'stream-fork/utils/pick-by-key.js';
import pickFirstMatch from 'stream-fork/utils/pick-first-match.js';
  • pickRoundRobin(count) — cycles 0..count-1. Load-balance across N workers.
  • pickByHash(keyFn, count) — stable hash(key) % count sharding.
  • pickByKey(keyFn, table) — explicit key → index map (object or Map).
  • pickFirstMatch(predicates) — first matching predicate's index; append () => true for catch-all.

Example: round-robin load balance (Web variant).

import route from 'stream-fork/web/route.js';
import pickRoundRobin from 'stream-fork/utils/pick-round-robin.js';

await source.pipeTo(route([worker1, worker2, worker3], {pick: pickRoundRobin(3)}));

For detailed usage docs see the wiki.

Release History

  • 2.0.0 ESM, new API: fork(...), no new, route, filter.
  • 1.0.5 technical release.
  • 1.0.4 bugfix: forward errors correctly, thx dbubovych.
  • 1.0.3 technical release to support Node 14.
  • 1.0.2 workaround for Node 6: use 'finish' event instead of _final().
  • 1.0.1 improved documentation.
  • 1.0.0 the initial release.

The full release notes are in the wiki: Release notes.

Keywords

stream

FAQs

Package last updated on 22 May 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts