stream-demux
Advanced tools
Comparing version
16
index.js
@@ -0,10 +1,12 @@ | ||
const AsyncIterableStream = require('async-iterable-stream'); | ||
const WritableAsyncIterableStream = require('writable-async-iterable-stream'); | ||
const END_SYMBOL = Symbol('end'); | ||
class StreamDemux { | ||
constructor(iterableAsyncStream) { | ||
this.stream = iterableAsyncStream; | ||
constructor() { | ||
this.mainStream = new WritableAsyncIterableStream(); | ||
} | ||
write(name, data) { | ||
this.stream.write({ | ||
this.mainStream.write({ | ||
name, | ||
@@ -16,3 +18,3 @@ data | ||
end(name) { | ||
this.stream.write({ | ||
this.mainStream.write({ | ||
name, | ||
@@ -23,3 +25,3 @@ data: END_SYMBOL | ||
async *createFilteredStream(stream, name) { | ||
async *createDemuxedStream(stream, name) { | ||
for await (let packet of stream) { | ||
@@ -36,3 +38,5 @@ if (packet.name === name) { | ||
getStream(name) { | ||
return this.createFilteredStream(this.stream, name); | ||
return new AsyncIterableStream(() => { | ||
return this.createDemuxedStream(this.mainStream, name); | ||
}); | ||
} | ||
@@ -39,0 +43,0 @@ } |
{ | ||
"name": "stream-demux", | ||
"version": "1.0.1", | ||
"version": "1.1.0", | ||
"description": "An iterable asynchronous stream demultiplexer.", | ||
@@ -28,5 +28,8 @@ "main": "index.js", | ||
"devDependencies": { | ||
"iterable-async-stream": "^1.0.7", | ||
"mocha": "^5.0.5" | ||
}, | ||
"dependencies": { | ||
"async-iterable-stream": "^1.0.1", | ||
"writable-async-iterable-stream": "^1.0.9" | ||
} | ||
} |
const assert = require('assert'); | ||
const StreamDemux = require('../index'); | ||
const IterableAsyncStream = require('iterable-async-stream'); | ||
@@ -14,11 +13,9 @@ function wait(duration) { | ||
describe('StreamDemux', () => { | ||
let stream; | ||
let demux; | ||
beforeEach(async () => { | ||
stream = new IterableAsyncStream(); | ||
demux = new StreamDemux(stream); | ||
demux = new StreamDemux(); | ||
}); | ||
it('should multiplex across a single stream and demultiplex across multiple substreams', async () => { | ||
it('should demultiplex packets over multiple substreams', async () => { | ||
(async () => { | ||
@@ -62,2 +59,39 @@ for (let i = 0; i < 10; i++) { | ||
}); | ||
it('should support iteraring over a single substream from multiple consumers at the same time', async () => { | ||
(async () => { | ||
for (let i = 0; i < 10; i++) { | ||
await wait(10); | ||
demux.write('hello', 'world' + i); | ||
} | ||
demux.end('hello'); | ||
})(); | ||
let receivedPacketsA = []; | ||
let receivedPacketsB = []; | ||
let receivedPacketsC = []; | ||
let substream = demux.getStream('hello'); | ||
await Promise.all([ | ||
(async () => { | ||
for await (let packet of substream) { | ||
receivedPacketsA.push(packet); | ||
} | ||
})(), | ||
(async () => { | ||
for await (let packet of substream) { | ||
receivedPacketsB.push(packet); | ||
} | ||
})(), | ||
(async () => { | ||
for await (let packet of substream) { | ||
receivedPacketsC.push(packet); | ||
} | ||
})() | ||
]); | ||
assert.equal(receivedPacketsA.length, 10); | ||
assert.equal(receivedPacketsB.length, 10); | ||
assert.equal(receivedPacketsC.length, 10); | ||
}); | ||
}); |
5328
26.38%1
-50%119
40%2
Infinity%+ Added
+ Added