stream-demux
An asynchronous iterable stream demultiplexer.
Lets you write data to multiple async iterable streams from a central place without keeping any references to those streams.
The StreamDemux
class returns streams of class DemuxedAsyncIterableStream
(base class AsyncIterableStream
).
See https://github.com/SocketCluster/async-iterable-stream
This library is currently less than 70 lines of code in total (including whitespace) and has 2 dependencies:
- AsyncIterableStream: < 40 LOC
- WritableAsyncIterableStream: < 90 LOC
This library uses a queue which is implemented as a singly-linked list; this allows each loop to consume at its own pace without missing any events (supports nested await statements). An 'event' in the queue can be garbage-collected as soon as the slowest consumer moves its pointer past it.
Installation
npm install stream-demux
Usage
Consuming using async loops
let demux = new StreamDemux();
(async () => {
let substream = demux.stream('abc');
for await (let packet of substream) {
console.log('ABC:', packet);
}
})();
(async () => {
let substream = demux.stream('def');
for await (let packet of substream) {
console.log('DEF:', packet);
}
})();
(async () => {
let asyncIterator = demux.stream('def').createAsyncIterator();
while (true) {
let packet = await asyncIterator.next();
if (packet.done) break;
console.log('DEF (while loop):', packet.value);
}
})();
(async () => {
for (let i = 0; i < 10; i++) {
await wait(10);
demux.write('abc', 'message-abc-' + i);
demux.write('def', 'message-def-' + i);
}
demux.close('abc');
demux.close('def');
})();
function wait(duration) {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, duration);
});
}
Consuming using the once method
(async () => {
let packet = await demux.stream('abc').once();
console.log('Packet:', packet);
})();
(async () => {
try {
let packet = await demux.stream('abc').once(10000);
console.log('Packet:', packet);
} catch (err) {
console.log('Error:', err);
}
})();
Goal
The goal of this module is to efficiently distribute data to a large number of named asynchronous streams while facilitating functional programming patterns which decrease the probability of memory leaks.
Each stream returned by this module is responsible for picking up its own data from a shared source stream - This means that the stream-demux module doesn't hold any references to streams which it produces via its stream()
method; this reduces the likelihood of programming mistakes which would lead to memory leaks because streams don't need to be destroyed or cleaned up explicitly.
The downside to making each stream responsible for consuming its own data is that having a lot of concurrent streams can have a negative impact on performance (especially if there are a lot of idle streams). A goal of stream-demux is to keep that overhead to a minimum.