async-iterable-stream
Advanced tools
Comparing version 1.2.1 to 2.0.0
24
index.js
@@ -0,23 +1,23 @@ | ||
const MUST_OVERRIDE_METHOD_MESSAGE = 'Method must be overriden by subclass'; | ||
class AsyncIterableStream { | ||
constructor(asyncIteratorFactory) { | ||
this._asyncIteratorFactory = asyncIteratorFactory; | ||
next() { | ||
throw new TypeError(MUST_OVERRIDE_METHOD_MESSAGE); | ||
} | ||
next() { | ||
return this._asyncIteratorFactory().next(); | ||
createAsyncIterator() { | ||
throw new TypeError(MUST_OVERRIDE_METHOD_MESSAGE); | ||
} | ||
async once() { | ||
while (true) { | ||
let result = await this.next(); | ||
if (result.done) { | ||
// If stream was ended, this function should never resolve. | ||
await new Promise(() => {}); | ||
} | ||
return result.value; | ||
let result = await this.next(); | ||
if (result.done) { | ||
// If stream was ended, this function should never resolve. | ||
await new Promise(() => {}); | ||
} | ||
return result.value; | ||
} | ||
[Symbol.asyncIterator]() { | ||
return this._asyncIteratorFactory(); | ||
return this.createAsyncIterator(); | ||
} | ||
@@ -24,0 +24,0 @@ } |
{ | ||
"name": "async-iterable-stream", | ||
"version": "1.2.1", | ||
"version": "2.0.0", | ||
"description": "A readable async stream which can be iterated over using a for-await-of loop.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
163
test/test.js
@@ -22,75 +22,124 @@ const AsyncIterableStream = require('../index'); | ||
describe('AsyncIterableStream', () => { | ||
let streamData; | ||
let stream; | ||
class AsyncIterableStreamSubclass extends AsyncIterableStream { | ||
constructor(dataPromiseList) { | ||
super(); | ||
this._dataPromiseList = dataPromiseList; | ||
} | ||
// The generator function passed to the AsyncIterableStream must be a pure function. | ||
async function* createDataConsumerGenerator(dataPromiseList) { | ||
while (dataPromiseList.length) { | ||
let result = await dataPromiseList[dataPromiseList.length - 1]; | ||
async next() { | ||
let value = await this._dataPromiseList[this._dataPromiseList.length - 1]; | ||
return {value, done: !this._dataPromiseList.length}; | ||
} | ||
async *createAsyncIterator() { | ||
while (this._dataPromiseList.length) { | ||
let result = await this._dataPromiseList[this._dataPromiseList.length - 1]; | ||
yield result; | ||
} | ||
} | ||
} | ||
beforeEach(async () => { | ||
streamData = [...Array(10).keys()] | ||
.map(async (value, index) => { | ||
await wait(10 * (index + 1)); | ||
streamData.pop(); | ||
return value; | ||
}) | ||
.reverse(); | ||
stream = new AsyncIterableStream(() => { | ||
return createDataConsumerGenerator(streamData); | ||
describe('AsyncIterableStream', () => { | ||
describe('AsyncIterableStream abstract class', () => { | ||
let abstractStream; | ||
beforeEach(async () => { | ||
abstractStream = new AsyncIterableStream(); | ||
}); | ||
}); | ||
afterEach(async () => { | ||
cancelAllPendingWaits(); | ||
}); | ||
afterEach(async () => { | ||
cancelAllPendingWaits(); | ||
}); | ||
it('should receive packets asynchronously', async () => { | ||
let receivedPackets = []; | ||
for await (let packet of stream) { | ||
receivedPackets.push(packet); | ||
} | ||
assert.equal(receivedPackets.length, 10); | ||
assert.equal(receivedPackets[0], 0); | ||
assert.equal(receivedPackets[1], 1); | ||
assert.equal(receivedPackets[9], 9); | ||
it('should throw error if next() is invoked directly on the abstract class', async () => { | ||
let result; | ||
let error; | ||
try { | ||
result = abstractStream.next(); | ||
} catch (err) { | ||
error = err; | ||
} | ||
assert.equal(error.name, 'TypeError'); | ||
assert.equal(error.message, 'Method must be overriden by subclass'); | ||
}); | ||
it('should throw error if createAsyncIterator() is invoked directly on the abstract class', async () => { | ||
let result; | ||
let error; | ||
try { | ||
result = abstractStream.createAsyncIterator(); | ||
} catch (err) { | ||
error = err; | ||
} | ||
assert.equal(error.name, 'TypeError'); | ||
assert.equal(error.message, 'Method must be overriden by subclass'); | ||
}); | ||
}); | ||
it('should receive packets asynchronously from multiple concurrent for-await-of loops', async () => { | ||
let receivedPacketsA = []; | ||
let receivedPacketsB = []; | ||
describe('AsyncIterableStream subclass', () => { | ||
let streamData; | ||
let stream; | ||
await Promise.all([ | ||
(async () => { | ||
for await (let packet of stream) { | ||
receivedPacketsA.push(packet); | ||
} | ||
})(), | ||
(async () => { | ||
for await (let packet of stream) { | ||
receivedPacketsB.push(packet); | ||
} | ||
})() | ||
]); | ||
beforeEach(async () => { | ||
streamData = [...Array(10).keys()] | ||
.map(async (value, index) => { | ||
await wait(10 * (index + 1)); | ||
streamData.pop(); | ||
return value; | ||
}) | ||
.reverse(); | ||
assert.equal(receivedPacketsA.length, 10); | ||
assert.equal(receivedPacketsA[0], 0); | ||
assert.equal(receivedPacketsA[1], 1); | ||
assert.equal(receivedPacketsA[9], 9); | ||
}); | ||
stream = new AsyncIterableStreamSubclass(streamData); | ||
}); | ||
it('should receive next packet asynchronously when once() method is used', async () => { | ||
let nextPacket = await stream.once(); | ||
assert.equal(nextPacket, 0); | ||
afterEach(async () => { | ||
cancelAllPendingWaits(); | ||
}); | ||
nextPacket = await stream.once(); | ||
assert.equal(nextPacket, 1); | ||
it('should receive packets asynchronously', async () => { | ||
let receivedPackets = []; | ||
for await (let packet of stream) { | ||
receivedPackets.push(packet); | ||
} | ||
assert.equal(receivedPackets.length, 10); | ||
assert.equal(receivedPackets[0], 0); | ||
assert.equal(receivedPackets[1], 1); | ||
assert.equal(receivedPackets[9], 9); | ||
}); | ||
nextPacket = await stream.once(); | ||
assert.equal(nextPacket, 2); | ||
it('should receive packets asynchronously from multiple concurrent for-await-of loops', async () => { | ||
let receivedPacketsA = []; | ||
let receivedPacketsB = []; | ||
await Promise.all([ | ||
(async () => { | ||
for await (let packet of stream) { | ||
receivedPacketsA.push(packet); | ||
} | ||
})(), | ||
(async () => { | ||
for await (let packet of stream) { | ||
receivedPacketsB.push(packet); | ||
} | ||
})() | ||
]); | ||
assert.equal(receivedPacketsA.length, 10); | ||
assert.equal(receivedPacketsA[0], 0); | ||
assert.equal(receivedPacketsA[1], 1); | ||
assert.equal(receivedPacketsA[9], 9); | ||
}); | ||
it('should receive next packet asynchronously when once() method is used', async () => { | ||
let nextPacket = await stream.once(); | ||
assert.equal(nextPacket, 0); | ||
nextPacket = await stream.once(); | ||
assert.equal(nextPacket, 1); | ||
nextPacket = await stream.once(); | ||
assert.equal(nextPacket, 2); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
7933
142