Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamops

Package Overview
Dependencies
Maintainers
0
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamops - npm Package Compare versions

Comparing version 0.1.4 to 0.1.5

tests/streaming.downstreamTimeouts.test.js

66

index.js

@@ -30,5 +30,7 @@ const EventEmitter = require('events');

const defaultOptions = {
timeout: 30000,
timeout: 100000,
bufferSize: 1000,
logLevel: 'info'
logLevel: 'info',
yieldTimeout: 20000,
downstreamTimeout: 30000
};

@@ -50,3 +52,13 @@

let stepIndex = 0;
let lastDownstreamYield = Date.now();
let downstreamTimeoutWarningIssued = false;
const checkDownstreamTimeout = setInterval(() => {
const timeSinceLastYield = Date.now() - lastDownstreamYield;
if (timeSinceLastYield > config.downstreamTimeout && !downstreamTimeoutWarningIssued) {
logger.warn(`No data received downstream for ${config.downstreamTimeout}ms`);
downstreamTimeoutWarningIssued = true;
}
}, Math.min(config.downstreamTimeout, 1000));
function validateStep(step) {

@@ -58,17 +70,35 @@ return true;

try {
let lastYieldTime = Date.now();
let hasYielded = false;
const checkYieldTimeout = setInterval(() => {
if (Date.now() - lastYieldTime > config.yieldTimeout) {
logger.warn(`Step ${stepIndex} has not yielded for ${config.yieldTimeout}ms`);
}
}, config.yieldTimeout);
const onYield = () => {
hasYielded = true;
lastYieldTime = Date.now();
};
let result;
if (Array.isArray(step)) {
return await processParallel(step, input);
result = await processParallel(step, input);
} else if (isGenerator(step)) {
return await processGenerator(step, input);
result = await processGenerator(step, input, onYield);
} else if (typeof step === 'function') {
return await processFunction(step, input);
result = await processFunction(step, input);
} else if (isComplexIterable(step)) {
return await processGenerator(async function*() {
result = await processGenerator(async function*() {
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator])
? step
: [step]
}, input);
}, input, onYield);
} else {
return step;
result = step;
}
clearInterval(checkYieldTimeout);
return result;
} catch (error) {

@@ -104,3 +134,3 @@ throw new StreamOpsError('Error processing step', stepIndex, error);

async function processGenerator(gen, input) {
async function processGenerator(gen, input, onYield) {
let results = [];

@@ -111,2 +141,3 @@ for await (const item of input) {

results.push(result);
if (onYield) onYield();
}

@@ -147,3 +178,3 @@ }

if (error instanceof StreamOpsError) {
throw error; // Rethrow StreamOpsErrors (including timeout errors) directly
throw error;
}

@@ -163,3 +194,7 @@ throw new StreamOpsError(`Error in step ${stepIndex}`, stepIndex, error);

logger.debug(`Buffer size exceeded. Current size: ${state.length}`);
yield* state.splice(0, state.length - config.bufferSize);
for (const item of state.splice(0, state.length - config.bufferSize)) {
yield item;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}
}

@@ -170,5 +205,9 @@ }

yield state;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
} else if (typeof state[Symbol.asyncIterator] === 'function') {
for await (const item of state) {
yield item;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}

@@ -178,5 +217,9 @@ } else if (typeof state[Symbol.iterator] === 'function') {

yield item;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}
} else {
yield state;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}

@@ -189,2 +232,3 @@

} finally {
clearInterval(checkDownstreamTimeout);
logger.info('Streaming pipeline completed');

@@ -191,0 +235,0 @@ emitter.emit('end');

2

package.json
{
"name": "streamops",
"version": "0.1.4",
"version": "0.1.5",
"main": "./index.js",

@@ -5,0 +5,0 @@ "scripts": {

@@ -35,6 +35,4 @@ const streaming = require('../index.js')({

function* repeatSum(sum) {
// for (const sum of sums) {
yield `The sum is: ${sum}`;
yield `Double the sum is: ${sum * 2}`;
// }
yield `The sum is: ${sum}`;
yield `Double the sum is: ${sum * 2}`;
}

@@ -41,0 +39,0 @@ ];

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc