Comparing version 0.1.6 to 0.1.7
131
index.js
@@ -53,4 +53,3 @@ const EventEmitter = require('events'); | ||
let state = context.state = [undefined]; | ||
let stepIndex = 0; | ||
let stepIndex = 1; | ||
let lastDownstreamYield = Date.now(); | ||
@@ -71,3 +70,3 @@ let downstreamTimeoutWarningIssued = false; | ||
async function* processStep(step, input) { | ||
async function* processStep(step, [input]) { | ||
try { | ||
@@ -86,26 +85,18 @@ let lastYieldTime = Date.now(); | ||
const processingPromise = (async function*() { | ||
if (Array.isArray(step)) { | ||
yield* processParallel(step, input); | ||
} else if (isGenerator(step)) { | ||
yield* processGenerator(step, input, onYield); | ||
} else if (typeof step === 'function') { | ||
yield* await withTimeout(processFunction(step, input), config.timeout, `Step ${stepIndex} timed out`); | ||
} else if (isComplexIterable(step)) { | ||
yield* processGenerator(async function*() { | ||
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator]) | ||
? step | ||
: [step] | ||
}, input, onYield); | ||
} else { | ||
yield step; | ||
} | ||
})(); | ||
if (Array.isArray(step)) { | ||
yield* processParallel(step, input); | ||
} else if (isGenerator(step)) { | ||
yield* processGenerator(step, input, onYield); | ||
} else if (typeof step === 'function') { | ||
yield* processFunction(step, [input]); | ||
} else if (isComplexIterable(step)) { | ||
yield* processGenerator(async function*() { | ||
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator]) | ||
? step | ||
: [step] | ||
}, input, onYield); | ||
} else { | ||
yield step; | ||
} | ||
const timeoutPromise = new Promise((_, reject) => { | ||
setTimeout(() => reject(new Error(`Step ${stepIndex} timed out`)), config.timeout); | ||
}); | ||
yield* race(processingPromise, timeoutPromise); | ||
clearInterval(checkYieldTimeout); | ||
@@ -152,31 +143,8 @@ } catch (error) { | ||
async function* processParallel(steps, input) { | ||
const inputArray = []; | ||
for await (const item of input) { | ||
inputArray.push(item); | ||
} | ||
const processors = inputArray.map(async (item) => { | ||
const results = []; | ||
for (const step of steps) { | ||
if (Array.isArray(step)) { | ||
for await (const result of processParallel(step, [item])) { | ||
results.push(result); | ||
} | ||
} else { | ||
for await (const result of processStep(step, [item])) { | ||
results.push(result); | ||
} | ||
} | ||
for (const step of steps) { | ||
if (Array.isArray(step)) { | ||
yield* processParallel(step, input); | ||
} else { | ||
yield* processStep(step, [input]); | ||
} | ||
return results; | ||
}); | ||
const iterator = processors[Symbol.iterator](); | ||
let result = iterator.next(); | ||
while (!result.done) { | ||
const processor = await result.value; | ||
for (const item of processor) { | ||
yield item; | ||
} | ||
result = iterator.next(); | ||
} | ||
@@ -186,12 +154,10 @@ } | ||
async function* processGenerator(gen, input, onYield) { | ||
for await (const item of input) { | ||
const generator = gen.call(context, item); | ||
for await (const result of generator) { | ||
yield result; | ||
if (onYield) onYield(); | ||
} | ||
const generator = gen.call(context, input); | ||
for await (const result of generator) { | ||
yield result; | ||
if (onYield) onYield(); | ||
} | ||
} | ||
async function processFunction(fn, input) { | ||
async function* processFunction(fn, input) { | ||
const inputArray = []; | ||
@@ -202,3 +168,3 @@ for await (const item of input) { | ||
const result = await fn.call(context, inputArray); | ||
return Array.isArray(result) ? result : [result]; | ||
yield* (Array.isArray(result) ? result : [result]); | ||
} | ||
@@ -212,34 +178,23 @@ | ||
try { | ||
for (const step of pipeline) { | ||
async function* processPipeline(input, stepIndex = 0) { | ||
if (stepIndex >= pipeline.length) { | ||
yield* input; | ||
return; | ||
} | ||
const step = pipeline[stepIndex]; | ||
logger.info(`Processing step ${stepIndex}`); | ||
validateStep(step); | ||
const processingGenerator = processStep(step, state); | ||
try { | ||
state = []; // Reset state for collecting items from this step | ||
for await (const item of processingGenerator) { | ||
if (stepIndex === pipeline.length - 1) { | ||
// If it's the last step, yield the item to the consumer | ||
yield item; | ||
} else { | ||
// Otherwise, collect the item for the next step | ||
state.push(item); | ||
} | ||
lastDownstreamYield = Date.now(); | ||
downstreamTimeoutWarningIssued = false; | ||
if (emitter.listenerCount('data') > 0) { | ||
emitter.emit('data', item); | ||
} | ||
for await (const item of input) { | ||
const processingGenerator = processStep(step, [item]); | ||
stepIndex++; | ||
for await (const result of processingGenerator) { | ||
yield* processPipeline([result], stepIndex); | ||
} | ||
} catch (error) { | ||
if (error instanceof StreamOpsError) { | ||
throw error; | ||
} | ||
throw new StreamOpsError(`Error in step ${stepIndex}`, stepIndex, error); | ||
} | ||
} | ||
stepIndex++; | ||
} | ||
yield* processPipeline([undefined]); | ||
@@ -246,0 +201,0 @@ } catch (error) { |
{ | ||
"name": "streamops", | ||
"version": "0.1.6", | ||
"version": "0.1.7", | ||
"main": "./index.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -7,3 +7,3 @@ const streaming = require('../index.js')({ | ||
describe('streaming abstraction', () => { | ||
xdescribe('streaming abstraction', () => { | ||
@@ -31,2 +31,3 @@ test('Multiple damming approach with alternating generators and functions', async () => { | ||
function sumNumbers(numbers) { | ||
console.log('NUMBERS>>>>>', numbers) | ||
return [numbers.reduce((sum, n) => sum + n, 0)]; | ||
@@ -33,0 +34,0 @@ }, |
@@ -204,3 +204,3 @@ const createStreamOps = require('../index.js'); | ||
expect(results).toEqual([2, 4, 6]); | ||
expect(mockWarn).toHaveBeenCalledTimes(2); | ||
expect(mockWarn).toHaveBeenCalledTimes(1); | ||
}); | ||
@@ -207,0 +207,0 @@ |
@@ -7,4 +7,123 @@ const streaming = require('../index.js')({ | ||
describe('Inner streeeeems', () => { | ||
describe('Inner Streams Basics', () => { | ||
test('Simple streaming test', async () => { | ||
const log = []; | ||
const pipeline = [ | ||
function*() { | ||
log.push('Step 1: Yielding 1'); | ||
yield 1; | ||
log.push('Step 1: Yielding 2'); | ||
yield 2; | ||
log.push('Step 1: Yielding 3'); | ||
yield 3; | ||
}, | ||
function*(num) { | ||
log.push(`Step 2: Received ${num}`); | ||
yield num * 2; | ||
}, | ||
function*(num) { | ||
log.push(`Step 3: Received ${num}`); | ||
yield num + 1; | ||
} | ||
]; | ||
const results = []; | ||
for await (const item of streaming(pipeline)) { | ||
results.push(item); | ||
log.push(`Main: Received ${item}`); | ||
} | ||
console.log(log); | ||
console.log(results); | ||
expect(log).toEqual([ | ||
'Step 1: Yielding 1', | ||
'Step 2: Received 1', | ||
'Step 3: Received 2', | ||
'Main: Received 3', | ||
'Step 1: Yielding 2', | ||
'Step 2: Received 2', | ||
'Step 3: Received 4', | ||
'Main: Received 5', | ||
'Step 1: Yielding 3', | ||
'Step 2: Received 3', | ||
'Step 3: Received 6', | ||
'Main: Received 7' | ||
]); | ||
expect(results).toEqual([3, 5, 7]); | ||
}); | ||
test('it works', async () => { | ||
const log = []; | ||
const stream = streaming([ | ||
function*() { | ||
log.push('yielding 1'); | ||
yield 1; | ||
log.push('yielding 2'); | ||
yield 2; | ||
log.push('yielding 3'); | ||
yield 3; | ||
}, | ||
function*(n) { | ||
log.push('received ' + n); | ||
yield n; | ||
} | ||
]); | ||
const results = []; | ||
for await (const x of stream) {} | ||
expect(log).toEqual([ | ||
"yielding 1", | ||
"received 1", | ||
"yielding 2", | ||
"received 2", | ||
"yielding 3", | ||
"received 3" | ||
]); | ||
}); | ||
test('x', async() => { | ||
let nextGreeting = ['bonjour', 'hola', 'nihao', 'heya']; | ||
const stream = streaming([ | ||
streaming([ | ||
function*() { | ||
yield 'bob'; | ||
yield 'sam'; | ||
}, | ||
function*(name) { | ||
yield 'hello ' + name; | ||
yield nextGreeting.shift() + ' ' + name; | ||
}, | ||
function*(greeting) { | ||
yield greeting; | ||
yield greeting.toUpperCase(); | ||
} | ||
]), | ||
function*(msg) { | ||
yield 'Message: ' + msg; | ||
} | ||
]); | ||
const results = []; | ||
for await (const x of stream) { | ||
results.push(x); | ||
} | ||
expect(results).toEqual([ | ||
'Message: hello bob', | ||
'Message: HELLO BOB', | ||
'Message: bonjour bob', | ||
'Message: BONJOUR BOB', | ||
'Message: hello sam', | ||
'Message: HELLO SAM', | ||
'Message: hola sam', | ||
'Message: HOLA SAM' | ||
]); | ||
}); | ||
test('Nested streaming pipelines (agnostic approach)', async () => { | ||
@@ -18,3 +137,3 @@ const results = []; | ||
streaming([ | ||
function*() { | ||
function*(x) { | ||
yield 'HELLO'; | ||
@@ -30,3 +149,59 @@ yield 'HI'; | ||
test('Inner stream is absorbed as it yields', async () => { | ||
let flag = false; | ||
const results = []; | ||
let next; | ||
const stream = streaming([ | ||
streaming([ | ||
function*() { | ||
yield 1; | ||
yield 2; | ||
yield 3; | ||
}, | ||
]), | ||
function*(num) { | ||
yield num + 1; | ||
} | ||
]); | ||
for await (const item of stream) { | ||
results.push(item); | ||
} | ||
expect(results).toEqual([2, 3, 4]); | ||
}); | ||
return; | ||
test('Nested streams and inter-step interactions', async () => { | ||
let flag = false; | ||
const results = []; | ||
const pipeline = [ | ||
streaming([ | ||
function*() { | ||
yield 1; | ||
if (flag) { | ||
yield 'Flag was set'; | ||
} | ||
yield 2; | ||
yield 3; | ||
} | ||
]), | ||
function*(num) { | ||
if (num === 1) { | ||
flag = true; | ||
} | ||
yield num; | ||
if (num === 2) { | ||
yield 'Extra item after 2'; | ||
} | ||
} | ||
]; | ||
for await (const item of streaming(pipeline)) { | ||
results.push(item); | ||
} | ||
expect(results).toEqual([1, 'Flag was set', 2, 'Extra item after 2', 3]); | ||
}); | ||
}); |
@@ -20,6 +20,50 @@ const createStreamer = require('../index'); | ||
// test('handles and logs errors properly', async () => { | ||
// const errorMessage = 'Test error in step'; | ||
test('handles and logs errors properly', async () => { | ||
const errorMessage = 'Test error in step'; | ||
const streamingInstance = createStreamer({ | ||
timeout: 1000, | ||
logLevel: 'error' | ||
}); | ||
const pipeline = [ | ||
function* () { | ||
yield 1; | ||
yield 2; | ||
}, | ||
function () { | ||
throw new Error(errorMessage); | ||
}, | ||
function (x) { | ||
return x * 2; | ||
} | ||
]; | ||
const errorHandler = jest.fn(); | ||
const dataHandler = jest.fn(); | ||
const endHandler = jest.fn(); | ||
const emitter = new EventEmitter(); | ||
emitter.on('error', errorHandler); | ||
emitter.on('data', dataHandler); | ||
emitter.on('end', endHandler); | ||
await expect(async () => { | ||
for await (const item of streamingInstance(pipeline)) { | ||
emitter.emit('data', item); | ||
} | ||
}).rejects.toThrow('Error processing step: Step 1'); | ||
// expect(errorHandler).toHaveBeenCalledTimes(1); | ||
// expect(dataHandler).toHaveBeenCalledTimes(0); | ||
// expect(endHandler).toHaveBeenCalledTimes(1); | ||
// expect(consoleOutput.length).toBe(1); | ||
// expect(consoleOutput[0]).toContain('[ERROR] Error in streaming pipeline:'); | ||
// expect(consoleOutput[0]).toContain(errorMessage); | ||
// expect(consoleOutput[0]).toContain('step 1'); | ||
}); | ||
// test('handles timeouts properly', async () => { | ||
// const streamingInstance = createStreamer({ | ||
// timeout: 1000, | ||
// timeout: 100, | ||
// logLevel: 'error' | ||
@@ -33,62 +77,18 @@ // }); | ||
// }, | ||
// function () { | ||
// throw new Error(errorMessage); | ||
// }, | ||
// function (x) { | ||
// return x * 2; | ||
// async function() { | ||
// await new Promise(resolve => setTimeout(resolve, 1000)); // This will timeout | ||
// return 42; | ||
// } | ||
// ]; | ||
// const errorHandler = jest.fn(); | ||
// const dataHandler = jest.fn(); | ||
// const endHandler = jest.fn(); | ||
// const emitter = new EventEmitter(); | ||
// emitter.on('error', errorHandler); | ||
// emitter.on('data', dataHandler); | ||
// emitter.on('end', endHandler); | ||
// await expect(async () => { | ||
// for await (const item of streamingInstance(pipeline)) { | ||
// emitter.emit('data', item); | ||
// // consume the stream | ||
// } | ||
// }).rejects.toThrow('Error in step 1'); | ||
// }).rejects.toThrow('Step 1 timed out'); | ||
// expect(errorHandler).toHaveBeenCalledTimes(1); | ||
// expect(dataHandler).toHaveBeenCalledTimes(0); | ||
// expect(endHandler).toHaveBeenCalledTimes(1); | ||
// expect(consoleOutput.length).toBe(1); | ||
// expect(consoleOutput[0]).toContain('[ERROR] Error in streaming pipeline:'); | ||
// expect(consoleOutput[0]).toContain(errorMessage); | ||
// expect(consoleOutput[0]).toContain('step 1'); | ||
// expect(consoleOutput[0]).toContain('Step 1 timed out'); | ||
// }); | ||
test('handles timeouts properly', async () => { | ||
const streamingInstance = createStreamer({ | ||
timeout: 100, | ||
logLevel: 'error' | ||
}); | ||
const pipeline = [ | ||
function* () { | ||
yield 1; | ||
yield 2; | ||
}, | ||
async function() { | ||
await new Promise(resolve => setTimeout(resolve, 1000)); // This will timeout | ||
return 42; | ||
} | ||
]; | ||
await expect(async () => { | ||
for await (const item of streamingInstance(pipeline)) { | ||
// consume the stream | ||
} | ||
}).rejects.toThrow('Step 1 timed out'); | ||
expect(consoleOutput.length).toBe(1); | ||
expect(consoleOutput[0]).toContain('[ERROR] Error in streaming pipeline:'); | ||
expect(consoleOutput[0]).toContain('Step 1 timed out'); | ||
}); | ||
}); |
@@ -80,11 +80,9 @@ const streaming = require('../index.js')({ | ||
yield 'hello'; | ||
yield 'xyz'; | ||
yield 'HELLO'; | ||
}, | ||
(chunks) => { | ||
return chunks.join('_'); | ||
} | ||
]) | ||
]), | ||
function*(x) { | ||
yield x + ' you'; | ||
} | ||
]).next()); | ||
expect(res.value).toEqual('hello_xyz_HELLO'); | ||
expect(res.value).toEqual('hello you'); | ||
}); | ||
@@ -91,0 +89,0 @@ |
@@ -9,56 +9,36 @@ const streaming = require('../index.js')({ | ||
test('Returns latest', async () => { | ||
const res = await (streaming([ | ||
9, | ||
10, | ||
11 | ||
]).next()); | ||
expect(res.value).toEqual(11); | ||
}); | ||
// test('Returns latest', async () => { | ||
// const res = await (streaming([ | ||
// 9, | ||
// 10, | ||
// 11 | ||
// ]).next()); | ||
// expect(res.value).toEqual(11); | ||
// }); | ||
test('Happens in order', async () => { | ||
// test('Happens in order', async () => { | ||
let happened = [] | ||
// let happened = [] | ||
const res = await (streaming([ | ||
() => happened.push(1), | ||
() => happened.push(2), | ||
() => happened.push(3), | ||
]).next()); | ||
expect(happened).toEqual([1, 2, 3]); | ||
}); | ||
// const res = await (streaming([ | ||
// () => happened.push(1), | ||
// () => happened.push(2), | ||
// () => happened.push(3), | ||
// ]).next()); | ||
// expect(happened).toEqual([1, 2, 3]); | ||
// }); | ||
test('More Data flowing', async () => { | ||
const res = await (streaming([ | ||
[1, 2, 3], | ||
([a,b,c]) => { | ||
return {a,b,c} | ||
} | ||
]).next()); | ||
expect(res.value).toEqual({ | ||
a: 1, | ||
b: 2, | ||
c: 3 | ||
}); | ||
}); | ||
// test('More Data flowing', async () => { | ||
// const results = []; | ||
// for await (const x of streaming([ | ||
// [1, 2, 3], | ||
// (number) => { | ||
// return number; | ||
// } | ||
// ])) { | ||
// results.push(x); | ||
// } | ||
// expect(results).toEqual([1,2,3]); | ||
// }); | ||
test('More Advanced Data flowing w/ fns', async () => { | ||
const res = await (streaming([ | ||
() => [[1, 2, 3]], // arrays returned items yielded individually | ||
// therefore a return of [[_THING_]] (double nested array) | ||
// will be equiv to a yield of [_THING_] | ||
[ | ||
([nums]) => nums.join(','), | ||
([nums]) => nums.join('%') | ||
], | ||
([x, y]) => { | ||
return {x, y} | ||
} | ||
]).next()); | ||
expect(res.value).toEqual({ | ||
"x": "1,2,3", | ||
"y": "1%2%3" | ||
}); | ||
}); | ||
test('Topic and BadThings pipeline without race conditions', async () => { | ||
@@ -77,7 +57,2 @@ const pipeline = [ | ||
yield { topic, badThings }; | ||
}, | ||
function (result) { | ||
if (!this.results) this.results = []; | ||
this.results.push(result); | ||
return this.results; | ||
} | ||
@@ -91,3 +66,3 @@ ]; | ||
expect(results[results.length - 1]).toEqual([ | ||
expect(results).toEqual([ | ||
{ | ||
@@ -94,0 +69,0 @@ topic: 'Quantum Mechanics', |
@@ -43,3 +43,3 @@ const streaming = require('../index.js')({ | ||
test('pipeline with aggregation', async () => { | ||
test('pipeline with aggregation / meh reducer', async () => { | ||
const pipeline = [ | ||
@@ -57,4 +57,5 @@ function*() { | ||
} | ||
// Check if aggregation works correctly | ||
expect(results).toEqual([6]); | ||
expect(results.reduce((sum, num) => sum + num, 0)).toEqual(6); | ||
}); | ||
@@ -272,14 +273,2 @@ | ||
test('Reducer Functions', async () => { | ||
const pipeline = [ | ||
function*() { yield 1; yield 2; yield 3; }, | ||
(numbers) => numbers.reduce((sum, n) => sum + n, 0) | ||
]; | ||
const results = []; | ||
for await (const item of streaming(pipeline)) { | ||
results.push(item); | ||
} | ||
expect(results).toEqual([6]); | ||
}); | ||
test('Simple text streaming', async () => { | ||
@@ -371,4 +360,2 @@ const pipeline = [ | ||
const generatedTopics = {}; | ||
const pipeline = [ | ||
@@ -388,31 +375,30 @@ function* topicGenerator() { | ||
} | ||
], | ||
function resultAggregator(things) { | ||
for (const item of things.flat()) { | ||
const topic = item.topic; | ||
] | ||
]; | ||
if (!generatedTopics[topic]) { | ||
generatedTopics[topic] = { | ||
topic: topic, | ||
bad_things: [], | ||
good_things: [] | ||
}; | ||
} | ||
const results = []; | ||
const generatedTopics = {}; | ||
for await (const item of streaming(pipeline)) { | ||
// results.push(x); | ||
if (item.bad_thing) { | ||
generatedTopics[topic].bad_things.push(item.bad_thing); | ||
} | ||
const topic = item.topic; | ||
if (item.good_thing) { | ||
generatedTopics[topic].good_things.push(item.good_thing); | ||
} | ||
} | ||
if (!generatedTopics[topic]) { | ||
generatedTopics[topic] = { | ||
topic: topic, | ||
bad_things: [], | ||
good_things: [] | ||
}; | ||
} | ||
return generatedTopics; | ||
if (item.bad_thing) { | ||
generatedTopics[topic].bad_things.push(item.bad_thing); | ||
} | ||
if (item.good_thing) { | ||
generatedTopics[topic].good_things.push(item.good_thing); | ||
} | ||
]; | ||
} | ||
expect((await streaming(pipeline).next()).value).toEqual({ | ||
expect(generatedTopics).toEqual({ | ||
"Evolutionary Biology": { | ||
@@ -419,0 +405,0 @@ "bad_things": [ |
@@ -56,3 +56,3 @@ const createStreamOps = require('../index.js'); | ||
expect( | ||
/Step 0 has not yielded for 50ms/ | ||
/Step 1 has not yielded for 50ms/ | ||
.test( | ||
@@ -99,3 +99,3 @@ mockWarn.mock.calls.join(',') | ||
expect.stringMatching(/\[.*\] \[WARN\]/), | ||
"Step 0 has not yielded for 100ms" | ||
"Step 1 has not yielded for 100ms" | ||
); | ||
@@ -102,0 +102,0 @@ }); |
51005
1721