Comparing version 0.1.12 to 0.1.13
{ | ||
"name": "streamops", | ||
"version": "0.1.12", | ||
"version": "0.1.13", | ||
"main": "./src/main.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
const EventEmitter = require('events'); | ||
const operators = require('./operators'); | ||
const {operators, Dam} = require('./operators'); | ||
const StreamOpsError = require('./StreamOpsError'); | ||
@@ -52,2 +52,20 @@ const StreamingChain = require('./StreamingChain'); | ||
function splitArrayAt(arr, predicate) { | ||
const result = arr.reduce((acc, item, index, array) => { | ||
if (predicate(item, index, array)) { | ||
if (index < array.length - 1) { | ||
acc.push([]); | ||
} | ||
} else { | ||
if (acc.length === 0) { | ||
acc.push([]); | ||
} | ||
acc[acc.length - 1].push(item); | ||
} | ||
return acc; | ||
}, []); | ||
return result; | ||
} | ||
async function* streaming(pipeline) { | ||
@@ -61,2 +79,21 @@ if (pipeline instanceof StreamingChain) { | ||
const splits = splitArrayAt(pipeline, step => step instanceof Dam); | ||
let last; | ||
if (splits?.length > 1) { | ||
for (const splitPipelinePart of splits) { | ||
const activePipelinePartStream = streaming([ | ||
...(last ? [function*() {yield last;}] : []), | ||
...splitPipelinePart | ||
]); | ||
const results = []; | ||
for await (const item of activePipelinePartStream) { | ||
results.push(item); | ||
} | ||
last = results; | ||
} | ||
yield*last; | ||
return; | ||
} | ||
if (pipeline.length === 0) { | ||
@@ -84,2 +121,3 @@ logger.warn('Empty pipeline provided'); | ||
async function* processStep(step, [input]) { | ||
let lastYieldTime = Date.now(); | ||
@@ -109,2 +147,3 @@ let timeoutOccurred = false; | ||
async function* wrappedStep() { | ||
if (Array.isArray(step)) { | ||
@@ -147,24 +186,2 @@ yield* processParallel(step, input); | ||
async function* withTimeout(promise, ms, message) { | ||
const timeoutPromise = new Promise((_, reject) => { | ||
setTimeout(() => reject(new Error(message)), ms); | ||
}); | ||
try { | ||
const result = await Promise.race([promise, timeoutPromise]); | ||
yield* (Array.isArray(result) ? result : [result]); | ||
} catch (error) { | ||
throw error; | ||
} | ||
} | ||
async function* race(generatorPromise, timeoutPromise) { | ||
try { | ||
const generator = await Promise.race([generatorPromise, timeoutPromise]); | ||
yield* generator; | ||
} catch (error) { | ||
throw error; | ||
} | ||
} | ||
function isComplexIterable(obj) { | ||
@@ -221,3 +238,7 @@ return obj != null && | ||
const acc = []; | ||
for await (const item of input) { | ||
if (acc.length) return acc; | ||
const processingGenerator = processStep(step, [item]); | ||
@@ -224,0 +245,0 @@ stepIndex++; |
@@ -1,3 +0,6 @@ | ||
const operators =module.exports = { | ||
class Dam {} | ||
module.exports.Dam = Dam; | ||
const operators = module.exports.operators = { | ||
map: function(fn) { | ||
@@ -232,3 +235,6 @@ return function* (input) { | ||
}; | ||
} | ||
}, | ||
accrue: () => new Dam, | ||
dam: () => new Dam | ||
}; |
const createStreamOps = require('./createStreamOps'); | ||
const StreamOpsError = require('./StreamOpsError'); | ||
const operators = require('./operators'); | ||
const {operators} = require('./operators'); | ||
@@ -5,0 +5,0 @@ module.exports = async function*(pipelineCreator, streamOpsConfig) { |
@@ -1,2 +0,2 @@ | ||
const operators = require('./operators'); | ||
const {operators} = require('./operators'); | ||
@@ -3,0 +3,0 @@ class StreamingChain { |
@@ -209,3 +209,3 @@ const createStreamOps = require('../src/createStreamOps'); | ||
streamOps.map(x => x * 2), | ||
function (x) { console.log('55555', x); return x + 1; }, | ||
function (x) { return x + 1; }, | ||
streamOps.filter(x => x % 2 === 1), | ||
@@ -212,0 +212,0 @@ streamOps.map(x => x * 10) |
@@ -378,3 +378,3 @@ const streaming = require('../src/createStreamOps.js')({ | ||
}, | ||
streaming.map(function(_) { console.log('this99', this); return _ + this.something; }) | ||
streaming.map(function(_) { return _ + this.something; }) | ||
]; | ||
@@ -564,3 +564,3 @@ | ||
const timeDiff = timings[i] - timings[i-1]; | ||
console.log('timeDiff', timeDiff); | ||
// console.log('timeDiff', timeDiff); | ||
expect(timeDiff).toBeGreaterThanOrEqual(900); // Allow for small timing inconsistencies | ||
@@ -567,0 +567,0 @@ expect(timeDiff).toBeLessThanOrEqual(1100); |
98031
25
3241