rxprotoplex
Advanced tools
Comparing version 1.1.2 to 1.2.0
@@ -1,16 +0,12 @@ | ||
import Protoplex from "@zacharygriffee/protoplex"; | ||
import { share, tap, catchError } from "rxjs"; | ||
import { close$ as createClose$ } from "./close$.js"; | ||
// asPlex.js | ||
import {Subject, throwError} from 'rxjs'; | ||
import Protoplex from '@zacharygriffee/protoplex'; | ||
import {share, tap, catchError} from 'rxjs/operators'; | ||
/** | ||
* Wraps a given stream in a Protoplex instance if it is not already multiplexed. | ||
* Wraps a stream into a Protoplex instance and attaches a close$ observable. | ||
* | ||
* @function | ||
* @param {Object} stream - The stream to be wrapped or returned as is if already multiplexed. | ||
* @param {Object} config - Configuration options for creating a Protoplex instance. | ||
* @returns {Object} - The original stream if already multiplexed, otherwise a new Protoplex instance. | ||
* | ||
* @example | ||
* const plexStream = asPlex(stream, { someConfig: true }); | ||
* console.log(plexStream); // Outputs either the original multiplexed stream or a new Protoplex instance. | ||
* @param {Duplex} stream - The stream to be wrapped and managed. | ||
* @param {Object} [config={}] - Optional configuration for Protoplex. | ||
* @returns {Protoplex} - The wrapped Plex instance with a close$ observable. | ||
*/ | ||
@@ -20,5 +16,7 @@ export const asPlex = (stream, config) => { | ||
const errorHandler = (err) => { | ||
// console.error(`Stream emitted error: ${err.message}`); | ||
if (stream && typeof stream.destroy === 'function') { | ||
stream.destroy(); // Explicitly destroy the stream on error to prevent further events | ||
if ( | ||
stream && | ||
typeof stream.destroy === 'function' && | ||
!(stream.destroying || stream.destroyed)) { | ||
stream.destroy(err); // Explicitly destroy the stream on error to prevent further events | ||
} | ||
@@ -31,6 +29,6 @@ }; | ||
tap({ | ||
next: () => {}, // Handle successful closures | ||
complete: () => { | ||
// Explicitly destroy the stream when closing | ||
if (plex.mux.stream && typeof plex.mux.stream.destroy === 'function') { | ||
if (plex.mux.stream && | ||
typeof plex.mux.stream.destroy === 'function' && | ||
!(stream.destroying || stream.destroyed)) { | ||
plex.mux.stream.destroy(); | ||
@@ -40,11 +38,8 @@ } | ||
error: (err) => { | ||
// console.error(`Stream error: ${err.message}`); | ||
if (plex.mux.stream && typeof plex.mux.stream.destroy === 'function') { | ||
plex.mux.stream.destroy(); | ||
if (plex.mux.stream && | ||
typeof plex.mux.stream.destroy === 'function' && | ||
!(stream.destroying || stream.destroyed)) { | ||
plex.mux.stream.destroy(err); | ||
} | ||
} | ||
}), | ||
catchError((err) => { | ||
// console.error(`Caught error in close$: ${err.message}`); | ||
return []; // Return an empty observable to suppress further emissions | ||
}) | ||
@@ -74,1 +69,69 @@ ); | ||
}; | ||
/** | ||
* Creates a close$ Subject for a Plex instance that can be used to manually close the stream or emit errors. | ||
* | ||
* @param {Object} plex - The Plex instance to manage. | ||
* @returns {Subject} - A Subject that manages closure and errors. | ||
*/ | ||
export const createClose$ = (plex) => { | ||
if (plex.close$) return plex.close$; | ||
const subject = new Subject(); | ||
const stream = plex.mux.stream; | ||
// Attach event listeners to the stream | ||
const errorHandler = (error) => { | ||
if (!subject.closed) { | ||
subject.error(error); | ||
} | ||
}; | ||
const closeHandler = () => { | ||
if (!subject.closed) { | ||
subject.complete(); | ||
} | ||
}; | ||
plex.mux.stream.on('error', errorHandler); | ||
plex.mux.stream.on('close', closeHandler); | ||
// Override subject's next and error methods to destroy the stream accordingly | ||
const originalNext = subject.next.bind(subject); | ||
const originalError = subject.error.bind(subject); | ||
subject.next = (value) => { | ||
if (!subject.closed) { | ||
if (value instanceof Error) { | ||
if (!(stream.destroying || stream.destroyed)) plex.mux.stream.destroy(value); | ||
originalError(value); // Emit error to subscribers | ||
} else { | ||
if (!(stream.destroying || stream.destroyed)) plex.mux.stream.destroy(value); | ||
originalNext(value); // Emit next to indicate normal closure | ||
subject.complete(); | ||
} | ||
} | ||
}; | ||
subject.error = (err) => { | ||
if (!subject.closed) { | ||
if (!(stream.destroying || stream.destroyed)) plex.mux.stream.destroy(err); | ||
originalError(err); | ||
} | ||
}; | ||
// Cleanup event listeners when the Subject is unsubscribed or closed | ||
const cleanup = () => { | ||
plex.mux.stream.off('error', errorHandler); | ||
plex.mux.stream.off('close', closeHandler); | ||
}; | ||
subject.subscribe({ | ||
complete: cleanup, | ||
error: cleanup, | ||
}); | ||
plex.close$ = subject; | ||
return plex.close$; | ||
}; |
@@ -1,68 +0,70 @@ | ||
import {fromEvent, ReplaySubject, take} from "rxjs"; | ||
import {destroy} from "./destroy.js"; | ||
// close$.js | ||
import { Subject } from 'rxjs'; | ||
/** | ||
* Returns an observable that emits a single event when the "close" event is triggered on the plex instance. | ||
* Creates a close$ Subject for a Plex instance that can be used to manually close the stream or emit errors. | ||
* | ||
* @param {Object} plex - The plex instance to listen for the "close" event. | ||
* @returns {Observable} - An observable that emits once when the "close" event occurs and then completes. | ||
* @param {Object} plex - The Plex instance to manage. | ||
* @returns {Subject} - A Subject that manages closure and errors. | ||
*/ | ||
export const close$ = (plex) => { | ||
if (plex.close$) return plex.close$; | ||
const _plex = plex; | ||
const subject = new ReplaySubject(1); | ||
let errorHandled = false; // Track if an error has been handled | ||
// Listen for close events emitted by Plex and notify the subject | ||
fromEvent(_plex.mux.stream, "close").pipe(take(1)).subscribe(() => { | ||
subject.next(); | ||
subject.complete(); // Complete when the stream is closed | ||
}); | ||
const subject = new Subject(); | ||
// Listen for errors emitted by Plex and handle them appropriately | ||
fromEvent(_plex.mux.stream, 'error').pipe(take(1)).subscribe(error => { | ||
if (!errorHandled) { | ||
errorHandled = true; | ||
subject.error(error); // Propagate the error to the RxJS stream | ||
// Attach event listeners to the stream | ||
const errorHandler = (error) => { | ||
if (!subject.closed) { | ||
subject.error(error); | ||
} | ||
}); | ||
}; | ||
// Trigger closing the Plex and optionally pass an error through the subject | ||
subject.pipe(take(1)).subscribe({ | ||
error: (error) => { | ||
// Empty error handler to prevent unhandled errors | ||
}, | ||
next: (value) => { | ||
if (value && typeof value === 'object' && 'message' in value) { | ||
if (!errorHandled) { | ||
errorHandled = true; // Set flag to indicate error has been handled | ||
const closeHandler = () => { | ||
if (!subject.closed) { | ||
subject.next(); // Emit next to indicate normal closure | ||
subject.complete(); | ||
} | ||
}; | ||
// Safely destroy the stream with error handling | ||
if (!_plex.mux.stream.destroyed) { | ||
_plex.mux.stream.destroy(value); | ||
} | ||
} | ||
subject.error(value); // Propagate error to any subscribers | ||
plex.mux.stream.on('error', errorHandler); | ||
plex.mux.stream.on('close', closeHandler); | ||
// Override subject's next and error methods to destroy the stream accordingly | ||
const originalNext = subject.next.bind(subject); | ||
const originalError = subject.error.bind(subject); | ||
subject.next = (value) => { | ||
if (!subject.closed) { | ||
if (value instanceof Error) { | ||
plex.mux.stream.destroy(value); | ||
originalError(value); // Emit error to subscribers | ||
} else { | ||
// Safely destroy the stream normally | ||
if (!_plex.mux.stream.destroyed) { | ||
destroy(plex, value); // Use the destroy function to handle closure normally | ||
} | ||
plex.mux.stream.destroy(); | ||
originalNext(value); // Emit next to indicate normal closure | ||
subject.complete(); | ||
} | ||
} | ||
}); | ||
}; | ||
return subject.pipe(take(1)); | ||
}; | ||
subject.error = (err) => { | ||
if (!subject.closed) { | ||
plex.mux.stream.destroy(err); | ||
originalError(err); | ||
} | ||
}; | ||
// Cleanup event listeners when the Subject is unsubscribed or closed | ||
const cleanup = () => { | ||
plex.mux.stream.off('error', errorHandler); | ||
plex.mux.stream.off('close', closeHandler); | ||
}; | ||
subject.subscribe({ | ||
complete: cleanup, | ||
error: cleanup, | ||
}); | ||
/** | ||
* @deprecated Use `close$` instead. | ||
* | ||
* Returns an observable that emits a single event when the "close" event is triggered on the plex instance. | ||
* | ||
* @param {Object} plex - The plex instance to listen for the "close" event. | ||
* @returns {Observable} - An observable that emits once when the "close" event occurs and then completes. | ||
*/ | ||
export const close = close$; | ||
plex.close$ = subject; | ||
return plex.close$; | ||
}; |
@@ -30,3 +30,2 @@ import {plexIdConfigOrganizeArguments} from "./plexIdConfigOrganizeArguments.js"; | ||
connection.on('connect', () => handler(connection)); | ||
return connection; | ||
@@ -39,11 +38,4 @@ }, | ||
).pipe( | ||
takeUntil(_plex.close$), // Clean up on close$ | ||
tap(stream => { | ||
// Check if the value looks like an error object, and propagate it if true | ||
if (stream && typeof stream === 'object' && 'message' in stream) { | ||
throw stream; // Propagate the error | ||
} | ||
}), | ||
catchError(err => new Error(`Connection error: ${err.message}`)) | ||
takeUntil(_plex.close$) | ||
); | ||
}); |
import {plexIdConfigOrganizeArguments} from "./plexIdConfigOrganizeArguments.js"; | ||
import {fromEvent, take, takeUntil} from "rxjs"; | ||
import {asPlex} from "./asPlex.js"; | ||
import {ofChannel} from "./ofChannel.js"; | ||
import {consumePlexStream} from "./consumePlexStream.js"; | ||
import {connection$} from "./connection$.js"; | ||
@@ -30,9 +28,3 @@ /** | ||
export const connectionAndRead$ = plexIdConfigOrganizeArguments((plex, id, config = {}) => { | ||
const connection$ = fromEvent(asPlex(plex), "connection"); | ||
const close$ = fromEvent(plex, "close").pipe(take(1)); | ||
return ((id || config?.protocol) ? | ||
connection$.pipe(ofChannel({ id, protocol: config.protocol }), takeUntil(close$)) : | ||
connection$.pipe(takeUntil(close$)) | ||
).pipe(consumePlexStream); | ||
return connection$(plex, id, config).pipe(consumePlexStream); | ||
}); | ||
@@ -39,0 +31,0 @@ |
import {from, map, mergeMap} from "rxjs"; | ||
import {catchError} from "rxjs/operators"; | ||
@@ -24,4 +25,7 @@ /** | ||
data => ({data, stream, id: stream.id, protocol: stream.protocol}) | ||
) | ||
), | ||
catchError(e => { | ||
throw e | ||
}) | ||
) | ||
); |
@@ -1,2 +0,2 @@ | ||
import duplexThrough from "duplex-through"; | ||
import {duplexThrough} from "duplex-through-with-error-handling"; | ||
import NSS from "not-secret-stream"; | ||
@@ -3,0 +3,0 @@ import {asPlex} from "./asPlex.js"; |
@@ -7,7 +7,12 @@ import {asPlex} from "./asPlex.js"; | ||
* @param {Object} plex - The plex instance containing the multiplexed stream. | ||
* @param {Error} [e] - Optional error object to emit when destroying the stream. | ||
* @param {Error} [error] - Optional error object to emit when destroying the stream. | ||
* @returns {void} | ||
*/ | ||
export const destroy = (plex, e) => { | ||
return asPlex(plex).mux.stream.destroy(e); | ||
export const destroy = (plex, error) => { | ||
const stream = plex.mux.stream; | ||
if (!stream.destroyed) { | ||
if (error) stream.emit("error", error); | ||
else stream.emit("close"); | ||
stream.destroy(error); | ||
} | ||
}; |
@@ -1,6 +0,5 @@ | ||
import { plexIdConfigOrganizeArguments } from "./plexIdConfigOrganizeArguments.js"; | ||
import { fromEvent, merge, takeUntil, tap, finalize, map } from "rxjs"; | ||
import { ofChannel } from "./ofChannel.js"; | ||
import { asPlex } from "./asPlex.js"; | ||
import { close$ } from "./close$.js"; | ||
import {plexIdConfigOrganizeArguments} from "./plexIdConfigOrganizeArguments.js"; | ||
import {finalize, fromEvent, takeUntil, tap} from "rxjs"; | ||
import {ofChannel} from "./ofChannel.js"; | ||
import {asPlex} from "./asPlex.js"; | ||
@@ -35,3 +34,4 @@ /** | ||
// Create connection$ Observable | ||
const connection$ = fromEvent(_plex, "connection").pipe( | ||
// Merge connection$ and error$ into a single Observable | ||
return fromEvent(_plex, "connection").pipe( | ||
ofChannel(listenArgs), // Filter events based on channel ID and protocol | ||
@@ -43,27 +43,2 @@ takeUntil(close$), // Clean up when close$ emits | ||
); | ||
// Create error$ Observable for errors from the stream | ||
const error$ = fromEvent(_plex.mux.stream, 'error').pipe( | ||
takeUntil(close$), // Clean up when close$ emits | ||
); | ||
// Merge connection$ and error$ into a single Observable | ||
return merge( | ||
connection$.pipe(map(connection => ({ type: 'connection', connection }))), | ||
error$.pipe(map(error => ({ type: 'error', error }))) | ||
).pipe( | ||
takeUntil(close$), // Complete on close$ | ||
tap(event => { | ||
// Handle error events | ||
if (event.type === 'error') { | ||
throw event.error; // Propagate error to any subscribers | ||
} | ||
}), | ||
map(event => { | ||
// Return the connection if it's a connection event | ||
if (event.type === 'connection') { | ||
return event.connection; | ||
} | ||
}) | ||
); | ||
}); |
@@ -1,2 +0,2 @@ | ||
import { timeout } from "rxjs"; | ||
import {tap, timeout} from "rxjs"; | ||
@@ -3,0 +3,0 @@ /** |
{ | ||
"name": "rxprotoplex", | ||
"version": "1.1.2", | ||
"version": "1.2.0", | ||
"description": "A utility library for working with Plex-based connections and streams with RxJS operators.", | ||
@@ -14,3 +14,3 @@ "main": "index.js", | ||
"compact-encoding": "^2.15.0", | ||
"duplex-through": "^1.0.2", | ||
"duplex-through-with-error-handling": "^1.0.11", | ||
"not-secret-stream": "^1.0.0", | ||
@@ -20,3 +20,4 @@ "rxjs": "^7.8.1" | ||
"devDependencies": { | ||
"brittle": "^3.7.0" | ||
"brittle": "^3.7.0", | ||
"streamx": "^2.20.2" | ||
}, | ||
@@ -23,0 +24,0 @@ "keywords": [ |
129
test.js
@@ -1,2 +0,2 @@ | ||
import {test, solo} from "brittle"; | ||
import {test, solo, skip} from "brittle"; | ||
import { | ||
@@ -12,3 +12,3 @@ listenAndConnectionAndRead$, | ||
tapSend, | ||
connection$, close$, connectionAndRead$, connect$ | ||
connection$, close$, connectionAndRead$, connect$, asPlex, destroy | ||
} from "./index.js"; | ||
@@ -18,3 +18,4 @@ import b4a from "b4a"; | ||
import {withTimeout} from "./lib/withTimeout.js"; | ||
import {Duplex} from "streamx"; | ||
import {catchError} from "rxjs/operators"; | ||
// Utility function for delayed closure | ||
@@ -27,2 +28,54 @@ const delayedClose = (closure$, delay, value) => { | ||
test("destroy function with error", async (t) => { | ||
t.plan(1); | ||
// Create a Plex instance using Duplex (or your specific implementation) | ||
const plex = asPlex(new Duplex()); | ||
// Subscribe to the close$ observable | ||
const subscription = close$(plex).subscribe({ | ||
next() { | ||
t.fail('Should not receive next when destroy is called with error'); | ||
}, | ||
error(e) { | ||
t.is(e.message, "fun", 'Error message should match "fun"'); | ||
}, | ||
complete() { | ||
t.fail('Should not complete when destroy is called with error'); | ||
}, | ||
}); | ||
// Call destroy with an error to trigger the 'error' event | ||
destroy(plex, new Error("fun")); | ||
// Cleanup after test to prevent memory leaks | ||
subscription.unsubscribe(); | ||
}); | ||
test("destroy function without error", async (t) => { | ||
t.plan(1); | ||
// Create a Plex instance using Duplex (or your specific implementation) | ||
const plex = asPlex(new Duplex()); | ||
// Subscribe to the close$ observable | ||
const subscription = close$(plex).subscribe({ | ||
next() { | ||
t.fail('Should not receive next when destroy is called with error'); | ||
}, | ||
error(e) { | ||
t.fail('Error should not occur'); | ||
}, | ||
complete() { | ||
t.pass('Should not complete when destroy is called with error'); | ||
}, | ||
}); | ||
// Call destroy with an error to trigger the 'error' event | ||
destroy(plex); | ||
// Cleanup after test to prevent memory leaks | ||
subscription.unsubscribe(); | ||
}); | ||
test("test connection$", async t => { | ||
@@ -180,27 +233,4 @@ t.plan(1); | ||
test("connection$ closes properly on close$", async t => { | ||
t.plan(2); | ||
const [p1, p2] = createPlexPair(); | ||
const conn$ = listenAndConnection$(p1, "channelId"); | ||
const closure$ = close$(p1); | ||
conn$.subscribe({ | ||
next: connection => { | ||
t.ok(connection, "Connection established successfully"); | ||
}, | ||
error: (err) => { | ||
t.is(err.message, "Manual closure", "Error should propagate correctly to connection$ subscribers"); | ||
} | ||
}); | ||
connect$(p2, "channelId").subscribe(stream => { | ||
stream.end(b4a.from("test message")); | ||
}); | ||
setTimeout(() => { | ||
closure$.next(new Error("Manual closure")); | ||
}); | ||
}); | ||
test("should establish multiple connections and exchange messages", async t => { | ||
@@ -231,4 +261,5 @@ t.plan(2); // Expect two successful assertions | ||
t.plan(1); // Expect one assertion | ||
const [p1] = createPlexPair(); | ||
const [p1, p2] = createPlexPair(); | ||
// Wrap the connect$ observable with a timeout | ||
@@ -246,27 +277,33 @@ const timeoutDuration = 1000; // Set timeout to 1000 ms for test | ||
test("should handle multiple close$ events gracefully", async t => { | ||
t.plan(1); | ||
const [p1] = createPlexPair(); | ||
test("connection$ closes properly on close$", async t => { | ||
t.plan(3); | ||
const [p1, p2] = createPlexPair(); | ||
const conn$ = listenAndConnection$(p1, "channelId"); | ||
const closure$ = close$(p1); | ||
let closeCount = 0; | ||
conn$.subscribe({ | ||
next: connection => { | ||
t.ok(connection, "Connection established successfully"); | ||
}, | ||
error: (err) => { | ||
t.is(err.message, "Manual closure", "Error should propagate correctly to connection$ subscribers"); | ||
} | ||
}); | ||
closure$.subscribe({ | ||
complete: () => { | ||
closeCount++; | ||
if (closeCount > 1) { | ||
t.fail("Close event should only complete once"); | ||
} else { | ||
t.pass("Close event handled gracefully"); | ||
} | ||
connect$(p2, "channelId").subscribe({ | ||
error(e) { | ||
t.ok(e.message, "Manual closure"); | ||
}, | ||
error: () => t.fail("No error should be thrown during multiple close events") | ||
next: stream => { | ||
stream.end(b4a.from("test message")); | ||
} | ||
}); | ||
// Emit multiple close events | ||
closure$.next(new Error("Manual closure")); | ||
delayedClose(closure$, 100, new Error("Another closure")); | ||
setTimeout(() => { | ||
closure$.next(new Error("Manual closure")); | ||
}); | ||
}); | ||
test("should gracefully shutdown on close$ while messages are exchanged", async t => { | ||
@@ -294,4 +331,6 @@ t.plan(2); | ||
// Establish connection and send messages | ||
connect$(p2, "channelId").subscribe(stream => { | ||
stream.write(b4a.from("test message")); | ||
connect$(p2, "channelId").subscribe({ | ||
next: stream => { | ||
stream.write(b4a.from("test message")); | ||
} | ||
}); | ||
@@ -298,0 +337,0 @@ |
50564
1070
2
+ Addedduplex-through-with-error-handling@1.0.11(transitive)
- Removedduplex-through@^1.0.2