Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamops

Package Overview
Dependencies
Maintainers
0
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamops - npm Package Compare versions

Comparing version 0.1.12 to 0.1.13

tests/streaming.operators.accrue.test.js

2

package.json
{
"name": "streamops",
"version": "0.1.12",
"version": "0.1.13",
"main": "./src/main.js",

@@ -5,0 +5,0 @@ "scripts": {

const EventEmitter = require('events');
const operators = require('./operators');
const {operators, Dam} = require('./operators');
const StreamOpsError = require('./StreamOpsError');

@@ -52,2 +52,20 @@ const StreamingChain = require('./StreamingChain');

function splitArrayAt(arr, predicate) {
const result = arr.reduce((acc, item, index, array) => {
if (predicate(item, index, array)) {
if (index < array.length - 1) {
acc.push([]);
}
} else {
if (acc.length === 0) {
acc.push([]);
}
acc[acc.length - 1].push(item);
}
return acc;
}, []);
return result;
}
async function* streaming(pipeline) {

@@ -61,2 +79,21 @@ if (pipeline instanceof StreamingChain) {

const splits = splitArrayAt(pipeline, step => step instanceof Dam);
let last;
if (splits?.length > 1) {
for (const splitPipelinePart of splits) {
const activePipelinePartStream = streaming([
...(last ? [function*() {yield last;}] : []),
...splitPipelinePart
]);
const results = [];
for await (const item of activePipelinePartStream) {
results.push(item);
}
last = results;
}
yield*last;
return;
}
if (pipeline.length === 0) {

@@ -84,2 +121,3 @@ logger.warn('Empty pipeline provided');

async function* processStep(step, [input]) {
let lastYieldTime = Date.now();

@@ -109,2 +147,3 @@ let timeoutOccurred = false;

async function* wrappedStep() {
if (Array.isArray(step)) {

@@ -147,24 +186,2 @@ yield* processParallel(step, input);

async function* withTimeout(promise, ms, message) {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error(message)), ms);
});
try {
const result = await Promise.race([promise, timeoutPromise]);
yield* (Array.isArray(result) ? result : [result]);
} catch (error) {
throw error;
}
}
async function* race(generatorPromise, timeoutPromise) {
try {
const generator = await Promise.race([generatorPromise, timeoutPromise]);
yield* generator;
} catch (error) {
throw error;
}
}
function isComplexIterable(obj) {

@@ -221,3 +238,7 @@ return obj != null &&

const acc = [];
for await (const item of input) {
if (acc.length) return acc;
const processingGenerator = processStep(step, [item]);

@@ -224,0 +245,0 @@ stepIndex++;

@@ -1,3 +0,6 @@

const operators =module.exports = {
class Dam {}
module.exports.Dam = Dam;
const operators = module.exports.operators = {
map: function(fn) {

@@ -232,3 +235,6 @@ return function* (input) {

};
}
},
accrue: () => new Dam,
dam: () => new Dam
};
const createStreamOps = require('./createStreamOps');
const StreamOpsError = require('./StreamOpsError');
const operators = require('./operators');
const {operators} = require('./operators');

@@ -5,0 +5,0 @@ module.exports = async function*(pipelineCreator, streamOpsConfig) {

@@ -1,2 +0,2 @@

const operators = require('./operators');
const {operators} = require('./operators');

@@ -3,0 +3,0 @@ class StreamingChain {

@@ -209,3 +209,3 @@ const createStreamOps = require('../src/createStreamOps');

streamOps.map(x => x * 2),
function (x) { console.log('55555', x); return x + 1; },
function (x) { return x + 1; },
streamOps.filter(x => x % 2 === 1),

@@ -212,0 +212,0 @@ streamOps.map(x => x * 10)

@@ -378,3 +378,3 @@ const streaming = require('../src/createStreamOps.js')({

},
streaming.map(function(_) { console.log('this99', this); return _ + this.something; })
streaming.map(function(_) { return _ + this.something; })
];

@@ -564,3 +564,3 @@

const timeDiff = timings[i] - timings[i-1];
console.log('timeDiff', timeDiff);
// console.log('timeDiff', timeDiff);
expect(timeDiff).toBeGreaterThanOrEqual(900); // Allow for small timing inconsistencies

@@ -567,0 +567,0 @@ expect(timeDiff).toBeLessThanOrEqual(1100);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc