Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

stream-demux

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-demux - npm Package Compare versions

Comparing version 1.0.1 to 1.1.0

16

index.js

@@ -0,10 +1,12 @@

const AsyncIterableStream = require('async-iterable-stream');
const WritableAsyncIterableStream = require('writable-async-iterable-stream');
const END_SYMBOL = Symbol('end');
class StreamDemux {
constructor(iterableAsyncStream) {
this.stream = iterableAsyncStream;
constructor() {
this.mainStream = new WritableAsyncIterableStream();
}
write(name, data) {
this.stream.write({
this.mainStream.write({
name,

@@ -16,3 +18,3 @@ data

end(name) {
this.stream.write({
this.mainStream.write({
name,

@@ -23,3 +25,3 @@ data: END_SYMBOL

async *createFilteredStream(stream, name) {
async *createDemuxedStream(stream, name) {
for await (let packet of stream) {

@@ -36,3 +38,5 @@ if (packet.name === name) {

getStream(name) {
return this.createFilteredStream(this.stream, name);
return new AsyncIterableStream(() => {
return this.createDemuxedStream(this.mainStream, name);
});
}

@@ -39,0 +43,0 @@ }

{
"name": "stream-demux",
"version": "1.0.1",
"version": "1.1.0",
"description": "An iterable asynchronous stream demultiplexer.",

@@ -28,5 +28,8 @@ "main": "index.js",

"devDependencies": {
"iterable-async-stream": "^1.0.7",
"mocha": "^5.0.5"
},
"dependencies": {
"async-iterable-stream": "^1.0.1",
"writable-async-iterable-stream": "^1.0.9"
}
}
const assert = require('assert');
const StreamDemux = require('../index');
const IterableAsyncStream = require('iterable-async-stream');

@@ -14,11 +13,9 @@ function wait(duration) {

describe('StreamDemux', () => {
let stream;
let demux;
beforeEach(async () => {
stream = new IterableAsyncStream();
demux = new StreamDemux(stream);
demux = new StreamDemux();
});
it('should multiplex across a single stream and demultiplex across multiple substreams', async () => {
it('should demultiplex packets over multiple substreams', async () => {
(async () => {

@@ -62,2 +59,39 @@ for (let i = 0; i < 10; i++) {

});
it('should support iteraring over a single substream from multiple consumers at the same time', async () => {
(async () => {
for (let i = 0; i < 10; i++) {
await wait(10);
demux.write('hello', 'world' + i);
}
demux.end('hello');
})();
let receivedPacketsA = [];
let receivedPacketsB = [];
let receivedPacketsC = [];
let substream = demux.getStream('hello');
await Promise.all([
(async () => {
for await (let packet of substream) {
receivedPacketsA.push(packet);
}
})(),
(async () => {
for await (let packet of substream) {
receivedPacketsB.push(packet);
}
})(),
(async () => {
for await (let packet of substream) {
receivedPacketsC.push(packet);
}
})()
]);
assert.equal(receivedPacketsA.length, 10);
assert.equal(receivedPacketsB.length, 10);
assert.equal(receivedPacketsC.length, 10);
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc