Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamops

Package Overview
Dependencies
Maintainers
0
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamops - npm Package Compare versions

Comparing version 0.1.10 to 0.1.11

2

package.json
{
"name": "streamops",
"version": "0.1.10",
"version": "0.1.11",
"main": "./src/main.js",

@@ -5,0 +5,0 @@ "scripts": {

module.exports = function createLogger(options) {
const logLevels = ['error', 'warn', 'info', 'debug'];
const logLevels = ['error', 'warn', 'info', 'debug', 'dev'];
const logLevel = options.logLevel || 'info';
const logLevelIndex = logLevels.indexOf(logLevel);
const isDevelopment = process.env.NODE_ENV === 'development';

@@ -9,4 +10,4 @@ return logLevels.reduce((logger, level) => {

logger[level] = (...args) => {
if (levelIndex <= logLevelIndex) {
console[level](`[${new Date().toISOString()}] [${level.toUpperCase()}]`, ...args);
if (levelIndex <= logLevelIndex && (level !== 'dev' || isDevelopment)) {
(console[level] || console.log)(`[${new Date().toISOString()}] [${level.toUpperCase()}]`, ...args);
}

@@ -13,0 +14,0 @@ };

@@ -7,10 +7,18 @@ const EventEmitter = require('events');

class TimeoutCancelError extends Error {
constructor(stepIndex) {
super(`Step ${stepIndex} timed out, cancelling pipeline`);
this.name = 'TimeoutCancelError';
this.stepIndex = stepIndex;
}
}
function createStreamOps(options = {}) {
const defaultOptions = {
timeout: 100000,
bufferSize: 1000,
logLevel: 'info',
logLevel: 'error',
yieldTimeout: 20000,
downstreamTimeout: 30000
downstreamTimeout: 30000,
yieldTimeoutBehavior: 'warn'
};

@@ -21,2 +29,26 @@

function handleYieldTimeout(stepIndex, lastYieldTime) {
if (Date.now() - lastYieldTime <= config.yieldTimeout) {
return { shouldContinue: true };
}
switch (config.yieldTimeoutBehavior) {
case 'warn':
logger.warn(`Step ${stepIndex} has not yielded for ${config.yieldTimeout}ms`);
return { shouldContinue: true };
case 'yield-null':
logger.warn(`Step ${stepIndex} timed out, yielding null`);
return { shouldContinue: true, valueToYield: null };
case 'cancel':
logger.error(`Step ${stepIndex} timed out, cancelling pipeline`);
return { shouldContinue: false, cancel: true };
case 'block':
logger.warn(`Step ${stepIndex} timed out, blocking future yields`);
return { shouldContinue: false };
default:
logger.warn(`Unknown yieldTimeoutBehavior: ${config.yieldTimeoutBehavior}. Defaulting to 'warn'`);
return { shouldContinue: true };
}
}
async function* streaming(pipeline) {

@@ -52,15 +84,26 @@ if (pipeline instanceof StreamingChain) {

async function* processStep(step, [input]) {
try {
let lastYieldTime = Date.now();
let lastYieldTime = Date.now();
let timeoutOccurred = false;
let shouldCancel = false;
let timeoutValue = undefined;
const checkYieldTimeout = setInterval(() => {
if (Date.now() - lastYieldTime > config.yieldTimeout) {
logger.warn(`Step ${stepIndex} has not yielded for ${config.yieldTimeout}ms`);
}
}, config.yieldTimeout);
const checkYieldTimeout = setInterval(() => {
const { shouldContinue, valueToYield, cancel } = handleYieldTimeout(stepIndex, lastYieldTime);
if (!shouldContinue) {
clearInterval(checkYieldTimeout);
timeoutOccurred = true;
}
if (cancel) {
shouldCancel = true;
}
if (valueToYield !== undefined) {
timeoutValue = valueToYield;
}
}, Math.min(config.yieldTimeout, 1000));
const onYield = () => {
lastYieldTime = Date.now();
};
const onYield = () => {
lastYieldTime = Date.now();
};
async function* wrappedStep() {
if (Array.isArray(step)) {

@@ -81,6 +124,21 @@ yield* processParallel(step, input);

}
}
try {
for await (const item of wrappedStep()) {
if (shouldCancel) {
throw new TimeoutCancelError(stepIndex);
}
if (timeoutOccurred) {
break;
}
if (timeoutValue !== undefined) {
yield timeoutValue;
timeoutValue = undefined;
}
yield item;
onYield();
}
} finally {
clearInterval(checkYieldTimeout);
} catch (error) {
throw new StreamOpsError('Error processing step', stepIndex, error);
}

@@ -159,3 +217,3 @@ }

const step = pipeline[stepIndex];
logger.info(`Processing step ${stepIndex}`);
logger.dev(`Processing step ${stepIndex}`);
validateStep(step);

@@ -180,3 +238,3 @@

clearInterval(checkDownstreamTimeout);
logger.info('Streaming pipeline completed');
logger.dev('Streaming pipeline completed');
emitter.emit('end');

@@ -199,4 +257,4 @@ }

);
};
}
module.exports = createStreamOps;

@@ -53,3 +53,3 @@ const createStreamer = require('../src/createStreamOps');

}
}).rejects.toThrow('Error processing step: Step 1');
}).rejects.toThrow('Test error');

@@ -56,0 +56,0 @@ // expect(errorHandler).toHaveBeenCalledTimes(1);

@@ -5,8 +5,13 @@ const createStreamOps = require('../src/createStreamOps.js');

let originalConsoleWarn;
let originalConsoleError;
let mockWarn;
let mockError;
beforeEach(() => {
originalConsoleWarn = console.warn;
originalConsoleError = console.error;
mockWarn = jest.fn();
mockError = jest.fn();
console.warn = mockWarn;
console.error = mockError;
});

@@ -16,2 +21,3 @@

console.warn = originalConsoleWarn;
console.error = originalConsoleError;
});

@@ -39,9 +45,9 @@

test('Warning logged for slow yielding step', async () => {
const streaming = createStreamOps({ yieldTimeout: 50 });
const streaming = createStreamOps({ yieldTimeout: 5, logLevel: 'warn' });
const pipeline = [
async function* () {
yield 1;
await new Promise(resolve => setTimeout(resolve, 200));
await new Promise(resolve => setTimeout(resolve, 30));
yield 2;
await new Promise(resolve => setTimeout(resolve, 200));
await new Promise(resolve => setTimeout(resolve, 30));
yield 3;

@@ -57,18 +63,19 @@ }

expect(results).toEqual([1, 2, 3]);
console.log(mockWarn.mock.calls.join(','))
expect(
/Step 1 has not yielded for 50ms/
.test(
mockWarn.mock.calls.join(',')
)
).toEqual(true)
// Remember timeout executions are fuzzy depending on runtime
// it's hard to make solid determinisms
expect(mockWarn).toHaveBeenCalledWith(
expect.stringMatching(/\[.*\] \[WARN\]/),
expect.stringMatching(/Step 1 has not yielded for 5ms/)
);
expect(mockWarn.mock.calls.length).toBeGreaterThanOrEqual(4);
});
test('No warning for step that completes quickly', async () => {
const streaming = createStreamOps({ yieldTimeout: 1000 });
test('Yield-null behavior', async () => {
const streaming = createStreamOps({ yieldTimeout: 10, yieldTimeoutBehavior: 'yield-null', logLevel: 'warn' });
const pipeline = [
() => [1, 2, 3]
async function* () {
yield 1;
await new Promise(resolve => setTimeout(resolve, 50));
yield 2;
await new Promise(resolve => setTimeout(resolve, 50));
yield 3;
}
];

@@ -81,15 +88,48 @@

expect(results.flat()).toEqual([1, 2, 3]);
expect(mockWarn).not.toHaveBeenCalled();
expect(results).toContain(null);
expect(results).toEqual([1, null, 2, null, 3]);
expect(mockWarn).toHaveBeenCalledWith(
expect.stringMatching(/\[.*\] \[WARN\]/),
expect.stringMatching(/Step 1 timed out, yielding null/)
);
});
test('Warning logged for step that takes too long without yielding', async () => {
const streaming = createStreamOps({ yieldTimeout: 100 });
test('Cancel behavior', async () => {
const streaming = createStreamOps({ yieldTimeout: 10, yieldTimeoutBehavior: 'cancel', logLevel: 'error' });
const pipeline = [
async () => {
await new Promise(resolve => setTimeout(resolve, 500));
return [1, 2, 3];
async function* () {
yield 1;
await new Promise(resolve => setTimeout(resolve, 30));
yield 2;
}
];
await expect(async () => {
const results = [];
try {
for await (const item of streaming(pipeline)) {
results.push(item);
}
} catch (error) {
throw error; // Re-throw the error to be caught by the expect
}
}).rejects.toThrow('Step 1 timed out');
expect(mockError).toHaveBeenCalledWith(
expect.stringMatching(/\[.*\] \[ERROR\]/),
expect.stringMatching(/Step 1 timed out, cancelling pipeline/)
);
});
test('Block behavior', async () => {
const streaming = createStreamOps({ yieldTimeout: 10, yieldTimeoutBehavior: 'block', logLevel: 'warn' });
const pipeline = [
async function* () {
yield 1;
await new Promise(resolve => setTimeout(resolve, 50));
yield 2;
yield 3;
}
];
const results = [];

@@ -100,18 +140,16 @@ for await (const item of streaming(pipeline)) {

expect(results.flat()).toEqual([1, 2, 3]);
expect(results).toEqual([1]);
expect(mockWarn).toHaveBeenCalledWith(
expect.stringMatching(/\[.*\] \[WARN\]/),
"Step 1 has not yielded for 100ms"
expect.stringMatching(/Step 1 timed out, blocking future yields/)
);
});
test('Multiple warnings for step with multiple long pauses', async () => {
const streaming = createStreamOps({ yieldTimeout: 50 });
test('Unknown behavior defaults to warn', async () => {
const streaming = createStreamOps({ yieldTimeout: 50, yieldTimeoutBehavior: 'unknown', logLevel: 'warn' });
const pipeline = [
async function* () {
yield 1;
await new Promise(resolve => setTimeout(resolve, 150));
await new Promise(resolve => setTimeout(resolve, 200));
yield 2;
await new Promise(resolve => setTimeout(resolve, 150));
yield 3;
}

@@ -125,8 +163,11 @@ ];

expect(results).toEqual([1, 2, 3]);
expect(mockWarn.mock.calls.length).toBeGreaterThanOrEqual(2);
expect(results).toEqual([1, 2]);
expect(mockWarn).toHaveBeenCalledWith(
expect.stringMatching(/\[.*\] \[WARN\]/),
expect.stringMatching(/Unknown yieldTimeoutBehavior: unknown. Defaulting to 'warn'/)
);
});
test('No warning for complex iterable that yields quickly', async () => {
const streaming = createStreamOps({ yieldTimeout: 1000 });
test('Timeout behavior with complex iterable', async () => {
const streaming = createStreamOps({ yieldTimeout: 50, yieldTimeoutBehavior: 'yield-null', logLevel: 'warn' });
const pipeline = [

@@ -136,3 +177,5 @@ {

yield 1;
await new Promise(resolve => setTimeout(resolve, 200));
yield 2;
await new Promise(resolve => setTimeout(resolve, 200));
yield 3;

@@ -148,17 +191,16 @@ }

expect(results).toEqual([1, 2, 3]);
expect(mockWarn).not.toHaveBeenCalled();
expect(results).toContain(null);
expect(results).toEqual([1, null, 2, null, 3]);
expect(mockWarn).toHaveBeenCalledWith(
expect.stringMatching(/\[.*\] \[WARN\]/),
expect.stringMatching(/Step 1 timed out, yielding null/)
);
});
test('Warning for complex iterable with slow yields', async () => {
const streaming = createStreamOps({ yieldTimeout: 50 });
test('Timeout behavior with regular function', async () => {
const streaming = createStreamOps({ yieldTimeout: 50, yieldTimeoutBehavior: 'warn', logLevel: 'warn' });
const pipeline = [
{
[Symbol.asyncIterator]: async function* () {
yield 1;
await new Promise(resolve => setTimeout(resolve, 200));
yield 2;
await new Promise(resolve => setTimeout(resolve, 200));
yield 3;
}
async () => {
await new Promise(resolve => setTimeout(resolve, 200));
return 42;
}

@@ -172,5 +214,9 @@ ];

expect(results).toEqual([1, 2, 3]);
expect(mockWarn.mock.calls.length).toBeGreaterThanOrEqual(4);
expect(results).toEqual([42]);
expect(mockWarn).toHaveBeenCalledWith(
expect.stringMatching(/\[.*\] \[WARN\]/),
expect.stringMatching(/Step 1 has not yielded for 50ms/)
);
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc