stream-demux
Advanced tools
Comparing version 1.0.1 to 1.1.0
16
index.js
@@ -0,10 +1,12 @@ | ||
const AsyncIterableStream = require('async-iterable-stream'); | ||
const WritableAsyncIterableStream = require('writable-async-iterable-stream'); | ||
const END_SYMBOL = Symbol('end'); | ||
class StreamDemux { | ||
constructor(iterableAsyncStream) { | ||
this.stream = iterableAsyncStream; | ||
constructor() { | ||
this.mainStream = new WritableAsyncIterableStream(); | ||
} | ||
write(name, data) { | ||
this.stream.write({ | ||
this.mainStream.write({ | ||
name, | ||
@@ -16,3 +18,3 @@ data | ||
end(name) { | ||
this.stream.write({ | ||
this.mainStream.write({ | ||
name, | ||
@@ -23,3 +25,3 @@ data: END_SYMBOL | ||
async *createFilteredStream(stream, name) { | ||
async *createDemuxedStream(stream, name) { | ||
for await (let packet of stream) { | ||
@@ -36,3 +38,5 @@ if (packet.name === name) { | ||
getStream(name) { | ||
return this.createFilteredStream(this.stream, name); | ||
return new AsyncIterableStream(() => { | ||
return this.createDemuxedStream(this.mainStream, name); | ||
}); | ||
} | ||
@@ -39,0 +43,0 @@ } |
{ | ||
"name": "stream-demux", | ||
"version": "1.0.1", | ||
"version": "1.1.0", | ||
"description": "An iterable asynchronous stream demultiplexer.", | ||
@@ -28,5 +28,8 @@ "main": "index.js", | ||
"devDependencies": { | ||
"iterable-async-stream": "^1.0.7", | ||
"mocha": "^5.0.5" | ||
}, | ||
"dependencies": { | ||
"async-iterable-stream": "^1.0.1", | ||
"writable-async-iterable-stream": "^1.0.9" | ||
} | ||
} |
const assert = require('assert'); | ||
const StreamDemux = require('../index'); | ||
const IterableAsyncStream = require('iterable-async-stream'); | ||
@@ -14,11 +13,9 @@ function wait(duration) { | ||
describe('StreamDemux', () => { | ||
let stream; | ||
let demux; | ||
beforeEach(async () => { | ||
stream = new IterableAsyncStream(); | ||
demux = new StreamDemux(stream); | ||
demux = new StreamDemux(); | ||
}); | ||
it('should multiplex across a single stream and demultiplex across multiple substreams', async () => { | ||
it('should demultiplex packets over multiple substreams', async () => { | ||
(async () => { | ||
@@ -62,2 +59,39 @@ for (let i = 0; i < 10; i++) { | ||
}); | ||
it('should support iteraring over a single substream from multiple consumers at the same time', async () => { | ||
(async () => { | ||
for (let i = 0; i < 10; i++) { | ||
await wait(10); | ||
demux.write('hello', 'world' + i); | ||
} | ||
demux.end('hello'); | ||
})(); | ||
let receivedPacketsA = []; | ||
let receivedPacketsB = []; | ||
let receivedPacketsC = []; | ||
let substream = demux.getStream('hello'); | ||
await Promise.all([ | ||
(async () => { | ||
for await (let packet of substream) { | ||
receivedPacketsA.push(packet); | ||
} | ||
})(), | ||
(async () => { | ||
for await (let packet of substream) { | ||
receivedPacketsB.push(packet); | ||
} | ||
})(), | ||
(async () => { | ||
for await (let packet of substream) { | ||
receivedPacketsC.push(packet); | ||
} | ||
})() | ||
]); | ||
assert.equal(receivedPacketsA.length, 10); | ||
assert.equal(receivedPacketsB.length, 10); | ||
assert.equal(receivedPacketsC.length, 10); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
5328
1
119
2
+ Addedasync-iterable-stream@^1.0.1
+ Addedasync-iterable-stream@1.2.1(transitive)