You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

@libp2p/mplex

Package Overview
Dependencies
Maintainers
4
Versions
855
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@libp2p/mplex - npm Package Compare versions

Comparing version
1.0.1
to
1.0.2
+63
dist/src/mplex.d.ts
import { Pushable } from 'it-pushable';
import { Message } from './message-types.js';
import type { Components } from '@libp2p/interfaces/components';
import type { Sink } from 'it-stream-types';
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer';
import type { Stream } from '@libp2p/interfaces/connection';
export interface MplexStream extends Stream {
source: Pushable<Uint8Array>;
}
export interface MplexInit extends StreamMuxerInit {
maxMsgSize?: number;
}
export declare class MplexStreamMuxer implements StreamMuxer {
protocol: string;
sink: Sink<Uint8Array>;
source: AsyncIterable<Uint8Array>;
private _streamId;
private readonly _streams;
private readonly _init;
private readonly _source;
constructor(components: Components, init?: MplexInit);
init(components: Components): void;
/**
* Returns a Map of streams and their ids
*/
get streams(): Stream[];
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream(name?: string): Stream;
/**
* Called whenever an inbound stream is created
*/
_newReceiverStream(options: {
id: number;
name: string;
}): MplexStream;
_newStream(options: {
id: number;
name: string;
type: 'initiator' | 'receiver';
registry: Map<number, MplexStream>;
}): MplexStream;
/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink(): Sink<Uint8Array, Promise<void>>;
/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource(): AsyncGenerator<Uint8Array, void, undefined> & {
push: (value: Message) => import("it-pushable").PushableV<Message>;
end: (err?: Error | undefined) => import("it-pushable").PushableV<Message>;
return: () => {
done: boolean;
};
};
_handleIncoming(message: Message): void;
}
//# sourceMappingURL=mplex.d.ts.map
{"version":3,"file":"mplex.d.ts","sourceRoot":"","sources":["../../src/mplex.ts"],"names":[],"mappings":"AACA,OAAO,EAAE,QAAQ,EAAa,MAAM,aAAa,CAAA;AAKjD,OAAO,EAAkC,OAAO,EAAE,MAAM,oBAAoB,CAAA;AAK5E,OAAO,KAAK,EAAE,UAAU,EAAE,MAAM,+BAA+B,CAAA;AAC/D,OAAO,KAAK,EAAE,IAAI,EAAE,MAAM,iBAAiB,CAAA;AAC3C,OAAO,KAAK,EAAE,WAAW,EAAE,eAAe,EAAE,MAAM,iCAAiC,CAAA;AACnF,OAAO,KAAK,EAAE,MAAM,EAAE,MAAM,+BAA+B,CAAA;AAqB3D,MAAM,WAAW,WAAY,SAAQ,MAAM;IACzC,MAAM,EAAE,QAAQ,CAAC,UAAU,CAAC,CAAA;CAC7B;AAED,MAAM,WAAW,SAAU,SAAQ,eAAe;IAChD,UAAU,CAAC,EAAE,MAAM,CAAA;CACpB;AAED,qBAAa,gBAAiB,YAAW,WAAW;IAC3C,QAAQ,SAAiB;IAEzB,IAAI,EAAE,IAAI,CAAC,UAAU,CAAC,CAAA;IACtB,MAAM,EAAE,aAAa,CAAC,UAAU,CAAC,CAAA;IAExC,OAAO,CAAC,SAAS,CAAQ;IACzB,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAA+E;IACxG,OAAO,CAAC,QAAQ,CAAC,KAAK,CAAW;IACjC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAA8D;gBAEzE,UAAU,EAAE,UAAU,EAAE,IAAI,CAAC,EAAE,SAAS;IA6BrD,IAAI,CAAE,UAAU,EAAE,UAAU;IAI5B;;OAEG;IACH,IAAI,OAAO,aAUV;IAED;;;OAGG;IACH,SAAS,CAAE,IAAI,CAAC,EAAE,MAAM,GAAG,MAAM;IAOjC;;OAEG;IACH,kBAAkB,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAA;KAAE;IAMzD,UAAU,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,WAAW,GAAG,UAAU,CAAC;QAAC,QAAQ,EAAE,GAAG,CAAC,MAAM,EAAE,WAAW,CAAC,CAAA;KAAE;IAmCrH;;;OAGG;IACH,WAAW;IA4BX;;;OAGG;IACH,aAAa;;;;;;;IAoBb,eAAe,CAAE,OAAO,EAAE,OAAO;CA0ClC"}
import { pipe } from 'it-pipe';
import { pushableV } from 'it-pushable';
import { abortableSource } from 'abortable-iterator';
import { encode } from './encode.js';
import { decode } from './decode.js';
import { restrictSize } from './restrict-size.js';
import { MessageTypes, MessageTypeNames } from './message-types.js';
import { createStream } from './stream.js';
import { toString as uint8ArrayToString } from 'uint8arrays';
import { trackedMap } from '@libp2p/tracked-map';
import { logger } from '@libp2p/logger';
const log = logger('libp2p:mplex');
function printMessage(msg) {
const output = {
...msg,
type: `${MessageTypeNames[msg.type]} (${msg.type})`
};
if (msg.type === MessageTypes.NEW_STREAM) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice());
}
if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16');
}
return output;
}
export class MplexStreamMuxer {
constructor(components, init) {
this.protocol = '/mplex/6.7.0';
init = init ?? {};
this._streamId = 0;
this._streams = {
/**
* Stream to ids map
*/
initiators: trackedMap({ metrics: components.getMetrics(), component: 'mplex', metric: 'initiatorStreams' }),
/**
* Stream to ids map
*/
receivers: trackedMap({ metrics: components.getMetrics(), component: 'mplex', metric: 'receiverStreams' })
};
this._init = init;
/**
* An iterable sink
*/
this.sink = this._createSink();
/**
* An iterable source
*/
const source = this._createSource();
this._source = source;
this.source = source;
}
init(components) {
}
/**
* Returns a Map of streams and their ids
*/
get streams() {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
const streams = [];
this._streams.initiators.forEach(stream => {
streams.push(stream);
});
this._streams.receivers.forEach(stream => {
streams.push(stream);
});
return streams;
}
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream(name) {
const id = this._streamId++;
name = name == null ? id.toString() : name.toString();
const registry = this._streams.initiators;
return this._newStream({ id, name, type: 'initiator', registry });
}
/**
* Called whenever an inbound stream is created
*/
_newReceiverStream(options) {
const { id, name } = options;
const registry = this._streams.receivers;
return this._newStream({ id, name, type: 'receiver', registry });
}
_newStream(options) {
const { id, name, type, registry } = options;
log('new %s stream %s %s', type, id, name);
if (registry.has(id)) {
throw new Error(`${type} stream ${id} already exists!`);
}
const send = (msg) => {
if (log.enabled) {
log.trace('%s stream %s send', type, id, printMessage(msg));
}
if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice();
}
this._source.push(msg);
};
const onEnd = () => {
log('%s stream %s %s ended', type, id, name);
registry.delete(id);
if (this._init.onStreamEnd != null) {
this._init.onStreamEnd(stream);
}
};
const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._init.maxMsgSize });
registry.set(id, stream);
return stream;
}
/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink() {
const sink = async (source) => {
if (this._init.signal != null) {
source = abortableSource(source, this._init.signal);
}
try {
await pipe(source, decode, restrictSize(this._init.maxMsgSize), async (source) => {
for await (const msg of source) {
this._handleIncoming(msg);
}
});
this._source.end();
}
catch (err) {
log('error in sink', err);
this._source.end(err); // End the source with an error
}
};
return sink;
}
/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource() {
const onEnd = (err) => {
const { initiators, receivers } = this._streams;
// Abort all the things!
for (const s of initiators.values()) {
s.abort(err);
}
for (const s of receivers.values()) {
s.abort(err);
}
};
const source = pushableV({ onEnd });
return Object.assign(encode(source), {
push: source.push,
end: source.end,
return: source.return
});
}
_handleIncoming(message) {
const { id, type } = message;
if (log.enabled) {
log.trace('incoming message', printMessage(message));
}
// Create a new stream?
if (message.type === MessageTypes.NEW_STREAM) {
const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) });
if (this._init.onIncomingStream != null) {
this._init.onIncomingStream(stream);
}
return;
}
const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers;
const stream = list.get(id);
if (stream == null) {
return log('missing stream %s', id);
}
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
stream.source.push(message.data.slice());
break;
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close();
break;
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
stream.reset();
break;
default:
log('unknown message type %s', type);
}
}
}
//# sourceMappingURL=mplex.js.map
{"version":3,"file":"mplex.js","sourceRoot":"","sources":["../../src/mplex.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,IAAI,EAAE,MAAM,SAAS,CAAA;AAC9B,OAAO,EAAY,SAAS,EAAE,MAAM,aAAa,CAAA;AACjD,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,YAAY,EAAE,gBAAgB,EAAW,MAAM,oBAAoB,CAAA;AAC5E,OAAO,EAAE,YAAY,EAAE,MAAM,aAAa,CAAA;AAC1C,OAAO,EAAE,QAAQ,IAAI,kBAAkB,EAAE,MAAM,aAAa,CAAA;AAC5D,OAAO,EAAE,UAAU,EAAE,MAAM,qBAAqB,CAAA;AAChD,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAMvC,MAAM,GAAG,GAAG,MAAM,CAAC,cAAc,CAAC,CAAA;AAElC,SAAS,YAAY,CAAE,GAAY;IACjC,MAAM,MAAM,GAAQ;QAClB,GAAG,GAAG;QACN,IAAI,EAAE,GAAG,gBAAgB,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,GAAG,CAAC,IAAI,GAAG;KACpD,CAAA;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;QACxC,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;KAC/F;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;QAC7F,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,EAAE,QAAQ,CAAC,CAAA;KACzG;IAED,OAAO,MAAM,CAAA;AACf,CAAC;AAUD,MAAM,OAAO,gBAAgB;IAW3B,YAAa,UAAsB,EAAE,IAAgB;QAV9C,aAAQ,GAAG,cAAc,CAAA;QAW9B,IAAI,GAAG,IAAI,IAAI,EAAE,CAAA;QAEjB,IAAI,CAAC,SAAS,GAAG,CAAC,CAAA;QAClB,IAAI,CAAC,QAAQ,GAAG;YACd;;eAEG;YACH,UAAU,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,UAAU,CAAC,UAAU,EAAE,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,kBAAkB,EAAE,CAAC;YACjI;;eAEG;YACH,SAAS,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,UAAU,CAAC,UAAU,EAAE,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,iBAAiB,EAAE,CAAC;SAChI,CAAA;QACD,IAAI,CAAC,KAAK,GAAG,IAAI,CAAA;QAEjB;;WAEG;QACH,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC,WAAW,EAAE,CAAA;QAE9B;;WAEG;QACH,MAAM,MAAM,GAAG,IAAI,CAAC,aAAa,EAAE,CAAA;QACnC,IAAI,CAAC,OAAO,GAAG,MAAM,CAAA;QACrB,IAAI,CAAC,MAAM,GAAG,MAAM,CAAA;IACtB,CAAC;IAED,IAAI,CAAE,UAAsB;IAE5B,CAAC;IAED;;OAEG;IACH,IAAI,OAAO;QACT,sFAAsF;QACtF,MAAM,OAAO,GAAa,EAAE,CAAA;QAC5B,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACxC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACvC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,OAAO,OAAO,CAAA;IAChB,CAAC;IAED;;;OAGG;IACH,SAAS,CAAE,IAAa;QACtB,MAAM,EAAE,GAAG,IAAI,CAAC,SAAS,EAAE,CAAA;QAC3B,IAAI,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,QAAQ,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,EAAE,CAAA;QACrD,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAA;QACzC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,WAAW,EAAE,QAAQ,EAAE,CAAC,CAAA;IACnE,CAAC;IAED;;OAEG;IACH,kBAAkB,CAAE,OAAqC;QACvD,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAC5B,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QACxC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,UAAU,EAAE,QAAQ,EAAE,CAAC,CAAA;IAClE,CAAC;IAED,UAAU,CAAE,OAAyG;QACnH,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,QAAQ,EAAE,GAAG,OAAO,CAAA;QAE5C,GAAG,CAAC,qBAAqB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;QAE1C,IAAI,QAAQ,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACpB,MAAM,IAAI,KAAK,CAAC,GAAG,IAAI,WAAW,EAAE,kBAAkB,CAAC,CAAA;SACxD;QAED,MAAM,IAAI,GAAG,CAAC,GAAY,EAAE,EAAE;YAC5B,IAAI,GAAG,CAAC,OAAO,EAAE;gBACf,GAAG,CAAC,KAAK,CAAC,mBAAmB,EAAE,IAAI,EAAE,EAAE,EAAE,YAAY,CAAC,GAAG,CAAC,CAAC,CAAA;aAC5D;YAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;gBACrI,GAAG,CAAC,IAAI,GAAG,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAA;aACxE;YAED,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,GAAG,CAAC,CAAA;QACxB,CAAC,CAAA;QAED,MAAM,KAAK,GAAG,GAAG,EAAE;YACjB,GAAG,CAAC,uBAAuB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;YAC5C,QAAQ,CAAC,MAAM,CAAC,EAAE,CAAC,CAAA;YAEnB,IAAI,IAAI,CAAC,KAAK,CAAC,WAAW,IAAI,IAAI,EAAE;gBAClC,IAAI,CAAC,KAAK,CAAC,WAAW,CAAC,MAAM,CAAC,CAAA;aAC/B;QACH,CAAC,CAAA;QAED,MAAM,MAAM,GAAG,YAAY,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,UAAU,EAAE,IAAI,CAAC,KAAK,CAAC,UAAU,EAAE,CAAC,CAAA;QAC/F,QAAQ,CAAC,GAAG,CAAC,EAAE,EAAE,MAAM,CAAC,CAAA;QACxB,OAAO,MAAM,CAAA;IACf,CAAC;IAED;;;OAGG;IACH,WAAW;QACT,MAAM,IAAI,GAAqB,KAAK,EAAC,MAAM,EAAC,EAAE;YAC5C,IAAI,IAAI,CAAC,KAAK,CAAC,MAAM,IAAI,IAAI,EAAE;gBAC7B,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,IAAI,CAAC,KAAK,CAAC,MAAM,CAAC,CAAA;aACpD;YAED,IAAI;gBACF,MAAM,IAAI,CACR,MAAM,EACN,MAAM,EACN,YAAY,CAAC,IAAI,CAAC,KAAK,CAAC,UAAU,CAAC,EACnC,KAAK,EAAC,MAAM,EAAC,EAAE;oBACb,IAAI,KAAK,EAAE,MAAM,GAAG,IAAI,MAAM,EAAE;wBAC9B,IAAI,CAAC,eAAe,CAAC,GAAG,CAAC,CAAA;qBAC1B;gBACH,CAAC,CACF,CAAA;gBAED,IAAI,CAAC,OAAO,CAAC,GAAG,EAAE,CAAA;aACnB;YAAC,OAAO,GAAQ,EAAE;gBACjB,GAAG,CAAC,eAAe,EAAE,GAAG,CAAC,CAAA;gBACzB,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA,CAAC,+BAA+B;aACtD;QACH,CAAC,CAAA;QAED,OAAO,IAAI,CAAA;IACb,CAAC;IAED;;;OAGG;IACH,aAAa;QACX,MAAM,KAAK,GAAG,CAAC,GAAW,EAAE,EAAE;YAC5B,MAAM,EAAE,UAAU,EAAE,SAAS,EAAE,GAAG,IAAI,CAAC,QAAQ,CAAA;YAC/C,wBAAwB;YACxB,KAAK,MAAM,CAAC,IAAI,UAAU,CAAC,MAAM,EAAE,EAAE;gBACnC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;YACD,KAAK,MAAM,CAAC,IAAI,SAAS,CAAC,MAAM,EAAE,EAAE;gBAClC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;QACH,CAAC,CAAA;QACD,MAAM,MAAM,GAAG,SAAS,CAAU,EAAE,KAAK,EAAE,CAAC,CAAA;QAE5C,OAAO,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,EAAE;YACnC,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,GAAG,EAAE,MAAM,CAAC,GAAG;YACf,MAAM,EAAE,MAAM,CAAC,MAAM;SACtB,CAAC,CAAA;IACJ,CAAC;IAED,eAAe,CAAE,OAAgB;QAC/B,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAE5B,IAAI,GAAG,CAAC,OAAO,EAAE;YACf,GAAG,CAAC,KAAK,CAAC,kBAAkB,EAAE,YAAY,CAAC,OAAO,CAAC,CAAC,CAAA;SACrD;QAED,uBAAuB;QACvB,IAAI,OAAO,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;YAC5C,MAAM,MAAM,GAAG,IAAI,CAAC,kBAAkB,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,kBAAkB,CAAC,OAAO,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,EAAE,CAAC,CAAA;YAElJ,IAAI,IAAI,CAAC,KAAK,CAAC,gBAAgB,IAAI,IAAI,EAAE;gBACvC,IAAI,CAAC,KAAK,CAAC,gBAAgB,CAAC,MAAM,CAAC,CAAA;aACpC;YAED,OAAM;SACP;QAED,MAAM,IAAI,GAAG,CAAC,IAAI,GAAG,CAAC,CAAC,KAAK,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QAClF,MAAM,MAAM,GAAG,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAA;QAE3B,IAAI,MAAM,IAAI,IAAI,EAAE;YAClB,OAAO,GAAG,CAAC,mBAAmB,EAAE,EAAE,CAAC,CAAA;SACpC;QAED,QAAQ,IAAI,EAAE;YACZ,KAAK,YAAY,CAAC,iBAAiB,CAAC;YACpC,KAAK,YAAY,CAAC,gBAAgB;gBAChC,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;gBACxC,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP;gBACE,GAAG,CAAC,yBAAyB,EAAE,IAAI,CAAC,CAAA;SACvC;IACH,CAAC;CACF"}
import { pipe } from 'it-pipe'
import { Pushable, pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { restrictSize } from './restrict-size.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
import { trackedMap } from '@libp2p/tracked-map'
import { logger } from '@libp2p/logger'
import type { Components } from '@libp2p/interfaces/components'
import type { Sink } from 'it-stream-types'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'
const log = logger('libp2p:mplex')
function printMessage (msg: Message) {
const output: any = {
...msg,
type: `${MessageTypeNames[msg.type]} (${msg.type})`
}
if (msg.type === MessageTypes.NEW_STREAM) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice())
}
if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16')
}
return output
}
export interface MplexStream extends Stream {
source: Pushable<Uint8Array>
}
export interface MplexInit extends StreamMuxerInit {
maxMsgSize?: number
}
export class MplexStreamMuxer implements StreamMuxer {
public protocol = '/mplex/6.7.0'
public sink: Sink<Uint8Array>
public source: AsyncIterable<Uint8Array>
private _streamId: number
private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> }
private readonly _init: MplexInit
private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void }
constructor (components: Components, init?: MplexInit) {
init = init ?? {}
this._streamId = 0
this._streams = {
/**
* Stream to ids map
*/
initiators: trackedMap<number, MplexStream>({ metrics: components.getMetrics(), component: 'mplex', metric: 'initiatorStreams' }),
/**
* Stream to ids map
*/
receivers: trackedMap<number, MplexStream>({ metrics: components.getMetrics(), component: 'mplex', metric: 'receiverStreams' })
}
this._init = init
/**
* An iterable sink
*/
this.sink = this._createSink()
/**
* An iterable source
*/
const source = this._createSource()
this._source = source
this.source = source
}
init (components: Components) {
}
/**
* Returns a Map of streams and their ids
*/
get streams () {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
const streams: Stream[] = []
this._streams.initiators.forEach(stream => {
streams.push(stream)
})
this._streams.receivers.forEach(stream => {
streams.push(stream)
})
return streams
}
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream (name?: string): Stream {
const id = this._streamId++
name = name == null ? id.toString() : name.toString()
const registry = this._streams.initiators
return this._newStream({ id, name, type: 'initiator', registry })
}
/**
* Called whenever an inbound stream is created
*/
_newReceiverStream (options: { id: number, name: string }) {
const { id, name } = options
const registry = this._streams.receivers
return this._newStream({ id, name, type: 'receiver', registry })
}
_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
const { id, name, type, registry } = options
log('new %s stream %s %s', type, id, name)
if (registry.has(id)) {
throw new Error(`${type} stream ${id} already exists!`)
}
const send = (msg: Message) => {
if (log.enabled) {
log.trace('%s stream %s send', type, id, printMessage(msg))
}
if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice()
}
this._source.push(msg)
}
const onEnd = () => {
log('%s stream %s %s ended', type, id, name)
registry.delete(id)
if (this._init.onStreamEnd != null) {
this._init.onStreamEnd(stream)
}
}
const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._init.maxMsgSize })
registry.set(id, stream)
return stream
}
/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink () {
const sink: Sink<Uint8Array> = async source => {
if (this._init.signal != null) {
source = abortableSource(source, this._init.signal)
}
try {
await pipe(
source,
decode,
restrictSize(this._init.maxMsgSize),
async source => {
for await (const msg of source) {
this._handleIncoming(msg)
}
}
)
this._source.end()
} catch (err: any) {
log('error in sink', err)
this._source.end(err) // End the source with an error
}
}
return sink
}
/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource () {
const onEnd = (err?: Error) => {
const { initiators, receivers } = this._streams
// Abort all the things!
for (const s of initiators.values()) {
s.abort(err)
}
for (const s of receivers.values()) {
s.abort(err)
}
}
const source = pushableV<Message>({ onEnd })
return Object.assign(encode(source), {
push: source.push,
end: source.end,
return: source.return
})
}
_handleIncoming (message: Message) {
const { id, type } = message
if (log.enabled) {
log.trace('incoming message', printMessage(message))
}
// Create a new stream?
if (message.type === MessageTypes.NEW_STREAM) {
const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) })
if (this._init.onIncomingStream != null) {
this._init.onIncomingStream(stream)
}
return
}
const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers
const stream = list.get(id)
if (stream == null) {
return log('missing stream %s', id)
}
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
stream.source.push(message.data.slice())
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
stream.reset()
break
default:
log('unknown message type %s', type)
}
}
}
+7
-59

@@ -1,63 +0,11 @@

import { Pushable } from 'it-pushable';
import { Message } from './message-types.js';
import type { Sink } from 'it-stream-types';
import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer';
import type { Stream } from '@libp2p/interfaces/connection';
import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics';
export interface MplexStream extends Stream {
source: Pushable<Uint8Array>;
}
export interface MplexOptions extends MuxerOptions {
import type { Components } from '@libp2p/interfaces/components';
import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer';
import { MplexStreamMuxer } from './mplex.js';
export interface MplexInit extends StreamMuxerInit {
maxMsgSize?: number;
metrics?: ComponentMetricsTracker;
}
export declare class Mplex implements Muxer {
static multicodec: string;
sink: Sink<Uint8Array>;
source: AsyncIterable<Uint8Array>;
private _streamId;
private readonly _streams;
private readonly _options;
private readonly _source;
constructor(options?: MplexOptions);
/**
* Returns a Map of streams and their ids
*/
get streams(): Stream[];
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream(name?: string): Stream;
/**
* Called whenever an inbound stream is created
*/
_newReceiverStream(options: {
id: number;
name: string;
}): MplexStream;
_newStream(options: {
id: number;
name: string;
type: 'initiator' | 'receiver';
registry: Map<number, MplexStream>;
}): MplexStream;
/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink(): Sink<Uint8Array, Promise<void>>;
/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource(): AsyncGenerator<Uint8Array, void, undefined> & {
push: (value: Message) => import("it-pushable").PushableV<Message>;
end: (err?: Error | undefined) => import("it-pushable").PushableV<Message>;
return: () => {
done: boolean;
};
};
_handleIncoming(message: Message): void;
export declare class Mplex implements StreamMuxerFactory {
protocol: string;
createStreamMuxer(components: Components, init?: MplexInit): MplexStreamMuxer;
}
//# sourceMappingURL=index.d.ts.map

@@ -1,1 +0,1 @@

{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AACA,OAAO,EAAE,QAAQ,EAAa,MAAM,aAAa,CAAA;AAKjD,OAAO,EAAkC,OAAO,EAAE,MAAM,oBAAoB,CAAA;AAK5E,OAAO,KAAK,EAAE,IAAI,EAAE,MAAM,iBAAiB,CAAA;AAC3C,OAAO,KAAK,EAAE,KAAK,EAAE,YAAY,EAAE,MAAM,iCAAiC,CAAA;AAC1E,OAAO,KAAK,EAAE,MAAM,EAAE,MAAM,+BAA+B,CAAA;AAC3D,OAAO,KAAK,EAAE,uBAAuB,EAAE,MAAM,4BAA4B,CAAA;AAsBzE,MAAM,WAAW,WAAY,SAAQ,MAAM;IACzC,MAAM,EAAE,QAAQ,CAAC,UAAU,CAAC,CAAA;CAC7B;AAED,MAAM,WAAW,YAAa,SAAQ,YAAY;IAChD,UAAU,CAAC,EAAE,MAAM,CAAA;IACnB,OAAO,CAAC,EAAE,uBAAuB,CAAA;CAClC;AAED,qBAAa,KAAM,YAAW,KAAK;IACjC,MAAM,CAAC,UAAU,SAAiB;IAE3B,IAAI,EAAE,IAAI,CAAC,UAAU,CAAC,CAAA;IACtB,MAAM,EAAE,aAAa,CAAC,UAAU,CAAC,CAAA;IAExC,OAAO,CAAC,SAAS,CAAQ;IACzB,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAA+E;IACxG,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAc;IACvC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAA8D;gBAEzE,OAAO,CAAC,EAAE,YAAY;IA6BnC;;OAEG;IACH,IAAI,OAAO,aAUV;IAED;;;OAGG;IACH,SAAS,CAAE,IAAI,CAAC,EAAE,MAAM,GAAG,MAAM;IAOjC;;OAEG;IACH,kBAAkB,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAA;KAAE;IAMzD,UAAU,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,WAAW,GAAG,UAAU,CAAC;QAAC,QAAQ,EAAE,GAAG,CAAC,MAAM,EAAE,WAAW,CAAC,CAAA;KAAE;IAmCrH;;;OAGG;IACH,WAAW;IA+BX;;;OAGG;IACH,aAAa;;;;;;;IAoBb,eAAe,CAAE,OAAO,EAAE,OAAO;CA0ClC"}
{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,UAAU,EAAE,MAAM,+BAA+B,CAAA;AAC/D,OAAO,KAAK,EAAE,kBAAkB,EAAE,eAAe,EAAE,MAAM,iCAAiC,CAAA;AAC1F,OAAO,EAAE,gBAAgB,EAAE,MAAM,YAAY,CAAA;AAE7C,MAAM,WAAW,SAAU,SAAQ,eAAe;IAChD,UAAU,CAAC,EAAE,MAAM,CAAA;CACpB;AAED,qBAAa,KAAM,YAAW,kBAAkB;IACvC,QAAQ,SAAiB;IAEhC,iBAAiB,CAAE,UAAU,EAAE,UAAU,EAAE,IAAI,CAAC,EAAE,SAAS;CAG5D"}

@@ -1,196 +0,10 @@

import { pipe } from 'it-pipe';
import { pushableV } from 'it-pushable';
import { abortableSource } from 'abortable-iterator';
import { encode } from './encode.js';
import { decode } from './decode.js';
import { restrictSize } from './restrict-size.js';
import { MessageTypes, MessageTypeNames } from './message-types.js';
import { createStream } from './stream.js';
import { toString as uint8ArrayToString } from 'uint8arrays';
import { trackedMap } from '@libp2p/tracked-map';
import { logger } from '@libp2p/logger';
import each from 'it-foreach';
const log = logger('libp2p:mplex');
function printMessage(msg) {
const output = {
...msg,
type: `${MessageTypeNames[msg.type]} (${msg.type})`
};
if (msg.type === MessageTypes.NEW_STREAM) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice());
}
if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16');
}
return output;
}
import { MplexStreamMuxer } from './mplex.js';
export class Mplex {
constructor(options) {
options = options ?? {};
this._streamId = 0;
this._streams = {
/**
* Stream to ids map
*/
initiators: trackedMap({ metrics: options.metrics, component: 'mplex', metric: 'initiatorStreams' }),
/**
* Stream to ids map
*/
receivers: trackedMap({ metrics: options.metrics, component: 'mplex', metric: 'receiverStreams' })
};
this._options = options;
/**
* An iterable sink
*/
this.sink = this._createSink();
/**
* An iterable source
*/
const source = this._createSource();
this._source = source;
this.source = source;
constructor() {
this.protocol = '/mplex/6.7.0';
}
/**
* Returns a Map of streams and their ids
*/
get streams() {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
const streams = [];
this._streams.initiators.forEach(stream => {
streams.push(stream);
});
this._streams.receivers.forEach(stream => {
streams.push(stream);
});
return streams;
createStreamMuxer(components, init) {
return new MplexStreamMuxer(components, init);
}
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream(name) {
const id = this._streamId++;
name = name == null ? id.toString() : name.toString();
const registry = this._streams.initiators;
return this._newStream({ id, name, type: 'initiator', registry });
}
/**
* Called whenever an inbound stream is created
*/
_newReceiverStream(options) {
const { id, name } = options;
const registry = this._streams.receivers;
return this._newStream({ id, name, type: 'receiver', registry });
}
_newStream(options) {
const { id, name, type, registry } = options;
log('new %s stream %s %s', type, id, name);
if (registry.has(id)) {
throw new Error(`${type} stream ${id} already exists!`);
}
const send = (msg) => {
if (log.enabled) {
log('%s stream %s send', type, id, printMessage(msg));
}
if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice();
}
this._source.push(msg);
};
const onEnd = () => {
log('%s stream %s %s ended', type, id, name);
registry.delete(id);
if (this._options.onStreamEnd != null) {
this._options.onStreamEnd(stream);
}
};
const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize });
registry.set(id, stream);
return stream;
}
/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink() {
const sink = async (source) => {
if (this._options.signal != null) {
source = abortableSource(source, this._options.signal);
}
try {
await pipe(source, source => each(source, (buf) => {
// console.info('incoming', uint8ArrayToString(buf, 'base64'))
}), decode, restrictSize(this._options.maxMsgSize), async (source) => {
for await (const msg of source) {
this._handleIncoming(msg);
}
});
this._source.end();
}
catch (err) {
log('error in sink', err);
this._source.end(err); // End the source with an error
}
};
return sink;
}
/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource() {
const onEnd = (err) => {
const { initiators, receivers } = this._streams;
// Abort all the things!
for (const s of initiators.values()) {
s.abort(err);
}
for (const s of receivers.values()) {
s.abort(err);
}
};
const source = pushableV({ onEnd });
return Object.assign(encode(source), {
push: source.push,
end: source.end,
return: source.return
});
}
_handleIncoming(message) {
const { id, type } = message;
if (log.enabled) {
log('incoming message', printMessage(message));
}
// Create a new stream?
if (message.type === MessageTypes.NEW_STREAM) {
const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) });
if (this._options.onIncomingStream != null) {
this._options.onIncomingStream(stream);
}
return;
}
const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers;
const stream = list.get(id);
if (stream == null) {
return log('missing stream %s', id);
}
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
stream.source.push(message.data.slice());
break;
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close();
break;
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
stream.reset();
break;
default:
log('unknown message type %s', type);
}
}
}
Mplex.multicodec = '/mplex/6.7.0';
//# sourceMappingURL=index.js.map

@@ -1,1 +0,1 @@

{"version":3,"file":"index.js","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,IAAI,EAAE,MAAM,SAAS,CAAA;AAC9B,OAAO,EAAY,SAAS,EAAE,MAAM,aAAa,CAAA;AACjD,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,YAAY,EAAE,gBAAgB,EAAW,MAAM,oBAAoB,CAAA;AAC5E,OAAO,EAAE,YAAY,EAAE,MAAM,aAAa,CAAA;AAC1C,OAAO,EAAE,QAAQ,IAAI,kBAAkB,EAAE,MAAM,aAAa,CAAA;AAC5D,OAAO,EAAE,UAAU,EAAE,MAAM,qBAAqB,CAAA;AAChD,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAKvC,OAAO,IAAI,MAAM,YAAY,CAAA;AAE7B,MAAM,GAAG,GAAG,MAAM,CAAC,cAAc,CAAC,CAAA;AAElC,SAAS,YAAY,CAAE,GAAY;IACjC,MAAM,MAAM,GAAQ;QAClB,GAAG,GAAG;QACN,IAAI,EAAE,GAAG,gBAAgB,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,GAAG,CAAC,IAAI,GAAG;KACpD,CAAA;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;QACxC,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;KAC/F;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;QAC7F,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,EAAE,QAAQ,CAAC,CAAA;KACzG;IAED,OAAO,MAAM,CAAA;AACf,CAAC;AAWD,MAAM,OAAO,KAAK;IAWhB,YAAa,OAAsB;QACjC,OAAO,GAAG,OAAO,IAAI,EAAE,CAAA;QAEvB,IAAI,CAAC,SAAS,GAAG,CAAC,CAAA;QAClB,IAAI,CAAC,QAAQ,GAAG;YACd;;eAEG;YACH,UAAU,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,OAAO,CAAC,OAAO,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,kBAAkB,EAAE,CAAC;YACzH;;eAEG;YACH,SAAS,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,OAAO,CAAC,OAAO,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,iBAAiB,EAAE,CAAC;SACxH,CAAA;QACD,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAA;QAEvB;;WAEG;QACH,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC,WAAW,EAAE,CAAA;QAE9B;;WAEG;QACH,MAAM,MAAM,GAAG,IAAI,CAAC,aAAa,EAAE,CAAA;QACnC,IAAI,CAAC,OAAO,GAAG,MAAM,CAAA;QACrB,IAAI,CAAC,MAAM,GAAG,MAAM,CAAA;IACtB,CAAC;IAED;;OAEG;IACH,IAAI,OAAO;QACT,sFAAsF;QACtF,MAAM,OAAO,GAAa,EAAE,CAAA;QAC5B,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACxC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACvC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,OAAO,OAAO,CAAA;IAChB,CAAC;IAED;;;OAGG;IACH,SAAS,CAAE,IAAa;QACtB,MAAM,EAAE,GAAG,IAAI,CAAC,SAAS,EAAE,CAAA;QAC3B,IAAI,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,QAAQ,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,EAAE,CAAA;QACrD,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAA;QACzC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,WAAW,EAAE,QAAQ,EAAE,CAAC,CAAA;IACnE,CAAC;IAED;;OAEG;IACH,kBAAkB,CAAE,OAAqC;QACvD,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAC5B,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QACxC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,UAAU,EAAE,QAAQ,EAAE,CAAC,CAAA;IAClE,CAAC;IAED,UAAU,CAAE,OAAyG;QACnH,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,QAAQ,EAAE,GAAG,OAAO,CAAA;QAE5C,GAAG,CAAC,qBAAqB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;QAE1C,IAAI,QAAQ,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACpB,MAAM,IAAI,KAAK,CAAC,GAAG,IAAI,WAAW,EAAE,kBAAkB,CAAC,CAAA;SACxD;QAED,MAAM,IAAI,GAAG,CAAC,GAAY,EAAE,EAAE;YAC5B,IAAI,GAAG,CAAC,OAAO,EAAE;gBACf,GAAG,CAAC,mBAAmB,EAAE,IAAI,EAAE,EAAE,EAAE,YAAY,CAAC,GAAG,CAAC,CAAC,CAAA;aACtD;YAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;gBACrI,GAAG,CAAC,IAAI,GAAG,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAA;aACxE;YAED,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,GAAG,CAAC,CAAA;QACxB,CAAC,CAAA;QAED,MAAM,KAAK,GAAG,GAAG,EAAE;YACjB,GAAG,CAAC,uBAAuB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;YAC5C,QAAQ,CAAC,MAAM,CAAC,EAAE,CAAC,CAAA;YAEnB,IAAI,IAAI,CAAC,QAAQ,CAAC,WAAW,IAAI,IAAI,EAAE;gBACrC,IAAI,CAAC,QAAQ,CAAC,WAAW,CAAC,MAAM,CAAC,CAAA;aAClC;QACH,CAAC,CAAA;QAED,MAAM,MAAM,GAAG,YAAY,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,UAAU,EAAE,IAAI,CAAC,QAAQ,CAAC,UAAU,EAAE,CAAC,CAAA;QAClG,QAAQ,CAAC,GAAG,CAAC,EAAE,EAAE,MAAM,CAAC,CAAA;QACxB,OAAO,MAAM,CAAA;IACf,CAAC;IAED;;;OAGG;IACH,WAAW;QACT,MAAM,IAAI,GAAqB,KAAK,EAAC,MAAM,EAAC,EAAE;YAC5C,IAAI,IAAI,CAAC,QAAQ,CAAC,MAAM,IAAI,IAAI,EAAE;gBAChC,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,IAAI,CAAC,QAAQ,CAAC,MAAM,CAAC,CAAA;aACvD;YAED,IAAI;gBACF,MAAM,IAAI,CACR,MAAM,EACN,MAAM,CAAC,EAAE,CAAC,IAAI,CAAC,MAAM,EAAE,CAAC,GAAG,EAAE,EAAE;oBAC7B,8DAA8D;gBAChE,CAAC,CAAC,EACF,MAAM,EACN,YAAY,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,EACtC,KAAK,EAAC,MAAM,EAAC,EAAE;oBACb,IAAI,KAAK,EAAE,MAAM,GAAG,IAAI,MAAM,EAAE;wBAC9B,IAAI,CAAC,eAAe,CAAC,GAAG,CAAC,CAAA;qBAC1B;gBACH,CAAC,CACF,CAAA;gBAED,IAAI,CAAC,OAAO,CAAC,GAAG,EAAE,CAAA;aACnB;YAAC,OAAO,GAAQ,EAAE;gBACjB,GAAG,CAAC,eAAe,EAAE,GAAG,CAAC,CAAA;gBACzB,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA,CAAC,+BAA+B;aACtD;QACH,CAAC,CAAA;QAED,OAAO,IAAI,CAAA;IACb,CAAC;IAED;;;OAGG;IACH,aAAa;QACX,MAAM,KAAK,GAAG,CAAC,GAAW,EAAE,EAAE;YAC5B,MAAM,EAAE,UAAU,EAAE,SAAS,EAAE,GAAG,IAAI,CAAC,QAAQ,CAAA;YAC/C,wBAAwB;YACxB,KAAK,MAAM,CAAC,IAAI,UAAU,CAAC,MAAM,EAAE,EAAE;gBACnC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;YACD,KAAK,MAAM,CAAC,IAAI,SAAS,CAAC,MAAM,EAAE,EAAE;gBAClC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;QACH,CAAC,CAAA;QACD,MAAM,MAAM,GAAG,SAAS,CAAU,EAAE,KAAK,EAAE,CAAC,CAAA;QAE5C,OAAO,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,EAAE;YACnC,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,GAAG,EAAE,MAAM,CAAC,GAAG;YACf,MAAM,EAAE,MAAM,CAAC,MAAM;SACtB,CAAC,CAAA;IACJ,CAAC;IAED,eAAe,CAAE,OAAgB;QAC/B,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAE5B,IAAI,GAAG,CAAC,OAAO,EAAE;YACf,GAAG,CAAC,kBAAkB,EAAE,YAAY,CAAC,OAAO,CAAC,CAAC,CAAA;SAC/C;QAED,uBAAuB;QACvB,IAAI,OAAO,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;YAC5C,MAAM,MAAM,GAAG,IAAI,CAAC,kBAAkB,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,kBAAkB,CAAC,OAAO,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,EAAE,CAAC,CAAA;YAElJ,IAAI,IAAI,CAAC,QAAQ,CAAC,gBAAgB,IAAI,IAAI,EAAE;gBAC1C,IAAI,CAAC,QAAQ,CAAC,gBAAgB,CAAC,MAAM,CAAC,CAAA;aACvC;YAED,OAAM;SACP;QAED,MAAM,IAAI,GAAG,CAAC,IAAI,GAAG,CAAC,CAAC,KAAK,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QAClF,MAAM,MAAM,GAAG,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAA;QAE3B,IAAI,MAAM,IAAI,IAAI,EAAE;YAClB,OAAO,GAAG,CAAC,mBAAmB,EAAE,EAAE,CAAC,CAAA;SACpC;QAED,QAAQ,IAAI,EAAE;YACZ,KAAK,YAAY,CAAC,iBAAiB,CAAC;YACpC,KAAK,YAAY,CAAC,gBAAgB;gBAChC,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;gBACxC,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP;gBACE,GAAG,CAAC,yBAAyB,EAAE,IAAI,CAAC,CAAA;SACvC;IACH,CAAC;;AAjNM,gBAAU,GAAG,cAAc,CAAA"}
{"version":3,"file":"index.js","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAEA,OAAO,EAAE,gBAAgB,EAAE,MAAM,YAAY,CAAA;AAM7C,MAAM,OAAO,KAAK;IAAlB;QACS,aAAQ,GAAG,cAAc,CAAA;IAKlC,CAAC;IAHC,iBAAiB,CAAE,UAAsB,EAAE,IAAgB;QACzD,OAAO,IAAI,gBAAgB,CAAC,UAAU,EAAE,IAAI,CAAC,CAAA;IAC/C,CAAC;CACF"}
import type { Message } from './message-types.js';
import type { MplexStream } from './index.js';
import type { MplexStream } from './mplex.js';
export interface Options {

@@ -4,0 +4,0 @@ id: number;

@@ -31,3 +31,3 @@ import { abortableSource } from 'abortable-iterator';

sourceEnded = true;
log('%s stream %s source end', type, streamName, err);
log.trace('%s stream %s source end', type, streamName, err);
if (err != null && endErr == null) {

@@ -48,3 +48,3 @@ endErr = err;

sinkEnded = true;
log('%s stream %s sink end - err: %o', type, streamName, err);
log.trace('%s stream %s sink end - err: %o', type, streamName, err);
if (err != null && endErr == null) {

@@ -67,3 +67,3 @@ endErr = err;

abort: (err) => {
log('%s stream %s abort', type, streamName, err);
log.trace('%s stream %s abort', type, streamName, err);
// End the source with the passed error

@@ -118,6 +118,6 @@ stream.source.end(err);

if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name);
log.trace('%s stream %s reset', type, name);
}
else {
log('%s stream %s error', type, name, err);
log.trace('%s stream %s error', type, name, err);
try {

@@ -127,3 +127,3 @@ send({ id, type: Types.RESET });

catch (err) {
log('%s stream %s error sending reset', type, name, err);
log.trace('%s stream %s error sending reset', type, name, err);
}

@@ -139,3 +139,3 @@ }

catch (err) {
log('%s stream %s error sending close', type, name, err);
log.trace('%s stream %s error sending close', type, name, err);
}

@@ -142,0 +142,0 @@ onSinkEnd();

@@ -1,1 +0,1 @@

{"version":3,"file":"stream.js","sourceRoot":"","sources":["../../src/stream.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,QAAQ,EAAE,MAAM,aAAa,CAAA;AACtC,OAAO,OAAO,MAAM,UAAU,CAAA;AAC9B,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,SAAS,EAAE,MAAM,YAAY,CAAA;AACtC,OAAO,EAAE,qBAAqB,EAAE,oBAAoB,EAAE,MAAM,oBAAoB,CAAA;AAChF,OAAO,EAAE,UAAU,IAAI,oBAAoB,EAAE,MAAM,yBAAyB,CAAA;AAC5E,OAAO,EAAE,cAAc,EAAE,MAAM,gBAAgB,CAAA;AAC/C,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAMvC,MAAM,GAAG,GAAG,MAAM,CAAC,qBAAqB,CAAC,CAAA;AAEzC,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AACvD,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AAWvD,MAAM,UAAU,YAAY,CAAE,OAAgB;IAC5C,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,IAAI,GAAG,WAAW,EAAE,UAAU,GAAG,YAAY,EAAE,GAAG,OAAO,CAAA;IAExF,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,KAAK,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,qBAAqB,CAAC,CAAC,CAAC,oBAAoB,CAAA;IACjF,MAAM,UAAU,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAA;IAC/D,MAAM,UAAU,GAAG,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,IAAI,EAAE,CAAA;IAEhD,IAAI,WAAW,GAAG,KAAK,CAAA;IACvB,IAAI,SAAS,GAAG,KAAK,CAAA;IACrB,IAAI,MAAyB,CAAA;IAE7B,MAAM,QAAQ,GAAa;QACzB,IAAI,EAAE,IAAI,CAAC,GAAG,EAAE;KACjB,CAAA;IAED,MAAM,WAAW,GAAG,CAAC,GAAW,EAAE,EAAE;QAClC,IAAI,WAAW,EAAE;YACf,OAAM;SACP;QAED,WAAW,GAAG,IAAI,CAAA;QAClB,GAAG,CAAC,yBAAyB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAErD,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,SAAS,EAAE;YACb,MAAM,CAAC,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAElC,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,SAAS,GAAG,CAAC,GAAW,EAAE,EAAE;QAChC,IAAI,SAAS,EAAE;YACb,OAAM;SACP;QAED,SAAS,GAAG,IAAI,CAAA;QAChB,GAAG,CAAC,iCAAiC,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAE7D,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,WAAW,EAAE;YACf,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAE3B,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,MAAM,GAAG;QACb,oBAAoB;QACpB,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,CAAC,MAAM,CAAC,GAAG,EAAE,CAAA;QACrB,CAAC;QACD,8CAA8C;QAC9C,KAAK,EAAE,CAAC,GAAW,EAAE,EAAE;YACrB,GAAG,CAAC,oBAAoB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;YAChD,uCAAuC;YACvC,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,2DAA2D;QAC3D,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,GAAG,GAAG,OAAO,CAAC,IAAI,KAAK,CAAC,cAAc,CAAC,EAAE,sBAAsB,CAAC,CAAA;YACtE,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,IAAI,EAAE,KAAK,EAAE,MAA0B,EAAE,EAAE;YACzC,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,SAAS,CAAC;gBACzC,eAAe,CAAC,MAAM;gBACtB,eAAe,CAAC,MAAM;aACvB,CAAC,CAAC,CAAA;YAEH,IAAI;gBACF,IAAI,IAAI,KAAK,WAAW,EAAE,EAAE,kCAAkC;oBAC5D,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,qBAAqB,CAAC,UAAU,EAAE,IAAI,EAAE,oBAAoB,CAAC,UAAU,CAAC,EAAE,CAAC,CAAA;iBAC7F;gBAED,MAAM,cAAc,GAAG,IAAI,cAAc,EAAE,CAAA;gBAE3C,IAAI,KAAK,EAAE,MAAM,IAAI,IAAI,MAAM,EAAE;oBAC/B,cAAc,CAAC,MAAM,CAAC,IAAI,CAAC,CAAA;oBAE3B,OAAO,cAAc,CAAC,MAAM,KAAK,CAAC,EAAE;wBAClC,IAAI,cAAc,CAAC,MAAM,IAAI,UAAU,EAAE;4BACvC,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,EAAE,EAAE,CAAC,CAAA;4BAClE,cAAc,CAAC,OAAO,CAAC,cAAc,CAAC,MAAM,CAAC,CAAA;4BAC7C,MAAK;yBACN;wBAED,MAAM,MAAM,GAAG,cAAc,CAAC,MAAM,GAAG,UAAU,CAAA;wBACjD,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,CAAC,CAAC,EAAE,MAAM,CAAC,EAAE,CAAC,CAAA;wBAC3E,cAAc,CAAC,OAAO,CAAC,MAAM,CAAC,CAAA;qBAC/B;iBACF;aACF;YAAC,OAAO,GAAQ,EAAE;gBACjB,IAAI,GAAG,CAAC,IAAI,KAAK,SAAS,IAAI,GAAG,CAAC,OAAO,KAAK,2BAA2B,EAAE;oBACzE,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,cAAc,CAAA;wBAC5B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;oBAED,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,gBAAgB,CAAA;wBAC9B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;iBACF;gBAED,sDAAsD;gBACtD,IAAI,GAAG,CAAC,IAAI,KAAK,sBAAsB,EAAE;oBACvC,GAAG,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,CAAC,CAAA;iBACtC;qBAAM;oBACL,GAAG,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;oBAC1C,IAAI;wBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;qBAChC;oBAAC,OAAO,GAAG,EAAE;wBACZ,GAAG,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;qBACzD;iBACF;gBAED,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;gBACtB,SAAS,CAAC,GAAG,CAAC,CAAA;gBACd,OAAM;aACP;YAED,IAAI;gBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;aAChC;YAAC,OAAO,GAAG,EAAE;gBACZ,GAAG,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;aACzD;YAED,SAAS,EAAE,CAAA;QACb,CAAC;QACD,MAAM,EAAE,QAAQ,CAAa;YAC3B,KAAK,EAAE,WAAW;SACnB,CAAC;QACF,QAAQ;QACR,EAAE,EAAE,UAAU;KACf,CAAA;IAED,OAAO,MAAM,CAAA;AACf,CAAC"}
{"version":3,"file":"stream.js","sourceRoot":"","sources":["../../src/stream.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,QAAQ,EAAE,MAAM,aAAa,CAAA;AACtC,OAAO,OAAO,MAAM,UAAU,CAAA;AAC9B,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,SAAS,EAAE,MAAM,YAAY,CAAA;AACtC,OAAO,EAAE,qBAAqB,EAAE,oBAAoB,EAAE,MAAM,oBAAoB,CAAA;AAChF,OAAO,EAAE,UAAU,IAAI,oBAAoB,EAAE,MAAM,yBAAyB,CAAA;AAC5E,OAAO,EAAE,cAAc,EAAE,MAAM,gBAAgB,CAAA;AAC/C,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAMvC,MAAM,GAAG,GAAG,MAAM,CAAC,qBAAqB,CAAC,CAAA;AAEzC,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AACvD,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AAWvD,MAAM,UAAU,YAAY,CAAE,OAAgB;IAC5C,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,IAAI,GAAG,WAAW,EAAE,UAAU,GAAG,YAAY,EAAE,GAAG,OAAO,CAAA;IAExF,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,KAAK,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,qBAAqB,CAAC,CAAC,CAAC,oBAAoB,CAAA;IACjF,MAAM,UAAU,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAA;IAC/D,MAAM,UAAU,GAAG,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,IAAI,EAAE,CAAA;IAEhD,IAAI,WAAW,GAAG,KAAK,CAAA;IACvB,IAAI,SAAS,GAAG,KAAK,CAAA;IACrB,IAAI,MAAyB,CAAA;IAE7B,MAAM,QAAQ,GAAa;QACzB,IAAI,EAAE,IAAI,CAAC,GAAG,EAAE;KACjB,CAAA;IAED,MAAM,WAAW,GAAG,CAAC,GAAW,EAAE,EAAE;QAClC,IAAI,WAAW,EAAE;YACf,OAAM;SACP;QAED,WAAW,GAAG,IAAI,CAAA;QAClB,GAAG,CAAC,KAAK,CAAC,yBAAyB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAE3D,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,SAAS,EAAE;YACb,MAAM,CAAC,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAElC,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,SAAS,GAAG,CAAC,GAAW,EAAE,EAAE;QAChC,IAAI,SAAS,EAAE;YACb,OAAM;SACP;QAED,SAAS,GAAG,IAAI,CAAA;QAChB,GAAG,CAAC,KAAK,CAAC,iCAAiC,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAEnE,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,WAAW,EAAE;YACf,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAE3B,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,MAAM,GAAG;QACb,oBAAoB;QACpB,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,CAAC,MAAM,CAAC,GAAG,EAAE,CAAA;QACrB,CAAC;QACD,8CAA8C;QAC9C,KAAK,EAAE,CAAC,GAAW,EAAE,EAAE;YACrB,GAAG,CAAC,KAAK,CAAC,oBAAoB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;YACtD,uCAAuC;YACvC,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,2DAA2D;QAC3D,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,GAAG,GAAG,OAAO,CAAC,IAAI,KAAK,CAAC,cAAc,CAAC,EAAE,sBAAsB,CAAC,CAAA;YACtE,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,IAAI,EAAE,KAAK,EAAE,MAA0B,EAAE,EAAE;YACzC,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,SAAS,CAAC;gBACzC,eAAe,CAAC,MAAM;gBACtB,eAAe,CAAC,MAAM;aACvB,CAAC,CAAC,CAAA;YAEH,IAAI;gBACF,IAAI,IAAI,KAAK,WAAW,EAAE,EAAE,kCAAkC;oBAC5D,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,qBAAqB,CAAC,UAAU,EAAE,IAAI,EAAE,oBAAoB,CAAC,UAAU,CAAC,EAAE,CAAC,CAAA;iBAC7F;gBAED,MAAM,cAAc,GAAG,IAAI,cAAc,EAAE,CAAA;gBAE3C,IAAI,KAAK,EAAE,MAAM,IAAI,IAAI,MAAM,EAAE;oBAC/B,cAAc,CAAC,MAAM,CAAC,IAAI,CAAC,CAAA;oBAE3B,OAAO,cAAc,CAAC,MAAM,KAAK,CAAC,EAAE;wBAClC,IAAI,cAAc,CAAC,MAAM,IAAI,UAAU,EAAE;4BACvC,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,EAAE,EAAE,CAAC,CAAA;4BAClE,cAAc,CAAC,OAAO,CAAC,cAAc,CAAC,MAAM,CAAC,CAAA;4BAC7C,MAAK;yBACN;wBAED,MAAM,MAAM,GAAG,cAAc,CAAC,MAAM,GAAG,UAAU,CAAA;wBACjD,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,CAAC,CAAC,EAAE,MAAM,CAAC,EAAE,CAAC,CAAA;wBAC3E,cAAc,CAAC,OAAO,CAAC,MAAM,CAAC,CAAA;qBAC/B;iBACF;aACF;YAAC,OAAO,GAAQ,EAAE;gBACjB,IAAI,GAAG,CAAC,IAAI,KAAK,SAAS,IAAI,GAAG,CAAC,OAAO,KAAK,2BAA2B,EAAE;oBACzE,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,cAAc,CAAA;wBAC5B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;oBAED,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,gBAAgB,CAAA;wBAC9B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;iBACF;gBAED,sDAAsD;gBACtD,IAAI,GAAG,CAAC,IAAI,KAAK,sBAAsB,EAAE;oBACvC,GAAG,CAAC,KAAK,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,CAAC,CAAA;iBAC5C;qBAAM;oBACL,GAAG,CAAC,KAAK,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;oBAChD,IAAI;wBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;qBAChC;oBAAC,OAAO,GAAG,EAAE;wBACZ,GAAG,CAAC,KAAK,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;qBAC/D;iBACF;gBAED,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;gBACtB,SAAS,CAAC,GAAG,CAAC,CAAA;gBACd,OAAM;aACP;YAED,IAAI;gBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;aAChC;YAAC,OAAO,GAAG,EAAE;gBACZ,GAAG,CAAC,KAAK,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;aAC/D;YAED,SAAS,EAAE,CAAA;QACb,CAAC;QACD,MAAM,EAAE,QAAQ,CAAa;YAC3B,KAAK,EAAE,WAAW;SACnB,CAAC;QACF,QAAQ;QACR,EAAE,EAAE,UAAU;KACf,CAAA;IAED,OAAO,MAAM,CAAA;AACf,CAAC"}
{
"name": "@libp2p/mplex",
"version": "1.0.1",
"version": "1.0.2",
"description": "JavaScript implementation of https://github.com/libp2p/mplex",

@@ -130,6 +130,6 @@ "license": "Apache-2.0 OR MIT",

"lint": "aegir lint",
"dep-check": "aegir dep-check",
"dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js",
"build": "tsc",
"pretest": "npm run build",
"test": "aegir test -f ./dist/test",
"test": "aegir test -f \"./dist/test/**/*.js\"",
"test:chrome": "npm run test -- -t browser --cov",

@@ -144,4 +144,4 @@ "test:chrome-webworker": "npm run test -- -t webworker",

"dependencies": {
"@libp2p/logger": "^1.0.3",
"@libp2p/tracked-map": "^1.0.1",
"@libp2p/logger": "^1.1.2",
"@libp2p/tracked-map": "^1.0.4",
"abortable-iterator": "^4.0.2",

@@ -153,12 +153,12 @@ "any-signal": "^3.0.0",

"it-stream-types": "^1.0.4",
"uint8arraylist": "^1.2.0",
"uint8arraylist": "^1.4.0",
"varint": "^6.0.0"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.0.7",
"@libp2p/interfaces": "^1.1.1",
"@libp2p/interface-compliance-tests": "^1.1.16",
"@libp2p/interfaces": "^1.3.14",
"@types/varint": "^6.0.0",
"aegir": "^36.1.3",
"cborg": "^1.2.1",
"iso-random-stream": "^2.0.0",
"cborg": "^1.8.1",
"iso-random-stream": "^2.0.2",
"it-all": "^1.0.6",

@@ -165,0 +165,0 @@ "it-drain": "^1.0.5",

@@ -1,257 +0,15 @@

import { pipe } from 'it-pipe'
import { Pushable, pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { restrictSize } from './restrict-size.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
import { trackedMap } from '@libp2p/tracked-map'
import { logger } from '@libp2p/logger'
import type { Sink } from 'it-stream-types'
import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'
import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics'
import each from 'it-foreach'
import type { Components } from '@libp2p/interfaces/components'
import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
import { MplexStreamMuxer } from './mplex.js'
const log = logger('libp2p:mplex')
function printMessage (msg: Message) {
const output: any = {
...msg,
type: `${MessageTypeNames[msg.type]} (${msg.type})`
}
if (msg.type === MessageTypes.NEW_STREAM) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice())
}
if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16')
}
return output
}
export interface MplexStream extends Stream {
source: Pushable<Uint8Array>
}
export interface MplexOptions extends MuxerOptions {
export interface MplexInit extends StreamMuxerInit {
maxMsgSize?: number
metrics?: ComponentMetricsTracker
}
export class Mplex implements Muxer {
static multicodec = '/mplex/6.7.0'
export class Mplex implements StreamMuxerFactory {
public protocol = '/mplex/6.7.0'
public sink: Sink<Uint8Array>
public source: AsyncIterable<Uint8Array>
private _streamId: number
private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> }
private readonly _options: MplexOptions
private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void }
constructor (options?: MplexOptions) {
options = options ?? {}
this._streamId = 0
this._streams = {
/**
* Stream to ids map
*/
initiators: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'initiatorStreams' }),
/**
* Stream to ids map
*/
receivers: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'receiverStreams' })
}
this._options = options
/**
* An iterable sink
*/
this.sink = this._createSink()
/**
* An iterable source
*/
const source = this._createSource()
this._source = source
this.source = source
createStreamMuxer (components: Components, init?: MplexInit) {
return new MplexStreamMuxer(components, init)
}
/**
* Returns a Map of streams and their ids
*/
get streams () {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
const streams: Stream[] = []
this._streams.initiators.forEach(stream => {
streams.push(stream)
})
this._streams.receivers.forEach(stream => {
streams.push(stream)
})
return streams
}
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream (name?: string): Stream {
const id = this._streamId++
name = name == null ? id.toString() : name.toString()
const registry = this._streams.initiators
return this._newStream({ id, name, type: 'initiator', registry })
}
/**
* Called whenever an inbound stream is created
*/
_newReceiverStream (options: { id: number, name: string }) {
const { id, name } = options
const registry = this._streams.receivers
return this._newStream({ id, name, type: 'receiver', registry })
}
_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
const { id, name, type, registry } = options
log('new %s stream %s %s', type, id, name)
if (registry.has(id)) {
throw new Error(`${type} stream ${id} already exists!`)
}
const send = (msg: Message) => {
if (log.enabled) {
log('%s stream %s send', type, id, printMessage(msg))
}
if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice()
}
this._source.push(msg)
}
const onEnd = () => {
log('%s stream %s %s ended', type, id, name)
registry.delete(id)
if (this._options.onStreamEnd != null) {
this._options.onStreamEnd(stream)
}
}
const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize })
registry.set(id, stream)
return stream
}
/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink () {
const sink: Sink<Uint8Array> = async source => {
if (this._options.signal != null) {
source = abortableSource(source, this._options.signal)
}
try {
await pipe(
source,
source => each(source, (buf) => {
// console.info('incoming', uint8ArrayToString(buf, 'base64'))
}),
decode,
restrictSize(this._options.maxMsgSize),
async source => {
for await (const msg of source) {
this._handleIncoming(msg)
}
}
)
this._source.end()
} catch (err: any) {
log('error in sink', err)
this._source.end(err) // End the source with an error
}
}
return sink
}
/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource () {
const onEnd = (err?: Error) => {
const { initiators, receivers } = this._streams
// Abort all the things!
for (const s of initiators.values()) {
s.abort(err)
}
for (const s of receivers.values()) {
s.abort(err)
}
}
const source = pushableV<Message>({ onEnd })
return Object.assign(encode(source), {
push: source.push,
end: source.end,
return: source.return
})
}
_handleIncoming (message: Message) {
const { id, type } = message
if (log.enabled) {
log('incoming message', printMessage(message))
}
// Create a new stream?
if (message.type === MessageTypes.NEW_STREAM) {
const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) })
if (this._options.onIncomingStream != null) {
this._options.onIncomingStream(stream)
}
return
}
const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers
const stream = list.get(id)
if (stream == null) {
return log('missing stream %s', id)
}
switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
stream.source.push(message.data.slice())
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
stream.reset()
break
default:
log('unknown message type %s', type)
}
}
}

@@ -13,3 +13,3 @@ import { abortableSource } from 'abortable-iterator'

import type { Source } from 'it-stream-types'
import type { MplexStream } from './index.js'
import type { MplexStream } from './mplex.js'

@@ -53,3 +53,3 @@ const log = logger('libp2p:mplex:stream')

sourceEnded = true
log('%s stream %s source end', type, streamName, err)
log.trace('%s stream %s source end', type, streamName, err)

@@ -75,3 +75,3 @@ if (err != null && endErr == null) {

sinkEnded = true
log('%s stream %s sink end - err: %o', type, streamName, err)
log.trace('%s stream %s sink end - err: %o', type, streamName, err)

@@ -98,3 +98,3 @@ if (err != null && endErr == null) {

abort: (err?: Error) => {
log('%s stream %s abort', type, streamName, err)
log.trace('%s stream %s abort', type, streamName, err)
// End the source with the passed error

@@ -155,9 +155,9 @@ stream.source.end(err)

if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name)
log.trace('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
log.trace('%s stream %s error', type, name, err)
try {
send({ id, type: Types.RESET })
} catch (err) {
log('%s stream %s error sending reset', type, name, err)
log.trace('%s stream %s error sending reset', type, name, err)
}

@@ -174,3 +174,3 @@ }

} catch (err) {
log('%s stream %s error sending close', type, name, err)
log.trace('%s stream %s error sending close', type, name, err)
}

@@ -177,0 +177,0 @@