Comparing version 0.1.10 to 0.1.11
{ | ||
"name": "streamops", | ||
"version": "0.1.10", | ||
"version": "0.1.11", | ||
"main": "./src/main.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
module.exports = function createLogger(options) { | ||
const logLevels = ['error', 'warn', 'info', 'debug']; | ||
const logLevels = ['error', 'warn', 'info', 'debug', 'dev']; | ||
const logLevel = options.logLevel || 'info'; | ||
const logLevelIndex = logLevels.indexOf(logLevel); | ||
const isDevelopment = process.env.NODE_ENV === 'development'; | ||
@@ -9,4 +10,4 @@ return logLevels.reduce((logger, level) => { | ||
logger[level] = (...args) => { | ||
if (levelIndex <= logLevelIndex) { | ||
console[level](`[${new Date().toISOString()}] [${level.toUpperCase()}]`, ...args); | ||
if (levelIndex <= logLevelIndex && (level !== 'dev' || isDevelopment)) { | ||
(console[level] || console.log)(`[${new Date().toISOString()}] [${level.toUpperCase()}]`, ...args); | ||
} | ||
@@ -13,0 +14,0 @@ }; |
@@ -7,10 +7,18 @@ const EventEmitter = require('events'); | ||
class TimeoutCancelError extends Error { | ||
constructor(stepIndex) { | ||
super(`Step ${stepIndex} timed out, cancelling pipeline`); | ||
this.name = 'TimeoutCancelError'; | ||
this.stepIndex = stepIndex; | ||
} | ||
} | ||
function createStreamOps(options = {}) { | ||
const defaultOptions = { | ||
timeout: 100000, | ||
bufferSize: 1000, | ||
logLevel: 'info', | ||
logLevel: 'error', | ||
yieldTimeout: 20000, | ||
downstreamTimeout: 30000 | ||
downstreamTimeout: 30000, | ||
yieldTimeoutBehavior: 'warn' | ||
}; | ||
@@ -21,2 +29,26 @@ | ||
function handleYieldTimeout(stepIndex, lastYieldTime) { | ||
if (Date.now() - lastYieldTime <= config.yieldTimeout) { | ||
return { shouldContinue: true }; | ||
} | ||
switch (config.yieldTimeoutBehavior) { | ||
case 'warn': | ||
logger.warn(`Step ${stepIndex} has not yielded for ${config.yieldTimeout}ms`); | ||
return { shouldContinue: true }; | ||
case 'yield-null': | ||
logger.warn(`Step ${stepIndex} timed out, yielding null`); | ||
return { shouldContinue: true, valueToYield: null }; | ||
case 'cancel': | ||
logger.error(`Step ${stepIndex} timed out, cancelling pipeline`); | ||
return { shouldContinue: false, cancel: true }; | ||
case 'block': | ||
logger.warn(`Step ${stepIndex} timed out, blocking future yields`); | ||
return { shouldContinue: false }; | ||
default: | ||
logger.warn(`Unknown yieldTimeoutBehavior: ${config.yieldTimeoutBehavior}. Defaulting to 'warn'`); | ||
return { shouldContinue: true }; | ||
} | ||
} | ||
async function* streaming(pipeline) { | ||
@@ -52,15 +84,26 @@ if (pipeline instanceof StreamingChain) { | ||
async function* processStep(step, [input]) { | ||
try { | ||
let lastYieldTime = Date.now(); | ||
let lastYieldTime = Date.now(); | ||
let timeoutOccurred = false; | ||
let shouldCancel = false; | ||
let timeoutValue = undefined; | ||
const checkYieldTimeout = setInterval(() => { | ||
if (Date.now() - lastYieldTime > config.yieldTimeout) { | ||
logger.warn(`Step ${stepIndex} has not yielded for ${config.yieldTimeout}ms`); | ||
} | ||
}, config.yieldTimeout); | ||
const checkYieldTimeout = setInterval(() => { | ||
const { shouldContinue, valueToYield, cancel } = handleYieldTimeout(stepIndex, lastYieldTime); | ||
if (!shouldContinue) { | ||
clearInterval(checkYieldTimeout); | ||
timeoutOccurred = true; | ||
} | ||
if (cancel) { | ||
shouldCancel = true; | ||
} | ||
if (valueToYield !== undefined) { | ||
timeoutValue = valueToYield; | ||
} | ||
}, Math.min(config.yieldTimeout, 1000)); | ||
const onYield = () => { | ||
lastYieldTime = Date.now(); | ||
}; | ||
const onYield = () => { | ||
lastYieldTime = Date.now(); | ||
}; | ||
async function* wrappedStep() { | ||
if (Array.isArray(step)) { | ||
@@ -81,6 +124,21 @@ yield* processParallel(step, input); | ||
} | ||
} | ||
try { | ||
for await (const item of wrappedStep()) { | ||
if (shouldCancel) { | ||
throw new TimeoutCancelError(stepIndex); | ||
} | ||
if (timeoutOccurred) { | ||
break; | ||
} | ||
if (timeoutValue !== undefined) { | ||
yield timeoutValue; | ||
timeoutValue = undefined; | ||
} | ||
yield item; | ||
onYield(); | ||
} | ||
} finally { | ||
clearInterval(checkYieldTimeout); | ||
} catch (error) { | ||
throw new StreamOpsError('Error processing step', stepIndex, error); | ||
} | ||
@@ -159,3 +217,3 @@ } | ||
const step = pipeline[stepIndex]; | ||
logger.info(`Processing step ${stepIndex}`); | ||
logger.dev(`Processing step ${stepIndex}`); | ||
validateStep(step); | ||
@@ -180,3 +238,3 @@ | ||
clearInterval(checkDownstreamTimeout); | ||
logger.info('Streaming pipeline completed'); | ||
logger.dev('Streaming pipeline completed'); | ||
emitter.emit('end'); | ||
@@ -199,4 +257,4 @@ } | ||
); | ||
}; | ||
} | ||
module.exports = createStreamOps; |
@@ -53,3 +53,3 @@ const createStreamer = require('../src/createStreamOps'); | ||
} | ||
}).rejects.toThrow('Error processing step: Step 1'); | ||
}).rejects.toThrow('Test error'); | ||
@@ -56,0 +56,0 @@ // expect(errorHandler).toHaveBeenCalledTimes(1); |
@@ -5,8 +5,13 @@ const createStreamOps = require('../src/createStreamOps.js'); | ||
let originalConsoleWarn; | ||
let originalConsoleError; | ||
let mockWarn; | ||
let mockError; | ||
beforeEach(() => { | ||
originalConsoleWarn = console.warn; | ||
originalConsoleError = console.error; | ||
mockWarn = jest.fn(); | ||
mockError = jest.fn(); | ||
console.warn = mockWarn; | ||
console.error = mockError; | ||
}); | ||
@@ -16,2 +21,3 @@ | ||
console.warn = originalConsoleWarn; | ||
console.error = originalConsoleError; | ||
}); | ||
@@ -39,9 +45,9 @@ | ||
test('Warning logged for slow yielding step', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 50 }); | ||
const streaming = createStreamOps({ yieldTimeout: 5, logLevel: 'warn' }); | ||
const pipeline = [ | ||
async function* () { | ||
yield 1; | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
await new Promise(resolve => setTimeout(resolve, 30)); | ||
yield 2; | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
await new Promise(resolve => setTimeout(resolve, 30)); | ||
yield 3; | ||
@@ -57,18 +63,19 @@ } | ||
expect(results).toEqual([1, 2, 3]); | ||
console.log(mockWarn.mock.calls.join(',')) | ||
expect( | ||
/Step 1 has not yielded for 50ms/ | ||
.test( | ||
mockWarn.mock.calls.join(',') | ||
) | ||
).toEqual(true) | ||
// Remember timeout executions are fuzzy depending on runtime | ||
// it's hard to make solid determinisms | ||
expect(mockWarn).toHaveBeenCalledWith( | ||
expect.stringMatching(/\[.*\] \[WARN\]/), | ||
expect.stringMatching(/Step 1 has not yielded for 5ms/) | ||
); | ||
expect(mockWarn.mock.calls.length).toBeGreaterThanOrEqual(4); | ||
}); | ||
test('No warning for step that completes quickly', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 1000 }); | ||
test('Yield-null behavior', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 10, yieldTimeoutBehavior: 'yield-null', logLevel: 'warn' }); | ||
const pipeline = [ | ||
() => [1, 2, 3] | ||
async function* () { | ||
yield 1; | ||
await new Promise(resolve => setTimeout(resolve, 50)); | ||
yield 2; | ||
await new Promise(resolve => setTimeout(resolve, 50)); | ||
yield 3; | ||
} | ||
]; | ||
@@ -81,15 +88,48 @@ | ||
expect(results.flat()).toEqual([1, 2, 3]); | ||
expect(mockWarn).not.toHaveBeenCalled(); | ||
expect(results).toContain(null); | ||
expect(results).toEqual([1, null, 2, null, 3]); | ||
expect(mockWarn).toHaveBeenCalledWith( | ||
expect.stringMatching(/\[.*\] \[WARN\]/), | ||
expect.stringMatching(/Step 1 timed out, yielding null/) | ||
); | ||
}); | ||
test('Warning logged for step that takes too long without yielding', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 100 }); | ||
test('Cancel behavior', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 10, yieldTimeoutBehavior: 'cancel', logLevel: 'error' }); | ||
const pipeline = [ | ||
async () => { | ||
await new Promise(resolve => setTimeout(resolve, 500)); | ||
return [1, 2, 3]; | ||
async function* () { | ||
yield 1; | ||
await new Promise(resolve => setTimeout(resolve, 30)); | ||
yield 2; | ||
} | ||
]; | ||
await expect(async () => { | ||
const results = []; | ||
try { | ||
for await (const item of streaming(pipeline)) { | ||
results.push(item); | ||
} | ||
} catch (error) { | ||
throw error; // Re-throw the error to be caught by the expect | ||
} | ||
}).rejects.toThrow('Step 1 timed out'); | ||
expect(mockError).toHaveBeenCalledWith( | ||
expect.stringMatching(/\[.*\] \[ERROR\]/), | ||
expect.stringMatching(/Step 1 timed out, cancelling pipeline/) | ||
); | ||
}); | ||
test('Block behavior', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 10, yieldTimeoutBehavior: 'block', logLevel: 'warn' }); | ||
const pipeline = [ | ||
async function* () { | ||
yield 1; | ||
await new Promise(resolve => setTimeout(resolve, 50)); | ||
yield 2; | ||
yield 3; | ||
} | ||
]; | ||
const results = []; | ||
@@ -100,18 +140,16 @@ for await (const item of streaming(pipeline)) { | ||
expect(results.flat()).toEqual([1, 2, 3]); | ||
expect(results).toEqual([1]); | ||
expect(mockWarn).toHaveBeenCalledWith( | ||
expect.stringMatching(/\[.*\] \[WARN\]/), | ||
"Step 1 has not yielded for 100ms" | ||
expect.stringMatching(/Step 1 timed out, blocking future yields/) | ||
); | ||
}); | ||
test('Multiple warnings for step with multiple long pauses', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 50 }); | ||
test('Unknown behavior defaults to warn', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 50, yieldTimeoutBehavior: 'unknown', logLevel: 'warn' }); | ||
const pipeline = [ | ||
async function* () { | ||
yield 1; | ||
await new Promise(resolve => setTimeout(resolve, 150)); | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
yield 2; | ||
await new Promise(resolve => setTimeout(resolve, 150)); | ||
yield 3; | ||
} | ||
@@ -125,8 +163,11 @@ ]; | ||
expect(results).toEqual([1, 2, 3]); | ||
expect(mockWarn.mock.calls.length).toBeGreaterThanOrEqual(2); | ||
expect(results).toEqual([1, 2]); | ||
expect(mockWarn).toHaveBeenCalledWith( | ||
expect.stringMatching(/\[.*\] \[WARN\]/), | ||
expect.stringMatching(/Unknown yieldTimeoutBehavior: unknown. Defaulting to 'warn'/) | ||
); | ||
}); | ||
test('No warning for complex iterable that yields quickly', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 1000 }); | ||
test('Timeout behavior with complex iterable', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 50, yieldTimeoutBehavior: 'yield-null', logLevel: 'warn' }); | ||
const pipeline = [ | ||
@@ -136,3 +177,5 @@ { | ||
yield 1; | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
yield 2; | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
yield 3; | ||
@@ -148,17 +191,16 @@ } | ||
expect(results).toEqual([1, 2, 3]); | ||
expect(mockWarn).not.toHaveBeenCalled(); | ||
expect(results).toContain(null); | ||
expect(results).toEqual([1, null, 2, null, 3]); | ||
expect(mockWarn).toHaveBeenCalledWith( | ||
expect.stringMatching(/\[.*\] \[WARN\]/), | ||
expect.stringMatching(/Step 1 timed out, yielding null/) | ||
); | ||
}); | ||
test('Warning for complex iterable with slow yields', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 50 }); | ||
test('Timeout behavior with regular function', async () => { | ||
const streaming = createStreamOps({ yieldTimeout: 50, yieldTimeoutBehavior: 'warn', logLevel: 'warn' }); | ||
const pipeline = [ | ||
{ | ||
[Symbol.asyncIterator]: async function* () { | ||
yield 1; | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
yield 2; | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
yield 3; | ||
} | ||
async () => { | ||
await new Promise(resolve => setTimeout(resolve, 200)); | ||
return 42; | ||
} | ||
@@ -172,5 +214,9 @@ ]; | ||
expect(results).toEqual([1, 2, 3]); | ||
expect(mockWarn.mock.calls.length).toBeGreaterThanOrEqual(4); | ||
expect(results).toEqual([42]); | ||
expect(mockWarn).toHaveBeenCalledWith( | ||
expect.stringMatching(/\[.*\] \[WARN\]/), | ||
expect.stringMatching(/Step 1 has not yielded for 50ms/) | ||
); | ||
}); | ||
}); |
77860
2636