@libp2p/mplex
Advanced tools
| import { Pushable } from 'it-pushable'; | ||
| import { Message } from './message-types.js'; | ||
| import type { Components } from '@libp2p/interfaces/components'; | ||
| import type { Sink } from 'it-stream-types'; | ||
| import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'; | ||
| import type { Stream } from '@libp2p/interfaces/connection'; | ||
| export interface MplexStream extends Stream { | ||
| source: Pushable<Uint8Array>; | ||
| } | ||
| export interface MplexInit extends StreamMuxerInit { | ||
| maxMsgSize?: number; | ||
| } | ||
| export declare class MplexStreamMuxer implements StreamMuxer { | ||
| protocol: string; | ||
| sink: Sink<Uint8Array>; | ||
| source: AsyncIterable<Uint8Array>; | ||
| private _streamId; | ||
| private readonly _streams; | ||
| private readonly _init; | ||
| private readonly _source; | ||
| constructor(components: Components, init?: MplexInit); | ||
| init(components: Components): void; | ||
| /** | ||
| * Returns a Map of streams and their ids | ||
| */ | ||
| get streams(): Stream[]; | ||
| /** | ||
| * Initiate a new stream with the given name. If no name is | ||
| * provided, the id of the stream will be used. | ||
| */ | ||
| newStream(name?: string): Stream; | ||
| /** | ||
| * Called whenever an inbound stream is created | ||
| */ | ||
| _newReceiverStream(options: { | ||
| id: number; | ||
| name: string; | ||
| }): MplexStream; | ||
| _newStream(options: { | ||
| id: number; | ||
| name: string; | ||
| type: 'initiator' | 'receiver'; | ||
| registry: Map<number, MplexStream>; | ||
| }): MplexStream; | ||
| /** | ||
| * Creates a sink with an abortable source. Incoming messages will | ||
| * also have their size restricted. All messages will be varint decoded. | ||
| */ | ||
| _createSink(): Sink<Uint8Array, Promise<void>>; | ||
| /** | ||
| * Creates a source that restricts outgoing message sizes | ||
| * and varint encodes them | ||
| */ | ||
| _createSource(): AsyncGenerator<Uint8Array, void, undefined> & { | ||
| push: (value: Message) => import("it-pushable").PushableV<Message>; | ||
| end: (err?: Error | undefined) => import("it-pushable").PushableV<Message>; | ||
| return: () => { | ||
| done: boolean; | ||
| }; | ||
| }; | ||
| _handleIncoming(message: Message): void; | ||
| } | ||
| //# sourceMappingURL=mplex.d.ts.map |
| {"version":3,"file":"mplex.d.ts","sourceRoot":"","sources":["../../src/mplex.ts"],"names":[],"mappings":"AACA,OAAO,EAAE,QAAQ,EAAa,MAAM,aAAa,CAAA;AAKjD,OAAO,EAAkC,OAAO,EAAE,MAAM,oBAAoB,CAAA;AAK5E,OAAO,KAAK,EAAE,UAAU,EAAE,MAAM,+BAA+B,CAAA;AAC/D,OAAO,KAAK,EAAE,IAAI,EAAE,MAAM,iBAAiB,CAAA;AAC3C,OAAO,KAAK,EAAE,WAAW,EAAE,eAAe,EAAE,MAAM,iCAAiC,CAAA;AACnF,OAAO,KAAK,EAAE,MAAM,EAAE,MAAM,+BAA+B,CAAA;AAqB3D,MAAM,WAAW,WAAY,SAAQ,MAAM;IACzC,MAAM,EAAE,QAAQ,CAAC,UAAU,CAAC,CAAA;CAC7B;AAED,MAAM,WAAW,SAAU,SAAQ,eAAe;IAChD,UAAU,CAAC,EAAE,MAAM,CAAA;CACpB;AAED,qBAAa,gBAAiB,YAAW,WAAW;IAC3C,QAAQ,SAAiB;IAEzB,IAAI,EAAE,IAAI,CAAC,UAAU,CAAC,CAAA;IACtB,MAAM,EAAE,aAAa,CAAC,UAAU,CAAC,CAAA;IAExC,OAAO,CAAC,SAAS,CAAQ;IACzB,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAA+E;IACxG,OAAO,CAAC,QAAQ,CAAC,KAAK,CAAW;IACjC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAA8D;gBAEzE,UAAU,EAAE,UAAU,EAAE,IAAI,CAAC,EAAE,SAAS;IA6BrD,IAAI,CAAE,UAAU,EAAE,UAAU;IAI5B;;OAEG;IACH,IAAI,OAAO,aAUV;IAED;;;OAGG;IACH,SAAS,CAAE,IAAI,CAAC,EAAE,MAAM,GAAG,MAAM;IAOjC;;OAEG;IACH,kBAAkB,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAA;KAAE;IAMzD,UAAU,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,WAAW,GAAG,UAAU,CAAC;QAAC,QAAQ,EAAE,GAAG,CAAC,MAAM,EAAE,WAAW,CAAC,CAAA;KAAE;IAmCrH;;;OAGG;IACH,WAAW;IA4BX;;;OAGG;IACH,aAAa;;;;;;;IAoBb,eAAe,CAAE,OAAO,EAAE,OAAO;CA0ClC"} |
| import { pipe } from 'it-pipe'; | ||
| import { pushableV } from 'it-pushable'; | ||
| import { abortableSource } from 'abortable-iterator'; | ||
| import { encode } from './encode.js'; | ||
| import { decode } from './decode.js'; | ||
| import { restrictSize } from './restrict-size.js'; | ||
| import { MessageTypes, MessageTypeNames } from './message-types.js'; | ||
| import { createStream } from './stream.js'; | ||
| import { toString as uint8ArrayToString } from 'uint8arrays'; | ||
| import { trackedMap } from '@libp2p/tracked-map'; | ||
| import { logger } from '@libp2p/logger'; | ||
| const log = logger('libp2p:mplex'); | ||
| function printMessage(msg) { | ||
| const output = { | ||
| ...msg, | ||
| type: `${MessageTypeNames[msg.type]} (${msg.type})` | ||
| }; | ||
| if (msg.type === MessageTypes.NEW_STREAM) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice()); | ||
| } | ||
| if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16'); | ||
| } | ||
| return output; | ||
| } | ||
| export class MplexStreamMuxer { | ||
| constructor(components, init) { | ||
| this.protocol = '/mplex/6.7.0'; | ||
| init = init ?? {}; | ||
| this._streamId = 0; | ||
| this._streams = { | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| initiators: trackedMap({ metrics: components.getMetrics(), component: 'mplex', metric: 'initiatorStreams' }), | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| receivers: trackedMap({ metrics: components.getMetrics(), component: 'mplex', metric: 'receiverStreams' }) | ||
| }; | ||
| this._init = init; | ||
| /** | ||
| * An iterable sink | ||
| */ | ||
| this.sink = this._createSink(); | ||
| /** | ||
| * An iterable source | ||
| */ | ||
| const source = this._createSource(); | ||
| this._source = source; | ||
| this.source = source; | ||
| } | ||
| init(components) { | ||
| } | ||
| /** | ||
| * Returns a Map of streams and their ids | ||
| */ | ||
| get streams() { | ||
| // Inbound and Outbound streams may have the same ids, so we need to make those unique | ||
| const streams = []; | ||
| this._streams.initiators.forEach(stream => { | ||
| streams.push(stream); | ||
| }); | ||
| this._streams.receivers.forEach(stream => { | ||
| streams.push(stream); | ||
| }); | ||
| return streams; | ||
| } | ||
| /** | ||
| * Initiate a new stream with the given name. If no name is | ||
| * provided, the id of the stream will be used. | ||
| */ | ||
| newStream(name) { | ||
| const id = this._streamId++; | ||
| name = name == null ? id.toString() : name.toString(); | ||
| const registry = this._streams.initiators; | ||
| return this._newStream({ id, name, type: 'initiator', registry }); | ||
| } | ||
| /** | ||
| * Called whenever an inbound stream is created | ||
| */ | ||
| _newReceiverStream(options) { | ||
| const { id, name } = options; | ||
| const registry = this._streams.receivers; | ||
| return this._newStream({ id, name, type: 'receiver', registry }); | ||
| } | ||
| _newStream(options) { | ||
| const { id, name, type, registry } = options; | ||
| log('new %s stream %s %s', type, id, name); | ||
| if (registry.has(id)) { | ||
| throw new Error(`${type} stream ${id} already exists!`); | ||
| } | ||
| const send = (msg) => { | ||
| if (log.enabled) { | ||
| log.trace('%s stream %s send', type, id, printMessage(msg)); | ||
| } | ||
| if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice(); | ||
| } | ||
| this._source.push(msg); | ||
| }; | ||
| const onEnd = () => { | ||
| log('%s stream %s %s ended', type, id, name); | ||
| registry.delete(id); | ||
| if (this._init.onStreamEnd != null) { | ||
| this._init.onStreamEnd(stream); | ||
| } | ||
| }; | ||
| const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._init.maxMsgSize }); | ||
| registry.set(id, stream); | ||
| return stream; | ||
| } | ||
| /** | ||
| * Creates a sink with an abortable source. Incoming messages will | ||
| * also have their size restricted. All messages will be varint decoded. | ||
| */ | ||
| _createSink() { | ||
| const sink = async (source) => { | ||
| if (this._init.signal != null) { | ||
| source = abortableSource(source, this._init.signal); | ||
| } | ||
| try { | ||
| await pipe(source, decode, restrictSize(this._init.maxMsgSize), async (source) => { | ||
| for await (const msg of source) { | ||
| this._handleIncoming(msg); | ||
| } | ||
| }); | ||
| this._source.end(); | ||
| } | ||
| catch (err) { | ||
| log('error in sink', err); | ||
| this._source.end(err); // End the source with an error | ||
| } | ||
| }; | ||
| return sink; | ||
| } | ||
| /** | ||
| * Creates a source that restricts outgoing message sizes | ||
| * and varint encodes them | ||
| */ | ||
| _createSource() { | ||
| const onEnd = (err) => { | ||
| const { initiators, receivers } = this._streams; | ||
| // Abort all the things! | ||
| for (const s of initiators.values()) { | ||
| s.abort(err); | ||
| } | ||
| for (const s of receivers.values()) { | ||
| s.abort(err); | ||
| } | ||
| }; | ||
| const source = pushableV({ onEnd }); | ||
| return Object.assign(encode(source), { | ||
| push: source.push, | ||
| end: source.end, | ||
| return: source.return | ||
| }); | ||
| } | ||
| _handleIncoming(message) { | ||
| const { id, type } = message; | ||
| if (log.enabled) { | ||
| log.trace('incoming message', printMessage(message)); | ||
| } | ||
| // Create a new stream? | ||
| if (message.type === MessageTypes.NEW_STREAM) { | ||
| const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) }); | ||
| if (this._init.onIncomingStream != null) { | ||
| this._init.onIncomingStream(stream); | ||
| } | ||
| return; | ||
| } | ||
| const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers; | ||
| const stream = list.get(id); | ||
| if (stream == null) { | ||
| return log('missing stream %s', id); | ||
| } | ||
| switch (type) { | ||
| case MessageTypes.MESSAGE_INITIATOR: | ||
| case MessageTypes.MESSAGE_RECEIVER: | ||
| stream.source.push(message.data.slice()); | ||
| break; | ||
| case MessageTypes.CLOSE_INITIATOR: | ||
| case MessageTypes.CLOSE_RECEIVER: | ||
| stream.close(); | ||
| break; | ||
| case MessageTypes.RESET_INITIATOR: | ||
| case MessageTypes.RESET_RECEIVER: | ||
| stream.reset(); | ||
| break; | ||
| default: | ||
| log('unknown message type %s', type); | ||
| } | ||
| } | ||
| } | ||
| //# sourceMappingURL=mplex.js.map |
| {"version":3,"file":"mplex.js","sourceRoot":"","sources":["../../src/mplex.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,IAAI,EAAE,MAAM,SAAS,CAAA;AAC9B,OAAO,EAAY,SAAS,EAAE,MAAM,aAAa,CAAA;AACjD,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,YAAY,EAAE,gBAAgB,EAAW,MAAM,oBAAoB,CAAA;AAC5E,OAAO,EAAE,YAAY,EAAE,MAAM,aAAa,CAAA;AAC1C,OAAO,EAAE,QAAQ,IAAI,kBAAkB,EAAE,MAAM,aAAa,CAAA;AAC5D,OAAO,EAAE,UAAU,EAAE,MAAM,qBAAqB,CAAA;AAChD,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAMvC,MAAM,GAAG,GAAG,MAAM,CAAC,cAAc,CAAC,CAAA;AAElC,SAAS,YAAY,CAAE,GAAY;IACjC,MAAM,MAAM,GAAQ;QAClB,GAAG,GAAG;QACN,IAAI,EAAE,GAAG,gBAAgB,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,GAAG,CAAC,IAAI,GAAG;KACpD,CAAA;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;QACxC,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;KAC/F;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;QAC7F,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,EAAE,QAAQ,CAAC,CAAA;KACzG;IAED,OAAO,MAAM,CAAA;AACf,CAAC;AAUD,MAAM,OAAO,gBAAgB;IAW3B,YAAa,UAAsB,EAAE,IAAgB;QAV9C,aAAQ,GAAG,cAAc,CAAA;QAW9B,IAAI,GAAG,IAAI,IAAI,EAAE,CAAA;QAEjB,IAAI,CAAC,SAAS,GAAG,CAAC,CAAA;QAClB,IAAI,CAAC,QAAQ,GAAG;YACd;;eAEG;YACH,UAAU,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,UAAU,CAAC,UAAU,EAAE,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,kBAAkB,EAAE,CAAC;YACjI;;eAEG;YACH,SAAS,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,UAAU,CAAC,UAAU,EAAE,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,iBAAiB,EAAE,CAAC;SAChI,CAAA;QACD,IAAI,CAAC,KAAK,GAAG,IAAI,CAAA;QAEjB;;WAEG;QACH,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC,WAAW,EAAE,CAAA;QAE9B;;WAEG;QACH,MAAM,MAAM,GAAG,IAAI,CAAC,aAAa,EAAE,CAAA;QACnC,IAAI,CAAC,OAAO,GAAG,MAAM,CAAA;QACrB,IAAI,CAAC,MAAM,GAAG,MAAM,CAAA;IACtB,CAAC;IAED,IAAI,CAAE,UAAsB;IAE5B,CAAC;IAED;;OAEG;IACH,IAAI,OAAO;QACT,sFAAsF;QACtF,MAAM,OAAO,GAAa,EAAE,CAAA;QAC5B,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACxC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACvC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,OAAO,OAAO,CAAA;IAChB,CAAC;IAED;;;OAGG;IACH,SAAS,CAAE,IAAa;QACtB,MAAM,EAAE,GAAG,IAAI,CAAC,SAAS,EAAE,CAAA;QAC3B,IAAI,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,QAAQ,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,EAAE,CAAA;QACrD,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAA;QACzC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,WAAW,EAAE,QAAQ,EAAE,CAAC,CAAA;IACnE,CAAC;IAED;;OAEG;IACH,kBAAkB,CAAE,OAAqC;QACvD,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAC5B,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QACxC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,UAAU,EAAE,QAAQ,EAAE,CAAC,CAAA;IAClE,CAAC;IAED,UAAU,CAAE,OAAyG;QACnH,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,QAAQ,EAAE,GAAG,OAAO,CAAA;QAE5C,GAAG,CAAC,qBAAqB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;QAE1C,IAAI,QAAQ,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACpB,MAAM,IAAI,KAAK,CAAC,GAAG,IAAI,WAAW,EAAE,kBAAkB,CAAC,CAAA;SACxD;QAED,MAAM,IAAI,GAAG,CAAC,GAAY,EAAE,EAAE;YAC5B,IAAI,GAAG,CAAC,OAAO,EAAE;gBACf,GAAG,CAAC,KAAK,CAAC,mBAAmB,EAAE,IAAI,EAAE,EAAE,EAAE,YAAY,CAAC,GAAG,CAAC,CAAC,CAAA;aAC5D;YAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;gBACrI,GAAG,CAAC,IAAI,GAAG,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAA;aACxE;YAED,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,GAAG,CAAC,CAAA;QACxB,CAAC,CAAA;QAED,MAAM,KAAK,GAAG,GAAG,EAAE;YACjB,GAAG,CAAC,uBAAuB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;YAC5C,QAAQ,CAAC,MAAM,CAAC,EAAE,CAAC,CAAA;YAEnB,IAAI,IAAI,CAAC,KAAK,CAAC,WAAW,IAAI,IAAI,EAAE;gBAClC,IAAI,CAAC,KAAK,CAAC,WAAW,CAAC,MAAM,CAAC,CAAA;aAC/B;QACH,CAAC,CAAA;QAED,MAAM,MAAM,GAAG,YAAY,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,UAAU,EAAE,IAAI,CAAC,KAAK,CAAC,UAAU,EAAE,CAAC,CAAA;QAC/F,QAAQ,CAAC,GAAG,CAAC,EAAE,EAAE,MAAM,CAAC,CAAA;QACxB,OAAO,MAAM,CAAA;IACf,CAAC;IAED;;;OAGG;IACH,WAAW;QACT,MAAM,IAAI,GAAqB,KAAK,EAAC,MAAM,EAAC,EAAE;YAC5C,IAAI,IAAI,CAAC,KAAK,CAAC,MAAM,IAAI,IAAI,EAAE;gBAC7B,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,IAAI,CAAC,KAAK,CAAC,MAAM,CAAC,CAAA;aACpD;YAED,IAAI;gBACF,MAAM,IAAI,CACR,MAAM,EACN,MAAM,EACN,YAAY,CAAC,IAAI,CAAC,KAAK,CAAC,UAAU,CAAC,EACnC,KAAK,EAAC,MAAM,EAAC,EAAE;oBACb,IAAI,KAAK,EAAE,MAAM,GAAG,IAAI,MAAM,EAAE;wBAC9B,IAAI,CAAC,eAAe,CAAC,GAAG,CAAC,CAAA;qBAC1B;gBACH,CAAC,CACF,CAAA;gBAED,IAAI,CAAC,OAAO,CAAC,GAAG,EAAE,CAAA;aACnB;YAAC,OAAO,GAAQ,EAAE;gBACjB,GAAG,CAAC,eAAe,EAAE,GAAG,CAAC,CAAA;gBACzB,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA,CAAC,+BAA+B;aACtD;QACH,CAAC,CAAA;QAED,OAAO,IAAI,CAAA;IACb,CAAC;IAED;;;OAGG;IACH,aAAa;QACX,MAAM,KAAK,GAAG,CAAC,GAAW,EAAE,EAAE;YAC5B,MAAM,EAAE,UAAU,EAAE,SAAS,EAAE,GAAG,IAAI,CAAC,QAAQ,CAAA;YAC/C,wBAAwB;YACxB,KAAK,MAAM,CAAC,IAAI,UAAU,CAAC,MAAM,EAAE,EAAE;gBACnC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;YACD,KAAK,MAAM,CAAC,IAAI,SAAS,CAAC,MAAM,EAAE,EAAE;gBAClC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;QACH,CAAC,CAAA;QACD,MAAM,MAAM,GAAG,SAAS,CAAU,EAAE,KAAK,EAAE,CAAC,CAAA;QAE5C,OAAO,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,EAAE;YACnC,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,GAAG,EAAE,MAAM,CAAC,GAAG;YACf,MAAM,EAAE,MAAM,CAAC,MAAM;SACtB,CAAC,CAAA;IACJ,CAAC;IAED,eAAe,CAAE,OAAgB;QAC/B,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAE5B,IAAI,GAAG,CAAC,OAAO,EAAE;YACf,GAAG,CAAC,KAAK,CAAC,kBAAkB,EAAE,YAAY,CAAC,OAAO,CAAC,CAAC,CAAA;SACrD;QAED,uBAAuB;QACvB,IAAI,OAAO,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;YAC5C,MAAM,MAAM,GAAG,IAAI,CAAC,kBAAkB,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,kBAAkB,CAAC,OAAO,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,EAAE,CAAC,CAAA;YAElJ,IAAI,IAAI,CAAC,KAAK,CAAC,gBAAgB,IAAI,IAAI,EAAE;gBACvC,IAAI,CAAC,KAAK,CAAC,gBAAgB,CAAC,MAAM,CAAC,CAAA;aACpC;YAED,OAAM;SACP;QAED,MAAM,IAAI,GAAG,CAAC,IAAI,GAAG,CAAC,CAAC,KAAK,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QAClF,MAAM,MAAM,GAAG,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAA;QAE3B,IAAI,MAAM,IAAI,IAAI,EAAE;YAClB,OAAO,GAAG,CAAC,mBAAmB,EAAE,EAAE,CAAC,CAAA;SACpC;QAED,QAAQ,IAAI,EAAE;YACZ,KAAK,YAAY,CAAC,iBAAiB,CAAC;YACpC,KAAK,YAAY,CAAC,gBAAgB;gBAChC,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;gBACxC,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP;gBACE,GAAG,CAAC,yBAAyB,EAAE,IAAI,CAAC,CAAA;SACvC;IACH,CAAC;CACF"} |
+256
| import { pipe } from 'it-pipe' | ||
| import { Pushable, pushableV } from 'it-pushable' | ||
| import { abortableSource } from 'abortable-iterator' | ||
| import { encode } from './encode.js' | ||
| import { decode } from './decode.js' | ||
| import { restrictSize } from './restrict-size.js' | ||
| import { MessageTypes, MessageTypeNames, Message } from './message-types.js' | ||
| import { createStream } from './stream.js' | ||
| import { toString as uint8ArrayToString } from 'uint8arrays' | ||
| import { trackedMap } from '@libp2p/tracked-map' | ||
| import { logger } from '@libp2p/logger' | ||
| import type { Components } from '@libp2p/interfaces/components' | ||
| import type { Sink } from 'it-stream-types' | ||
| import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' | ||
| import type { Stream } from '@libp2p/interfaces/connection' | ||
| const log = logger('libp2p:mplex') | ||
| function printMessage (msg: Message) { | ||
| const output: any = { | ||
| ...msg, | ||
| type: `${MessageTypeNames[msg.type]} (${msg.type})` | ||
| } | ||
| if (msg.type === MessageTypes.NEW_STREAM) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice()) | ||
| } | ||
| if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16') | ||
| } | ||
| return output | ||
| } | ||
| export interface MplexStream extends Stream { | ||
| source: Pushable<Uint8Array> | ||
| } | ||
| export interface MplexInit extends StreamMuxerInit { | ||
| maxMsgSize?: number | ||
| } | ||
| export class MplexStreamMuxer implements StreamMuxer { | ||
| public protocol = '/mplex/6.7.0' | ||
| public sink: Sink<Uint8Array> | ||
| public source: AsyncIterable<Uint8Array> | ||
| private _streamId: number | ||
| private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> } | ||
| private readonly _init: MplexInit | ||
| private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void } | ||
| constructor (components: Components, init?: MplexInit) { | ||
| init = init ?? {} | ||
| this._streamId = 0 | ||
| this._streams = { | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| initiators: trackedMap<number, MplexStream>({ metrics: components.getMetrics(), component: 'mplex', metric: 'initiatorStreams' }), | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| receivers: trackedMap<number, MplexStream>({ metrics: components.getMetrics(), component: 'mplex', metric: 'receiverStreams' }) | ||
| } | ||
| this._init = init | ||
| /** | ||
| * An iterable sink | ||
| */ | ||
| this.sink = this._createSink() | ||
| /** | ||
| * An iterable source | ||
| */ | ||
| const source = this._createSource() | ||
| this._source = source | ||
| this.source = source | ||
| } | ||
| init (components: Components) { | ||
| } | ||
| /** | ||
| * Returns a Map of streams and their ids | ||
| */ | ||
| get streams () { | ||
| // Inbound and Outbound streams may have the same ids, so we need to make those unique | ||
| const streams: Stream[] = [] | ||
| this._streams.initiators.forEach(stream => { | ||
| streams.push(stream) | ||
| }) | ||
| this._streams.receivers.forEach(stream => { | ||
| streams.push(stream) | ||
| }) | ||
| return streams | ||
| } | ||
| /** | ||
| * Initiate a new stream with the given name. If no name is | ||
| * provided, the id of the stream will be used. | ||
| */ | ||
| newStream (name?: string): Stream { | ||
| const id = this._streamId++ | ||
| name = name == null ? id.toString() : name.toString() | ||
| const registry = this._streams.initiators | ||
| return this._newStream({ id, name, type: 'initiator', registry }) | ||
| } | ||
| /** | ||
| * Called whenever an inbound stream is created | ||
| */ | ||
| _newReceiverStream (options: { id: number, name: string }) { | ||
| const { id, name } = options | ||
| const registry = this._streams.receivers | ||
| return this._newStream({ id, name, type: 'receiver', registry }) | ||
| } | ||
| _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) { | ||
| const { id, name, type, registry } = options | ||
| log('new %s stream %s %s', type, id, name) | ||
| if (registry.has(id)) { | ||
| throw new Error(`${type} stream ${id} already exists!`) | ||
| } | ||
| const send = (msg: Message) => { | ||
| if (log.enabled) { | ||
| log.trace('%s stream %s send', type, id, printMessage(msg)) | ||
| } | ||
| if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice() | ||
| } | ||
| this._source.push(msg) | ||
| } | ||
| const onEnd = () => { | ||
| log('%s stream %s %s ended', type, id, name) | ||
| registry.delete(id) | ||
| if (this._init.onStreamEnd != null) { | ||
| this._init.onStreamEnd(stream) | ||
| } | ||
| } | ||
| const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._init.maxMsgSize }) | ||
| registry.set(id, stream) | ||
| return stream | ||
| } | ||
| /** | ||
| * Creates a sink with an abortable source. Incoming messages will | ||
| * also have their size restricted. All messages will be varint decoded. | ||
| */ | ||
| _createSink () { | ||
| const sink: Sink<Uint8Array> = async source => { | ||
| if (this._init.signal != null) { | ||
| source = abortableSource(source, this._init.signal) | ||
| } | ||
| try { | ||
| await pipe( | ||
| source, | ||
| decode, | ||
| restrictSize(this._init.maxMsgSize), | ||
| async source => { | ||
| for await (const msg of source) { | ||
| this._handleIncoming(msg) | ||
| } | ||
| } | ||
| ) | ||
| this._source.end() | ||
| } catch (err: any) { | ||
| log('error in sink', err) | ||
| this._source.end(err) // End the source with an error | ||
| } | ||
| } | ||
| return sink | ||
| } | ||
| /** | ||
| * Creates a source that restricts outgoing message sizes | ||
| * and varint encodes them | ||
| */ | ||
| _createSource () { | ||
| const onEnd = (err?: Error) => { | ||
| const { initiators, receivers } = this._streams | ||
| // Abort all the things! | ||
| for (const s of initiators.values()) { | ||
| s.abort(err) | ||
| } | ||
| for (const s of receivers.values()) { | ||
| s.abort(err) | ||
| } | ||
| } | ||
| const source = pushableV<Message>({ onEnd }) | ||
| return Object.assign(encode(source), { | ||
| push: source.push, | ||
| end: source.end, | ||
| return: source.return | ||
| }) | ||
| } | ||
| _handleIncoming (message: Message) { | ||
| const { id, type } = message | ||
| if (log.enabled) { | ||
| log.trace('incoming message', printMessage(message)) | ||
| } | ||
| // Create a new stream? | ||
| if (message.type === MessageTypes.NEW_STREAM) { | ||
| const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) }) | ||
| if (this._init.onIncomingStream != null) { | ||
| this._init.onIncomingStream(stream) | ||
| } | ||
| return | ||
| } | ||
| const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers | ||
| const stream = list.get(id) | ||
| if (stream == null) { | ||
| return log('missing stream %s', id) | ||
| } | ||
| switch (type) { | ||
| case MessageTypes.MESSAGE_INITIATOR: | ||
| case MessageTypes.MESSAGE_RECEIVER: | ||
| stream.source.push(message.data.slice()) | ||
| break | ||
| case MessageTypes.CLOSE_INITIATOR: | ||
| case MessageTypes.CLOSE_RECEIVER: | ||
| stream.close() | ||
| break | ||
| case MessageTypes.RESET_INITIATOR: | ||
| case MessageTypes.RESET_RECEIVER: | ||
| stream.reset() | ||
| break | ||
| default: | ||
| log('unknown message type %s', type) | ||
| } | ||
| } | ||
| } |
+7
-59
@@ -1,63 +0,11 @@ | ||
| import { Pushable } from 'it-pushable'; | ||
| import { Message } from './message-types.js'; | ||
| import type { Sink } from 'it-stream-types'; | ||
| import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer'; | ||
| import type { Stream } from '@libp2p/interfaces/connection'; | ||
| import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics'; | ||
| export interface MplexStream extends Stream { | ||
| source: Pushable<Uint8Array>; | ||
| } | ||
| export interface MplexOptions extends MuxerOptions { | ||
| import type { Components } from '@libp2p/interfaces/components'; | ||
| import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'; | ||
| import { MplexStreamMuxer } from './mplex.js'; | ||
| export interface MplexInit extends StreamMuxerInit { | ||
| maxMsgSize?: number; | ||
| metrics?: ComponentMetricsTracker; | ||
| } | ||
| export declare class Mplex implements Muxer { | ||
| static multicodec: string; | ||
| sink: Sink<Uint8Array>; | ||
| source: AsyncIterable<Uint8Array>; | ||
| private _streamId; | ||
| private readonly _streams; | ||
| private readonly _options; | ||
| private readonly _source; | ||
| constructor(options?: MplexOptions); | ||
| /** | ||
| * Returns a Map of streams and their ids | ||
| */ | ||
| get streams(): Stream[]; | ||
| /** | ||
| * Initiate a new stream with the given name. If no name is | ||
| * provided, the id of the stream will be used. | ||
| */ | ||
| newStream(name?: string): Stream; | ||
| /** | ||
| * Called whenever an inbound stream is created | ||
| */ | ||
| _newReceiverStream(options: { | ||
| id: number; | ||
| name: string; | ||
| }): MplexStream; | ||
| _newStream(options: { | ||
| id: number; | ||
| name: string; | ||
| type: 'initiator' | 'receiver'; | ||
| registry: Map<number, MplexStream>; | ||
| }): MplexStream; | ||
| /** | ||
| * Creates a sink with an abortable source. Incoming messages will | ||
| * also have their size restricted. All messages will be varint decoded. | ||
| */ | ||
| _createSink(): Sink<Uint8Array, Promise<void>>; | ||
| /** | ||
| * Creates a source that restricts outgoing message sizes | ||
| * and varint encodes them | ||
| */ | ||
| _createSource(): AsyncGenerator<Uint8Array, void, undefined> & { | ||
| push: (value: Message) => import("it-pushable").PushableV<Message>; | ||
| end: (err?: Error | undefined) => import("it-pushable").PushableV<Message>; | ||
| return: () => { | ||
| done: boolean; | ||
| }; | ||
| }; | ||
| _handleIncoming(message: Message): void; | ||
| export declare class Mplex implements StreamMuxerFactory { | ||
| protocol: string; | ||
| createStreamMuxer(components: Components, init?: MplexInit): MplexStreamMuxer; | ||
| } | ||
| //# sourceMappingURL=index.d.ts.map |
@@ -1,1 +0,1 @@ | ||
| {"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AACA,OAAO,EAAE,QAAQ,EAAa,MAAM,aAAa,CAAA;AAKjD,OAAO,EAAkC,OAAO,EAAE,MAAM,oBAAoB,CAAA;AAK5E,OAAO,KAAK,EAAE,IAAI,EAAE,MAAM,iBAAiB,CAAA;AAC3C,OAAO,KAAK,EAAE,KAAK,EAAE,YAAY,EAAE,MAAM,iCAAiC,CAAA;AAC1E,OAAO,KAAK,EAAE,MAAM,EAAE,MAAM,+BAA+B,CAAA;AAC3D,OAAO,KAAK,EAAE,uBAAuB,EAAE,MAAM,4BAA4B,CAAA;AAsBzE,MAAM,WAAW,WAAY,SAAQ,MAAM;IACzC,MAAM,EAAE,QAAQ,CAAC,UAAU,CAAC,CAAA;CAC7B;AAED,MAAM,WAAW,YAAa,SAAQ,YAAY;IAChD,UAAU,CAAC,EAAE,MAAM,CAAA;IACnB,OAAO,CAAC,EAAE,uBAAuB,CAAA;CAClC;AAED,qBAAa,KAAM,YAAW,KAAK;IACjC,MAAM,CAAC,UAAU,SAAiB;IAE3B,IAAI,EAAE,IAAI,CAAC,UAAU,CAAC,CAAA;IACtB,MAAM,EAAE,aAAa,CAAC,UAAU,CAAC,CAAA;IAExC,OAAO,CAAC,SAAS,CAAQ;IACzB,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAA+E;IACxG,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAc;IACvC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAA8D;gBAEzE,OAAO,CAAC,EAAE,YAAY;IA6BnC;;OAEG;IACH,IAAI,OAAO,aAUV;IAED;;;OAGG;IACH,SAAS,CAAE,IAAI,CAAC,EAAE,MAAM,GAAG,MAAM;IAOjC;;OAEG;IACH,kBAAkB,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAA;KAAE;IAMzD,UAAU,CAAE,OAAO,EAAE;QAAE,EAAE,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,MAAM,CAAC;QAAC,IAAI,EAAE,WAAW,GAAG,UAAU,CAAC;QAAC,QAAQ,EAAE,GAAG,CAAC,MAAM,EAAE,WAAW,CAAC,CAAA;KAAE;IAmCrH;;;OAGG;IACH,WAAW;IA+BX;;;OAGG;IACH,aAAa;;;;;;;IAoBb,eAAe,CAAE,OAAO,EAAE,OAAO;CA0ClC"} | ||
| {"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,UAAU,EAAE,MAAM,+BAA+B,CAAA;AAC/D,OAAO,KAAK,EAAE,kBAAkB,EAAE,eAAe,EAAE,MAAM,iCAAiC,CAAA;AAC1F,OAAO,EAAE,gBAAgB,EAAE,MAAM,YAAY,CAAA;AAE7C,MAAM,WAAW,SAAU,SAAQ,eAAe;IAChD,UAAU,CAAC,EAAE,MAAM,CAAA;CACpB;AAED,qBAAa,KAAM,YAAW,kBAAkB;IACvC,QAAQ,SAAiB;IAEhC,iBAAiB,CAAE,UAAU,EAAE,UAAU,EAAE,IAAI,CAAC,EAAE,SAAS;CAG5D"} |
+5
-191
@@ -1,196 +0,10 @@ | ||
| import { pipe } from 'it-pipe'; | ||
| import { pushableV } from 'it-pushable'; | ||
| import { abortableSource } from 'abortable-iterator'; | ||
| import { encode } from './encode.js'; | ||
| import { decode } from './decode.js'; | ||
| import { restrictSize } from './restrict-size.js'; | ||
| import { MessageTypes, MessageTypeNames } from './message-types.js'; | ||
| import { createStream } from './stream.js'; | ||
| import { toString as uint8ArrayToString } from 'uint8arrays'; | ||
| import { trackedMap } from '@libp2p/tracked-map'; | ||
| import { logger } from '@libp2p/logger'; | ||
| import each from 'it-foreach'; | ||
| const log = logger('libp2p:mplex'); | ||
| function printMessage(msg) { | ||
| const output = { | ||
| ...msg, | ||
| type: `${MessageTypeNames[msg.type]} (${msg.type})` | ||
| }; | ||
| if (msg.type === MessageTypes.NEW_STREAM) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice()); | ||
| } | ||
| if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16'); | ||
| } | ||
| return output; | ||
| } | ||
| import { MplexStreamMuxer } from './mplex.js'; | ||
| export class Mplex { | ||
| constructor(options) { | ||
| options = options ?? {}; | ||
| this._streamId = 0; | ||
| this._streams = { | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| initiators: trackedMap({ metrics: options.metrics, component: 'mplex', metric: 'initiatorStreams' }), | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| receivers: trackedMap({ metrics: options.metrics, component: 'mplex', metric: 'receiverStreams' }) | ||
| }; | ||
| this._options = options; | ||
| /** | ||
| * An iterable sink | ||
| */ | ||
| this.sink = this._createSink(); | ||
| /** | ||
| * An iterable source | ||
| */ | ||
| const source = this._createSource(); | ||
| this._source = source; | ||
| this.source = source; | ||
| constructor() { | ||
| this.protocol = '/mplex/6.7.0'; | ||
| } | ||
| /** | ||
| * Returns a Map of streams and their ids | ||
| */ | ||
| get streams() { | ||
| // Inbound and Outbound streams may have the same ids, so we need to make those unique | ||
| const streams = []; | ||
| this._streams.initiators.forEach(stream => { | ||
| streams.push(stream); | ||
| }); | ||
| this._streams.receivers.forEach(stream => { | ||
| streams.push(stream); | ||
| }); | ||
| return streams; | ||
| createStreamMuxer(components, init) { | ||
| return new MplexStreamMuxer(components, init); | ||
| } | ||
| /** | ||
| * Initiate a new stream with the given name. If no name is | ||
| * provided, the id of the stream will be used. | ||
| */ | ||
| newStream(name) { | ||
| const id = this._streamId++; | ||
| name = name == null ? id.toString() : name.toString(); | ||
| const registry = this._streams.initiators; | ||
| return this._newStream({ id, name, type: 'initiator', registry }); | ||
| } | ||
| /** | ||
| * Called whenever an inbound stream is created | ||
| */ | ||
| _newReceiverStream(options) { | ||
| const { id, name } = options; | ||
| const registry = this._streams.receivers; | ||
| return this._newStream({ id, name, type: 'receiver', registry }); | ||
| } | ||
| _newStream(options) { | ||
| const { id, name, type, registry } = options; | ||
| log('new %s stream %s %s', type, id, name); | ||
| if (registry.has(id)) { | ||
| throw new Error(`${type} stream ${id} already exists!`); | ||
| } | ||
| const send = (msg) => { | ||
| if (log.enabled) { | ||
| log('%s stream %s send', type, id, printMessage(msg)); | ||
| } | ||
| if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice(); | ||
| } | ||
| this._source.push(msg); | ||
| }; | ||
| const onEnd = () => { | ||
| log('%s stream %s %s ended', type, id, name); | ||
| registry.delete(id); | ||
| if (this._options.onStreamEnd != null) { | ||
| this._options.onStreamEnd(stream); | ||
| } | ||
| }; | ||
| const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize }); | ||
| registry.set(id, stream); | ||
| return stream; | ||
| } | ||
| /** | ||
| * Creates a sink with an abortable source. Incoming messages will | ||
| * also have their size restricted. All messages will be varint decoded. | ||
| */ | ||
| _createSink() { | ||
| const sink = async (source) => { | ||
| if (this._options.signal != null) { | ||
| source = abortableSource(source, this._options.signal); | ||
| } | ||
| try { | ||
| await pipe(source, source => each(source, (buf) => { | ||
| // console.info('incoming', uint8ArrayToString(buf, 'base64')) | ||
| }), decode, restrictSize(this._options.maxMsgSize), async (source) => { | ||
| for await (const msg of source) { | ||
| this._handleIncoming(msg); | ||
| } | ||
| }); | ||
| this._source.end(); | ||
| } | ||
| catch (err) { | ||
| log('error in sink', err); | ||
| this._source.end(err); // End the source with an error | ||
| } | ||
| }; | ||
| return sink; | ||
| } | ||
| /** | ||
| * Creates a source that restricts outgoing message sizes | ||
| * and varint encodes them | ||
| */ | ||
| _createSource() { | ||
| const onEnd = (err) => { | ||
| const { initiators, receivers } = this._streams; | ||
| // Abort all the things! | ||
| for (const s of initiators.values()) { | ||
| s.abort(err); | ||
| } | ||
| for (const s of receivers.values()) { | ||
| s.abort(err); | ||
| } | ||
| }; | ||
| const source = pushableV({ onEnd }); | ||
| return Object.assign(encode(source), { | ||
| push: source.push, | ||
| end: source.end, | ||
| return: source.return | ||
| }); | ||
| } | ||
| _handleIncoming(message) { | ||
| const { id, type } = message; | ||
| if (log.enabled) { | ||
| log('incoming message', printMessage(message)); | ||
| } | ||
| // Create a new stream? | ||
| if (message.type === MessageTypes.NEW_STREAM) { | ||
| const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) }); | ||
| if (this._options.onIncomingStream != null) { | ||
| this._options.onIncomingStream(stream); | ||
| } | ||
| return; | ||
| } | ||
| const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers; | ||
| const stream = list.get(id); | ||
| if (stream == null) { | ||
| return log('missing stream %s', id); | ||
| } | ||
| switch (type) { | ||
| case MessageTypes.MESSAGE_INITIATOR: | ||
| case MessageTypes.MESSAGE_RECEIVER: | ||
| stream.source.push(message.data.slice()); | ||
| break; | ||
| case MessageTypes.CLOSE_INITIATOR: | ||
| case MessageTypes.CLOSE_RECEIVER: | ||
| stream.close(); | ||
| break; | ||
| case MessageTypes.RESET_INITIATOR: | ||
| case MessageTypes.RESET_RECEIVER: | ||
| stream.reset(); | ||
| break; | ||
| default: | ||
| log('unknown message type %s', type); | ||
| } | ||
| } | ||
| } | ||
| Mplex.multicodec = '/mplex/6.7.0'; | ||
| //# sourceMappingURL=index.js.map |
@@ -1,1 +0,1 @@ | ||
| {"version":3,"file":"index.js","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,IAAI,EAAE,MAAM,SAAS,CAAA;AAC9B,OAAO,EAAY,SAAS,EAAE,MAAM,aAAa,CAAA;AACjD,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,MAAM,EAAE,MAAM,aAAa,CAAA;AACpC,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,YAAY,EAAE,gBAAgB,EAAW,MAAM,oBAAoB,CAAA;AAC5E,OAAO,EAAE,YAAY,EAAE,MAAM,aAAa,CAAA;AAC1C,OAAO,EAAE,QAAQ,IAAI,kBAAkB,EAAE,MAAM,aAAa,CAAA;AAC5D,OAAO,EAAE,UAAU,EAAE,MAAM,qBAAqB,CAAA;AAChD,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAKvC,OAAO,IAAI,MAAM,YAAY,CAAA;AAE7B,MAAM,GAAG,GAAG,MAAM,CAAC,cAAc,CAAC,CAAA;AAElC,SAAS,YAAY,CAAE,GAAY;IACjC,MAAM,MAAM,GAAQ;QAClB,GAAG,GAAG;QACN,IAAI,EAAE,GAAG,gBAAgB,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,GAAG,CAAC,IAAI,GAAG;KACpD,CAAA;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;QACxC,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;KAC/F;IAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;QAC7F,MAAM,CAAC,IAAI,GAAG,kBAAkB,CAAC,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,EAAE,QAAQ,CAAC,CAAA;KACzG;IAED,OAAO,MAAM,CAAA;AACf,CAAC;AAWD,MAAM,OAAO,KAAK;IAWhB,YAAa,OAAsB;QACjC,OAAO,GAAG,OAAO,IAAI,EAAE,CAAA;QAEvB,IAAI,CAAC,SAAS,GAAG,CAAC,CAAA;QAClB,IAAI,CAAC,QAAQ,GAAG;YACd;;eAEG;YACH,UAAU,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,OAAO,CAAC,OAAO,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,kBAAkB,EAAE,CAAC;YACzH;;eAEG;YACH,SAAS,EAAE,UAAU,CAAsB,EAAE,OAAO,EAAE,OAAO,CAAC,OAAO,EAAE,SAAS,EAAE,OAAO,EAAE,MAAM,EAAE,iBAAiB,EAAE,CAAC;SACxH,CAAA;QACD,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAA;QAEvB;;WAEG;QACH,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC,WAAW,EAAE,CAAA;QAE9B;;WAEG;QACH,MAAM,MAAM,GAAG,IAAI,CAAC,aAAa,EAAE,CAAA;QACnC,IAAI,CAAC,OAAO,GAAG,MAAM,CAAA;QACrB,IAAI,CAAC,MAAM,GAAG,MAAM,CAAA;IACtB,CAAC;IAED;;OAEG;IACH,IAAI,OAAO;QACT,sFAAsF;QACtF,MAAM,OAAO,GAAa,EAAE,CAAA;QAC5B,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACxC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAC,OAAO,CAAC,MAAM,CAAC,EAAE;YACvC,OAAO,CAAC,IAAI,CAAC,MAAM,CAAC,CAAA;QACtB,CAAC,CAAC,CAAA;QACF,OAAO,OAAO,CAAA;IAChB,CAAC;IAED;;;OAGG;IACH,SAAS,CAAE,IAAa;QACtB,MAAM,EAAE,GAAG,IAAI,CAAC,SAAS,EAAE,CAAA;QAC3B,IAAI,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,QAAQ,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,EAAE,CAAA;QACrD,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAA;QACzC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,WAAW,EAAE,QAAQ,EAAE,CAAC,CAAA;IACnE,CAAC;IAED;;OAEG;IACH,kBAAkB,CAAE,OAAqC;QACvD,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAC5B,MAAM,QAAQ,GAAG,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QACxC,OAAO,IAAI,CAAC,UAAU,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,UAAU,EAAE,QAAQ,EAAE,CAAC,CAAA;IAClE,CAAC;IAED,UAAU,CAAE,OAAyG;QACnH,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,QAAQ,EAAE,GAAG,OAAO,CAAA;QAE5C,GAAG,CAAC,qBAAqB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;QAE1C,IAAI,QAAQ,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACpB,MAAM,IAAI,KAAK,CAAC,GAAG,IAAI,WAAW,EAAE,kBAAkB,CAAC,CAAA;SACxD;QAED,MAAM,IAAI,GAAG,CAAC,GAAY,EAAE,EAAE;YAC5B,IAAI,GAAG,CAAC,OAAO,EAAE;gBACf,GAAG,CAAC,mBAAmB,EAAE,IAAI,EAAE,EAAE,EAAE,YAAY,CAAC,GAAG,CAAC,CAAC,CAAA;aACtD;YAED,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,iBAAiB,IAAI,GAAG,CAAC,IAAI,KAAK,YAAY,CAAC,gBAAgB,EAAE;gBACrI,GAAG,CAAC,IAAI,GAAG,GAAG,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC,CAAC,GAAG,CAAC,IAAI,CAAC,KAAK,EAAE,CAAA;aACxE;YAED,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,GAAG,CAAC,CAAA;QACxB,CAAC,CAAA;QAED,MAAM,KAAK,GAAG,GAAG,EAAE;YACjB,GAAG,CAAC,uBAAuB,EAAE,IAAI,EAAE,EAAE,EAAE,IAAI,CAAC,CAAA;YAC5C,QAAQ,CAAC,MAAM,CAAC,EAAE,CAAC,CAAA;YAEnB,IAAI,IAAI,CAAC,QAAQ,CAAC,WAAW,IAAI,IAAI,EAAE;gBACrC,IAAI,CAAC,QAAQ,CAAC,WAAW,CAAC,MAAM,CAAC,CAAA;aAClC;QACH,CAAC,CAAA;QAED,MAAM,MAAM,GAAG,YAAY,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,UAAU,EAAE,IAAI,CAAC,QAAQ,CAAC,UAAU,EAAE,CAAC,CAAA;QAClG,QAAQ,CAAC,GAAG,CAAC,EAAE,EAAE,MAAM,CAAC,CAAA;QACxB,OAAO,MAAM,CAAA;IACf,CAAC;IAED;;;OAGG;IACH,WAAW;QACT,MAAM,IAAI,GAAqB,KAAK,EAAC,MAAM,EAAC,EAAE;YAC5C,IAAI,IAAI,CAAC,QAAQ,CAAC,MAAM,IAAI,IAAI,EAAE;gBAChC,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,IAAI,CAAC,QAAQ,CAAC,MAAM,CAAC,CAAA;aACvD;YAED,IAAI;gBACF,MAAM,IAAI,CACR,MAAM,EACN,MAAM,CAAC,EAAE,CAAC,IAAI,CAAC,MAAM,EAAE,CAAC,GAAG,EAAE,EAAE;oBAC7B,8DAA8D;gBAChE,CAAC,CAAC,EACF,MAAM,EACN,YAAY,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,EACtC,KAAK,EAAC,MAAM,EAAC,EAAE;oBACb,IAAI,KAAK,EAAE,MAAM,GAAG,IAAI,MAAM,EAAE;wBAC9B,IAAI,CAAC,eAAe,CAAC,GAAG,CAAC,CAAA;qBAC1B;gBACH,CAAC,CACF,CAAA;gBAED,IAAI,CAAC,OAAO,CAAC,GAAG,EAAE,CAAA;aACnB;YAAC,OAAO,GAAQ,EAAE;gBACjB,GAAG,CAAC,eAAe,EAAE,GAAG,CAAC,CAAA;gBACzB,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA,CAAC,+BAA+B;aACtD;QACH,CAAC,CAAA;QAED,OAAO,IAAI,CAAA;IACb,CAAC;IAED;;;OAGG;IACH,aAAa;QACX,MAAM,KAAK,GAAG,CAAC,GAAW,EAAE,EAAE;YAC5B,MAAM,EAAE,UAAU,EAAE,SAAS,EAAE,GAAG,IAAI,CAAC,QAAQ,CAAA;YAC/C,wBAAwB;YACxB,KAAK,MAAM,CAAC,IAAI,UAAU,CAAC,MAAM,EAAE,EAAE;gBACnC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;YACD,KAAK,MAAM,CAAC,IAAI,SAAS,CAAC,MAAM,EAAE,EAAE;gBAClC,CAAC,CAAC,KAAK,CAAC,GAAG,CAAC,CAAA;aACb;QACH,CAAC,CAAA;QACD,MAAM,MAAM,GAAG,SAAS,CAAU,EAAE,KAAK,EAAE,CAAC,CAAA;QAE5C,OAAO,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,MAAM,CAAC,EAAE;YACnC,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,GAAG,EAAE,MAAM,CAAC,GAAG;YACf,MAAM,EAAE,MAAM,CAAC,MAAM;SACtB,CAAC,CAAA;IACJ,CAAC;IAED,eAAe,CAAE,OAAgB;QAC/B,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,GAAG,OAAO,CAAA;QAE5B,IAAI,GAAG,CAAC,OAAO,EAAE;YACf,GAAG,CAAC,kBAAkB,EAAE,YAAY,CAAC,OAAO,CAAC,CAAC,CAAA;SAC/C;QAED,uBAAuB;QACvB,IAAI,OAAO,CAAC,IAAI,KAAK,YAAY,CAAC,UAAU,EAAE;YAC5C,MAAM,MAAM,GAAG,IAAI,CAAC,kBAAkB,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,kBAAkB,CAAC,OAAO,CAAC,IAAI,YAAY,UAAU,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,EAAE,CAAC,CAAA;YAElJ,IAAI,IAAI,CAAC,QAAQ,CAAC,gBAAgB,IAAI,IAAI,EAAE;gBAC1C,IAAI,CAAC,QAAQ,CAAC,gBAAgB,CAAC,MAAM,CAAC,CAAA;aACvC;YAED,OAAM;SACP;QAED,MAAM,IAAI,GAAG,CAAC,IAAI,GAAG,CAAC,CAAC,KAAK,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,CAAC,CAAC,CAAC,IAAI,CAAC,QAAQ,CAAC,SAAS,CAAA;QAClF,MAAM,MAAM,GAAG,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAA;QAE3B,IAAI,MAAM,IAAI,IAAI,EAAE;YAClB,OAAO,GAAG,CAAC,mBAAmB,EAAE,EAAE,CAAC,CAAA;SACpC;QAED,QAAQ,IAAI,EAAE;YACZ,KAAK,YAAY,CAAC,iBAAiB,CAAC;YACpC,KAAK,YAAY,CAAC,gBAAgB;gBAChC,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,CAAC,CAAA;gBACxC,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP,KAAK,YAAY,CAAC,eAAe,CAAC;YAClC,KAAK,YAAY,CAAC,cAAc;gBAC9B,MAAM,CAAC,KAAK,EAAE,CAAA;gBACd,MAAK;YACP;gBACE,GAAG,CAAC,yBAAyB,EAAE,IAAI,CAAC,CAAA;SACvC;IACH,CAAC;;AAjNM,gBAAU,GAAG,cAAc,CAAA"} | ||
| {"version":3,"file":"index.js","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAEA,OAAO,EAAE,gBAAgB,EAAE,MAAM,YAAY,CAAA;AAM7C,MAAM,OAAO,KAAK;IAAlB;QACS,aAAQ,GAAG,cAAc,CAAA;IAKlC,CAAC;IAHC,iBAAiB,CAAE,UAAsB,EAAE,IAAgB;QACzD,OAAO,IAAI,gBAAgB,CAAC,UAAU,EAAE,IAAI,CAAC,CAAA;IAC/C,CAAC;CACF"} |
| import type { Message } from './message-types.js'; | ||
| import type { MplexStream } from './index.js'; | ||
| import type { MplexStream } from './mplex.js'; | ||
| export interface Options { | ||
@@ -4,0 +4,0 @@ id: number; |
@@ -31,3 +31,3 @@ import { abortableSource } from 'abortable-iterator'; | ||
| sourceEnded = true; | ||
| log('%s stream %s source end', type, streamName, err); | ||
| log.trace('%s stream %s source end', type, streamName, err); | ||
| if (err != null && endErr == null) { | ||
@@ -48,3 +48,3 @@ endErr = err; | ||
| sinkEnded = true; | ||
| log('%s stream %s sink end - err: %o', type, streamName, err); | ||
| log.trace('%s stream %s sink end - err: %o', type, streamName, err); | ||
| if (err != null && endErr == null) { | ||
@@ -67,3 +67,3 @@ endErr = err; | ||
| abort: (err) => { | ||
| log('%s stream %s abort', type, streamName, err); | ||
| log.trace('%s stream %s abort', type, streamName, err); | ||
| // End the source with the passed error | ||
@@ -118,6 +118,6 @@ stream.source.end(err); | ||
| if (err.code === ERR_MPLEX_STREAM_RESET) { | ||
| log('%s stream %s reset', type, name); | ||
| log.trace('%s stream %s reset', type, name); | ||
| } | ||
| else { | ||
| log('%s stream %s error', type, name, err); | ||
| log.trace('%s stream %s error', type, name, err); | ||
| try { | ||
@@ -127,3 +127,3 @@ send({ id, type: Types.RESET }); | ||
| catch (err) { | ||
| log('%s stream %s error sending reset', type, name, err); | ||
| log.trace('%s stream %s error sending reset', type, name, err); | ||
| } | ||
@@ -139,3 +139,3 @@ } | ||
| catch (err) { | ||
| log('%s stream %s error sending close', type, name, err); | ||
| log.trace('%s stream %s error sending close', type, name, err); | ||
| } | ||
@@ -142,0 +142,0 @@ onSinkEnd(); |
@@ -1,1 +0,1 @@ | ||
| {"version":3,"file":"stream.js","sourceRoot":"","sources":["../../src/stream.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,QAAQ,EAAE,MAAM,aAAa,CAAA;AACtC,OAAO,OAAO,MAAM,UAAU,CAAA;AAC9B,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,SAAS,EAAE,MAAM,YAAY,CAAA;AACtC,OAAO,EAAE,qBAAqB,EAAE,oBAAoB,EAAE,MAAM,oBAAoB,CAAA;AAChF,OAAO,EAAE,UAAU,IAAI,oBAAoB,EAAE,MAAM,yBAAyB,CAAA;AAC5E,OAAO,EAAE,cAAc,EAAE,MAAM,gBAAgB,CAAA;AAC/C,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAMvC,MAAM,GAAG,GAAG,MAAM,CAAC,qBAAqB,CAAC,CAAA;AAEzC,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AACvD,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AAWvD,MAAM,UAAU,YAAY,CAAE,OAAgB;IAC5C,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,IAAI,GAAG,WAAW,EAAE,UAAU,GAAG,YAAY,EAAE,GAAG,OAAO,CAAA;IAExF,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,KAAK,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,qBAAqB,CAAC,CAAC,CAAC,oBAAoB,CAAA;IACjF,MAAM,UAAU,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAA;IAC/D,MAAM,UAAU,GAAG,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,IAAI,EAAE,CAAA;IAEhD,IAAI,WAAW,GAAG,KAAK,CAAA;IACvB,IAAI,SAAS,GAAG,KAAK,CAAA;IACrB,IAAI,MAAyB,CAAA;IAE7B,MAAM,QAAQ,GAAa;QACzB,IAAI,EAAE,IAAI,CAAC,GAAG,EAAE;KACjB,CAAA;IAED,MAAM,WAAW,GAAG,CAAC,GAAW,EAAE,EAAE;QAClC,IAAI,WAAW,EAAE;YACf,OAAM;SACP;QAED,WAAW,GAAG,IAAI,CAAA;QAClB,GAAG,CAAC,yBAAyB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAErD,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,SAAS,EAAE;YACb,MAAM,CAAC,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAElC,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,SAAS,GAAG,CAAC,GAAW,EAAE,EAAE;QAChC,IAAI,SAAS,EAAE;YACb,OAAM;SACP;QAED,SAAS,GAAG,IAAI,CAAA;QAChB,GAAG,CAAC,iCAAiC,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAE7D,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,WAAW,EAAE;YACf,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAE3B,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,MAAM,GAAG;QACb,oBAAoB;QACpB,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,CAAC,MAAM,CAAC,GAAG,EAAE,CAAA;QACrB,CAAC;QACD,8CAA8C;QAC9C,KAAK,EAAE,CAAC,GAAW,EAAE,EAAE;YACrB,GAAG,CAAC,oBAAoB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;YAChD,uCAAuC;YACvC,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,2DAA2D;QAC3D,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,GAAG,GAAG,OAAO,CAAC,IAAI,KAAK,CAAC,cAAc,CAAC,EAAE,sBAAsB,CAAC,CAAA;YACtE,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,IAAI,EAAE,KAAK,EAAE,MAA0B,EAAE,EAAE;YACzC,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,SAAS,CAAC;gBACzC,eAAe,CAAC,MAAM;gBACtB,eAAe,CAAC,MAAM;aACvB,CAAC,CAAC,CAAA;YAEH,IAAI;gBACF,IAAI,IAAI,KAAK,WAAW,EAAE,EAAE,kCAAkC;oBAC5D,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,qBAAqB,CAAC,UAAU,EAAE,IAAI,EAAE,oBAAoB,CAAC,UAAU,CAAC,EAAE,CAAC,CAAA;iBAC7F;gBAED,MAAM,cAAc,GAAG,IAAI,cAAc,EAAE,CAAA;gBAE3C,IAAI,KAAK,EAAE,MAAM,IAAI,IAAI,MAAM,EAAE;oBAC/B,cAAc,CAAC,MAAM,CAAC,IAAI,CAAC,CAAA;oBAE3B,OAAO,cAAc,CAAC,MAAM,KAAK,CAAC,EAAE;wBAClC,IAAI,cAAc,CAAC,MAAM,IAAI,UAAU,EAAE;4BACvC,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,EAAE,EAAE,CAAC,CAAA;4BAClE,cAAc,CAAC,OAAO,CAAC,cAAc,CAAC,MAAM,CAAC,CAAA;4BAC7C,MAAK;yBACN;wBAED,MAAM,MAAM,GAAG,cAAc,CAAC,MAAM,GAAG,UAAU,CAAA;wBACjD,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,CAAC,CAAC,EAAE,MAAM,CAAC,EAAE,CAAC,CAAA;wBAC3E,cAAc,CAAC,OAAO,CAAC,MAAM,CAAC,CAAA;qBAC/B;iBACF;aACF;YAAC,OAAO,GAAQ,EAAE;gBACjB,IAAI,GAAG,CAAC,IAAI,KAAK,SAAS,IAAI,GAAG,CAAC,OAAO,KAAK,2BAA2B,EAAE;oBACzE,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,cAAc,CAAA;wBAC5B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;oBAED,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,gBAAgB,CAAA;wBAC9B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;iBACF;gBAED,sDAAsD;gBACtD,IAAI,GAAG,CAAC,IAAI,KAAK,sBAAsB,EAAE;oBACvC,GAAG,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,CAAC,CAAA;iBACtC;qBAAM;oBACL,GAAG,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;oBAC1C,IAAI;wBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;qBAChC;oBAAC,OAAO,GAAG,EAAE;wBACZ,GAAG,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;qBACzD;iBACF;gBAED,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;gBACtB,SAAS,CAAC,GAAG,CAAC,CAAA;gBACd,OAAM;aACP;YAED,IAAI;gBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;aAChC;YAAC,OAAO,GAAG,EAAE;gBACZ,GAAG,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;aACzD;YAED,SAAS,EAAE,CAAA;QACb,CAAC;QACD,MAAM,EAAE,QAAQ,CAAa;YAC3B,KAAK,EAAE,WAAW;SACnB,CAAC;QACF,QAAQ;QACR,EAAE,EAAE,UAAU;KACf,CAAA;IAED,OAAO,MAAM,CAAA;AACf,CAAC"} | ||
| {"version":3,"file":"stream.js","sourceRoot":"","sources":["../../src/stream.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAA;AACpD,OAAO,EAAE,QAAQ,EAAE,MAAM,aAAa,CAAA;AACtC,OAAO,OAAO,MAAM,UAAU,CAAA;AAC9B,OAAO,EAAE,YAAY,EAAE,MAAM,oBAAoB,CAAA;AACjD,OAAO,EAAE,SAAS,EAAE,MAAM,YAAY,CAAA;AACtC,OAAO,EAAE,qBAAqB,EAAE,oBAAoB,EAAE,MAAM,oBAAoB,CAAA;AAChF,OAAO,EAAE,UAAU,IAAI,oBAAoB,EAAE,MAAM,yBAAyB,CAAA;AAC5E,OAAO,EAAE,cAAc,EAAE,MAAM,gBAAgB,CAAA;AAC/C,OAAO,EAAE,MAAM,EAAE,MAAM,gBAAgB,CAAA;AAMvC,MAAM,GAAG,GAAG,MAAM,CAAC,qBAAqB,CAAC,CAAA;AAEzC,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AACvD,MAAM,sBAAsB,GAAG,wBAAwB,CAAA;AAWvD,MAAM,UAAU,YAAY,CAAE,OAAgB;IAC5C,MAAM,EAAE,EAAE,EAAE,IAAI,EAAE,IAAI,EAAE,KAAK,EAAE,IAAI,GAAG,WAAW,EAAE,UAAU,GAAG,YAAY,EAAE,GAAG,OAAO,CAAA;IAExF,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,eAAe,GAAG,IAAI,eAAe,EAAE,CAAA;IAC7C,MAAM,KAAK,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,qBAAqB,CAAC,CAAC,CAAC,oBAAoB,CAAA;IACjF,MAAM,UAAU,GAAG,IAAI,KAAK,WAAW,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAC,CAAC,CAAC,CAAC,IAAI,EAAE,EAAE,CAAA;IAC/D,MAAM,UAAU,GAAG,GAAG,IAAI,IAAI,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,IAAI,EAAE,CAAA;IAEhD,IAAI,WAAW,GAAG,KAAK,CAAA;IACvB,IAAI,SAAS,GAAG,KAAK,CAAA;IACrB,IAAI,MAAyB,CAAA;IAE7B,MAAM,QAAQ,GAAa;QACzB,IAAI,EAAE,IAAI,CAAC,GAAG,EAAE;KACjB,CAAA;IAED,MAAM,WAAW,GAAG,CAAC,GAAW,EAAE,EAAE;QAClC,IAAI,WAAW,EAAE;YACf,OAAM;SACP;QAED,WAAW,GAAG,IAAI,CAAA;QAClB,GAAG,CAAC,KAAK,CAAC,yBAAyB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAE3D,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,SAAS,EAAE;YACb,MAAM,CAAC,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAElC,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,SAAS,GAAG,CAAC,GAAW,EAAE,EAAE;QAChC,IAAI,SAAS,EAAE;YACb,OAAM;SACP;QAED,SAAS,GAAG,IAAI,CAAA;QAChB,GAAG,CAAC,KAAK,CAAC,iCAAiC,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;QAEnE,IAAI,GAAG,IAAI,IAAI,IAAI,MAAM,IAAI,IAAI,EAAE;YACjC,MAAM,GAAG,GAAG,CAAA;SACb;QAED,IAAI,WAAW,EAAE;YACf,QAAQ,CAAC,KAAK,GAAG,IAAI,CAAC,GAAG,EAAE,CAAA;YAE3B,IAAI,KAAK,IAAI,IAAI,EAAE;gBACjB,KAAK,CAAC,MAAM,CAAC,CAAA;aACd;SACF;IACH,CAAC,CAAA;IAED,MAAM,MAAM,GAAG;QACb,oBAAoB;QACpB,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,CAAC,MAAM,CAAC,GAAG,EAAE,CAAA;QACrB,CAAC;QACD,8CAA8C;QAC9C,KAAK,EAAE,CAAC,GAAW,EAAE,EAAE;YACrB,GAAG,CAAC,KAAK,CAAC,oBAAoB,EAAE,IAAI,EAAE,UAAU,EAAE,GAAG,CAAC,CAAA;YACtD,uCAAuC;YACvC,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,2DAA2D;QAC3D,KAAK,EAAE,GAAG,EAAE;YACV,MAAM,GAAG,GAAG,OAAO,CAAC,IAAI,KAAK,CAAC,cAAc,CAAC,EAAE,sBAAsB,CAAC,CAAA;YACtE,eAAe,CAAC,KAAK,EAAE,CAAA;YACvB,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;YACtB,SAAS,CAAC,GAAG,CAAC,CAAA;QAChB,CAAC;QACD,IAAI,EAAE,KAAK,EAAE,MAA0B,EAAE,EAAE;YACzC,MAAM,GAAG,eAAe,CAAC,MAAM,EAAE,SAAS,CAAC;gBACzC,eAAe,CAAC,MAAM;gBACtB,eAAe,CAAC,MAAM;aACvB,CAAC,CAAC,CAAA;YAEH,IAAI;gBACF,IAAI,IAAI,KAAK,WAAW,EAAE,EAAE,kCAAkC;oBAC5D,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,qBAAqB,CAAC,UAAU,EAAE,IAAI,EAAE,oBAAoB,CAAC,UAAU,CAAC,EAAE,CAAC,CAAA;iBAC7F;gBAED,MAAM,cAAc,GAAG,IAAI,cAAc,EAAE,CAAA;gBAE3C,IAAI,KAAK,EAAE,MAAM,IAAI,IAAI,MAAM,EAAE;oBAC/B,cAAc,CAAC,MAAM,CAAC,IAAI,CAAC,CAAA;oBAE3B,OAAO,cAAc,CAAC,MAAM,KAAK,CAAC,EAAE;wBAClC,IAAI,cAAc,CAAC,MAAM,IAAI,UAAU,EAAE;4BACvC,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,EAAE,EAAE,CAAC,CAAA;4BAClE,cAAc,CAAC,OAAO,CAAC,cAAc,CAAC,MAAM,CAAC,CAAA;4BAC7C,MAAK;yBACN;wBAED,MAAM,MAAM,GAAG,cAAc,CAAC,MAAM,GAAG,UAAU,CAAA;wBACjD,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,cAAc,CAAC,QAAQ,CAAC,CAAC,EAAE,MAAM,CAAC,EAAE,CAAC,CAAA;wBAC3E,cAAc,CAAC,OAAO,CAAC,MAAM,CAAC,CAAA;qBAC/B;iBACF;aACF;YAAC,OAAO,GAAQ,EAAE;gBACjB,IAAI,GAAG,CAAC,IAAI,KAAK,SAAS,IAAI,GAAG,CAAC,OAAO,KAAK,2BAA2B,EAAE;oBACzE,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,cAAc,CAAA;wBAC5B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;oBAED,IAAI,eAAe,CAAC,MAAM,CAAC,OAAO,EAAE;wBAClC,GAAG,CAAC,OAAO,GAAG,gBAAgB,CAAA;wBAC9B,GAAG,CAAC,IAAI,GAAG,sBAAsB,CAAA;qBAClC;iBACF;gBAED,sDAAsD;gBACtD,IAAI,GAAG,CAAC,IAAI,KAAK,sBAAsB,EAAE;oBACvC,GAAG,CAAC,KAAK,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,CAAC,CAAA;iBAC5C;qBAAM;oBACL,GAAG,CAAC,KAAK,CAAC,oBAAoB,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;oBAChD,IAAI;wBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;qBAChC;oBAAC,OAAO,GAAG,EAAE;wBACZ,GAAG,CAAC,KAAK,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;qBAC/D;iBACF;gBAED,MAAM,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,CAAC,CAAA;gBACtB,SAAS,CAAC,GAAG,CAAC,CAAA;gBACd,OAAM;aACP;YAED,IAAI;gBACF,IAAI,CAAC,EAAE,EAAE,EAAE,IAAI,EAAE,KAAK,CAAC,KAAK,EAAE,CAAC,CAAA;aAChC;YAAC,OAAO,GAAG,EAAE;gBACZ,GAAG,CAAC,KAAK,CAAC,kCAAkC,EAAE,IAAI,EAAE,IAAI,EAAE,GAAG,CAAC,CAAA;aAC/D;YAED,SAAS,EAAE,CAAA;QACb,CAAC;QACD,MAAM,EAAE,QAAQ,CAAa;YAC3B,KAAK,EAAE,WAAW;SACnB,CAAC;QACF,QAAQ;QACR,EAAE,EAAE,UAAU;KACf,CAAA;IAED,OAAO,MAAM,CAAA;AACf,CAAC"} |
+10
-10
| { | ||
| "name": "@libp2p/mplex", | ||
| "version": "1.0.1", | ||
| "version": "1.0.2", | ||
| "description": "JavaScript implementation of https://github.com/libp2p/mplex", | ||
@@ -130,6 +130,6 @@ "license": "Apache-2.0 OR MIT", | ||
| "lint": "aegir lint", | ||
| "dep-check": "aegir dep-check", | ||
| "dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js", | ||
| "build": "tsc", | ||
| "pretest": "npm run build", | ||
| "test": "aegir test -f ./dist/test", | ||
| "test": "aegir test -f \"./dist/test/**/*.js\"", | ||
| "test:chrome": "npm run test -- -t browser --cov", | ||
@@ -144,4 +144,4 @@ "test:chrome-webworker": "npm run test -- -t webworker", | ||
| "dependencies": { | ||
| "@libp2p/logger": "^1.0.3", | ||
| "@libp2p/tracked-map": "^1.0.1", | ||
| "@libp2p/logger": "^1.1.2", | ||
| "@libp2p/tracked-map": "^1.0.4", | ||
| "abortable-iterator": "^4.0.2", | ||
@@ -153,12 +153,12 @@ "any-signal": "^3.0.0", | ||
| "it-stream-types": "^1.0.4", | ||
| "uint8arraylist": "^1.2.0", | ||
| "uint8arraylist": "^1.4.0", | ||
| "varint": "^6.0.0" | ||
| }, | ||
| "devDependencies": { | ||
| "@libp2p/interface-compliance-tests": "^1.0.7", | ||
| "@libp2p/interfaces": "^1.1.1", | ||
| "@libp2p/interface-compliance-tests": "^1.1.16", | ||
| "@libp2p/interfaces": "^1.3.14", | ||
| "@types/varint": "^6.0.0", | ||
| "aegir": "^36.1.3", | ||
| "cborg": "^1.2.1", | ||
| "iso-random-stream": "^2.0.0", | ||
| "cborg": "^1.8.1", | ||
| "iso-random-stream": "^2.0.2", | ||
| "it-all": "^1.0.6", | ||
@@ -165,0 +165,0 @@ "it-drain": "^1.0.5", |
+8
-250
@@ -1,257 +0,15 @@ | ||
| import { pipe } from 'it-pipe' | ||
| import { Pushable, pushableV } from 'it-pushable' | ||
| import { abortableSource } from 'abortable-iterator' | ||
| import { encode } from './encode.js' | ||
| import { decode } from './decode.js' | ||
| import { restrictSize } from './restrict-size.js' | ||
| import { MessageTypes, MessageTypeNames, Message } from './message-types.js' | ||
| import { createStream } from './stream.js' | ||
| import { toString as uint8ArrayToString } from 'uint8arrays' | ||
| import { trackedMap } from '@libp2p/tracked-map' | ||
| import { logger } from '@libp2p/logger' | ||
| import type { Sink } from 'it-stream-types' | ||
| import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer' | ||
| import type { Stream } from '@libp2p/interfaces/connection' | ||
| import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics' | ||
| import each from 'it-foreach' | ||
| import type { Components } from '@libp2p/interfaces/components' | ||
| import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' | ||
| import { MplexStreamMuxer } from './mplex.js' | ||
| const log = logger('libp2p:mplex') | ||
| function printMessage (msg: Message) { | ||
| const output: any = { | ||
| ...msg, | ||
| type: `${MessageTypeNames[msg.type]} (${msg.type})` | ||
| } | ||
| if (msg.type === MessageTypes.NEW_STREAM) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice()) | ||
| } | ||
| if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16') | ||
| } | ||
| return output | ||
| } | ||
| export interface MplexStream extends Stream { | ||
| source: Pushable<Uint8Array> | ||
| } | ||
| export interface MplexOptions extends MuxerOptions { | ||
| export interface MplexInit extends StreamMuxerInit { | ||
| maxMsgSize?: number | ||
| metrics?: ComponentMetricsTracker | ||
| } | ||
| export class Mplex implements Muxer { | ||
| static multicodec = '/mplex/6.7.0' | ||
| export class Mplex implements StreamMuxerFactory { | ||
| public protocol = '/mplex/6.7.0' | ||
| public sink: Sink<Uint8Array> | ||
| public source: AsyncIterable<Uint8Array> | ||
| private _streamId: number | ||
| private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> } | ||
| private readonly _options: MplexOptions | ||
| private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void } | ||
| constructor (options?: MplexOptions) { | ||
| options = options ?? {} | ||
| this._streamId = 0 | ||
| this._streams = { | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| initiators: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'initiatorStreams' }), | ||
| /** | ||
| * Stream to ids map | ||
| */ | ||
| receivers: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'receiverStreams' }) | ||
| } | ||
| this._options = options | ||
| /** | ||
| * An iterable sink | ||
| */ | ||
| this.sink = this._createSink() | ||
| /** | ||
| * An iterable source | ||
| */ | ||
| const source = this._createSource() | ||
| this._source = source | ||
| this.source = source | ||
| createStreamMuxer (components: Components, init?: MplexInit) { | ||
| return new MplexStreamMuxer(components, init) | ||
| } | ||
| /** | ||
| * Returns a Map of streams and their ids | ||
| */ | ||
| get streams () { | ||
| // Inbound and Outbound streams may have the same ids, so we need to make those unique | ||
| const streams: Stream[] = [] | ||
| this._streams.initiators.forEach(stream => { | ||
| streams.push(stream) | ||
| }) | ||
| this._streams.receivers.forEach(stream => { | ||
| streams.push(stream) | ||
| }) | ||
| return streams | ||
| } | ||
| /** | ||
| * Initiate a new stream with the given name. If no name is | ||
| * provided, the id of the stream will be used. | ||
| */ | ||
| newStream (name?: string): Stream { | ||
| const id = this._streamId++ | ||
| name = name == null ? id.toString() : name.toString() | ||
| const registry = this._streams.initiators | ||
| return this._newStream({ id, name, type: 'initiator', registry }) | ||
| } | ||
| /** | ||
| * Called whenever an inbound stream is created | ||
| */ | ||
| _newReceiverStream (options: { id: number, name: string }) { | ||
| const { id, name } = options | ||
| const registry = this._streams.receivers | ||
| return this._newStream({ id, name, type: 'receiver', registry }) | ||
| } | ||
| _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) { | ||
| const { id, name, type, registry } = options | ||
| log('new %s stream %s %s', type, id, name) | ||
| if (registry.has(id)) { | ||
| throw new Error(`${type} stream ${id} already exists!`) | ||
| } | ||
| const send = (msg: Message) => { | ||
| if (log.enabled) { | ||
| log('%s stream %s send', type, id, printMessage(msg)) | ||
| } | ||
| if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { | ||
| msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice() | ||
| } | ||
| this._source.push(msg) | ||
| } | ||
| const onEnd = () => { | ||
| log('%s stream %s %s ended', type, id, name) | ||
| registry.delete(id) | ||
| if (this._options.onStreamEnd != null) { | ||
| this._options.onStreamEnd(stream) | ||
| } | ||
| } | ||
| const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize }) | ||
| registry.set(id, stream) | ||
| return stream | ||
| } | ||
| /** | ||
| * Creates a sink with an abortable source. Incoming messages will | ||
| * also have their size restricted. All messages will be varint decoded. | ||
| */ | ||
| _createSink () { | ||
| const sink: Sink<Uint8Array> = async source => { | ||
| if (this._options.signal != null) { | ||
| source = abortableSource(source, this._options.signal) | ||
| } | ||
| try { | ||
| await pipe( | ||
| source, | ||
| source => each(source, (buf) => { | ||
| // console.info('incoming', uint8ArrayToString(buf, 'base64')) | ||
| }), | ||
| decode, | ||
| restrictSize(this._options.maxMsgSize), | ||
| async source => { | ||
| for await (const msg of source) { | ||
| this._handleIncoming(msg) | ||
| } | ||
| } | ||
| ) | ||
| this._source.end() | ||
| } catch (err: any) { | ||
| log('error in sink', err) | ||
| this._source.end(err) // End the source with an error | ||
| } | ||
| } | ||
| return sink | ||
| } | ||
| /** | ||
| * Creates a source that restricts outgoing message sizes | ||
| * and varint encodes them | ||
| */ | ||
| _createSource () { | ||
| const onEnd = (err?: Error) => { | ||
| const { initiators, receivers } = this._streams | ||
| // Abort all the things! | ||
| for (const s of initiators.values()) { | ||
| s.abort(err) | ||
| } | ||
| for (const s of receivers.values()) { | ||
| s.abort(err) | ||
| } | ||
| } | ||
| const source = pushableV<Message>({ onEnd }) | ||
| return Object.assign(encode(source), { | ||
| push: source.push, | ||
| end: source.end, | ||
| return: source.return | ||
| }) | ||
| } | ||
| _handleIncoming (message: Message) { | ||
| const { id, type } = message | ||
| if (log.enabled) { | ||
| log('incoming message', printMessage(message)) | ||
| } | ||
| // Create a new stream? | ||
| if (message.type === MessageTypes.NEW_STREAM) { | ||
| const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) }) | ||
| if (this._options.onIncomingStream != null) { | ||
| this._options.onIncomingStream(stream) | ||
| } | ||
| return | ||
| } | ||
| const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers | ||
| const stream = list.get(id) | ||
| if (stream == null) { | ||
| return log('missing stream %s', id) | ||
| } | ||
| switch (type) { | ||
| case MessageTypes.MESSAGE_INITIATOR: | ||
| case MessageTypes.MESSAGE_RECEIVER: | ||
| stream.source.push(message.data.slice()) | ||
| break | ||
| case MessageTypes.CLOSE_INITIATOR: | ||
| case MessageTypes.CLOSE_RECEIVER: | ||
| stream.close() | ||
| break | ||
| case MessageTypes.RESET_INITIATOR: | ||
| case MessageTypes.RESET_RECEIVER: | ||
| stream.reset() | ||
| break | ||
| default: | ||
| log('unknown message type %s', type) | ||
| } | ||
| } | ||
| } |
+8
-8
@@ -13,3 +13,3 @@ import { abortableSource } from 'abortable-iterator' | ||
| import type { Source } from 'it-stream-types' | ||
| import type { MplexStream } from './index.js' | ||
| import type { MplexStream } from './mplex.js' | ||
@@ -53,3 +53,3 @@ const log = logger('libp2p:mplex:stream') | ||
| sourceEnded = true | ||
| log('%s stream %s source end', type, streamName, err) | ||
| log.trace('%s stream %s source end', type, streamName, err) | ||
@@ -75,3 +75,3 @@ if (err != null && endErr == null) { | ||
| sinkEnded = true | ||
| log('%s stream %s sink end - err: %o', type, streamName, err) | ||
| log.trace('%s stream %s sink end - err: %o', type, streamName, err) | ||
@@ -98,3 +98,3 @@ if (err != null && endErr == null) { | ||
| abort: (err?: Error) => { | ||
| log('%s stream %s abort', type, streamName, err) | ||
| log.trace('%s stream %s abort', type, streamName, err) | ||
| // End the source with the passed error | ||
@@ -155,9 +155,9 @@ stream.source.end(err) | ||
| if (err.code === ERR_MPLEX_STREAM_RESET) { | ||
| log('%s stream %s reset', type, name) | ||
| log.trace('%s stream %s reset', type, name) | ||
| } else { | ||
| log('%s stream %s error', type, name, err) | ||
| log.trace('%s stream %s error', type, name, err) | ||
| try { | ||
| send({ id, type: Types.RESET }) | ||
| } catch (err) { | ||
| log('%s stream %s error sending reset', type, name, err) | ||
| log.trace('%s stream %s error sending reset', type, name, err) | ||
| } | ||
@@ -174,3 +174,3 @@ } | ||
| } catch (err) { | ||
| log('%s stream %s error sending close', type, name, err) | ||
| log.trace('%s stream %s error sending close', type, name, err) | ||
| } | ||
@@ -177,0 +177,0 @@ |
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
82598
2.37%38
15.15%1365
2.02%Updated
Updated
Updated