Comparing version 0.1.5 to 0.1.6
185
index.js
@@ -5,3 +5,7 @@ const EventEmitter = require('events'); | ||
constructor(message, step, originalError = null) { | ||
super(message); | ||
super( | ||
message | ||
+ (step ? ': Step ' + step : '') | ||
+ (originalError ? ': Original Error: ' + originalError : '') | ||
); | ||
this.name = 'StreamOpsError'; | ||
@@ -67,6 +71,5 @@ this.step = step; | ||
async function processStep(step, input) { | ||
async function* processStep(step, input) { | ||
try { | ||
let lastYieldTime = Date.now(); | ||
let hasYielded = false; | ||
@@ -80,25 +83,30 @@ const checkYieldTimeout = setInterval(() => { | ||
const onYield = () => { | ||
hasYielded = true; | ||
lastYieldTime = Date.now(); | ||
}; | ||
let result; | ||
if (Array.isArray(step)) { | ||
result = await processParallel(step, input); | ||
} else if (isGenerator(step)) { | ||
result = await processGenerator(step, input, onYield); | ||
} else if (typeof step === 'function') { | ||
result = await processFunction(step, input); | ||
} else if (isComplexIterable(step)) { | ||
result = await processGenerator(async function*() { | ||
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator]) | ||
? step | ||
: [step] | ||
}, input, onYield); | ||
} else { | ||
result = step; | ||
} | ||
const processingPromise = (async function*() { | ||
if (Array.isArray(step)) { | ||
yield* processParallel(step, input); | ||
} else if (isGenerator(step)) { | ||
yield* processGenerator(step, input, onYield); | ||
} else if (typeof step === 'function') { | ||
yield* await withTimeout(processFunction(step, input), config.timeout, `Step ${stepIndex} timed out`); | ||
} else if (isComplexIterable(step)) { | ||
yield* processGenerator(async function*() { | ||
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator]) | ||
? step | ||
: [step] | ||
}, input, onYield); | ||
} else { | ||
yield step; | ||
} | ||
})(); | ||
const timeoutPromise = new Promise((_, reject) => { | ||
setTimeout(() => reject(new Error(`Step ${stepIndex} timed out`)), config.timeout); | ||
}); | ||
yield* race(processingPromise, timeoutPromise); | ||
clearInterval(checkYieldTimeout); | ||
return result; | ||
} catch (error) { | ||
@@ -109,2 +117,24 @@ throw new StreamOpsError('Error processing step', stepIndex, error); | ||
async function* withTimeout(promise, ms, message) { | ||
const timeoutPromise = new Promise((_, reject) => { | ||
setTimeout(() => reject(new Error(message)), ms); | ||
}); | ||
try { | ||
const result = await Promise.race([promise, timeoutPromise]); | ||
yield* (Array.isArray(result) ? result : [result]); | ||
} catch (error) { | ||
throw error; | ||
} | ||
} | ||
async function* race(generatorPromise, timeoutPromise) { | ||
try { | ||
const generator = await Promise.race([generatorPromise, timeoutPromise]); | ||
yield* generator; | ||
} catch (error) { | ||
throw error; | ||
} | ||
} | ||
function isComplexIterable(obj) { | ||
@@ -122,35 +152,52 @@ return obj != null && | ||
async function processParallel(steps, input) { | ||
return await Promise.all(input.map(async item => { | ||
const results = await Promise.all(steps.map(async step => { | ||
async function* processParallel(steps, input) { | ||
const inputArray = []; | ||
for await (const item of input) { | ||
inputArray.push(item); | ||
} | ||
const processors = inputArray.map(async (item) => { | ||
const results = []; | ||
for (const step of steps) { | ||
if (Array.isArray(step)) { | ||
return await processParallel(step, [item]); | ||
for await (const result of processParallel(step, [item])) { | ||
results.push(result); | ||
} | ||
} else { | ||
return await processStep(step, [item]); | ||
for await (const result of processStep(step, [item])) { | ||
results.push(result); | ||
} | ||
} | ||
})); | ||
return results.flat(); | ||
})); | ||
} | ||
return results; | ||
}); | ||
const iterator = processors[Symbol.iterator](); | ||
let result = iterator.next(); | ||
while (!result.done) { | ||
const processor = await result.value; | ||
for (const item of processor) { | ||
yield item; | ||
} | ||
result = iterator.next(); | ||
} | ||
} | ||
async function processGenerator(gen, input, onYield) { | ||
let results = []; | ||
async function* processGenerator(gen, input, onYield) { | ||
for await (const item of input) { | ||
const generator = gen.call(context, item); | ||
for await (const result of generator) { | ||
results.push(result); | ||
yield result; | ||
if (onYield) onYield(); | ||
} | ||
} | ||
return results; | ||
} | ||
async function processFunction(fn, input) { | ||
if (input.length === 1) { | ||
const result = await fn.call(context, input[0]); | ||
return [result]; | ||
} else { | ||
const result = await fn.call(context, input); | ||
return Array.isArray(result) ? result : [result]; | ||
const inputArray = []; | ||
for await (const item of input) { | ||
inputArray.push(item); | ||
} | ||
const result = await fn.call(context, inputArray); | ||
return Array.isArray(result) ? result : [result]; | ||
} | ||
@@ -168,10 +215,21 @@ | ||
const processingPromise = processStep(step, state); | ||
const timeoutPromise = new Promise((_, reject) => | ||
setTimeout(() => reject(new StreamOpsError(`Step ${stepIndex} timed out`, stepIndex)), config.timeout) | ||
); | ||
const processingGenerator = processStep(step, state); | ||
try { | ||
state = await Promise.race([processingPromise, timeoutPromise]); | ||
state = []; // Reset state for collecting items from this step | ||
for await (const item of processingGenerator) { | ||
if (stepIndex === pipeline.length - 1) { | ||
// If it's the last step, yield the item to the consumer | ||
yield item; | ||
} else { | ||
// Otherwise, collect the item for the next step | ||
state.push(item); | ||
} | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
if (emitter.listenerCount('data') > 0) { | ||
emitter.emit('data', item); | ||
} | ||
} | ||
} catch (error) { | ||
@@ -185,40 +243,3 @@ if (error instanceof StreamOpsError) { | ||
stepIndex++; | ||
if (emitter.listenerCount('data') > 0) { | ||
for (const item of state) { | ||
emitter.emit('data', item); | ||
} | ||
} | ||
if (state.length > config.bufferSize) { | ||
logger.debug(`Buffer size exceeded. Current size: ${state.length}`); | ||
for (const item of state.splice(0, state.length - config.bufferSize)) { | ||
yield item; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
} | ||
} | ||
if (typeof state == 'string') { | ||
yield state; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} else if (typeof state[Symbol.asyncIterator] === 'function') { | ||
for await (const item of state) { | ||
yield item; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
} else if (typeof state[Symbol.iterator] === 'function') { | ||
for (const item of state) { | ||
yield item; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
} else { | ||
yield state; | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
} | ||
@@ -225,0 +246,0 @@ } catch (error) { |
{ | ||
"name": "streamops", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"main": "./index.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -76,4 +76,4 @@ const createStreamer = require('../index'); | ||
}, | ||
async function () { | ||
await new Promise(resolve => setTimeout(resolve, 200)); // This will timeout | ||
async function() { | ||
await new Promise(resolve => setTimeout(resolve, 1000)); // This will timeout | ||
return 42; | ||
@@ -80,0 +80,0 @@ } |
@@ -9,3 +9,3 @@ const streaming = require('../index.js')({ | ||
test('nested parallel processing', async () => { | ||
test('nested parallel processing with damming fns', async () => { | ||
const pipeline = [ | ||
@@ -15,6 +15,6 @@ function*() { yield 1; yield 2; }, | ||
[ | ||
(x) => x * 2, | ||
(x) => x * 3 | ||
([x]) => x * 2, | ||
([x]) => x * 3 | ||
], | ||
(x) => x + 1 | ||
([x]) => x + 100 | ||
] | ||
@@ -26,11 +26,29 @@ ]; | ||
} | ||
expect(results).toEqual([[[2, 3], 2], [[4, 6], 3]]); | ||
expect(results).toEqual([2, 3, 101, 4, 6, 102]); | ||
}); | ||
test('nested parallel processing with generators', async () => { | ||
const pipeline = [ | ||
function*() { yield 1; yield 2; }, | ||
[ | ||
[ | ||
function*(x) { yield x * 2 }, | ||
function*(x) { yield x * 3 } | ||
], | ||
function*(x) { yield x + 100 } | ||
] | ||
]; | ||
const results = []; | ||
for await (const item of streaming(pipeline)) { | ||
results.push(item); | ||
} | ||
expect(results).toEqual([2, 3, 101, 4, 6, 102]); | ||
}); | ||
test('Parallel Processing', async () => { | ||
const pipeline = [ | ||
() => 3, | ||
() => [3], | ||
[ | ||
(x) => x * 2, | ||
(x) => x + 1 | ||
([x]) => x * 2, | ||
([x]) => x + 1 | ||
] | ||
@@ -42,3 +60,3 @@ ]; | ||
} | ||
expect(results).toEqual([[6, 4]]); | ||
expect(results).toEqual([6, 4]); | ||
}); | ||
@@ -50,4 +68,4 @@ | ||
[ | ||
[(x) => x * 2, (x) => x * 3], | ||
(x) => x + 1 | ||
[([x]) => x * 2, ([x]) => x * 3], | ||
([x]) => x + 1 | ||
] | ||
@@ -59,3 +77,3 @@ ]; | ||
} | ||
expect(results).toEqual([[[4, 6], 3]]); | ||
expect(results).toEqual([4, 6, 3]); | ||
}); | ||
@@ -82,3 +100,3 @@ | ||
['apple'], | ||
function*([thing]) { | ||
function*(thing) { | ||
yield thing === 'apple'; | ||
@@ -85,0 +103,0 @@ } |
@@ -35,6 +35,6 @@ const streaming = require('../index.js')({ | ||
const pipeline = [ | ||
() => 3, | ||
() => [3], | ||
[ | ||
(x) => x * 2, | ||
(x) => x + 1 | ||
([x]) => x * 2, | ||
([x]) => x + 1 | ||
] | ||
@@ -46,3 +46,3 @@ ]; | ||
} | ||
expect(results).toEqual([[6, 4]]); | ||
expect(results).toEqual([6, 4]); | ||
}); | ||
@@ -57,8 +57,8 @@ | ||
[ | ||
function*(num) { yield num * 2; }, | ||
function*(num) { yield num * 3; } | ||
function*(num) { yield {double: num * 2}; }, | ||
function*(num) { yield {triple: num * 3}; } | ||
], | ||
function*(result) { // Note: singular 'result', not 'results' | ||
const [double, triple] = result; | ||
yield { double, triple }; | ||
(res) => { | ||
console.log('res', res); | ||
return res; | ||
} | ||
@@ -71,9 +71,11 @@ ]; | ||
expect(results).toEqual([ | ||
{ double: 2, triple: 3 }, | ||
{ double: 4, triple: 6 } | ||
{ double: 2 }, | ||
{ triple: 3 }, | ||
{ double: 4 }, | ||
{ triple: 6 } | ||
// { double: 2, triple: 3 }, | ||
// { double: 4, triple: 6 } | ||
]); | ||
}); | ||
// return; | ||
test('generator after parallel step receives items individually', async () => { | ||
@@ -102,11 +104,11 @@ const receivedItems = []; | ||
expect(receivedItems).toEqual([ | ||
[2, 3], | ||
[4, 6] | ||
2,3,4,6 | ||
]); | ||
expect(results).toEqual([ | ||
[2, 3], | ||
[4, 6] | ||
2,3,4,6 | ||
]); | ||
}); | ||
return; | ||
test('Parallel API calls using yielded Promise.all', async () => { | ||
@@ -113,0 +115,0 @@ const pipeline = [ |
@@ -44,8 +44,10 @@ const streaming = require('../index.js')({ | ||
test('More Advanced Data flowing', async () => { | ||
test('More Advanced Data flowing w/ fns', async () => { | ||
const res = await (streaming([ | ||
() => [1, 2, 3], | ||
() => [[1, 2, 3]], // arrays returned items yielded individually | ||
// therefore a return of [[_THING_]] (double nested array) | ||
// will be equiv to a yield of [_THING_] | ||
[ | ||
(nums) => nums.join(','), | ||
(nums) => nums.join('%') | ||
([nums]) => nums.join(','), | ||
([nums]) => nums.join('%') | ||
], | ||
@@ -52,0 +54,0 @@ ([x, y]) => { |
@@ -60,2 +60,51 @@ const streaming = require('../index.js')({ | ||
test('Real-time streaming with short delays', async () => { | ||
jest.useRealTimers(); // Use real timers for this test | ||
const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms)); | ||
const pipeline = [ | ||
async function*() { | ||
for (let i = 1; i <= 10; i++) { | ||
yield `Item ${i}\n`; | ||
await delay(50); // 100ms delay | ||
} | ||
} | ||
]; | ||
const results = []; | ||
const timings = []; | ||
const logs = []; | ||
const customLog = (message) => { | ||
const timestamp = new Date().toISOString(); | ||
logs.push({ timestamp, message }); | ||
}; | ||
const stream = streaming(pipeline); | ||
for await (const item of stream) { | ||
customLog(`xmllm yielding item`); | ||
results.push(item); | ||
timings.push(Date.now()); | ||
} | ||
expect(results).toEqual(['Item 1\n', 'Item 2\n', 'Item 3\n', 'Item 4\n', 'Item 5\n', 'Item 6\n', 'Item 7\n', 'Item 8\n', 'Item 9\n', 'Item 10\n']); | ||
// Check if items were received approximately 100ms apart | ||
for (let i = 1; i < timings.length; i++) { | ||
const timeDiff = timings[i] - timings[i-1]; | ||
expect(timeDiff).toBeGreaterThanOrEqual(40); // Allow for small timing inconsistencies | ||
expect(timeDiff).toBeLessThanOrEqual(60); | ||
} | ||
// Check if customLog was called with approximately 100ms intervals | ||
expect(logs.length).toBe(10); | ||
for (let i = 1; i < logs.length; i++) { | ||
const timeDiff = new Date(logs[i].timestamp) - new Date(logs[i-1].timestamp); | ||
expect(timeDiff).toBeGreaterThanOrEqual(40); | ||
expect(timeDiff).toBeLessThanOrEqual(60); | ||
} | ||
}); | ||
test('pipeline with async operations', async () => { | ||
@@ -196,4 +245,6 @@ const pipeline = [ | ||
() => ({count: 1}), | ||
({count}) => ({count: count + 1}), | ||
({count}) => `Final count: ${count}` | ||
([{count}]) => { | ||
return {count: count + 1}; | ||
}, | ||
([{count}]) => `Final count: ${count}` | ||
]; | ||
@@ -207,3 +258,3 @@ const results = []; | ||
test('Async Operations', async () => { | ||
test('Async Operations [functions]', async () => { | ||
const fetchData = async () => 'data'; | ||
@@ -213,3 +264,3 @@ const processData = async (data) => data.toUpperCase(); | ||
async () => await fetchData(), | ||
async (data) => await processData(data) | ||
async ([data]) => await processData(data) | ||
]; | ||
@@ -272,10 +323,8 @@ const results = []; | ||
], | ||
function*(messages) { | ||
for (const {message} of messages) { | ||
yield { | ||
message, | ||
name: 'Michael' | ||
} | ||
} | ||
function*({message}) { | ||
yield { | ||
message, | ||
name: 'Michael' | ||
} | ||
} | ||
]; | ||
@@ -464,2 +513,43 @@ | ||
test('Delayed streaming with timing verification', async () => { | ||
jest.useFakeTimers(); | ||
const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms)); | ||
const pipeline = [ | ||
async function*() { | ||
for (let i = 1; i <= 5; i++) { | ||
yield `Item ${i}`; | ||
await delay(1000); // 1 second delay | ||
} | ||
} | ||
]; | ||
const results = []; | ||
const timings = []; | ||
const streamPromise = (async () => { | ||
for await (const item of streaming(pipeline)) { | ||
results.push(item); | ||
timings.push(Date.now()); | ||
// jest.advanceTimersByTime(1000); // Advance time by 1 second | ||
} | ||
})(); | ||
await jest.runAllTimersAsync(); | ||
await streamPromise; | ||
expect(results).toEqual(['Item 1', 'Item 2', 'Item 3', 'Item 4', 'Item 5']); | ||
// Check if items were received approximately 1 second apart | ||
for (let i = 1; i < timings.length; i++) { | ||
const timeDiff = timings[i] - timings[i-1]; | ||
console.log('timeDiff', timeDiff); | ||
expect(timeDiff).toBeGreaterThanOrEqual(900); // Allow for small timing inconsistencies | ||
expect(timeDiff).toBeLessThanOrEqual(1100); | ||
} | ||
jest.useRealTimers(); | ||
}); | ||
}); |
49929
1642