Comparing version 0.1.4 to 0.1.5
66
index.js
@@ -30,5 +30,7 @@ const EventEmitter = require('events'); | ||
const defaultOptions = { | ||
timeout: 30000, | ||
timeout: 100000, | ||
bufferSize: 1000, | ||
logLevel: 'info' | ||
logLevel: 'info', | ||
yieldTimeout: 20000, | ||
downstreamTimeout: 30000 | ||
}; | ||
@@ -50,3 +52,13 @@ | ||
let stepIndex = 0; | ||
let lastDownstreamYield = Date.now(); | ||
let downstreamTimeoutWarningIssued = false; | ||
const checkDownstreamTimeout = setInterval(() => { | ||
const timeSinceLastYield = Date.now() - lastDownstreamYield; | ||
if (timeSinceLastYield > config.downstreamTimeout && !downstreamTimeoutWarningIssued) { | ||
logger.warn(`No data received downstream for ${config.downstreamTimeout}ms`); | ||
downstreamTimeoutWarningIssued = true; | ||
} | ||
}, Math.min(config.downstreamTimeout, 1000)); | ||
function validateStep(step) { | ||
@@ -58,17 +70,35 @@ return true; | ||
try { | ||
let lastYieldTime = Date.now(); | ||
let hasYielded = false; | ||
const checkYieldTimeout = setInterval(() => { | ||
if (Date.now() - lastYieldTime > config.yieldTimeout) { | ||
logger.warn(`Step ${stepIndex} has not yielded for ${config.yieldTimeout}ms`); | ||
} | ||
}, config.yieldTimeout); | ||
const onYield = () => { | ||
hasYielded = true; | ||
lastYieldTime = Date.now(); | ||
}; | ||
let result; | ||
if (Array.isArray(step)) { | ||
return await processParallel(step, input); | ||
result = await processParallel(step, input); | ||
} else if (isGenerator(step)) { | ||
return await processGenerator(step, input); | ||
result = await processGenerator(step, input, onYield); | ||
} else if (typeof step === 'function') { | ||
return await processFunction(step, input); | ||
result = await processFunction(step, input); | ||
} else if (isComplexIterable(step)) { | ||
return await processGenerator(async function*() { | ||
result = await processGenerator(async function*() { | ||
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator]) | ||
? step | ||
: [step] | ||
}, input); | ||
}, input, onYield); | ||
} else { | ||
return step; | ||
result = step; | ||
} | ||
clearInterval(checkYieldTimeout); | ||
return result; | ||
} catch (error) { | ||
@@ -104,3 +134,3 @@ throw new StreamOpsError('Error processing step', stepIndex, error); | ||
async function processGenerator(gen, input) { | ||
async function processGenerator(gen, input, onYield) { | ||
let results = []; | ||
@@ -111,2 +141,3 @@ for await (const item of input) { | ||
results.push(result); | ||
if (onYield) onYield(); | ||
} | ||
@@ -147,3 +178,3 @@ } | ||
if (error instanceof StreamOpsError) { | ||
throw error; // Rethrow StreamOpsErrors (including timeout errors) directly | ||
throw error; | ||
} | ||
@@ -163,3 +194,7 @@ throw new StreamOpsError(`Error in step ${stepIndex}`, stepIndex, error); | ||
logger.debug(`Buffer size exceeded. Current size: ${state.length}`); | ||
yield* state.splice(0, state.length - config.bufferSize); | ||
for (const item of state.splice(0, state.length - config.bufferSize)) { | ||
yield item; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
} | ||
@@ -170,5 +205,9 @@ } | ||
yield state; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} else if (typeof state[Symbol.asyncIterator] === 'function') { | ||
for await (const item of state) { | ||
yield item; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
@@ -178,5 +217,9 @@ } else if (typeof state[Symbol.iterator] === 'function') { | ||
yield item; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
} else { | ||
yield state; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
@@ -189,2 +232,3 @@ | ||
} finally { | ||
clearInterval(checkDownstreamTimeout); | ||
logger.info('Streaming pipeline completed'); | ||
@@ -191,0 +235,0 @@ emitter.emit('end'); |
{ | ||
"name": "streamops", | ||
"version": "0.1.4", | ||
"version": "0.1.5", | ||
"main": "./index.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -35,6 +35,4 @@ const streaming = require('../index.js')({ | ||
function* repeatSum(sum) { | ||
// for (const sum of sums) { | ||
yield `The sum is: ${sum}`; | ||
yield `Double the sum is: ${sum * 2}`; | ||
// } | ||
yield `The sum is: ${sum}`; | ||
yield `Double the sum is: ${sum * 2}`; | ||
} | ||
@@ -41,0 +39,0 @@ ]; |
45938
13
1533