πŸš€ Big News:Socket Has Acquired Secure Annex.Learn More β†’
Socket
Book a DemoSign in
Socket

@push.rocks/smartstream

Package Overview
Dependencies
Maintainers
1
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push.rocks/smartstream

A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.

latest
npmnpm
Version
3.4.2
Version published
Weekly downloads
977
146.1%
Maintainers
1
Weekly downloads
Β 
Created
Source

@push.rocks/smartstream

A TypeScript-first library for creating and manipulating Node.js and Web streams with built-in backpressure handling, async transformations, and seamless Node.js ↔ Web stream interoperability.

Issue Reporting and Security

For reporting bugs, issues, or security vulnerabilities, please visit community.foss.global/. This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a code.foss.global/ account to submit Pull Requests directly.

Install

pnpm install @push.rocks/smartstream

The package ships with two entry points so you can pick exactly what you need:

Entry PointImport PathEnvironment
Node.js (default)@push.rocks/smartstreamNode.js β€” full stream utilities, duplex, intake, wrappers, and Node↔Web helpers
Web@push.rocks/smartstream/webBrowser & Node.js β€” pure Web Streams API (WebDuplexStream)

Usage

All examples use ESM / TypeScript syntax.

πŸ“¦ Importing

// Node.js β€” full API
import {
  SmartDuplex,
  StreamWrapper,
  StreamIntake,
  createTransformFunction,
  createPassThrough,
  nodewebhelpers,
} from '@push.rocks/smartstream';

// Web β€” browser-safe, zero Node.js dependencies
import { WebDuplexStream } from '@push.rocks/smartstream/web';

πŸ”„ SmartDuplex β€” The Core Stream Primitive

SmartDuplex extends Node.js Duplex with first-class async support, built-in backpressure management, and a clean functional API. Instead of overriding _transform or _write manually, you pass a writeFunction that receives each chunk along with a tools object.

Basic Transform

import { SmartDuplex } from '@push.rocks/smartstream';

const upperCaser = new SmartDuplex<Buffer, Buffer>({
  writeFunction: async (chunk, tools) => {
    // Return a value to push it downstream
    return Buffer.from(chunk.toString().toUpperCase());
  },
});

readableStream.pipe(upperCaser).pipe(writableStream);

Using tools.push() for Multiple Outputs

The writeFunction can emit multiple chunks per input via tools.push():

const splitter = new SmartDuplex<string, string>({
  objectMode: true,
  writeFunction: async (chunk, tools) => {
    const words = chunk.split(' ');
    for (const word of words) {
      await tools.push(word);
    }
    // Returning nothing β€” output was already pushed
  },
});

Final Function

Run cleanup or emit final data when the writable side ends:

let runningTotal = 0;

const aggregator = new SmartDuplex<number, number>({
  objectMode: true,
  writeFunction: async (chunk, tools) => {
    runningTotal += chunk;
    // Don't emit anything per-chunk
  },
  finalFunction: async (tools) => {
    return runningTotal; // Emitted as the last chunk
  },
});

The finalFunction can also push multiple chunks via tools.push(), just like writeFunction.

Truncating a Stream Early

Call tools.truncate() inside writeFunction to signal that no more data should be read:

const limiter = new SmartDuplex<string, string>({
  objectMode: true,
  writeFunction: async (chunk, tools) => {
    if (chunk === 'STOP') {
      tools.truncate();
      return;
    }
    return chunk;
  },
});

Creating from a Buffer

const stream = SmartDuplex.fromBuffer(Buffer.from('hello world'));
stream.on('data', (chunk) => console.log(chunk.toString())); // "hello world"

Creating from a Web ReadableStream

Bridge the Web Streams API into a Node.js Duplex:

const response = await fetch('https://example.com/data');
const nodeDuplex = SmartDuplex.fromWebReadableStream(response.body);

nodeDuplex.pipe(processTransform).pipe(outputStream);

Getting Web Streams from SmartDuplex

Convert a SmartDuplex into a Web ReadableStream + WritableStream pair:

const duplex = new SmartDuplex({
  objectMode: true,
  writeFunction: async (chunk, tools) => {
    return transform(chunk);
  },
});

const { readable, writable } = await duplex.getWebStreams();

const writer = writable.getWriter();
const reader = readable.getReader();

// Read and write concurrently to avoid TransformStream backpressure
const readAll = async () => {
  const results = [];
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    results.push(value);
  }
  return results;
};

const readPromise = readAll();
await writer.write('hello');
await writer.close();
const results = await readPromise;

Debug Mode

Pass debug: true and name to get detailed internal logs:

const stream = new SmartDuplex({
  name: 'MyStream',
  debug: true,
  writeFunction: async (chunk, tools) => chunk,
});

🧩 StreamWrapper β€” Pipeline Composition

StreamWrapper takes an array of streams, pipes them together, attaches error listeners on all of them, and returns a Promise that resolves when the pipeline finishes:

import { StreamWrapper } from '@push.rocks/smartstream';
import fs from 'fs';

const pipeline = new StreamWrapper([
  fs.createReadStream('./input.txt'),
  new SmartDuplex({
    writeFunction: async (chunk) => Buffer.from(chunk.toString().toUpperCase()),
  }),
  fs.createWriteStream('./output.txt'),
]);

await pipeline.run();
console.log('Pipeline complete!');

Error handling is automatic β€” if any stream in the array errors, the returned promise rejects:

pipeline.run()
  .then(() => console.log('Done'))
  .catch((err) => console.error('Pipeline failed:', err));

You can listen for custom events across all streams in the pipeline:

pipeline.onCustomEvent('progress', () => {
  console.log('Progress event fired');
});

You can also await streamStarted() to know when the pipeline has been wired up:

const runPromise = pipeline.run();
await pipeline.streamStarted(); // Resolves once pipes are connected
await runPromise;

πŸ“₯ StreamIntake β€” Dynamic Data Injection

StreamIntake is a Readable stream that lets you programmatically push data into a pipeline. It operates in object mode by default and provides a reactive observable (pushNextObservable) for demand-driven data production.

import { StreamIntake, SmartDuplex } from '@push.rocks/smartstream';

const intake = new StreamIntake<string>();

// Pipe through a transform
intake
  .pipe(new SmartDuplex({
    objectMode: true,
    writeFunction: async (chunk) => {
      console.log('Processing:', chunk);
      return chunk;
    },
  }))
  .on('data', (data) => console.log('Output:', data));

// Push data whenever it's ready
intake.pushData('Hello');
intake.pushData('World');
intake.signalEnd(); // Signal end-of-stream

Demand-Driven Production with Observable

pushNextObservable emits whenever the stream is ready for more data β€” perfect for throttled or event-driven producers:

const intake = new StreamIntake<number>();

let counter = 0;
intake.pushNextObservable.subscribe(() => {
  if (counter < 100) {
    intake.pushData(counter++);
  } else {
    intake.signalEnd();
  }
});

intake.pipe(consumer);

Creating from Existing Streams

Wrap a Node.js Readable or a Web ReadableStream:

// From Node.js Readable
const intake = await StreamIntake.fromStream<Buffer>(fs.createReadStream('./data.bin'));

// From Web ReadableStream
const response = await fetch('https://example.com/stream');
const intake = await StreamIntake.fromStream<Uint8Array>(response.body);

⚑ Utility Functions

createTransformFunction

Quickly create a SmartDuplex from a simple async mapping function:

import { createTransformFunction } from '@push.rocks/smartstream';

const doubler = createTransformFunction<number, number>(
  async (n) => n * 2,
  { objectMode: true }
);

intakeStream.pipe(doubler).pipe(outputStream);

createPassThrough

Create an object-mode passthrough stream (useful as an intermediary or tee point):

import { createPassThrough } from '@push.rocks/smartstream';

const passThrough = createPassThrough();
source.pipe(passThrough).pipe(destination);

🌐 WebDuplexStream β€” Pure Web Streams API

WebDuplexStream extends TransformStream and works in both browsers and Node.js. Import it from the /web subpath for zero Node.js dependencies.

import { WebDuplexStream } from '@push.rocks/smartstream/web';

const stream = new WebDuplexStream<number, number>({
  writeFunction: async (chunk, { push }) => {
    push(chunk * 2); // Push transformed data
  },
});

const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

// Always read concurrently with writes β€” TransformStream applies backpressure
const readPromise = (async () => {
  const results = [];
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    results.push(value);
  }
  return results;
})();

await writer.write(5);
await writer.write(10);
await writer.close();

const results = await readPromise; // [10, 20]

πŸ’‘ Tip: Because TransformStream enforces backpressure between its writable and readable sides, you must start reading before or concurrently with writes. If you await all writes first and then read, the writes will block waiting for the readable side to drain.

From a Uint8Array

const stream = WebDuplexStream.fromUInt8Array(new Uint8Array([1, 2, 3]));
const reader = stream.readable.getReader();
const { value } = await reader.read(); // Uint8Array [1, 2, 3]

Data Production with readFunction

Supply data into the stream from any async source:

const stream = new WebDuplexStream<string, string>({
  readFunction: async (tools) => {
    await tools.write('chunk 1');
    await tools.write('chunk 2');
    tools.done(); // Signal end
  },
  writeFunction: async (chunk, { push }) => {
    push(chunk.toUpperCase());
  },
});

const reader = stream.readable.getReader();
// reads "CHUNK 1", "CHUNK 2"

Final Function

Emit trailing data when the writable side is closed:

const stream = new WebDuplexStream<string, string>({
  writeFunction: async (chunk) => chunk,
  finalFunction: async (tools) => {
    tools.push('footer');
  },
});

πŸ”€ Node ↔ Web Stream Converters

The nodewebhelpers namespace provides bidirectional converters between Node.js and Web Streams with proper backpressure handling:

import { nodewebhelpers } from '@push.rocks/smartstream';
FunctionFromTo
createWebReadableStreamFromFile(path)File pathWeb ReadableStream<Uint8Array>
convertWebReadableToNodeReadable(webStream)Web ReadableStreamNode.js Readable
convertNodeReadableToWebReadable(nodeStream)Node.js ReadableWeb ReadableStream
convertWebWritableToNodeWritable(webWritable)Web WritableStreamNode.js Writable
convertNodeWritableToWebWritable(nodeWritable)Node.js WritableWeb WritableStream

Example: Serve a File as a Web ReadableStream

const webStream = nodewebhelpers.createWebReadableStreamFromFile('./video.mp4');

// Use with fetch Response, service workers, etc.
return new Response(webStream, {
  headers: { 'Content-Type': 'video/mp4' },
});

Example: Convert Between Stream Types

import fs from 'fs';
import { nodewebhelpers } from '@push.rocks/smartstream';

// Node β†’ Web
const nodeReadable = fs.createReadStream('./data.bin');
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);

// Web β†’ Node
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));

Example: Round-Trip Conversion

// Node β†’ Web β†’ Node (lossless round-trip)
const original = fs.createReadStream('./photo.jpg');
const webStream = nodewebhelpers.convertNodeReadableToWebReadable(original);
const backToNode = nodewebhelpers.convertWebReadableToNodeReadable(webStream);
backToNode.pipe(fs.createWriteStream('./photo-copy.jpg'));

πŸ—οΈ Backpressure Handling

SmartDuplex uses a BackpressuredArray internally, bounded by highWaterMark (default: 1). When the downstream consumer is slow, the stream automatically pauses the upstream producer until space is available β€” no manual bookkeeping required.

const slow = new SmartDuplex({
  name: 'SlowConsumer',
  objectMode: true,
  highWaterMark: 1,
  writeFunction: async (chunk, tools) => {
    await new Promise((resolve) => setTimeout(resolve, 200));
    return chunk;
  },
});

const fast = new SmartDuplex({
  name: 'FastProducer',
  objectMode: true,
  writeFunction: async (chunk, tools) => {
    return chunk; // Instant processing
  },
});

// Backpressure is handled automatically between fast β†’ slow
fast.pipe(slow).on('data', (d) => console.log(d));

for (let i = 0; i < 100; i++) {
  fast.write(`chunk-${i}`);
}
fast.end();

🎯 Real-World Example: Log Processing Pipeline

import fs from 'fs';
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';

// Read β†’ Parse β†’ Filter β†’ Write
const pipeline = new StreamWrapper([
  fs.createReadStream('./access.log'),
  new SmartDuplex({
    writeFunction: async (chunk) => {
      return chunk.toString().split('\n');
    },
  }),
  new SmartDuplex({
    objectMode: true,
    writeFunction: async (lines: string[], tools) => {
      for (const line of lines) {
        if (line.includes('ERROR')) {
          await tools.push(line + '\n');
        }
      }
    },
  }),
  fs.createWriteStream('./errors.log'),
]);

await pipeline.run();
console.log('Error extraction complete');

πŸ“‹ API Reference

SmartDuplex

MemberTypeDescription
new SmartDuplex(options?)ConstructorCreate a new duplex stream
options.writeFunction(chunk, tools) => Promise<T>Transform each chunk; return to push, or use tools.push()
options.finalFunction(tools) => Promise<T>Emit final data when writable ends
options.readFunction() => Promise<void>Supply data to the readable side
options.debugbooleanEnable internal logging
options.namestringStream name for debug logs
SmartDuplex.fromBuffer(buf)StaticCreate a readable stream from a Buffer
SmartDuplex.fromWebReadableStream(rs)StaticBridge a Web ReadableStream to Node.js Duplex
duplex.getWebStreams()MethodGet { readable, writable } Web Streams pair

WebDuplexStream

MemberTypeDescription
new WebDuplexStream(options)ConstructorCreate a new web transform stream
options.writeFunction(chunk, tools) => Promise<T>Transform each chunk; use tools.push() or return
options.finalFunction(tools) => Promise<T>Emit data on flush
options.readFunction(tools) => Promise<void>Supply data via tools.write(), signal tools.done()
WebDuplexStream.fromUInt8Array(arr)StaticCreate a stream from a Uint8Array

StreamWrapper

MemberTypeDescription
new StreamWrapper(streams[])ConstructorCreate a pipeline from an array of streams
wrapper.run()MethodPipe all streams and return a Promise
wrapper.streamStarted()MethodPromise that resolves when pipes are connected
wrapper.onCustomEvent(name, fn)MethodListen for custom events across all streams

StreamIntake

MemberTypeDescription
new StreamIntake<T>()ConstructorCreate a new intake stream (object mode)
intake.pushData(data)MethodPush data into the stream
intake.signalEnd()MethodSignal end of stream
intake.pushNextObservablePropertyObservable that emits when the stream wants more data
StreamIntake.fromStream(stream)StaticWrap a Node.js Readable or Web ReadableStream

nodewebhelpers

FunctionDescription
createWebReadableStreamFromFile(path)File β†’ Web ReadableStream (pull-based backpressure)
convertWebReadableToNodeReadable(rs)Web ReadableStream β†’ Node.js Readable
convertNodeReadableToWebReadable(ns)Node.js Readable β†’ Web ReadableStream (pull-based backpressure)
convertWebWritableToNodeWritable(ws)Web WritableStream β†’ Node.js Writable
convertNodeWritableToWebWritable(nw)Node.js Writable β†’ Web WritableStream

Utility Functions

FunctionDescription
createTransformFunction(fn, opts?)Create a SmartDuplex from an async mapping function
createPassThrough()Create an object-mode passthrough stream

This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the LICENSE file.

Please note: The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

Trademarks

This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.

Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.

Company Information

Task Venture Capital GmbH
Registered at District Court Bremen HRB 35230 HB, Germany

For any legal inquiries or further information, please contact us via email at hello@task.vc.

By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.

Keywords

stream

FAQs

Package last updated on 30 Apr 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts