Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamops

Package Overview
Dependencies
Maintainers
0
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamops - npm Package Compare versions

Comparing version 0.1.5 to 0.1.6

185

index.js

@@ -5,3 +5,7 @@ const EventEmitter = require('events');

constructor(message, step, originalError = null) {
super(message);
super(
message
+ (step ? ': Step ' + step : '')
+ (originalError ? ': Original Error: ' + originalError : '')
);
this.name = 'StreamOpsError';

@@ -67,6 +71,5 @@ this.step = step;

async function processStep(step, input) {
async function* processStep(step, input) {
try {
let lastYieldTime = Date.now();
let hasYielded = false;

@@ -80,25 +83,30 @@ const checkYieldTimeout = setInterval(() => {

const onYield = () => {
hasYielded = true;
lastYieldTime = Date.now();
};
let result;
if (Array.isArray(step)) {
result = await processParallel(step, input);
} else if (isGenerator(step)) {
result = await processGenerator(step, input, onYield);
} else if (typeof step === 'function') {
result = await processFunction(step, input);
} else if (isComplexIterable(step)) {
result = await processGenerator(async function*() {
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator])
? step
: [step]
}, input, onYield);
} else {
result = step;
}
const processingPromise = (async function*() {
if (Array.isArray(step)) {
yield* processParallel(step, input);
} else if (isGenerator(step)) {
yield* processGenerator(step, input, onYield);
} else if (typeof step === 'function') {
yield* await withTimeout(processFunction(step, input), config.timeout, `Step ${stepIndex} timed out`);
} else if (isComplexIterable(step)) {
yield* processGenerator(async function*() {
yield* await (step[Symbol.iterator] || step[Symbol.asyncIterator])
? step
: [step]
}, input, onYield);
} else {
yield step;
}
})();
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error(`Step ${stepIndex} timed out`)), config.timeout);
});
yield* race(processingPromise, timeoutPromise);
clearInterval(checkYieldTimeout);
return result;
} catch (error) {

@@ -109,2 +117,24 @@ throw new StreamOpsError('Error processing step', stepIndex, error);

async function* withTimeout(promise, ms, message) {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error(message)), ms);
});
try {
const result = await Promise.race([promise, timeoutPromise]);
yield* (Array.isArray(result) ? result : [result]);
} catch (error) {
throw error;
}
}
async function* race(generatorPromise, timeoutPromise) {
try {
const generator = await Promise.race([generatorPromise, timeoutPromise]);
yield* generator;
} catch (error) {
throw error;
}
}
function isComplexIterable(obj) {

@@ -122,35 +152,52 @@ return obj != null &&

async function processParallel(steps, input) {
return await Promise.all(input.map(async item => {
const results = await Promise.all(steps.map(async step => {
async function* processParallel(steps, input) {
const inputArray = [];
for await (const item of input) {
inputArray.push(item);
}
const processors = inputArray.map(async (item) => {
const results = [];
for (const step of steps) {
if (Array.isArray(step)) {
return await processParallel(step, [item]);
for await (const result of processParallel(step, [item])) {
results.push(result);
}
} else {
return await processStep(step, [item]);
for await (const result of processStep(step, [item])) {
results.push(result);
}
}
}));
return results.flat();
}));
}
return results;
});
const iterator = processors[Symbol.iterator]();
let result = iterator.next();
while (!result.done) {
const processor = await result.value;
for (const item of processor) {
yield item;
}
result = iterator.next();
}
}
async function processGenerator(gen, input, onYield) {
let results = [];
async function* processGenerator(gen, input, onYield) {
for await (const item of input) {
const generator = gen.call(context, item);
for await (const result of generator) {
results.push(result);
yield result;
if (onYield) onYield();
}
}
return results;
}
async function processFunction(fn, input) {
if (input.length === 1) {
const result = await fn.call(context, input[0]);
return [result];
} else {
const result = await fn.call(context, input);
return Array.isArray(result) ? result : [result];
const inputArray = [];
for await (const item of input) {
inputArray.push(item);
}
const result = await fn.call(context, inputArray);
return Array.isArray(result) ? result : [result];
}

@@ -168,10 +215,21 @@

const processingPromise = processStep(step, state);
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new StreamOpsError(`Step ${stepIndex} timed out`, stepIndex)), config.timeout)
);
const processingGenerator = processStep(step, state);
try {
state = await Promise.race([processingPromise, timeoutPromise]);
state = []; // Reset state for collecting items from this step
for await (const item of processingGenerator) {
if (stepIndex === pipeline.length - 1) {
// If it's the last step, yield the item to the consumer
yield item;
} else {
// Otherwise, collect the item for the next step
state.push(item);
}
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
if (emitter.listenerCount('data') > 0) {
emitter.emit('data', item);
}
}
} catch (error) {

@@ -185,40 +243,3 @@ if (error instanceof StreamOpsError) {

stepIndex++;
if (emitter.listenerCount('data') > 0) {
for (const item of state) {
emitter.emit('data', item);
}
}
if (state.length > config.bufferSize) {
logger.debug(`Buffer size exceeded. Current size: ${state.length}`);
for (const item of state.splice(0, state.length - config.bufferSize)) {
yield item;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}
}
}
if (typeof state == 'string') {
yield state;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
} else if (typeof state[Symbol.asyncIterator] === 'function') {
for await (const item of state) {
yield item;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}
} else if (typeof state[Symbol.iterator] === 'function') {
for (const item of state) {
yield item;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}
} else {
yield state;
lastDownstreamYield = Date.now();
downstreamTimeoutWarningIssued = false;
}

@@ -225,0 +246,0 @@ } catch (error) {

{
"name": "streamops",
"version": "0.1.5",
"version": "0.1.6",
"main": "./index.js",

@@ -5,0 +5,0 @@ "scripts": {

@@ -76,4 +76,4 @@ const createStreamer = require('../index');

},
async function () {
await new Promise(resolve => setTimeout(resolve, 200)); // This will timeout
async function() {
await new Promise(resolve => setTimeout(resolve, 1000)); // This will timeout
return 42;

@@ -80,0 +80,0 @@ }

@@ -9,3 +9,3 @@ const streaming = require('../index.js')({

test('nested parallel processing', async () => {
test('nested parallel processing with damming fns', async () => {
const pipeline = [

@@ -15,6 +15,6 @@ function*() { yield 1; yield 2; },

[
(x) => x * 2,
(x) => x * 3
([x]) => x * 2,
([x]) => x * 3
],
(x) => x + 1
([x]) => x + 100
]

@@ -26,11 +26,29 @@ ];

}
expect(results).toEqual([[[2, 3], 2], [[4, 6], 3]]);
expect(results).toEqual([2, 3, 101, 4, 6, 102]);
});
test('nested parallel processing with generators', async () => {
const pipeline = [
function*() { yield 1; yield 2; },
[
[
function*(x) { yield x * 2 },
function*(x) { yield x * 3 }
],
function*(x) { yield x + 100 }
]
];
const results = [];
for await (const item of streaming(pipeline)) {
results.push(item);
}
expect(results).toEqual([2, 3, 101, 4, 6, 102]);
});
test('Parallel Processing', async () => {
const pipeline = [
() => 3,
() => [3],
[
(x) => x * 2,
(x) => x + 1
([x]) => x * 2,
([x]) => x + 1
]

@@ -42,3 +60,3 @@ ];

}
expect(results).toEqual([[6, 4]]);
expect(results).toEqual([6, 4]);
});

@@ -50,4 +68,4 @@

[
[(x) => x * 2, (x) => x * 3],
(x) => x + 1
[([x]) => x * 2, ([x]) => x * 3],
([x]) => x + 1
]

@@ -59,3 +77,3 @@ ];

}
expect(results).toEqual([[[4, 6], 3]]);
expect(results).toEqual([4, 6, 3]);
});

@@ -82,3 +100,3 @@

['apple'],
function*([thing]) {
function*(thing) {
yield thing === 'apple';

@@ -85,0 +103,0 @@ }

@@ -35,6 +35,6 @@ const streaming = require('../index.js')({

const pipeline = [
() => 3,
() => [3],
[
(x) => x * 2,
(x) => x + 1
([x]) => x * 2,
([x]) => x + 1
]

@@ -46,3 +46,3 @@ ];

}
expect(results).toEqual([[6, 4]]);
expect(results).toEqual([6, 4]);
});

@@ -57,8 +57,8 @@

[
function*(num) { yield num * 2; },
function*(num) { yield num * 3; }
function*(num) { yield {double: num * 2}; },
function*(num) { yield {triple: num * 3}; }
],
function*(result) { // Note: singular 'result', not 'results'
const [double, triple] = result;
yield { double, triple };
(res) => {
console.log('res', res);
return res;
}

@@ -71,9 +71,11 @@ ];

expect(results).toEqual([
{ double: 2, triple: 3 },
{ double: 4, triple: 6 }
{ double: 2 },
{ triple: 3 },
{ double: 4 },
{ triple: 6 }
// { double: 2, triple: 3 },
// { double: 4, triple: 6 }
]);
});
// return;
test('generator after parallel step receives items individually', async () => {

@@ -102,11 +104,11 @@ const receivedItems = [];

expect(receivedItems).toEqual([
[2, 3],
[4, 6]
2,3,4,6
]);
expect(results).toEqual([
[2, 3],
[4, 6]
2,3,4,6
]);
});
return;
test('Parallel API calls using yielded Promise.all', async () => {

@@ -113,0 +115,0 @@ const pipeline = [

@@ -44,8 +44,10 @@ const streaming = require('../index.js')({

test('More Advanced Data flowing', async () => {
test('More Advanced Data flowing w/ fns', async () => {
const res = await (streaming([
() => [1, 2, 3],
() => [[1, 2, 3]], // arrays returned items yielded individually
// therefore a return of [[_THING_]] (double nested array)
// will be equiv to a yield of [_THING_]
[
(nums) => nums.join(','),
(nums) => nums.join('%')
([nums]) => nums.join(','),
([nums]) => nums.join('%')
],

@@ -52,0 +54,0 @@ ([x, y]) => {

@@ -60,2 +60,51 @@ const streaming = require('../index.js')({

test('Real-time streaming with short delays', async () => {
jest.useRealTimers(); // Use real timers for this test
const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms));
const pipeline = [
async function*() {
for (let i = 1; i <= 10; i++) {
yield `Item ${i}\n`;
await delay(50); // 100ms delay
}
}
];
const results = [];
const timings = [];
const logs = [];
const customLog = (message) => {
const timestamp = new Date().toISOString();
logs.push({ timestamp, message });
};
const stream = streaming(pipeline);
for await (const item of stream) {
customLog(`xmllm yielding item`);
results.push(item);
timings.push(Date.now());
}
expect(results).toEqual(['Item 1\n', 'Item 2\n', 'Item 3\n', 'Item 4\n', 'Item 5\n', 'Item 6\n', 'Item 7\n', 'Item 8\n', 'Item 9\n', 'Item 10\n']);
// Check if items were received approximately 100ms apart
for (let i = 1; i < timings.length; i++) {
const timeDiff = timings[i] - timings[i-1];
expect(timeDiff).toBeGreaterThanOrEqual(40); // Allow for small timing inconsistencies
expect(timeDiff).toBeLessThanOrEqual(60);
}
// Check if customLog was called with approximately 100ms intervals
expect(logs.length).toBe(10);
for (let i = 1; i < logs.length; i++) {
const timeDiff = new Date(logs[i].timestamp) - new Date(logs[i-1].timestamp);
expect(timeDiff).toBeGreaterThanOrEqual(40);
expect(timeDiff).toBeLessThanOrEqual(60);
}
});
test('pipeline with async operations', async () => {

@@ -196,4 +245,6 @@ const pipeline = [

() => ({count: 1}),
({count}) => ({count: count + 1}),
({count}) => `Final count: ${count}`
([{count}]) => {
return {count: count + 1};
},
([{count}]) => `Final count: ${count}`
];

@@ -207,3 +258,3 @@ const results = [];

test('Async Operations', async () => {
test('Async Operations [functions]', async () => {
const fetchData = async () => 'data';

@@ -213,3 +264,3 @@ const processData = async (data) => data.toUpperCase();

async () => await fetchData(),
async (data) => await processData(data)
async ([data]) => await processData(data)
];

@@ -272,10 +323,8 @@ const results = [];

],
function*(messages) {
for (const {message} of messages) {
yield {
message,
name: 'Michael'
}
}
function*({message}) {
yield {
message,
name: 'Michael'
}
}
];

@@ -464,2 +513,43 @@

test('Delayed streaming with timing verification', async () => {
jest.useFakeTimers();
const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms));
const pipeline = [
async function*() {
for (let i = 1; i <= 5; i++) {
yield `Item ${i}`;
await delay(1000); // 1 second delay
}
}
];
const results = [];
const timings = [];
const streamPromise = (async () => {
for await (const item of streaming(pipeline)) {
results.push(item);
timings.push(Date.now());
// jest.advanceTimersByTime(1000); // Advance time by 1 second
}
})();
await jest.runAllTimersAsync();
await streamPromise;
expect(results).toEqual(['Item 1', 'Item 2', 'Item 3', 'Item 4', 'Item 5']);
// Check if items were received approximately 1 second apart
for (let i = 1; i < timings.length; i++) {
const timeDiff = timings[i] - timings[i-1];
console.log('timeDiff', timeDiff);
expect(timeDiff).toBeGreaterThanOrEqual(900); // Allow for small timing inconsistencies
expect(timeDiff).toBeLessThanOrEqual(1100);
}
jest.useRealTimers();
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc