Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamops

Package Overview
Dependencies
Maintainers
0
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamops

A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel pro

  • 0.1.20
  • latest
  • npm
  • Socket score

Version published
Maintainers
0
Created
Source

StreamOps

A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel processing, error handling, and state management.

Installation

npm install streamops

Key Features

  • Pipeline-based streaming operations: Build complex data processing pipelines with ease
  • Async/sync generator support: Seamlessly mix sync and async operations
  • Parallel processing: Process data concurrently with parallel branches
  • State management: Share state across pipeline steps
  • Configurable error handling: Robust error handling with timeouts
  • Rich operator set: Comprehensive set of built-in operators

Getting Started

Basic Pipeline

const createStreamOps = require('streamops');
const stream = createStreamOps();

// Array-style pipeline
const pipeline = [
  function* () {
    yield 1;
    yield 2;
    yield 3;
  },
  stream.map(x => x * 2),
  stream.filter(x => x > 4)
];

// Process the stream
for await (const item of stream(pipeline)) {
  console.log(item); // Outputs: 4, 6
}

Chaining Style

const result = stream(function* () {
  yield 1;
  yield 2;
  yield 3;
})
  .map(x => x * 2)
  .filter(x => x > 4);

for await (const item of result) {
  console.log(item); // Outputs: 4, 6
}

Real-World Example: Processing API Data

const pipeline = [
  // Fetch and yield data
  async function* () {
    const response = await fetch('https://api.example.com/users');
    const users = await response.json();
    for (const user of users) {
      yield user;
    }
  },
  // Transform data
  stream.map(user => ({
    id: user.id,
    name: user.name,
    isActive: user.status === 'active'
  })),
  // Filter active users
  stream.filter(user => user.isActive),
  // Process in batches
  stream.batch(10)
];

for await (const userBatch of stream(pipeline)) {
  await processUserBatch(userBatch);
}

Configuration

const stream = createStreamOps({
  timeout: 30000,        // Overall pipeline timeout
  logLevel: 'info',      // 'error' | 'warn' | 'info' | 'debug'
  yieldTimeout: 20000,   // Max time between yields
  downstreamTimeout: 30000  // Max time without downstream consumption
});

Timeout Behaviors

  • yieldTimeoutBehavior: Controls timeout handling
    • 'warn': Log warning and continue (default)
    • 'yield-null': Yield null value and continue
    • 'cancel': Cancel pipeline
    • 'block': Stop yielding from timed-out step

Error Handling

Using catchError Operator

const pipeline = [
  riskyOperation,
  stream.catchError(error => {
    console.error('Operation failed:', error);
    // Handle error appropriately
  }),
  nextStep
];

Timeout Protection

const pipeline = [
  longRunningOperation,
  stream.timeout(5000),  // Fails if step takes > 5s
  stream.catchError(error => {
    if (error.name === 'TimeoutError') {
      // Handle timeout
    }
  })
];

Stream Control

End of Stream Handling

const { END_SIGNAL } = require('streamops');

const pipeline = [
  sourceStream,
  stream.withEndSignal(function* (input) {
    if (input === END_SIGNAL) {
      yield* cleanup();
      return;
    }
    yield processInput(input);
  })
];

Flow Control with Accrue

The accrue operator collects all items before continuing:

const pipeline = [
  source,
  stream.accrue(),  // Collect all items
  stream.map(items => processItems(items))
];

Advanced Features

Parallel Processing

const pipeline = [
  source,
  [  // Parallel branches
    [  // Nested parallel
      stream.map(x => x * 2),
      stream.map(x => x + 1)
    ],
    stream.filter(x => x > 10)
  ]
];

Results from parallel branches are merged in order.

State Management

Maintain state via 'this' context:

const pipeline = [
  source,
  function* (input) {
    this.count = (this.count || 0) + 1;
    yield `${this.count}: ${input}`;
  }
];

API Documentation

Built-in Operators

Basic Operators
  • map(fn): Transform each item using the provided function

    stream.map(x => x * 2)
    
  • filter(predicate): Only allow items that match the predicate

    stream.filter(x => x > 5)
    
  • reduce(reducer, initialValue): Accumulate values, yielding intermediate results

    stream.reduce((sum, x) => sum + x, 0)
    
  • flatMap(fn): Map each item to multiple items

    stream.flatMap(x => [x, x * 2])
    
Control Operators
  • take(n): Limit stream to first n items

    stream.take(5)  // Only first 5 items
    
  • skip(n): Skip first n items

    stream.skip(2)  // Skip first 2 items
    
  • batch(size, options): Group items into arrays of specified size

    stream.batch(3, { yieldIncomplete: true })
    

    Options:

    • yieldIncomplete: Whether to yield incomplete batches (default: true)
  • distinct(equalityFn): Remove duplicates using optional equality function

    stream.distinct((a, b) => a.id === b.id)
    
Advanced Operators
  • mergeAggregate(options): Merge objects into arrays by key

    stream.mergeAggregate({ 
      removeDuplicates: true,
      alwaysArray: true 
    })
    
  • waitUntil(condition): Buffer items until condition is met

    // Wait for specific fields
    stream.waitUntil(['price', 'volume'])
    // Or custom condition
    stream.waitUntil(buffer => buffer.length >= 3)
    
  • bufferBetween(startToken, endToken, mapFn): Capture content between tokens

    stream.bufferBetween('', '', content => parse(content))
    
Error Handling
  • catchError(handler): Handle errors in the pipeline

    stream.catchError(err => console.error(err))
    
  • timeout(ms): Fail if processing takes too long

    stream.timeout(5000)  // 5 second timeout
    
Utility Operators
  • tap(fn): Execute side effects without modifying stream

    stream.tap(x => console.log('Saw:', x))
    
  • accrue(): Collect all items before proceeding

    stream.accrue()
    
  • dam(): Alias for accrue()

Stream Control Operators
  • withEndSignal(fn): Mark a function/generator to receive end signals
    stream.withEndSignal(function* (input) {
      if (input === END_SIGNAL) {
        // Handle end of stream
        yield* cleanup();
        return;
      }
      yield process(input);
    })
    

Simple Interface

StreamOps also provides a simplified interface for creating pipelines:

const { simple } = require('streamops');

// Create pipeline with injected operators
const stream = simple(
  ({map, filter}) => [
    [1, 2, 3, 4],
    map(x => x * 2),
    filter(x => x > 5)
  ]
);

for await (const item of stream) {
  console.log(item);  // Outputs: 6, 8
}

The simple interface automatically injects operators and handles pipeline creation.

Debugging

Logging

Set logLevel in configuration:

const stream = createStreamOps({
  logLevel: 'debug'  // See all pipeline operations
});

Use tap operator for debugging:

stream.tap(x => console.log('Value:', x))

Common Issues

  1. Memory Leaks

    • Use batch operator for large streams
    • Consider accrue carefully
  2. Timeouts

    • Adjust timeout configuration
    • Use appropriate yieldTimeoutBehavior
  3. Backpressure

    • Monitor downstreamTimeout warnings
    • Use batch operator to control flow

License

MIT

FAQs

Package last updated on 30 Nov 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc