streaming-iterables
Advanced tools
Comparing version 4.1.1 to 4.1.2
@@ -649,2 +649,26 @@ /*! ***************************************************************************** | ||
const values = new Map(); | ||
let lastError = null; | ||
let errCb = null; | ||
let valueCb = null; | ||
const notifyError = err => { | ||
lastError = err; | ||
if (errCb) { | ||
errCb(err); | ||
} | ||
}; | ||
const notifyDone = value => { | ||
if (valueCb) { | ||
valueCb(value); | ||
} | ||
}; | ||
const waitForQueue = () => new Promise((resolve, reject) => { | ||
if (lastError) { | ||
reject(lastError); | ||
} | ||
if (values.size > 0) { | ||
return resolve(); | ||
} | ||
valueCb = resolve; | ||
errCb = reject; | ||
}); | ||
const queueNext = input => { | ||
@@ -658,2 +682,3 @@ const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => { | ||
concurrentWork.add(nextVal); | ||
nextVal.then(notifyDone, notifyError); | ||
}; | ||
@@ -664,6 +689,9 @@ for (const input of inputs) { | ||
while (true) { | ||
if (concurrentWork.size === 0) { | ||
// We technically don't have to check `values.size` as the for loop should have emptied it | ||
// However I haven't yet found specs verifying that behavior, only tests | ||
// the guard in waitForQueue() checking for values is in place for the same reason | ||
if (concurrentWork.size === 0 && values.size === 0) { | ||
return yield __await(void 0); | ||
} | ||
yield __await(Promise.race(concurrentWork)); | ||
yield __await(waitForQueue()); | ||
for (const [input, value] of values) { | ||
@@ -939,17 +967,30 @@ values.delete(input); | ||
function once(event, stream) { | ||
return new Promise(resolve => { | ||
stream.once(event, resolve); | ||
}); | ||
} | ||
async function _writeToStream(stream, iterable) { | ||
var e_1, _a; | ||
let errorListener; | ||
let error; | ||
const errorPromise = new Promise((resolve, reject) => { | ||
errorListener = err => { | ||
error = err; | ||
reject(err); | ||
}; | ||
stream.once('error', errorListener); | ||
let lastError = null; | ||
let errCb = null; | ||
let drainCb = null; | ||
const notifyError = err => { | ||
lastError = err; | ||
if (errCb) { | ||
errCb(err); | ||
} | ||
}; | ||
const notifyDrain = () => { | ||
if (drainCb) { | ||
drainCb(); | ||
} | ||
}; | ||
const cleanup = () => { | ||
stream.removeListener('error', notifyError); | ||
stream.removeListener('drain', notifyDrain); | ||
}; | ||
stream.once('error', notifyError); | ||
const waitForDrain = () => new Promise((resolve, reject) => { | ||
if (lastError) { | ||
return reject(lastError); | ||
} | ||
stream.once('drain', notifyDrain); | ||
drainCb = resolve; | ||
errCb = reject; | ||
}); | ||
@@ -960,6 +1001,6 @@ try { | ||
if (stream.write(value) === false) { | ||
await Promise.race([errorPromise, once('drain', stream)]); | ||
await waitForDrain(); | ||
} | ||
if (error) { | ||
return errorPromise; | ||
if (lastError) { | ||
break; | ||
} | ||
@@ -975,5 +1016,5 @@ } | ||
} | ||
stream.removeListener('error', errorListener); | ||
if (error) { | ||
return errorPromise; | ||
cleanup(); | ||
if (lastError) { | ||
throw lastError; | ||
} | ||
@@ -980,0 +1021,0 @@ } |
@@ -655,2 +655,26 @@ (function (global, factory) { | ||
const values = new Map(); | ||
let lastError = null; | ||
let errCb = null; | ||
let valueCb = null; | ||
const notifyError = err => { | ||
lastError = err; | ||
if (errCb) { | ||
errCb(err); | ||
} | ||
}; | ||
const notifyDone = value => { | ||
if (valueCb) { | ||
valueCb(value); | ||
} | ||
}; | ||
const waitForQueue = () => new Promise((resolve, reject) => { | ||
if (lastError) { | ||
reject(lastError); | ||
} | ||
if (values.size > 0) { | ||
return resolve(); | ||
} | ||
valueCb = resolve; | ||
errCb = reject; | ||
}); | ||
const queueNext = input => { | ||
@@ -664,2 +688,3 @@ const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => { | ||
concurrentWork.add(nextVal); | ||
nextVal.then(notifyDone, notifyError); | ||
}; | ||
@@ -670,6 +695,9 @@ for (const input of inputs) { | ||
while (true) { | ||
if (concurrentWork.size === 0) { | ||
// We technically don't have to check `values.size` as the for loop should have emptied it | ||
// However I haven't yet found specs verifying that behavior, only tests | ||
// the guard in waitForQueue() checking for values is in place for the same reason | ||
if (concurrentWork.size === 0 && values.size === 0) { | ||
return yield __await(void 0); | ||
} | ||
yield __await(Promise.race(concurrentWork)); | ||
yield __await(waitForQueue()); | ||
for (const [input, value] of values) { | ||
@@ -945,17 +973,30 @@ values.delete(input); | ||
function once(event, stream) { | ||
return new Promise(resolve => { | ||
stream.once(event, resolve); | ||
}); | ||
} | ||
async function _writeToStream(stream, iterable) { | ||
var e_1, _a; | ||
let errorListener; | ||
let error; | ||
const errorPromise = new Promise((resolve, reject) => { | ||
errorListener = err => { | ||
error = err; | ||
reject(err); | ||
}; | ||
stream.once('error', errorListener); | ||
let lastError = null; | ||
let errCb = null; | ||
let drainCb = null; | ||
const notifyError = err => { | ||
lastError = err; | ||
if (errCb) { | ||
errCb(err); | ||
} | ||
}; | ||
const notifyDrain = () => { | ||
if (drainCb) { | ||
drainCb(); | ||
} | ||
}; | ||
const cleanup = () => { | ||
stream.removeListener('error', notifyError); | ||
stream.removeListener('drain', notifyDrain); | ||
}; | ||
stream.once('error', notifyError); | ||
const waitForDrain = () => new Promise((resolve, reject) => { | ||
if (lastError) { | ||
return reject(lastError); | ||
} | ||
stream.once('drain', notifyDrain); | ||
drainCb = resolve; | ||
errCb = reject; | ||
}); | ||
@@ -966,6 +1007,6 @@ try { | ||
if (stream.write(value) === false) { | ||
await Promise.race([errorPromise, once('drain', stream)]); | ||
await waitForDrain(); | ||
} | ||
if (error) { | ||
return errorPromise; | ||
if (lastError) { | ||
break; | ||
} | ||
@@ -981,5 +1022,5 @@ } | ||
} | ||
stream.removeListener('error', errorListener); | ||
if (error) { | ||
return errorPromise; | ||
cleanup(); | ||
if (lastError) { | ||
throw lastError; | ||
} | ||
@@ -986,0 +1027,0 @@ } |
{ | ||
"name": "streaming-iterables", | ||
"version": "4.1.1", | ||
"version": "4.1.2", | ||
"description": "A collection of utilities for async iterables. Designed to replace your streams.", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
107603
9
2181