Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

writable-consumable-stream

Package Overview
Dependencies
Maintainers
1
Versions
12
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

writable-consumable-stream - npm Package Compare versions

Comparing version 1.1.1 to 2.0.0

39

consumer.js

@@ -8,3 +8,3 @@ class Consumer {

this.timeout = timeout;
this._isIterating = false;
this.isAlive = true;
this.stream.setConsumer(this.id, this);

@@ -24,3 +24,3 @@ }

resetBackpressure() {
_resetBackpressure() {
this._backpressure = 0;

@@ -58,9 +58,5 @@ }

}
if (this._isIterating) {
this._killPacket = {value, done: true};
this.applyBackpressure(this._killPacket);
} else {
this.stream.removeConsumer(this.id);
this.resetBackpressure();
}
this._killPacket = {value, done: true};
this._destroy();
if (this._resolve) {

@@ -72,2 +68,8 @@ this._resolve();

_destroy() {
this.isAlive = false;
this._resetBackpressure();
this.stream.removeConsumer(this.id);
}
async _waitForNextItem(timeout) {

@@ -95,3 +97,2 @@ return new Promise((resolve, reject) => {

async next() {
this._isIterating = true;
this.stream.setConsumer(this.id, this);

@@ -104,4 +105,3 @@

} catch (error) {
this._isIterating = false;
this.stream.removeConsumer(this.id);
this._destroy();
throw error;

@@ -111,5 +111,3 @@ }

if (this._killPacket) {
this._isIterating = false;
this.stream.removeConsumer(this.id);
this.resetBackpressure();
this._destroy();
let killPacket = this._killPacket;

@@ -129,4 +127,3 @@ delete this._killPacket;

if (this.currentNode.data.done) {
this._isIterating = false;
this.stream.removeConsumer(this.id);
this._destroy();
}

@@ -140,7 +137,9 @@

delete this.currentNode;
this._isIterating = false;
this.stream.removeConsumer(this.id);
this.resetBackpressure();
this._destroy();
return {};
}
[Symbol.asyncIterator]() {
return this;
}
}

@@ -147,0 +146,0 @@

{
"name": "writable-consumable-stream",
"version": "1.1.1",
"version": "2.0.0",
"description": "An async stream which can be iterated over using a for-await-of loop.",

@@ -30,4 +30,4 @@ "main": "index.js",

"dependencies": {
"consumable-stream": "^1.0.0"
"consumable-stream": "^2.0.0"
}
}

@@ -42,3 +42,3 @@ # writable-consumable-stream

// Works in older environments.
let asyncIterator = stream.createConsumer();
let asyncIterator = asyncIterable.createConsumer();
while (true) {

@@ -71,3 +71,3 @@ let packet = await asyncIterator.next();

}
};
}

@@ -74,0 +74,0 @@ async function consumeAsyncIterable(asyncIterable) {

@@ -271,3 +271,3 @@ const WritableConsumableStream = require('../index');

let receivedPackets = [];
let consumable = stream.createConsumable(20);
let consumable = stream.createConsumer(20);
let error;

@@ -306,3 +306,3 @@ try {

let receivedPackets = [];
let consumable = stream.createConsumable(20);
let consumable = stream.createConsumer(20);
let error;

@@ -340,3 +340,3 @@ try {

let receivedPackets = [];
let consumable = stream.createConsumable(20);
let consumable = stream.createConsumer(20);
let error;

@@ -382,3 +382,3 @@ try {

let receivedPackets = [];
let consumable = stream.createConsumable();
let consumable = stream.createConsumer();

@@ -458,3 +458,3 @@ while (true) {

assert.equal(backpressureBeforeKill, 10);
assert.equal(backpressureAfterKill, 11);
assert.equal(backpressureAfterKill, 0);
assert.equal(backpressureAfterConsume, 0);

@@ -554,3 +554,3 @@ assert.equal(receivedPackets.length, 0);

it('should be able to start reading from a killed stream immediately', async () => {
it('should set consumer.isAlive to false if stream is killed', async () => { // TODO 22
(async () => {

@@ -565,3 +565,5 @@ await wait(10);

let consumer = stream.createConsumer();
assert.equal(consumer.isAlive, true);
stream.kill();
assert.equal(consumer.isAlive, false);

@@ -574,3 +576,3 @@ let receivedPackets = [];

}
assert.equal(receivedPackets.length, 10);
assert.equal(receivedPackets.length, 0);

@@ -924,5 +926,2 @@ assert.equal(Object.keys(stream._consumers).length, 0); // Check internal cleanup.

// console.log(111, stream.getConsumerStatsList());
// console.log(111, consumer.getBackpressure());
assert.equal(stream.getConsumerStatsList().length, 0);

@@ -1314,3 +1313,3 @@ assert.equal(consumer.getBackpressure(), 0);

assert.equal(backpressureBeforeKill, 10);
assert.equal(backpressureAfterKill, 11);
assert.equal(backpressureAfterKill, 10); // consumerB was still running.
assert.equal(backpressureAfterConsume, 0);

@@ -1317,0 +1316,0 @@ assert.equal(receivedPacketsA.length, 1);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc