Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

async-iterable-stream

Package Overview
Dependencies
Maintainers
1
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-iterable-stream - npm Package Compare versions

Comparing version 1.2.1 to 2.0.0

24

index.js

@@ -0,23 +1,23 @@

const MUST_OVERRIDE_METHOD_MESSAGE = 'Method must be overriden by subclass';
class AsyncIterableStream {
constructor(asyncIteratorFactory) {
this._asyncIteratorFactory = asyncIteratorFactory;
next() {
throw new TypeError(MUST_OVERRIDE_METHOD_MESSAGE);
}
next() {
return this._asyncIteratorFactory().next();
createAsyncIterator() {
throw new TypeError(MUST_OVERRIDE_METHOD_MESSAGE);
}
async once() {
while (true) {
let result = await this.next();
if (result.done) {
// If stream was ended, this function should never resolve.
await new Promise(() => {});
}
return result.value;
let result = await this.next();
if (result.done) {
// If stream was ended, this function should never resolve.
await new Promise(() => {});
}
return result.value;
}
[Symbol.asyncIterator]() {
return this._asyncIteratorFactory();
return this.createAsyncIterator();
}

@@ -24,0 +24,0 @@ }

{
"name": "async-iterable-stream",
"version": "1.2.1",
"version": "2.0.0",
"description": "A readable async stream which can be iterated over using a for-await-of loop.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -22,75 +22,124 @@ const AsyncIterableStream = require('../index');

describe('AsyncIterableStream', () => {
let streamData;
let stream;
class AsyncIterableStreamSubclass extends AsyncIterableStream {
constructor(dataPromiseList) {
super();
this._dataPromiseList = dataPromiseList;
}
// The generator function passed to the AsyncIterableStream must be a pure function.
async function* createDataConsumerGenerator(dataPromiseList) {
while (dataPromiseList.length) {
let result = await dataPromiseList[dataPromiseList.length - 1];
async next() {
let value = await this._dataPromiseList[this._dataPromiseList.length - 1];
return {value, done: !this._dataPromiseList.length};
}
async *createAsyncIterator() {
while (this._dataPromiseList.length) {
let result = await this._dataPromiseList[this._dataPromiseList.length - 1];
yield result;
}
}
}
beforeEach(async () => {
streamData = [...Array(10).keys()]
.map(async (value, index) => {
await wait(10 * (index + 1));
streamData.pop();
return value;
})
.reverse();
stream = new AsyncIterableStream(() => {
return createDataConsumerGenerator(streamData);
describe('AsyncIterableStream', () => {
describe('AsyncIterableStream abstract class', () => {
let abstractStream;
beforeEach(async () => {
abstractStream = new AsyncIterableStream();
});
});
afterEach(async () => {
cancelAllPendingWaits();
});
afterEach(async () => {
cancelAllPendingWaits();
});
it('should receive packets asynchronously', async () => {
let receivedPackets = [];
for await (let packet of stream) {
receivedPackets.push(packet);
}
assert.equal(receivedPackets.length, 10);
assert.equal(receivedPackets[0], 0);
assert.equal(receivedPackets[1], 1);
assert.equal(receivedPackets[9], 9);
it('should throw error if next() is invoked directly on the abstract class', async () => {
let result;
let error;
try {
result = abstractStream.next();
} catch (err) {
error = err;
}
assert.equal(error.name, 'TypeError');
assert.equal(error.message, 'Method must be overriden by subclass');
});
it('should throw error if createAsyncIterator() is invoked directly on the abstract class', async () => {
let result;
let error;
try {
result = abstractStream.createAsyncIterator();
} catch (err) {
error = err;
}
assert.equal(error.name, 'TypeError');
assert.equal(error.message, 'Method must be overriden by subclass');
});
});
it('should receive packets asynchronously from multiple concurrent for-await-of loops', async () => {
let receivedPacketsA = [];
let receivedPacketsB = [];
describe('AsyncIterableStream subclass', () => {
let streamData;
let stream;
await Promise.all([
(async () => {
for await (let packet of stream) {
receivedPacketsA.push(packet);
}
})(),
(async () => {
for await (let packet of stream) {
receivedPacketsB.push(packet);
}
})()
]);
beforeEach(async () => {
streamData = [...Array(10).keys()]
.map(async (value, index) => {
await wait(10 * (index + 1));
streamData.pop();
return value;
})
.reverse();
assert.equal(receivedPacketsA.length, 10);
assert.equal(receivedPacketsA[0], 0);
assert.equal(receivedPacketsA[1], 1);
assert.equal(receivedPacketsA[9], 9);
});
stream = new AsyncIterableStreamSubclass(streamData);
});
it('should receive next packet asynchronously when once() method is used', async () => {
let nextPacket = await stream.once();
assert.equal(nextPacket, 0);
afterEach(async () => {
cancelAllPendingWaits();
});
nextPacket = await stream.once();
assert.equal(nextPacket, 1);
it('should receive packets asynchronously', async () => {
let receivedPackets = [];
for await (let packet of stream) {
receivedPackets.push(packet);
}
assert.equal(receivedPackets.length, 10);
assert.equal(receivedPackets[0], 0);
assert.equal(receivedPackets[1], 1);
assert.equal(receivedPackets[9], 9);
});
nextPacket = await stream.once();
assert.equal(nextPacket, 2);
it('should receive packets asynchronously from multiple concurrent for-await-of loops', async () => {
let receivedPacketsA = [];
let receivedPacketsB = [];
await Promise.all([
(async () => {
for await (let packet of stream) {
receivedPacketsA.push(packet);
}
})(),
(async () => {
for await (let packet of stream) {
receivedPacketsB.push(packet);
}
})()
]);
assert.equal(receivedPacketsA.length, 10);
assert.equal(receivedPacketsA[0], 0);
assert.equal(receivedPacketsA[1], 1);
assert.equal(receivedPacketsA[9], 9);
});
it('should receive next packet asynchronously when once() method is used', async () => {
let nextPacket = await stream.once();
assert.equal(nextPacket, 0);
nextPacket = await stream.once();
assert.equal(nextPacket, 1);
nextPacket = await stream.once();
assert.equal(nextPacket, 2);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc