Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@push.rocks/smartstream

Package Overview
Dependencies
Maintainers
0
Versions
55
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push.rocks/smartstream

A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.

  • 3.2.5
  • latest
  • npm
  • Socket score

Version published
Weekly downloads
354
increased by67.77%
Maintainers
0
Weekly downloads
 
Created
Source
# @push.rocks/smartstream
A TypeScript library to simplify the creation and manipulation of Node.js streams, providing utilities for transform, duplex, and readable/writable stream handling while managing backpressure effectively.

## Install
To install `@push.rocks/smartstream`, you can use npm or yarn as follows:

```bash
npm install @push.rocks/smartstream --save
# OR
yarn add @push.rocks/smartstream

This will add @push.rocks/smartstream to your project's dependencies.

Usage

The @push.rocks/smartstream module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes extensive use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.

Importing the Module

Start by importing the module into your TypeScript file:

import * as smartstream from '@push.rocks/smartstream';

For a more specific import, you may do the following:

import { SmartDuplex, StreamWrapper, StreamIntake, createTransformFunction, createPassThrough } from '@push.rocks/smartstream';

Creating Basic Transform Streams

The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the createTransformFunction utility:

import { createTransformFunction } from '@push.rocks/smartstream';

const upperCaseTransform = createTransformFunction<string, string>(async (chunk) => {
  return chunk.toUpperCase();
});

// Usage with pipe
readableStream
  .pipe(upperCaseTransform)
  .pipe(writableStream);

Handling Backpressure with SmartDuplex

SmartDuplex is a powerful part of the smartstream module designed to handle backpressure effectively. Here's an example of how to create a SmartDuplex stream that processes data and respects the consumer's pace:

import { SmartDuplex } from '@push.rocks/smartstream';

const processDataDuplex = new SmartDuplex({
  async writeFunction(chunk, { push }) {
    const processedChunk = await processChunk(chunk); // Assume this is a defined asynchronous function
    push(processedChunk);
  }
});

sourceStream.pipe(processDataDuplex).pipe(destinationStream);

Combining Multiple Streams

Smartstream facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams:

import { StreamWrapper } from '@push.rocks/smartstream';

const combinedStream = new StreamWrapper([
  readStream,       // Source stream
  transformStream1, // Transformation
  transformStream2, // Another transformation
  writeStream       // Destination stream
]);

combinedStream.run()
  .then(() => console.log('Processing completed.'))
  .catch(err => console.error('An error occurred:', err));

Working with StreamIntake

StreamIntake allows for more dynamic control of the reading process, facilitating scenarios where data is not continuously available:

import { StreamIntake } from '@push.rocks/smartstream';

const streamIntake = new StreamIntake<string>();

// Dynamically push data into the intake
streamIntake.pushData('Hello, World!');
streamIntake.pushData('Another message');

// Signal end when no more data is to be pushed
streamIntake.signalEnd();

Real-world Scenario: Processing Large Files

Consider a scenario where you need to process a large CSV file, transform the data row-by-row, and then write the results to a database or another file. With smartstream, you could create a pipe that reads the CSV, processes each row, and handles backpressure, ensuring efficient use of resources.

import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
import fs from 'fs';
import csvParser from 'csv-parser';

const csvReadTransform = createTransformFunction<any, any>(async (row) => {
  // Process row
  return processedRow;
});

fs.createReadStream('path/to/largeFile.csv')
  .pipe(csvParser())
  .pipe(csvReadTransform)
  .pipe(new SmartDuplex({
    async writeFunction(chunk, { push }) {
      await writeToDatabase(chunk); // Assume this writes to a database
    }
  }))
  .on('finish', () => console.log('File processed successfully.'));

This example demonstrates reading a large CSV file, transforming each row with createTransformFunction, and using a SmartDuplex to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues.

Advanced Use Case: Backpressure Handling

Effective backpressure handling is crucial when working with streams to avoid overwhelming the downstream consumers. Here’s a comprehensive example that demonstrates handling backpressure in a pipeline with multiple SmartDuplex instances:

import { SmartDuplex } from '@push.rocks/smartstream';

// Define the first SmartDuplex, which writes data slowly to simulate backpressure
const slowProcessingStream = new SmartDuplex({
  name: 'SlowProcessor',
  objectMode: true,
  writeFunction: async (chunk, { push }) => {
    await new Promise(resolve => setTimeout(resolve, 100)); // Simulated delay
    console.log('Processed chunk:', chunk);
    push(chunk);
  }
});

// Define the second SmartDuplex as a fast processor
const fastProcessingStream = new SmartDuplex({
  name: 'FastProcessor',
  objectMode: true,
  writeFunction: async (chunk, { push }) => {
    console.log('Fast processing chunk:', chunk);
    push(chunk);
  }
});

// Create a StreamIntake to dynamically handle incoming data
const streamIntake = new StreamIntake<string>();

// Chain the streams together and handle the backpressure scenario
streamIntake
  .pipe(fastProcessingStream)
  .pipe(slowProcessingStream)
  .pipe(createPassThrough()) // Use Pass-Through to provide intermediary handling
  .on('data', data => console.log('Final output:', data))
  .on('error', error => console.error('Stream encountered an error:', error));

// Simulate data pushing with intervals to observe backpressure handling
let counter = 0;
const interval = setInterval(() => {
  if (counter >= 10) {
    streamIntake.signalEnd();
    clearInterval(interval);
  } else {
    streamIntake.pushData(`Chunk ${counter}`);
    counter++;
  }
}, 50);

In this advanced use case, a SlowProcessor and FastProcessor are created using SmartDuplex, simulating a situation where one stream is slower than another. The StreamIntake dynamically handles incoming chunks of data and the intermediary Pass-Through handles any potential interruptions.

Transform Streams in Parallel

For scenarios where you need to process data in parallel:

import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';

const parallelTransform = createTransformFunction<any, any>(async (chunk) => {
  // Parallel Processing
  const results = await Promise.all(chunk.map(async item => await processItem(item)));
  return results;
});

const streamIntake = new StreamIntake<any[]>();

streamIntake
  .pipe(parallelTransform)
  .pipe(new SmartDuplex({
    async writeFunction(chunk, { push }) {
      console.log('Processed parallel chunk:', chunk);
      push(chunk);
    }
  }))
  .on('finish', () => console.log('Parallel processing completed.'));

// Simulate data pushing
streamIntake.pushData([1, 2, 3, 4]);
streamIntake.pushData([5, 6, 7, 8]);
streamIntake.signalEnd();

Error Handling in Stream Pipelines

Error handling is an essential part of working with streams. The StreamWrapper assists in combining multiple streams while managing errors seamlessly:

import { StreamWrapper } from '@push.rocks/smartstream';

const faultyStream = new SmartDuplex({
  async writeFunction(chunk, { push }) {
    if (chunk === 'bad data') {
      throw new Error('Faulty data encountered');
    }
    push(chunk);
  }
});

const readStream = new StreamIntake<string>();
const writeStream = new SmartDuplex({
  async writeFunction(chunk) {
    console.log('Written chunk:', chunk);
  }
});

const combinedStream = new StreamWrapper([readStream, faultyStream, writeStream]);

combinedStream.run()
  .then(() => console.log('Stream processing completed.'))
  .catch(err => console.error('Stream error:', err.message));

// Push Data
readStream.pushData('good data');
readStream.pushData('bad data');  // This will throw an error
readStream.pushData('more good data');
readStream.signalEnd();

Testing Streams

Here's an example test case using the tap testing framework to verify the integrity of the SmartDuplex from a buffer:

import { expect, tap } from '@push.rocks/tapbundle';
import { SmartDuplex } from '@push.rocks/smartstream';

tap.test('should create a SmartStream from a Buffer', async () => {
  const bufferData = Buffer.from('This is a test buffer');
  const smartStream = SmartDuplex.fromBuffer(bufferData, {});
  
  let receivedData = Buffer.alloc(0);
  
  return new Promise<void>((resolve) => {
    smartStream.on('data', (chunk: Buffer) => {
      receivedData = Buffer.concat([receivedData, chunk]);
    });

    smartStream.on('end', () => {
      expect(receivedData.toString()).toEqual(bufferData.toString());
      resolve();
    });
  });
});

tap.start();

Working with Files and Buffers

You can easily stream files and buffers with smartstream. Here’s a test illustrating reading and writing with file streams using smartfile combined with smartstream utilities:

import { tap } from '@push.rocks/tapbundle';
import * as smartfile from '@push.rocks/smartfile';
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';

tap.test('should handle file read and write streams', async () => {
  const readStream = smartfile.fsStream.createReadStream('./test/assets/readabletext.txt');
  const writeStream = smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt');

  const transformStream = new SmartDuplex({
    async writeFunction(chunk, { push }) {
      const transformedChunk = chunk.toString().toUpperCase();
      push(transformedChunk);
    }
  });

  const streamWrapper = new StreamWrapper([readStream, transformStream, writeStream]);

  await streamWrapper.run();

  const outputContent = await smartfile.fs.promises.readFile('./test/assets/writabletext.txt', 'utf-8');
  console.log('Output Content:', outputContent);
});

tap.start();

Modular and Scoped Transformations

Creating modular and scoped transformations is straightforward with SmartDuplex:

import { SmartDuplex } from '@push.rocks/smartstream';

type DataChunk = {
  id: number;
  data: string;
};

const transformationStream1 = new SmartDuplex<DataChunk, DataChunk>({
  async writeFunction(chunk, { push }) {
    chunk.data = chunk.data.toUpperCase();
    push(chunk);
  }
})

const transformationStream2 = new SmartDuplex<DataChunk, DataChunk>({
  async writeFunction(chunk, { push }) {
    chunk.data = `${chunk.data} processed with transformation 2`;
    push(chunk);
  }
});

const initialData: DataChunk[] = [
  { id: 1, data: 'first' },
  { id: 2, data: 'second' }
];

const intakeStream = new StreamIntake<DataChunk>();

intakeStream
  .pipe(transformationStream1)
  .pipe(transformationStream2)
  .on('data', data => console.log('Transformed Data:', data));

initialData.forEach(item => intakeStream.pushData(item));
intakeStream.signalEnd();

By leveraging SmartDuplex, StreamWrapper, and StreamIntake, you can streamline and enhance your data transformation pipelines in Node.js with a clear, efficient, and backpressure-friendly approach.



## License and Legal Information

This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. 

**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

### Trademarks

This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.

### Company Information

Task Venture Capital GmbH  
Registered at District court Bremen HRB 35230 HB, Germany

For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.

By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.

Keywords

FAQs

Package last updated on 19 Nov 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc