New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rxprotoplex

Package Overview
Dependencies
Maintainers
0
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rxprotoplex - npm Package Compare versions

Comparing version 1.1.2 to 1.2.0

113

lib/asPlex.js

@@ -1,16 +0,12 @@

import Protoplex from "@zacharygriffee/protoplex";
import { share, tap, catchError } from "rxjs";
import { close$ as createClose$ } from "./close$.js";
// asPlex.js
import {Subject, throwError} from 'rxjs';
import Protoplex from '@zacharygriffee/protoplex';
import {share, tap, catchError} from 'rxjs/operators';
/**
* Wraps a given stream in a Protoplex instance if it is not already multiplexed.
* Wraps a stream into a Protoplex instance and attaches a close$ observable.
*
* @function
* @param {Object} stream - The stream to be wrapped or returned as is if already multiplexed.
* @param {Object} config - Configuration options for creating a Protoplex instance.
* @returns {Object} - The original stream if already multiplexed, otherwise a new Protoplex instance.
*
* @example
* const plexStream = asPlex(stream, { someConfig: true });
* console.log(plexStream); // Outputs either the original multiplexed stream or a new Protoplex instance.
* @param {Duplex} stream - The stream to be wrapped and managed.
* @param {Object} [config={}] - Optional configuration for Protoplex.
* @returns {Protoplex} - The wrapped Plex instance with a close$ observable.
*/

@@ -20,5 +16,7 @@ export const asPlex = (stream, config) => {

const errorHandler = (err) => {
// console.error(`Stream emitted error: ${err.message}`);
if (stream && typeof stream.destroy === 'function') {
stream.destroy(); // Explicitly destroy the stream on error to prevent further events
if (
stream &&
typeof stream.destroy === 'function' &&
!(stream.destroying || stream.destroyed)) {
stream.destroy(err); // Explicitly destroy the stream on error to prevent further events
}

@@ -31,6 +29,6 @@ };

tap({
next: () => {}, // Handle successful closures
complete: () => {
// Explicitly destroy the stream when closing
if (plex.mux.stream && typeof plex.mux.stream.destroy === 'function') {
if (plex.mux.stream &&
typeof plex.mux.stream.destroy === 'function' &&
!(stream.destroying || stream.destroyed)) {
plex.mux.stream.destroy();

@@ -40,11 +38,8 @@ }

error: (err) => {
// console.error(`Stream error: ${err.message}`);
if (plex.mux.stream && typeof plex.mux.stream.destroy === 'function') {
plex.mux.stream.destroy();
if (plex.mux.stream &&
typeof plex.mux.stream.destroy === 'function' &&
!(stream.destroying || stream.destroyed)) {
plex.mux.stream.destroy(err);
}
}
}),
catchError((err) => {
// console.error(`Caught error in close$: ${err.message}`);
return []; // Return an empty observable to suppress further emissions
})

@@ -74,1 +69,69 @@ );

};
/**
* Creates a close$ Subject for a Plex instance that can be used to manually close the stream or emit errors.
*
* @param {Object} plex - The Plex instance to manage.
* @returns {Subject} - A Subject that manages closure and errors.
*/
export const createClose$ = (plex) => {
if (plex.close$) return plex.close$;
const subject = new Subject();
const stream = plex.mux.stream;
// Attach event listeners to the stream
const errorHandler = (error) => {
if (!subject.closed) {
subject.error(error);
}
};
const closeHandler = () => {
if (!subject.closed) {
subject.complete();
}
};
plex.mux.stream.on('error', errorHandler);
plex.mux.stream.on('close', closeHandler);
// Override subject's next and error methods to destroy the stream accordingly
const originalNext = subject.next.bind(subject);
const originalError = subject.error.bind(subject);
subject.next = (value) => {
if (!subject.closed) {
if (value instanceof Error) {
if (!(stream.destroying || stream.destroyed)) plex.mux.stream.destroy(value);
originalError(value); // Emit error to subscribers
} else {
if (!(stream.destroying || stream.destroyed)) plex.mux.stream.destroy(value);
originalNext(value); // Emit next to indicate normal closure
subject.complete();
}
}
};
subject.error = (err) => {
if (!subject.closed) {
if (!(stream.destroying || stream.destroyed)) plex.mux.stream.destroy(err);
originalError(err);
}
};
// Cleanup event listeners when the Subject is unsubscribed or closed
const cleanup = () => {
plex.mux.stream.off('error', errorHandler);
plex.mux.stream.off('close', closeHandler);
};
subject.subscribe({
complete: cleanup,
error: cleanup,
});
plex.close$ = subject;
return plex.close$;
};

@@ -1,68 +0,70 @@

import {fromEvent, ReplaySubject, take} from "rxjs";
import {destroy} from "./destroy.js";
// close$.js
import { Subject } from 'rxjs';
/**
* Returns an observable that emits a single event when the "close" event is triggered on the plex instance.
* Creates a close$ Subject for a Plex instance that can be used to manually close the stream or emit errors.
*
* @param {Object} plex - The plex instance to listen for the "close" event.
* @returns {Observable} - An observable that emits once when the "close" event occurs and then completes.
* @param {Object} plex - The Plex instance to manage.
* @returns {Subject} - A Subject that manages closure and errors.
*/
export const close$ = (plex) => {
if (plex.close$) return plex.close$;
const _plex = plex;
const subject = new ReplaySubject(1);
let errorHandled = false; // Track if an error has been handled
// Listen for close events emitted by Plex and notify the subject
fromEvent(_plex.mux.stream, "close").pipe(take(1)).subscribe(() => {
subject.next();
subject.complete(); // Complete when the stream is closed
});
const subject = new Subject();
// Listen for errors emitted by Plex and handle them appropriately
fromEvent(_plex.mux.stream, 'error').pipe(take(1)).subscribe(error => {
if (!errorHandled) {
errorHandled = true;
subject.error(error); // Propagate the error to the RxJS stream
// Attach event listeners to the stream
const errorHandler = (error) => {
if (!subject.closed) {
subject.error(error);
}
});
};
// Trigger closing the Plex and optionally pass an error through the subject
subject.pipe(take(1)).subscribe({
error: (error) => {
// Empty error handler to prevent unhandled errors
},
next: (value) => {
if (value && typeof value === 'object' && 'message' in value) {
if (!errorHandled) {
errorHandled = true; // Set flag to indicate error has been handled
const closeHandler = () => {
if (!subject.closed) {
subject.next(); // Emit next to indicate normal closure
subject.complete();
}
};
// Safely destroy the stream with error handling
if (!_plex.mux.stream.destroyed) {
_plex.mux.stream.destroy(value);
}
}
subject.error(value); // Propagate error to any subscribers
plex.mux.stream.on('error', errorHandler);
plex.mux.stream.on('close', closeHandler);
// Override subject's next and error methods to destroy the stream accordingly
const originalNext = subject.next.bind(subject);
const originalError = subject.error.bind(subject);
subject.next = (value) => {
if (!subject.closed) {
if (value instanceof Error) {
plex.mux.stream.destroy(value);
originalError(value); // Emit error to subscribers
} else {
// Safely destroy the stream normally
if (!_plex.mux.stream.destroyed) {
destroy(plex, value); // Use the destroy function to handle closure normally
}
plex.mux.stream.destroy();
originalNext(value); // Emit next to indicate normal closure
subject.complete();
}
}
});
};
return subject.pipe(take(1));
};
subject.error = (err) => {
if (!subject.closed) {
plex.mux.stream.destroy(err);
originalError(err);
}
};
// Cleanup event listeners when the Subject is unsubscribed or closed
const cleanup = () => {
plex.mux.stream.off('error', errorHandler);
plex.mux.stream.off('close', closeHandler);
};
subject.subscribe({
complete: cleanup,
error: cleanup,
});
/**
* @deprecated Use `close$` instead.
*
* Returns an observable that emits a single event when the "close" event is triggered on the plex instance.
*
* @param {Object} plex - The plex instance to listen for the "close" event.
* @returns {Observable} - An observable that emits once when the "close" event occurs and then completes.
*/
export const close = close$;
plex.close$ = subject;
return plex.close$;
};

@@ -30,3 +30,2 @@ import {plexIdConfigOrganizeArguments} from "./plexIdConfigOrganizeArguments.js";

connection.on('connect', () => handler(connection));
return connection;

@@ -39,11 +38,4 @@ },

).pipe(
takeUntil(_plex.close$), // Clean up on close$
tap(stream => {
// Check if the value looks like an error object, and propagate it if true
if (stream && typeof stream === 'object' && 'message' in stream) {
throw stream; // Propagate the error
}
}),
catchError(err => new Error(`Connection error: ${err.message}`))
takeUntil(_plex.close$)
);
});
import {plexIdConfigOrganizeArguments} from "./plexIdConfigOrganizeArguments.js";
import {fromEvent, take, takeUntil} from "rxjs";
import {asPlex} from "./asPlex.js";
import {ofChannel} from "./ofChannel.js";
import {consumePlexStream} from "./consumePlexStream.js";
import {connection$} from "./connection$.js";

@@ -30,9 +28,3 @@ /**

export const connectionAndRead$ = plexIdConfigOrganizeArguments((plex, id, config = {}) => {
const connection$ = fromEvent(asPlex(plex), "connection");
const close$ = fromEvent(plex, "close").pipe(take(1));
return ((id || config?.protocol) ?
connection$.pipe(ofChannel({ id, protocol: config.protocol }), takeUntil(close$)) :
connection$.pipe(takeUntil(close$))
).pipe(consumePlexStream);
return connection$(plex, id, config).pipe(consumePlexStream);
});

@@ -39,0 +31,0 @@

import {from, map, mergeMap} from "rxjs";
import {catchError} from "rxjs/operators";

@@ -24,4 +25,7 @@ /**

data => ({data, stream, id: stream.id, protocol: stream.protocol})
)
),
catchError(e => {
throw e
})
)
);

@@ -1,2 +0,2 @@

import duplexThrough from "duplex-through";
import {duplexThrough} from "duplex-through-with-error-handling";
import NSS from "not-secret-stream";

@@ -3,0 +3,0 @@ import {asPlex} from "./asPlex.js";

@@ -7,7 +7,12 @@ import {asPlex} from "./asPlex.js";

* @param {Object} plex - The plex instance containing the multiplexed stream.
* @param {Error} [e] - Optional error object to emit when destroying the stream.
* @param {Error} [error] - Optional error object to emit when destroying the stream.
* @returns {void}
*/
export const destroy = (plex, e) => {
return asPlex(plex).mux.stream.destroy(e);
export const destroy = (plex, error) => {
const stream = plex.mux.stream;
if (!stream.destroyed) {
if (error) stream.emit("error", error);
else stream.emit("close");
stream.destroy(error);
}
};

@@ -1,6 +0,5 @@

import { plexIdConfigOrganizeArguments } from "./plexIdConfigOrganizeArguments.js";
import { fromEvent, merge, takeUntil, tap, finalize, map } from "rxjs";
import { ofChannel } from "./ofChannel.js";
import { asPlex } from "./asPlex.js";
import { close$ } from "./close$.js";
import {plexIdConfigOrganizeArguments} from "./plexIdConfigOrganizeArguments.js";
import {finalize, fromEvent, takeUntil, tap} from "rxjs";
import {ofChannel} from "./ofChannel.js";
import {asPlex} from "./asPlex.js";

@@ -35,3 +34,4 @@ /**

// Create connection$ Observable
const connection$ = fromEvent(_plex, "connection").pipe(
// Merge connection$ and error$ into a single Observable
return fromEvent(_plex, "connection").pipe(
ofChannel(listenArgs), // Filter events based on channel ID and protocol

@@ -43,27 +43,2 @@ takeUntil(close$), // Clean up when close$ emits

);
// Create error$ Observable for errors from the stream
const error$ = fromEvent(_plex.mux.stream, 'error').pipe(
takeUntil(close$), // Clean up when close$ emits
);
// Merge connection$ and error$ into a single Observable
return merge(
connection$.pipe(map(connection => ({ type: 'connection', connection }))),
error$.pipe(map(error => ({ type: 'error', error })))
).pipe(
takeUntil(close$), // Complete on close$
tap(event => {
// Handle error events
if (event.type === 'error') {
throw event.error; // Propagate error to any subscribers
}
}),
map(event => {
// Return the connection if it's a connection event
if (event.type === 'connection') {
return event.connection;
}
})
);
});

@@ -1,2 +0,2 @@

import { timeout } from "rxjs";
import {tap, timeout} from "rxjs";

@@ -3,0 +3,0 @@ /**

{
"name": "rxprotoplex",
"version": "1.1.2",
"version": "1.2.0",
"description": "A utility library for working with Plex-based connections and streams with RxJS operators.",

@@ -14,3 +14,3 @@ "main": "index.js",

"compact-encoding": "^2.15.0",
"duplex-through": "^1.0.2",
"duplex-through-with-error-handling": "^1.0.11",
"not-secret-stream": "^1.0.0",

@@ -20,3 +20,4 @@ "rxjs": "^7.8.1"

"devDependencies": {
"brittle": "^3.7.0"
"brittle": "^3.7.0",
"streamx": "^2.20.2"
},

@@ -23,0 +24,0 @@ "keywords": [

@@ -1,2 +0,2 @@

import {test, solo} from "brittle";
import {test, solo, skip} from "brittle";
import {

@@ -12,3 +12,3 @@ listenAndConnectionAndRead$,

tapSend,
connection$, close$, connectionAndRead$, connect$
connection$, close$, connectionAndRead$, connect$, asPlex, destroy
} from "./index.js";

@@ -18,3 +18,4 @@ import b4a from "b4a";

import {withTimeout} from "./lib/withTimeout.js";
import {Duplex} from "streamx";
import {catchError} from "rxjs/operators";
// Utility function for delayed closure

@@ -27,2 +28,54 @@ const delayedClose = (closure$, delay, value) => {

test("destroy function with error", async (t) => {
t.plan(1);
// Create a Plex instance using Duplex (or your specific implementation)
const plex = asPlex(new Duplex());
// Subscribe to the close$ observable
const subscription = close$(plex).subscribe({
next() {
t.fail('Should not receive next when destroy is called with error');
},
error(e) {
t.is(e.message, "fun", 'Error message should match "fun"');
},
complete() {
t.fail('Should not complete when destroy is called with error');
},
});
// Call destroy with an error to trigger the 'error' event
destroy(plex, new Error("fun"));
// Cleanup after test to prevent memory leaks
subscription.unsubscribe();
});
test("destroy function without error", async (t) => {
t.plan(1);
// Create a Plex instance using Duplex (or your specific implementation)
const plex = asPlex(new Duplex());
// Subscribe to the close$ observable
const subscription = close$(plex).subscribe({
next() {
t.fail('Should not receive next when destroy is called with error');
},
error(e) {
t.fail('Error should not occur');
},
complete() {
t.pass('Should not complete when destroy is called with error');
},
});
// Call destroy with an error to trigger the 'error' event
destroy(plex);
// Cleanup after test to prevent memory leaks
subscription.unsubscribe();
});
test("test connection$", async t => {

@@ -180,27 +233,4 @@ t.plan(1);

test("connection$ closes properly on close$", async t => {
t.plan(2);
const [p1, p2] = createPlexPair();
const conn$ = listenAndConnection$(p1, "channelId");
const closure$ = close$(p1);
conn$.subscribe({
next: connection => {
t.ok(connection, "Connection established successfully");
},
error: (err) => {
t.is(err.message, "Manual closure", "Error should propagate correctly to connection$ subscribers");
}
});
connect$(p2, "channelId").subscribe(stream => {
stream.end(b4a.from("test message"));
});
setTimeout(() => {
closure$.next(new Error("Manual closure"));
});
});
test("should establish multiple connections and exchange messages", async t => {

@@ -231,4 +261,5 @@ t.plan(2); // Expect two successful assertions

t.plan(1); // Expect one assertion
const [p1] = createPlexPair();
const [p1, p2] = createPlexPair();
// Wrap the connect$ observable with a timeout

@@ -246,27 +277,33 @@ const timeoutDuration = 1000; // Set timeout to 1000 ms for test

test("should handle multiple close$ events gracefully", async t => {
t.plan(1);
const [p1] = createPlexPair();
test("connection$ closes properly on close$", async t => {
t.plan(3);
const [p1, p2] = createPlexPair();
const conn$ = listenAndConnection$(p1, "channelId");
const closure$ = close$(p1);
let closeCount = 0;
conn$.subscribe({
next: connection => {
t.ok(connection, "Connection established successfully");
},
error: (err) => {
t.is(err.message, "Manual closure", "Error should propagate correctly to connection$ subscribers");
}
});
closure$.subscribe({
complete: () => {
closeCount++;
if (closeCount > 1) {
t.fail("Close event should only complete once");
} else {
t.pass("Close event handled gracefully");
}
connect$(p2, "channelId").subscribe({
error(e) {
t.ok(e.message, "Manual closure");
},
error: () => t.fail("No error should be thrown during multiple close events")
next: stream => {
stream.end(b4a.from("test message"));
}
});
// Emit multiple close events
closure$.next(new Error("Manual closure"));
delayedClose(closure$, 100, new Error("Another closure"));
setTimeout(() => {
closure$.next(new Error("Manual closure"));
});
});
test("should gracefully shutdown on close$ while messages are exchanged", async t => {

@@ -294,4 +331,6 @@ t.plan(2);

// Establish connection and send messages
connect$(p2, "channelId").subscribe(stream => {
stream.write(b4a.from("test message"));
connect$(p2, "channelId").subscribe({
next: stream => {
stream.write(b4a.from("test message"));
}
});

@@ -298,0 +337,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc