writable-consumable-stream
Advanced tools
Comparing version 1.1.1 to 2.0.0
@@ -8,3 +8,3 @@ class Consumer { | ||
this.timeout = timeout; | ||
this._isIterating = false; | ||
this.isAlive = true; | ||
this.stream.setConsumer(this.id, this); | ||
@@ -24,3 +24,3 @@ } | ||
resetBackpressure() { | ||
_resetBackpressure() { | ||
this._backpressure = 0; | ||
@@ -58,9 +58,5 @@ } | ||
} | ||
if (this._isIterating) { | ||
this._killPacket = {value, done: true}; | ||
this.applyBackpressure(this._killPacket); | ||
} else { | ||
this.stream.removeConsumer(this.id); | ||
this.resetBackpressure(); | ||
} | ||
this._killPacket = {value, done: true}; | ||
this._destroy(); | ||
if (this._resolve) { | ||
@@ -72,2 +68,8 @@ this._resolve(); | ||
_destroy() { | ||
this.isAlive = false; | ||
this._resetBackpressure(); | ||
this.stream.removeConsumer(this.id); | ||
} | ||
async _waitForNextItem(timeout) { | ||
@@ -95,3 +97,2 @@ return new Promise((resolve, reject) => { | ||
async next() { | ||
this._isIterating = true; | ||
this.stream.setConsumer(this.id, this); | ||
@@ -104,4 +105,3 @@ | ||
} catch (error) { | ||
this._isIterating = false; | ||
this.stream.removeConsumer(this.id); | ||
this._destroy(); | ||
throw error; | ||
@@ -111,5 +111,3 @@ } | ||
if (this._killPacket) { | ||
this._isIterating = false; | ||
this.stream.removeConsumer(this.id); | ||
this.resetBackpressure(); | ||
this._destroy(); | ||
let killPacket = this._killPacket; | ||
@@ -129,4 +127,3 @@ delete this._killPacket; | ||
if (this.currentNode.data.done) { | ||
this._isIterating = false; | ||
this.stream.removeConsumer(this.id); | ||
this._destroy(); | ||
} | ||
@@ -140,7 +137,9 @@ | ||
delete this.currentNode; | ||
this._isIterating = false; | ||
this.stream.removeConsumer(this.id); | ||
this.resetBackpressure(); | ||
this._destroy(); | ||
return {}; | ||
} | ||
[Symbol.asyncIterator]() { | ||
return this; | ||
} | ||
} | ||
@@ -147,0 +146,0 @@ |
{ | ||
"name": "writable-consumable-stream", | ||
"version": "1.1.1", | ||
"version": "2.0.0", | ||
"description": "An async stream which can be iterated over using a for-await-of loop.", | ||
@@ -30,4 +30,4 @@ "main": "index.js", | ||
"dependencies": { | ||
"consumable-stream": "^1.0.0" | ||
"consumable-stream": "^2.0.0" | ||
} | ||
} |
@@ -42,3 +42,3 @@ # writable-consumable-stream | ||
// Works in older environments. | ||
let asyncIterator = stream.createConsumer(); | ||
let asyncIterator = asyncIterable.createConsumer(); | ||
while (true) { | ||
@@ -71,3 +71,3 @@ let packet = await asyncIterator.next(); | ||
} | ||
}; | ||
} | ||
@@ -74,0 +74,0 @@ async function consumeAsyncIterable(asyncIterable) { |
@@ -271,3 +271,3 @@ const WritableConsumableStream = require('../index'); | ||
let receivedPackets = []; | ||
let consumable = stream.createConsumable(20); | ||
let consumable = stream.createConsumer(20); | ||
let error; | ||
@@ -306,3 +306,3 @@ try { | ||
let receivedPackets = []; | ||
let consumable = stream.createConsumable(20); | ||
let consumable = stream.createConsumer(20); | ||
let error; | ||
@@ -340,3 +340,3 @@ try { | ||
let receivedPackets = []; | ||
let consumable = stream.createConsumable(20); | ||
let consumable = stream.createConsumer(20); | ||
let error; | ||
@@ -382,3 +382,3 @@ try { | ||
let receivedPackets = []; | ||
let consumable = stream.createConsumable(); | ||
let consumable = stream.createConsumer(); | ||
@@ -458,3 +458,3 @@ while (true) { | ||
assert.equal(backpressureBeforeKill, 10); | ||
assert.equal(backpressureAfterKill, 11); | ||
assert.equal(backpressureAfterKill, 0); | ||
assert.equal(backpressureAfterConsume, 0); | ||
@@ -554,3 +554,3 @@ assert.equal(receivedPackets.length, 0); | ||
it('should be able to start reading from a killed stream immediately', async () => { | ||
it('should set consumer.isAlive to false if stream is killed', async () => { // TODO 22 | ||
(async () => { | ||
@@ -565,3 +565,5 @@ await wait(10); | ||
let consumer = stream.createConsumer(); | ||
assert.equal(consumer.isAlive, true); | ||
stream.kill(); | ||
assert.equal(consumer.isAlive, false); | ||
@@ -574,3 +576,3 @@ let receivedPackets = []; | ||
} | ||
assert.equal(receivedPackets.length, 10); | ||
assert.equal(receivedPackets.length, 0); | ||
@@ -924,5 +926,2 @@ assert.equal(Object.keys(stream._consumers).length, 0); // Check internal cleanup. | ||
// console.log(111, stream.getConsumerStatsList()); | ||
// console.log(111, consumer.getBackpressure()); | ||
assert.equal(stream.getConsumerStatsList().length, 0); | ||
@@ -1314,3 +1313,3 @@ assert.equal(consumer.getBackpressure(), 0); | ||
assert.equal(backpressureBeforeKill, 10); | ||
assert.equal(backpressureAfterKill, 11); | ||
assert.equal(backpressureAfterKill, 10); // consumerB was still running. | ||
assert.equal(backpressureAfterConsume, 0); | ||
@@ -1317,0 +1316,0 @@ assert.equal(receivedPacketsA.length, 1); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
55374
1467
+ Addedconsumable-stream@2.0.0(transitive)
- Removedconsumable-stream@1.0.0(transitive)
Updatedconsumable-stream@^2.0.0