async-iterable-stream
Advanced tools
+12
-12
@@ -0,23 +1,23 @@ | ||
| const MUST_OVERRIDE_METHOD_MESSAGE = 'Method must be overriden by subclass'; | ||
| class AsyncIterableStream { | ||
| constructor(asyncIteratorFactory) { | ||
| this._asyncIteratorFactory = asyncIteratorFactory; | ||
| next() { | ||
| throw new TypeError(MUST_OVERRIDE_METHOD_MESSAGE); | ||
| } | ||
| next() { | ||
| return this._asyncIteratorFactory().next(); | ||
| createAsyncIterator() { | ||
| throw new TypeError(MUST_OVERRIDE_METHOD_MESSAGE); | ||
| } | ||
| async once() { | ||
| while (true) { | ||
| let result = await this.next(); | ||
| if (result.done) { | ||
| // If stream was ended, this function should never resolve. | ||
| await new Promise(() => {}); | ||
| } | ||
| return result.value; | ||
| let result = await this.next(); | ||
| if (result.done) { | ||
| // If stream was ended, this function should never resolve. | ||
| await new Promise(() => {}); | ||
| } | ||
| return result.value; | ||
| } | ||
| [Symbol.asyncIterator]() { | ||
| return this._asyncIteratorFactory(); | ||
| return this.createAsyncIterator(); | ||
| } | ||
@@ -24,0 +24,0 @@ } |
+1
-1
| { | ||
| "name": "async-iterable-stream", | ||
| "version": "1.2.1", | ||
| "version": "2.0.0", | ||
| "description": "A readable async stream which can be iterated over using a for-await-of loop.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
+106
-57
@@ -22,75 +22,124 @@ const AsyncIterableStream = require('../index'); | ||
| describe('AsyncIterableStream', () => { | ||
| let streamData; | ||
| let stream; | ||
| class AsyncIterableStreamSubclass extends AsyncIterableStream { | ||
| constructor(dataPromiseList) { | ||
| super(); | ||
| this._dataPromiseList = dataPromiseList; | ||
| } | ||
| // The generator function passed to the AsyncIterableStream must be a pure function. | ||
| async function* createDataConsumerGenerator(dataPromiseList) { | ||
| while (dataPromiseList.length) { | ||
| let result = await dataPromiseList[dataPromiseList.length - 1]; | ||
| async next() { | ||
| let value = await this._dataPromiseList[this._dataPromiseList.length - 1]; | ||
| return {value, done: !this._dataPromiseList.length}; | ||
| } | ||
| async *createAsyncIterator() { | ||
| while (this._dataPromiseList.length) { | ||
| let result = await this._dataPromiseList[this._dataPromiseList.length - 1]; | ||
| yield result; | ||
| } | ||
| } | ||
| } | ||
| beforeEach(async () => { | ||
| streamData = [...Array(10).keys()] | ||
| .map(async (value, index) => { | ||
| await wait(10 * (index + 1)); | ||
| streamData.pop(); | ||
| return value; | ||
| }) | ||
| .reverse(); | ||
| stream = new AsyncIterableStream(() => { | ||
| return createDataConsumerGenerator(streamData); | ||
| describe('AsyncIterableStream', () => { | ||
| describe('AsyncIterableStream abstract class', () => { | ||
| let abstractStream; | ||
| beforeEach(async () => { | ||
| abstractStream = new AsyncIterableStream(); | ||
| }); | ||
| }); | ||
| afterEach(async () => { | ||
| cancelAllPendingWaits(); | ||
| }); | ||
| afterEach(async () => { | ||
| cancelAllPendingWaits(); | ||
| }); | ||
| it('should receive packets asynchronously', async () => { | ||
| let receivedPackets = []; | ||
| for await (let packet of stream) { | ||
| receivedPackets.push(packet); | ||
| } | ||
| assert.equal(receivedPackets.length, 10); | ||
| assert.equal(receivedPackets[0], 0); | ||
| assert.equal(receivedPackets[1], 1); | ||
| assert.equal(receivedPackets[9], 9); | ||
| it('should throw error if next() is invoked directly on the abstract class', async () => { | ||
| let result; | ||
| let error; | ||
| try { | ||
| result = abstractStream.next(); | ||
| } catch (err) { | ||
| error = err; | ||
| } | ||
| assert.equal(error.name, 'TypeError'); | ||
| assert.equal(error.message, 'Method must be overriden by subclass'); | ||
| }); | ||
| it('should throw error if createAsyncIterator() is invoked directly on the abstract class', async () => { | ||
| let result; | ||
| let error; | ||
| try { | ||
| result = abstractStream.createAsyncIterator(); | ||
| } catch (err) { | ||
| error = err; | ||
| } | ||
| assert.equal(error.name, 'TypeError'); | ||
| assert.equal(error.message, 'Method must be overriden by subclass'); | ||
| }); | ||
| }); | ||
| it('should receive packets asynchronously from multiple concurrent for-await-of loops', async () => { | ||
| let receivedPacketsA = []; | ||
| let receivedPacketsB = []; | ||
| describe('AsyncIterableStream subclass', () => { | ||
| let streamData; | ||
| let stream; | ||
| await Promise.all([ | ||
| (async () => { | ||
| for await (let packet of stream) { | ||
| receivedPacketsA.push(packet); | ||
| } | ||
| })(), | ||
| (async () => { | ||
| for await (let packet of stream) { | ||
| receivedPacketsB.push(packet); | ||
| } | ||
| })() | ||
| ]); | ||
| beforeEach(async () => { | ||
| streamData = [...Array(10).keys()] | ||
| .map(async (value, index) => { | ||
| await wait(10 * (index + 1)); | ||
| streamData.pop(); | ||
| return value; | ||
| }) | ||
| .reverse(); | ||
| assert.equal(receivedPacketsA.length, 10); | ||
| assert.equal(receivedPacketsA[0], 0); | ||
| assert.equal(receivedPacketsA[1], 1); | ||
| assert.equal(receivedPacketsA[9], 9); | ||
| }); | ||
| stream = new AsyncIterableStreamSubclass(streamData); | ||
| }); | ||
| it('should receive next packet asynchronously when once() method is used', async () => { | ||
| let nextPacket = await stream.once(); | ||
| assert.equal(nextPacket, 0); | ||
| afterEach(async () => { | ||
| cancelAllPendingWaits(); | ||
| }); | ||
| nextPacket = await stream.once(); | ||
| assert.equal(nextPacket, 1); | ||
| it('should receive packets asynchronously', async () => { | ||
| let receivedPackets = []; | ||
| for await (let packet of stream) { | ||
| receivedPackets.push(packet); | ||
| } | ||
| assert.equal(receivedPackets.length, 10); | ||
| assert.equal(receivedPackets[0], 0); | ||
| assert.equal(receivedPackets[1], 1); | ||
| assert.equal(receivedPackets[9], 9); | ||
| }); | ||
| nextPacket = await stream.once(); | ||
| assert.equal(nextPacket, 2); | ||
| it('should receive packets asynchronously from multiple concurrent for-await-of loops', async () => { | ||
| let receivedPacketsA = []; | ||
| let receivedPacketsB = []; | ||
| await Promise.all([ | ||
| (async () => { | ||
| for await (let packet of stream) { | ||
| receivedPacketsA.push(packet); | ||
| } | ||
| })(), | ||
| (async () => { | ||
| for await (let packet of stream) { | ||
| receivedPacketsB.push(packet); | ||
| } | ||
| })() | ||
| ]); | ||
| assert.equal(receivedPacketsA.length, 10); | ||
| assert.equal(receivedPacketsA[0], 0); | ||
| assert.equal(receivedPacketsA[1], 1); | ||
| assert.equal(receivedPacketsA[9], 9); | ||
| }); | ||
| it('should receive next packet asynchronously when once() method is used', async () => { | ||
| let nextPacket = await stream.once(); | ||
| assert.equal(nextPacket, 0); | ||
| nextPacket = await stream.once(); | ||
| assert.equal(nextPacket, 1); | ||
| nextPacket = await stream.once(); | ||
| assert.equal(nextPacket, 2); | ||
| }); | ||
| }); | ||
| }); |
7933
20.82%142
37.86%