Research
Security News
Quasar RAT Disguised as an npm Package for Detecting Vulnerabilities in Ethereum Smart Contracts
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel pro
A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel processing, error handling, and state management.
npm install streamops
const createStreamOps = require('streamops'); const stream = createStreamOps(); // Array-style pipeline const pipeline = [ function* () { yield 1; yield 2; yield 3; }, stream.map(x => x * 2), stream.filter(x => x > 4) ]; // Process the stream for await (const item of stream(pipeline)) { console.log(item); // Outputs: 4, 6 }
const result = stream(function* () { yield 1; yield 2; yield 3; }) .map(x => x * 2) .filter(x => x > 4); for await (const item of result) { console.log(item); // Outputs: 4, 6 }
const pipeline = [ // Fetch and yield data async function* () { const response = await fetch('https://api.example.com/users'); const users = await response.json(); for (const user of users) { yield user; } }, // Transform data stream.map(user => ({ id: user.id, name: user.name, isActive: user.status === 'active' })), // Filter active users stream.filter(user => user.isActive), // Process in batches stream.batch(10) ]; for await (const userBatch of stream(pipeline)) { await processUserBatch(userBatch); }
const stream = createStreamOps({ timeout: 30000, // Overall pipeline timeout logLevel: 'info', // 'error' | 'warn' | 'info' | 'debug' yieldTimeout: 20000, // Max time between yields downstreamTimeout: 30000 // Max time without downstream consumption });
yieldTimeoutBehavior
: Controls timeout handling
'warn'
: Log warning and continue (default)'yield-null'
: Yield null value and continue'cancel'
: Cancel pipeline'block'
: Stop yielding from timed-out stepconst pipeline = [ riskyOperation, stream.catchError(error => { console.error('Operation failed:', error); // Handle error appropriately }), nextStep ];
const pipeline = [ longRunningOperation, stream.timeout(5000), // Fails if step takes > 5s stream.catchError(error => { if (error.name === 'TimeoutError') { // Handle timeout } }) ];
const { END_SIGNAL } = require('streamops'); const pipeline = [ sourceStream, stream.withEndSignal(function* (input) { if (input === END_SIGNAL) { yield* cleanup(); return; } yield processInput(input); }) ];
The accrue
operator collects all items before continuing:
const pipeline = [ source, stream.accrue(), // Collect all items stream.map(items => processItems(items)) ];
const pipeline = [ source, [ // Parallel branches [ // Nested parallel stream.map(x => x * 2), stream.map(x => x + 1) ], stream.filter(x => x > 10) ] ];
Results from parallel branches are merged in order.
Maintain state via 'this' context:
const pipeline = [ source, function* (input) { this.count = (this.count || 0) + 1; yield `${this.count}: ${input}`; } ];
map(fn)
: Transform each item using the provided function
stream.map(x => x * 2)
filter(predicate)
: Only allow items that match the predicate
stream.filter(x => x > 5)
reduce(reducer, initialValue)
: Accumulate values, yielding intermediate results
stream.reduce((sum, x) => sum + x, 0)
flatMap(fn)
: Map each item to multiple items
stream.flatMap(x => [x, x * 2])
take(n)
: Limit stream to first n items
stream.take(5) // Only first 5 items
skip(n)
: Skip first n items
stream.skip(2) // Skip first 2 items
batch(size, options)
: Group items into arrays of specified size
stream.batch(3, { yieldIncomplete: true })
Options:
yieldIncomplete
: Whether to yield incomplete batches (default: true)distinct(equalityFn)
: Remove duplicates using optional equality function
stream.distinct((a, b) => a.id === b.id)
mergeAggregate(options)
: Merge objects into arrays by key
stream.mergeAggregate({ removeDuplicates: true, alwaysArray: true })
waitUntil(condition)
: Buffer items until condition is met
// Wait for specific fields stream.waitUntil(['price', 'volume']) // Or custom condition stream.waitUntil(buffer => buffer.length >= 3)
bufferBetween(startToken, endToken, mapFn)
: Capture content between tokens
stream.bufferBetween('', '', content => parse(content))
catchError(handler)
: Handle errors in the pipeline
stream.catchError(err => console.error(err))
timeout(ms)
: Fail if processing takes too long
stream.timeout(5000) // 5 second timeout
tap(fn)
: Execute side effects without modifying stream
stream.tap(x => console.log('Saw:', x))
accrue()
: Collect all items before proceeding
stream.accrue()
dam()
: Alias for accrue()
withEndSignal(fn)
: Mark a function/generator to receive end signals
stream.withEndSignal(function* (input) { if (input === END_SIGNAL) { // Handle end of stream yield* cleanup(); return; } yield process(input); })
StreamOps also provides a simplified interface for creating pipelines:
const { simple } = require('streamops'); // Create pipeline with injected operators const stream = simple( ({map, filter}) => [ [1, 2, 3, 4], map(x => x * 2), filter(x => x > 5) ] ); for await (const item of stream) { console.log(item); // Outputs: 6, 8 }
The simple interface automatically injects operators and handles pipeline creation.
Set logLevel in configuration:
const stream = createStreamOps({ logLevel: 'debug' // See all pipeline operations });
Use tap operator for debugging:
stream.tap(x => console.log('Value:', x))
Memory Leaks
Timeouts
Backpressure
MIT
FAQs
A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel pro
The npm package streamops receives a total of 9 weekly downloads. As such, streamops popularity was classified as not popular.
We found that streamops demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
Security News
Research
A supply chain attack on Rspack's npm packages injected cryptomining malware, potentially impacting thousands of developers.
Research
Security News
Socket researchers discovered a malware campaign on npm delivering the Skuld infostealer via typosquatted packages, exposing sensitive data.