Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streaming-iterables

Package Overview
Dependencies
Maintainers
2
Versions
41
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streaming-iterables - npm Package Compare versions

Comparing version 4.1.1 to 4.1.2

.github/workflows/test.yml

83

dist/index-esm.js

@@ -649,2 +649,26 @@ /*! *****************************************************************************

const values = new Map();
let lastError = null;
let errCb = null;
let valueCb = null;
const notifyError = err => {
lastError = err;
if (errCb) {
errCb(err);
}
};
const notifyDone = value => {
if (valueCb) {
valueCb(value);
}
};
const waitForQueue = () => new Promise((resolve, reject) => {
if (lastError) {
reject(lastError);
}
if (values.size > 0) {
return resolve();
}
valueCb = resolve;
errCb = reject;
});
const queueNext = input => {

@@ -658,2 +682,3 @@ const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => {

concurrentWork.add(nextVal);
nextVal.then(notifyDone, notifyError);
};

@@ -664,6 +689,9 @@ for (const input of inputs) {

while (true) {
if (concurrentWork.size === 0) {
// We technically don't have to check `values.size` as the for loop should have emptied it
// However I haven't yet found specs verifying that behavior, only tests
// the guard in waitForQueue() checking for values is in place for the same reason
if (concurrentWork.size === 0 && values.size === 0) {
return yield __await(void 0);
}
yield __await(Promise.race(concurrentWork));
yield __await(waitForQueue());
for (const [input, value] of values) {

@@ -939,17 +967,30 @@ values.delete(input);

function once(event, stream) {
return new Promise(resolve => {
stream.once(event, resolve);
});
}
async function _writeToStream(stream, iterable) {
var e_1, _a;
let errorListener;
let error;
const errorPromise = new Promise((resolve, reject) => {
errorListener = err => {
error = err;
reject(err);
};
stream.once('error', errorListener);
let lastError = null;
let errCb = null;
let drainCb = null;
const notifyError = err => {
lastError = err;
if (errCb) {
errCb(err);
}
};
const notifyDrain = () => {
if (drainCb) {
drainCb();
}
};
const cleanup = () => {
stream.removeListener('error', notifyError);
stream.removeListener('drain', notifyDrain);
};
stream.once('error', notifyError);
const waitForDrain = () => new Promise((resolve, reject) => {
if (lastError) {
return reject(lastError);
}
stream.once('drain', notifyDrain);
drainCb = resolve;
errCb = reject;
});

@@ -960,6 +1001,6 @@ try {

if (stream.write(value) === false) {
await Promise.race([errorPromise, once('drain', stream)]);
await waitForDrain();
}
if (error) {
return errorPromise;
if (lastError) {
break;
}

@@ -975,5 +1016,5 @@ }

}
stream.removeListener('error', errorListener);
if (error) {
return errorPromise;
cleanup();
if (lastError) {
throw lastError;
}

@@ -980,0 +1021,0 @@ }

@@ -655,2 +655,26 @@ (function (global, factory) {

const values = new Map();
let lastError = null;
let errCb = null;
let valueCb = null;
const notifyError = err => {
lastError = err;
if (errCb) {
errCb(err);
}
};
const notifyDone = value => {
if (valueCb) {
valueCb(value);
}
};
const waitForQueue = () => new Promise((resolve, reject) => {
if (lastError) {
reject(lastError);
}
if (values.size > 0) {
return resolve();
}
valueCb = resolve;
errCb = reject;
});
const queueNext = input => {

@@ -664,2 +688,3 @@ const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => {

concurrentWork.add(nextVal);
nextVal.then(notifyDone, notifyError);
};

@@ -670,6 +695,9 @@ for (const input of inputs) {

while (true) {
if (concurrentWork.size === 0) {
// We technically don't have to check `values.size` as the for loop should have emptied it
// However I haven't yet found specs verifying that behavior, only tests
// the guard in waitForQueue() checking for values is in place for the same reason
if (concurrentWork.size === 0 && values.size === 0) {
return yield __await(void 0);
}
yield __await(Promise.race(concurrentWork));
yield __await(waitForQueue());
for (const [input, value] of values) {

@@ -945,17 +973,30 @@ values.delete(input);

function once(event, stream) {
return new Promise(resolve => {
stream.once(event, resolve);
});
}
async function _writeToStream(stream, iterable) {
var e_1, _a;
let errorListener;
let error;
const errorPromise = new Promise((resolve, reject) => {
errorListener = err => {
error = err;
reject(err);
};
stream.once('error', errorListener);
let lastError = null;
let errCb = null;
let drainCb = null;
const notifyError = err => {
lastError = err;
if (errCb) {
errCb(err);
}
};
const notifyDrain = () => {
if (drainCb) {
drainCb();
}
};
const cleanup = () => {
stream.removeListener('error', notifyError);
stream.removeListener('drain', notifyDrain);
};
stream.once('error', notifyError);
const waitForDrain = () => new Promise((resolve, reject) => {
if (lastError) {
return reject(lastError);
}
stream.once('drain', notifyDrain);
drainCb = resolve;
errCb = reject;
});

@@ -966,6 +1007,6 @@ try {

if (stream.write(value) === false) {
await Promise.race([errorPromise, once('drain', stream)]);
await waitForDrain();
}
if (error) {
return errorPromise;
if (lastError) {
break;
}

@@ -981,5 +1022,5 @@ }

}
stream.removeListener('error', errorListener);
if (error) {
return errorPromise;
cleanup();
if (lastError) {
throw lastError;
}

@@ -986,0 +1027,0 @@ }

{
"name": "streaming-iterables",
"version": "4.1.1",
"version": "4.1.2",
"description": "A collection of utilities for async iterables. Designed to replace your streams.",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc