Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@libp2p/interface

Package Overview
Dependencies
Maintainers
6
Versions
512
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@libp2p/interface - npm Package Compare versions

Comparing version 0.0.1 to 0.1.0-fdd80820

dist/src/pubsub/index.d.ts

205

dist/src/connection/index.d.ts

@@ -1,2 +0,1 @@

import type * as Status from './status.js';
import type { AbortOptions } from '../index.js';

@@ -8,32 +7,20 @@ import type { PeerId } from '../peer-id/index.js';

export interface ConnectionTimeline {
/**
* When the connection was opened
*/
open: number;
/**
* When the MultiaddrConnection was upgraded to a Connection - e.g. the type
* of connection encryption and multiplexing was negotiated.
*/
upgraded?: number;
/**
* When the connection was closed.
*/
close?: number;
}
/**
* Outbound conections are opened by the local node, inbound streams are opened by the remote
* Outbound connections are opened by the local node, inbound streams are opened by the remote
*/
export type Direction = 'inbound' | 'outbound';
export interface ConnectionStat {
/**
* Outbound conections are opened by the local node, inbound streams are opened by the remote
*/
direction: Direction;
/**
* Lifecycle times for the connection
*/
timeline: ConnectionTimeline;
/**
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object
*/
multiplexer?: string;
/**
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object
*/
encryption?: string;
/**
* The current status of the connection
*/
status: keyof typeof Status;
}
export interface StreamTimeline {

@@ -60,18 +47,30 @@ /**

reset?: number;
}
export interface StreamStat {
/**
* Outbound streams are opened by the local node, inbound streams are opened by the remote
* A timestamp of when the stream was aborted
*/
direction: Direction;
/**
* Lifecycle times for the stream
*/
timeline: StreamTimeline;
/**
* Once a protocol has been negotiated for this stream, it will be set on the stat object
*/
protocol?: string;
abort?: number;
}
/**
* The states a stream can be in
*/
export type StreamStatus = 'open' | 'closing' | 'closed' | 'aborted' | 'reset';
/**
* The states the readable end of a stream can be in
*
* ready - the readable end is ready for reading
* closing - the readable end is closing
* closed - the readable end has closed
*/
export type ReadStatus = 'ready' | 'closing' | 'closed';
/**
* The states the writable end of a stream can be in
*
* ready - the writable end is ready for writing
* writing - the writable end is in the process of being written to
* done - the source passed to the `.sink` function yielded all values without error
* closing - the writable end is closing
* closed - the writable end has closed
*/
export type WriteStatus = 'ready' | 'writing' | 'done' | 'closing' | 'closed';
/**
* A Stream is a data channel between two peers that

@@ -93,3 +92,3 @@ * can be written to and read from at both ends.

*/
close: () => void;
close: (options?: AbortOptions) => Promise<void>;
/**

@@ -102,3 +101,3 @@ * Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.

*/
closeRead: () => void;
closeRead: (options?: AbortOptions) => Promise<void>;
/**

@@ -109,3 +108,3 @@ * Closes the stream for **writing**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed.

*/
closeWrite: () => void;
closeWrite: (options?: AbortOptions) => Promise<void>;
/**

@@ -122,10 +121,2 @@ * Closes the stream for **reading** *and* **writing**. This should be called when a *local error* has occurred.

/**
* Closes the stream *immediately* for **reading** *and* **writing**. This should be called when a *remote error* has occurred.
*
* This function is called automatically by the muxer when it receives a `RESET` message from the remote.
*
* The sink will return and the source will throw.
*/
reset: () => void;
/**
* Unique identifier for a stream. Identifiers are not unique across muxers.

@@ -135,9 +126,29 @@ */

/**
* Stats about this stream
* Outbound streams are opened by the local node, inbound streams are opened by the remote
*/
stat: StreamStat;
direction: Direction;
/**
* Lifecycle times for the stream
*/
timeline: StreamTimeline;
/**
* Once a protocol has been negotiated for this stream, it will be set on the stat object
*/
protocol?: string;
/**
* User defined stream metadata
*/
metadata: Record<string, any>;
/**
* The current status of the stream
*/
status: StreamStatus;
/**
* The current status of the readable end of the stream
*/
readStatus: ReadStatus;
/**
* The current status of the writable end of the stream
*/
writeStatus: WriteStatus;
}

@@ -151,3 +162,9 @@ export interface NewStreamOptions extends AbortOptions {

maxOutboundStreams?: number;
/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
*/
runOnTransientConnection?: boolean;
}
export type ConnectionStatus = 'open' | 'closing' | 'closed';
/**

@@ -160,12 +177,70 @@ * A Connection is a high-level representation of a connection

export interface Connection {
/**
* The unique identifier for this connection
*/
id: string;
stat: ConnectionStat;
/**
* The address of the remote end of the connection
*/
remoteAddr: Multiaddr;
/**
* The id of the peer at the remote end of the connection
*/
remotePeer: PeerId;
/**
* A list of tags applied to this connection
*/
tags: string[];
/**
* A list of open streams on this connection
*/
streams: Stream[];
newStream: (multicodecs: string | string[], options?: NewStreamOptions) => Promise<Stream>;
/**
* Outbound conections are opened by the local node, inbound streams are opened by the remote
*/
direction: Direction;
/**
* Lifecycle times for the connection
*/
timeline: ConnectionTimeline;
/**
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object
*/
multiplexer?: string;
/**
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object
*/
encryption?: string;
/**
* The current status of the connection
*/
status: ConnectionStatus;
/**
* A transient connection is one that is not expected to be open for very long
* or one that cannot transfer very much data, such as one being used as a
* circuit relay connection. Protocols need to explicitly opt-in to being run
* over transient connections.
*/
transient: boolean;
/**
* Create a new stream on this connection and negotiate one of the passed protocols
*/
newStream: (protocols: string | string[], options?: NewStreamOptions) => Promise<Stream>;
/**
* Add a stream to this connection
*/
addStream: (stream: Stream) => void;
/**
* Remove a stream from this connection
*/
removeStream: (id: string) => void;
close: () => Promise<void>;
/**
* Gracefully close the connection. All queued data will be written to the
* underlying transport.
*/
close: (options?: AbortOptions) => Promise<void>;
/**
* Immediately close the connection, any queued data will be discarded
*/
abort: (err: Error) => void;
}

@@ -183,4 +258,14 @@ export declare const symbol: unique symbol;

export interface MultiaddrConnectionTimeline {
/**
* When the connection was opened
*/
open: number;
/**
* When the MultiaddrConnection was upgraded to a Connection - the type of
* connection encryption and multiplexing was negotiated.
*/
upgraded?: number;
/**
* When the connection was closed.
*/
close?: number;

@@ -194,6 +279,20 @@ }

export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
close: (err?: Error) => Promise<void>;
/**
* Gracefully close the connection. All queued data will be written to the
* underlying transport.
*/
close: (options?: AbortOptions) => Promise<void>;
/**
* Immediately close the connection, any queued data will be discarded
*/
abort: (err: Error) => void;
/**
* The address of the remote end of the connection
*/
remoteAddr: Multiaddr;
/**
* When connection lifecycle events occurred
*/
timeline: MultiaddrConnectionTimeline;
}
//# sourceMappingURL=index.d.ts.map

@@ -16,3 +16,3 @@ /**

*/
import type { Connection, Stream } from './connection/index.js';
import type { Connection, NewStreamOptions, Stream } from './connection/index.js';
import type { ContentRouting } from './content-routing/index.js';

@@ -463,6 +463,10 @@ import type { EventEmitter } from './events.js';

*/
dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: AbortOptions) => Promise<Stream>;
dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: NewStreamOptions) => Promise<Stream>;
/**
* Attempts to gracefully close an open connection to the given peer. If the connection is not closed in the grace period, it will be forcefully closed.
* Attempts to gracefully close an open connection to the given peer. If the
* connection is not closed in the grace period, it will be forcefully closed.
*
* An AbortSignal can optionally be passed to control when the connection is
* forcefully closed.
*
* @example

@@ -474,3 +478,3 @@ *

*/
hangUp: (peer: PeerId | Multiaddr) => Promise<void>;
hangUp: (peer: PeerId | Multiaddr, options?: AbortOptions) => Promise<void>;
/**

@@ -477,0 +481,0 @@ * Sets up [multistream-select routing](https://github.com/multiformats/multistream-select) of protocols to their application handlers. Whenever a stream is opened on one of the provided protocols, the handler will be called. `handle` must be called in order to register a handler and support for a given protocol. This also informs other peers of the protocols you support.

@@ -18,2 +18,7 @@ import type { Connection, Stream } from '../connection/index.js';

maxOutboundStreams?: number;
/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
*/
runOnTransientConnection?: boolean;
}

@@ -20,0 +25,0 @@ export interface StreamHandlerRecord {

@@ -35,5 +35,9 @@ import type { Direction, Stream } from '../connection/index.js';

*/
close: (err?: Error) => void;
close: (options?: AbortOptions) => Promise<void>;
/**
* Close or abort all tracked streams and stop the muxer
*/
abort: (err: Error) => void;
}
export interface StreamMuxerInit extends AbortOptions {
export interface StreamMuxerInit {
/**

@@ -40,0 +44,0 @@ * A callback function invoked every time an incoming stream is opened

import { Uint8ArrayList } from 'uint8arraylist';
import type { Direction, Stream, StreamStat } from '../connection/index.js';
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js';
import type { AbortOptions } from '../index.js';
import type { Source } from 'it-stream-types';
interface Logger {
(formatter: any, ...args: any[]): void;
error: (formatter: any, ...args: any[]) => void;
trace: (formatter: any, ...args: any[]) => void;
enabled: boolean;
}
export interface AbstractStreamInit {

@@ -14,6 +21,5 @@ /**

/**
* The maximum allowable data size, any data larger than this will be
* chunked and sent in multiple data messages
* A Logger implementation used to log stream-specific information
*/
maxDataSize: number;
log: Logger;
/**

@@ -27,28 +33,80 @@ * User specific stream metadata

onEnd?: (err?: Error | undefined) => void;
/**
* Invoked when the readable end of the stream is closed
*/
onCloseRead?: () => void;
/**
* Invoked when the writable end of the stream is closed
*/
onCloseWrite?: () => void;
/**
* Invoked when the the stream has been reset by the remote
*/
onReset?: () => void;
/**
* Invoked when the the stream has errored
*/
onAbort?: (err: Error) => void;
/**
* How long to wait in ms for stream data to be written to the underlying
* connection when closing the writable end of the stream. (default: 500)
*/
closeTimeout?: number;
}
export declare abstract class AbstractStream implements Stream {
id: string;
stat: StreamStat;
direction: Direction;
timeline: StreamTimeline;
protocol?: string;
metadata: Record<string, unknown>;
source: AsyncGenerator<Uint8ArrayList, void, unknown>;
private readonly abortController;
private readonly resetController;
private readonly closeController;
private sourceEnded;
private sinkEnded;
private sinkSunk;
status: StreamStatus;
readStatus: ReadStatus;
writeStatus: WriteStatus;
private readonly sinkController;
private readonly sinkEnd;
private endErr;
private readonly streamSource;
private readonly onEnd?;
private readonly maxDataSize;
private readonly onCloseRead?;
private readonly onCloseWrite?;
private readonly onReset?;
private readonly onAbort?;
protected readonly log: Logger;
constructor(init: AbstractStreamInit);
sink(source: Source<Uint8ArrayList | Uint8Array>): Promise<void>;
protected onSourceEnd(err?: Error): void;
protected onSinkEnd(err?: Error): void;
close(): void;
closeRead(): void;
closeWrite(): void;
close(options?: AbortOptions): Promise<void>;
closeRead(options?: AbortOptions): Promise<void>;
closeWrite(options?: AbortOptions): Promise<void>;
/**
* Close immediately for reading and writing and send a reset message (local
* error)
*/
abort(err: Error): void;
/**
* Receive a reset message - close immediately for reading and writing (remote
* error)
*/
reset(): void;
sink(source: Source<Uint8ArrayList | Uint8Array>): Promise<void>;
_closeSinkAndSource(err?: Error): void;
_closeSink(err?: Error): void;
_closeSource(err?: Error): void;
/**
* The remote closed for writing so we should expect to receive no more
* messages
*/
remoteCloseWrite(): void;
/**
* The remote closed for reading so we should not send any more
* messages
*/
remoteCloseRead(): void;
/**
* The underlying muxer has closed, no more messages can be sent or will
* be received, close immediately to free up resources
*/
destroy(): void;
/**
* When an extending class reads data from it's implementation-specific source,

@@ -67,11 +125,11 @@ * call this method to allow the stream consumer to read the data.

*/
abstract sendNewStream(): void | Promise<void>;
abstract sendNewStream(options?: AbortOptions): void | Promise<void>;
/**
* Send a data message to the remote muxer
*/
abstract sendData(buf: Uint8ArrayList): void | Promise<void>;
abstract sendData(buf: Uint8ArrayList, options?: AbortOptions): void | Promise<void>;
/**
* Send a reset message to the remote muxer
*/
abstract sendReset(): void | Promise<void>;
abstract sendReset(options?: AbortOptions): void | Promise<void>;
/**

@@ -81,3 +139,3 @@ * Send a message to the remote muxer, informing them no more data messages

*/
abstract sendCloseWrite(): void | Promise<void>;
abstract sendCloseWrite(options?: AbortOptions): void | Promise<void>;
/**

@@ -87,4 +145,5 @@ * Send a message to the remote muxer, informing them no more data messages

*/
abstract sendCloseRead(): void | Promise<void>;
abstract sendCloseRead(options?: AbortOptions): void | Promise<void>;
}
export {};
//# sourceMappingURL=stream.d.ts.map

@@ -1,15 +0,8 @@

// import { logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator';
import { anySignal } from 'any-signal';
import { pushable } from 'it-pushable';
import defer, {} from 'p-defer';
import { Uint8ArrayList } from 'uint8arraylist';
import { CodeError } from '../errors.js';
// const log = logger('libp2p:stream')
const log = () => { };
log.trace = () => { };
log.error = () => { };
const ERR_STREAM_RESET = 'ERR_STREAM_RESET';
const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT';
const ERR_SINK_ENDED = 'ERR_SINK_ENDED';
const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK';
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE';
function isPromise(res) {

@@ -20,44 +13,49 @@ return res != null && typeof res.then === 'function';

id;
stat;
direction;
timeline;
protocol;
metadata;
source;
abortController;
resetController;
closeController;
sourceEnded;
sinkEnded;
sinkSunk;
status;
readStatus;
writeStatus;
sinkController;
sinkEnd;
endErr;
streamSource;
onEnd;
maxDataSize;
onCloseRead;
onCloseWrite;
onReset;
onAbort;
log;
constructor(init) {
this.abortController = new AbortController();
this.resetController = new AbortController();
this.closeController = new AbortController();
this.sourceEnded = false;
this.sinkEnded = false;
this.sinkSunk = false;
this.sinkController = new AbortController();
this.sinkEnd = defer();
this.log = init.log;
// stream status
this.status = 'open';
this.readStatus = 'ready';
this.writeStatus = 'ready';
this.id = init.id;
this.metadata = init.metadata ?? {};
this.stat = {
direction: init.direction,
timeline: {
open: Date.now()
}
this.direction = init.direction;
this.timeline = {
open: Date.now()
};
this.maxDataSize = init.maxDataSize;
this.onEnd = init.onEnd;
this.onCloseRead = init?.onCloseRead;
this.onCloseWrite = init?.onCloseWrite;
this.onReset = init?.onReset;
this.onAbort = init?.onAbort;
this.source = this.streamSource = pushable({
onEnd: () => {
// already sent a reset message
if (this.stat.timeline.reset !== null) {
const res = this.sendCloseRead();
if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close read', err);
});
}
onEnd: (err) => {
if (err != null) {
this.log.trace('source ended with error', err);
}
this.onSourceEnd();
else {
this.log.trace('source ended');
}
this.readStatus = 'closed';
this.onSourceEnd(err);
}

@@ -68,14 +66,56 @@ });

}
async sink(source) {
if (this.writeStatus !== 'ready') {
throw new CodeError(`writable end state is "${this.writeStatus}" not "ready"`, ERR_SINK_INVALID_STATE);
}
try {
this.writeStatus = 'writing';
const options = {
signal: this.sinkController.signal
};
if (this.direction === 'outbound') { // If initiator, open a new stream
const res = this.sendNewStream(options);
if (isPromise(res)) {
await res;
}
}
source = abortableSource(source, this.sinkController.signal, {
returnOnAbort: true
});
this.log.trace('sink reading from source');
for await (let data of source) {
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data;
const res = this.sendData(data, options);
if (isPromise(res)) { // eslint-disable-line max-depth
await res;
}
}
this.log.trace('sink finished reading from source');
this.writeStatus = 'done';
this.log.trace('sink calling closeWrite');
await this.closeWrite(options);
this.onSinkEnd();
}
catch (err) {
this.log.trace('sink ended with error, calling abort with error', err);
this.abort(err);
throw err;
}
finally {
this.log.trace('resolve sink end');
this.sinkEnd.resolve();
}
}
onSourceEnd(err) {
if (this.sourceEnded) {
if (this.timeline.closeRead != null) {
return;
}
this.stat.timeline.closeRead = Date.now();
this.sourceEnded = true;
log.trace('%s stream %s source end - err: %o', this.stat.direction, this.id, err);
this.timeline.closeRead = Date.now();
if (err != null && this.endErr == null) {
this.endErr = err;
}
if (this.sinkEnded) {
this.stat.timeline.close = Date.now();
this.onCloseRead?.();
if (this.timeline.closeWrite != null) {
this.log.trace('source and sink ended');
this.timeline.close = Date.now();
if (this.onEnd != null) {

@@ -85,15 +125,18 @@ this.onEnd(this.endErr);

}
else {
this.log.trace('source ended, waiting for sink to end');
}
}
onSinkEnd(err) {
if (this.sinkEnded) {
if (this.timeline.closeWrite != null) {
return;
}
this.stat.timeline.closeWrite = Date.now();
this.sinkEnded = true;
log.trace('%s stream %s sink end - err: %o', this.stat.direction, this.id, err);
this.timeline.closeWrite = Date.now();
if (err != null && this.endErr == null) {
this.endErr = err;
}
if (this.sourceEnded) {
this.stat.timeline.close = Date.now();
this.onCloseWrite?.();
if (this.timeline.closeRead != null) {
this.log.trace('sink and source ended');
this.timeline.close = Date.now();
if (this.onEnd != null) {

@@ -103,141 +146,154 @@ this.onEnd(this.endErr);

}
else {
this.log.trace('sink ended, waiting for source to end');
}
}
// Close for both Reading and Writing
close() {
log.trace('%s stream %s close', this.stat.direction, this.id);
this.closeRead();
this.closeWrite();
async close(options) {
this.log.trace('closing gracefully');
this.status = 'closing';
await Promise.all([
this.closeRead(options),
this.closeWrite(options)
]);
this.status = 'closed';
this.log.trace('closed gracefully');
}
// Close for reading
closeRead() {
log.trace('%s stream %s closeRead', this.stat.direction, this.id);
if (this.sourceEnded) {
async closeRead(options = {}) {
if (this.readStatus === 'closing' || this.readStatus === 'closed') {
return;
}
this.streamSource.end();
this.log.trace('closing readable end of stream with starting read status "%s"', this.readStatus);
const readStatus = this.readStatus;
this.readStatus = 'closing';
if (readStatus === 'ready') {
this.log.trace('ending internal source queue');
this.streamSource.end();
}
if (this.status !== 'reset' && this.status !== 'aborted') {
this.log.trace('send close read to remote');
await this.sendCloseRead(options);
}
this.log.trace('closed readable end of stream');
}
// Close for writing
closeWrite() {
log.trace('%s stream %s closeWrite', this.stat.direction, this.id);
if (this.sinkEnded) {
async closeWrite(options = {}) {
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') {
return;
}
this.closeController.abort();
try {
// need to call this here as the sink method returns in the catch block
// when the close controller is aborted
const res = this.sendCloseWrite();
if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close write', err);
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus);
const writeStatus = this.writeStatus;
if (this.writeStatus === 'ready') {
this.log.trace('sink was never sunk, sink an empty array');
await this.sink([]);
}
this.writeStatus = 'closing';
if (writeStatus === 'writing') {
// stop reading from the source passed to `.sink` in the microtask queue
// - this lets any data queued by the user in the current tick get read
// before we exit
await new Promise((resolve, reject) => {
queueMicrotask(() => {
this.log.trace('aborting source passed to .sink');
this.sinkController.abort();
this.sinkEnd.promise.then(resolve, reject);
});
}
});
}
catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err);
if (this.status !== 'reset' && this.status !== 'aborted') {
this.log.trace('send close write to remote');
await this.sendCloseWrite(options);
}
this.onSinkEnd();
this.writeStatus = 'closed';
this.log.trace('closed writable end of stream');
}
// Close for reading and writing (local error)
/**
* Close immediately for reading and writing and send a reset message (local
* error)
*/
abort(err) {
log.trace('%s stream %s abort', this.stat.direction, this.id, err);
// End the source with the passed error
this.streamSource.end(err);
this.abortController.abort();
this.onSinkEnd(err);
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') {
return;
}
this.log('abort with error', err);
// try to send a reset message
this.log('try to send reset to remote');
const res = this.sendReset();
if (isPromise(res)) {
res.catch((err) => {
this.log.error('error sending reset message', err);
});
}
this.status = 'aborted';
this.timeline.abort = Date.now();
this._closeSinkAndSource(err);
this.onAbort?.(err);
}
// Close immediately for reading and writing (remote error)
/**
* Receive a reset message - close immediately for reading and writing (remote
* error)
*/
reset() {
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') {
return;
}
const err = new CodeError('stream reset', ERR_STREAM_RESET);
this.resetController.abort();
this.streamSource.end(err);
this.status = 'reset';
this._closeSinkAndSource(err);
this.onReset?.();
}
_closeSinkAndSource(err) {
this._closeSink(err);
this._closeSource(err);
}
_closeSink(err) {
// if the sink function is running, cause it to end
if (this.writeStatus === 'writing') {
this.log.trace('end sink source');
this.sinkController.abort();
}
this.onSinkEnd(err);
}
async sink(source) {
if (this.sinkSunk) {
throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK);
}
this.sinkSunk = true;
if (this.sinkEnded) {
throw new CodeError('stream closed for writing', ERR_SINK_ENDED);
}
const signal = anySignal([
this.abortController.signal,
this.resetController.signal,
this.closeController.signal
]);
try {
source = abortableSource(source, signal);
if (this.stat.direction === 'outbound') { // If initiator, open a new stream
const res = this.sendNewStream();
if (isPromise(res)) {
await res;
}
}
for await (let data of source) {
while (data.length > 0) {
if (data.length <= this.maxDataSize) {
const res = this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data);
if (isPromise(res)) { // eslint-disable-line max-depth
await res;
}
break;
}
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data;
const res = this.sendData(data.sublist(0, this.maxDataSize));
if (isPromise(res)) {
await res;
}
data.consume(this.maxDataSize);
}
}
}
catch (err) {
if (err.type === 'aborted' && err.message === 'The operation was aborted') {
if (this.closeController.signal.aborted) {
return;
}
if (this.resetController.signal.aborted) {
err.message = 'stream reset';
err.code = ERR_STREAM_RESET;
}
if (this.abortController.signal.aborted) {
err.message = 'stream aborted';
err.code = ERR_STREAM_ABORT;
}
}
// Send no more data if this stream was remotely reset
if (err.code === ERR_STREAM_RESET) {
log.trace('%s stream %s reset', this.stat.direction, this.id);
}
else {
log.trace('%s stream %s error', this.stat.direction, this.id, err);
try {
const res = this.sendReset();
if (isPromise(res)) {
await res;
}
this.stat.timeline.reset = Date.now();
}
catch (err) {
log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err);
}
}
_closeSource(err) {
// if the source is not ending, end it
if (this.readStatus !== 'closing' && this.readStatus !== 'closed') {
this.log.trace('ending source with %d bytes to be read by consumer', this.streamSource.readableLength);
this.readStatus = 'closing';
this.streamSource.end(err);
this.onSinkEnd(err);
throw err;
}
finally {
signal.clear();
}
/**
* The remote closed for writing so we should expect to receive no more
* messages
*/
remoteCloseWrite() {
if (this.readStatus === 'closing' || this.readStatus === 'closed') {
this.log('received remote close write but local source is already closed');
return;
}
try {
const res = this.sendCloseWrite();
if (isPromise(res)) {
await res;
}
this.log.trace('remote close write');
this._closeSource();
}
/**
* The remote closed for reading so we should not send any more
* messages
*/
remoteCloseRead() {
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') {
this.log('received remote close read but local sink is already closed');
return;
}
catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err);
this.log.trace('remote close read');
this._closeSink();
}
/**
* The underlying muxer has closed, no more messages can be sent or will
* be received, close immediately to free up resources
*/
destroy() {
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') {
this.log('received destroy but we are already closed');
return;
}
this.onSinkEnd();
this.log.trace('muxer destroyed');
this._closeSinkAndSource();
}

@@ -244,0 +300,0 @@ /**

@@ -85,2 +85,7 @@ import type { Connection, MultiaddrConnection } from '../connection/index.js';

muxerFactory?: StreamMuxerFactory;
/**
* The passed MultiaddrConnection has limits place on duration and/or data
* transfer amounts so is not expected to be open for very long.
*/
transient?: boolean;
}

@@ -87,0 +92,0 @@ export interface Upgrader {

{
"name": "@libp2p/interface",
"version": "0.0.1",
"version": "0.1.0-fdd80820",
"description": "The interface implemented by a libp2p node",

@@ -59,6 +59,2 @@ "license": "Apache-2.0 OR MIT",

},
"./connection/status": {
"types": "./dist/src/connection/status.d.ts",
"import": "./dist/src/connection/status.js"
},
"./content-routing": {

@@ -116,2 +112,6 @@ "types": "./dist/src/content-routing/index.d.ts",

},
"./pubsub": {
"types": "./dist/src/pubsub/index.d.ts",
"import": "./dist/src/pubsub/index.js"
},
"./record": {

@@ -164,6 +164,6 @@ "types": "./dist/src/record/index.d.ts",

"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"it-pushable": "^3.1.3",
"it-pushable": "^3.2.0",
"it-stream-types": "^2.0.1",
"multiformats": "^12.0.1",
"p-defer": "^4.0.0",
"uint8arraylist": "^2.4.3"

@@ -173,9 +173,6 @@ },

"@types/sinon": "^10.0.15",
"aegir": "^39.0.10",
"aegir": "^40.0.1",
"sinon": "^15.1.2",
"sinon-ts": "^1.0.0"
},
"typedoc": {
"entryPoint": "./src/index.ts"
}
}

@@ -1,2 +0,1 @@

import type * as Status from './status.js'
import type { AbortOptions } from '../index.js'

@@ -9,39 +8,24 @@ import type { PeerId } from '../peer-id/index.js'

export interface ConnectionTimeline {
open: number
upgraded?: number
close?: number
}
/**
* Outbound conections are opened by the local node, inbound streams are opened by the remote
*/
export type Direction = 'inbound' | 'outbound'
export interface ConnectionStat {
/**
* Outbound conections are opened by the local node, inbound streams are opened by the remote
* When the connection was opened
*/
direction: Direction
open: number
/**
* Lifecycle times for the connection
* When the MultiaddrConnection was upgraded to a Connection - e.g. the type
* of connection encryption and multiplexing was negotiated.
*/
timeline: ConnectionTimeline
upgraded?: number
/**
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object
* When the connection was closed.
*/
multiplexer?: string
close?: number
}
/**
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object
*/
encryption?: string
/**
* Outbound connections are opened by the local node, inbound streams are opened by the remote
*/
export type Direction = 'inbound' | 'outbound'
/**
* The current status of the connection
*/
status: keyof typeof Status
}
export interface StreamTimeline {

@@ -72,22 +56,35 @@ /**

reset?: number
}
export interface StreamStat {
/**
* Outbound streams are opened by the local node, inbound streams are opened by the remote
* A timestamp of when the stream was aborted
*/
direction: Direction
abort?: number
}
/**
* Lifecycle times for the stream
*/
timeline: StreamTimeline
/**
* The states a stream can be in
*/
export type StreamStatus = 'open' | 'closing' | 'closed' | 'aborted' | 'reset'
/**
* Once a protocol has been negotiated for this stream, it will be set on the stat object
*/
protocol?: string
}
/**
* The states the readable end of a stream can be in
*
* ready - the readable end is ready for reading
* closing - the readable end is closing
* closed - the readable end has closed
*/
export type ReadStatus = 'ready' | 'closing' | 'closed'
/**
* The states the writable end of a stream can be in
*
* ready - the writable end is ready for writing
* writing - the writable end is in the process of being written to
* done - the source passed to the `.sink` function yielded all values without error
* closing - the writable end is closing
* closed - the writable end has closed
*/
export type WriteStatus = 'ready' | 'writing' | 'done' | 'closing' | 'closed'
/**
* A Stream is a data channel between two peers that

@@ -109,3 +106,3 @@ * can be written to and read from at both ends.

*/
close: () => void
close: (options?: AbortOptions) => Promise<void>

@@ -119,3 +116,3 @@ /**

*/
closeRead: () => void
closeRead: (options?: AbortOptions) => Promise<void>

@@ -127,3 +124,3 @@ /**

*/
closeWrite: () => void
closeWrite: (options?: AbortOptions) => Promise<void>

@@ -142,24 +139,40 @@ /**

/**
* Closes the stream *immediately* for **reading** *and* **writing**. This should be called when a *remote error* has occurred.
*
* This function is called automatically by the muxer when it receives a `RESET` message from the remote.
*
* The sink will return and the source will throw.
* Unique identifier for a stream. Identifiers are not unique across muxers.
*/
reset: () => void
id: string
/**
* Unique identifier for a stream. Identifiers are not unique across muxers.
* Outbound streams are opened by the local node, inbound streams are opened by the remote
*/
id: string
direction: Direction
/**
* Stats about this stream
* Lifecycle times for the stream
*/
stat: StreamStat
timeline: StreamTimeline
/**
* Once a protocol has been negotiated for this stream, it will be set on the stat object
*/
protocol?: string
/**
* User defined stream metadata
*/
metadata: Record<string, any>
/**
* The current status of the stream
*/
status: StreamStatus
/**
* The current status of the readable end of the stream
*/
readStatus: ReadStatus
/**
* The current status of the writable end of the stream
*/
writeStatus: WriteStatus
}

@@ -174,4 +187,12 @@

maxOutboundStreams?: number
/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
*/
runOnTransientConnection?: boolean
}
export type ConnectionStatus = 'open' | 'closing' | 'closed'
/**

@@ -184,13 +205,85 @@ * A Connection is a high-level representation of a connection

export interface Connection {
/**
* The unique identifier for this connection
*/
id: string
stat: ConnectionStat
/**
* The address of the remote end of the connection
*/
remoteAddr: Multiaddr
/**
* The id of the peer at the remote end of the connection
*/
remotePeer: PeerId
/**
* A list of tags applied to this connection
*/
tags: string[]
/**
* A list of open streams on this connection
*/
streams: Stream[]
newStream: (multicodecs: string | string[], options?: NewStreamOptions) => Promise<Stream>
/**
* Outbound conections are opened by the local node, inbound streams are opened by the remote
*/
direction: Direction
/**
* Lifecycle times for the connection
*/
timeline: ConnectionTimeline
/**
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object
*/
multiplexer?: string
/**
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object
*/
encryption?: string
/**
* The current status of the connection
*/
status: ConnectionStatus
/**
* A transient connection is one that is not expected to be open for very long
* or one that cannot transfer very much data, such as one being used as a
* circuit relay connection. Protocols need to explicitly opt-in to being run
* over transient connections.
*/
transient: boolean
/**
* Create a new stream on this connection and negotiate one of the passed protocols
*/
newStream: (protocols: string | string[], options?: NewStreamOptions) => Promise<Stream>
/**
* Add a stream to this connection
*/
addStream: (stream: Stream) => void
/**
* Remove a stream from this connection
*/
removeStream: (id: string) => void
close: () => Promise<void>
/**
* Gracefully close the connection. All queued data will be written to the
* underlying transport.
*/
close: (options?: AbortOptions) => Promise<void>
/**
* Immediately close the connection, any queued data will be discarded
*/
abort: (err: Error) => void
}

@@ -205,3 +298,2 @@

export interface ConnectionProtector {
/**

@@ -216,4 +308,16 @@ * Takes a given Connection and creates a private encryption stream

export interface MultiaddrConnectionTimeline {
/**
* When the connection was opened
*/
open: number
/**
* When the MultiaddrConnection was upgraded to a Connection - the type of
* connection encryption and multiplexing was negotiated.
*/
upgraded?: number
/**
* When the connection was closed.
*/
close?: number

@@ -228,5 +332,22 @@ }

export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
close: (err?: Error) => Promise<void>
/**
* Gracefully close the connection. All queued data will be written to the
* underlying transport.
*/
close: (options?: AbortOptions) => Promise<void>
/**
* Immediately close the connection, any queued data will be discarded
*/
abort: (err: Error) => void
/**
* The address of the remote end of the connection
*/
remoteAddr: Multiaddr
/**
* When connection lifecycle events occurred
*/
timeline: MultiaddrConnectionTimeline
}

@@ -1,2 +0,1 @@

/**

@@ -3,0 +2,0 @@ * When this error is thrown it means an operation was aborted,

@@ -17,3 +17,3 @@ /**

import type { Connection, Stream } from './connection/index.js'
import type { Connection, NewStreamOptions, Stream } from './connection/index.js'
import type { ContentRouting } from './content-routing/index.js'

@@ -507,7 +507,11 @@ import type { EventEmitter } from './events.js'

*/
dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: AbortOptions) => Promise<Stream>
dialProtocol: (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options?: NewStreamOptions) => Promise<Stream>
/**
* Attempts to gracefully close an open connection to the given peer. If the connection is not closed in the grace period, it will be forcefully closed.
* Attempts to gracefully close an open connection to the given peer. If the
* connection is not closed in the grace period, it will be forcefully closed.
*
* An AbortSignal can optionally be passed to control when the connection is
* forcefully closed.
*
* @example

@@ -519,3 +523,3 @@ *

*/
hangUp: (peer: PeerId | Multiaddr) => Promise<void>
hangUp: (peer: PeerId | Multiaddr, options?: AbortOptions) => Promise<void>

@@ -522,0 +526,0 @@ /**

@@ -1,2 +0,1 @@

export interface PublicKey {

@@ -3,0 +2,0 @@ readonly bytes: Uint8Array

@@ -1,2 +0,1 @@

export const KEEP_ALIVE = 'keep-alive'

@@ -1,2 +0,1 @@

/**

@@ -3,0 +2,0 @@ * Implemented by components that have a lifecycle

@@ -22,2 +22,8 @@ import type { Connection, Stream } from '../connection/index.js'

maxOutboundStreams?: number
/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
*/
runOnTransientConnection?: boolean
}

@@ -24,0 +30,0 @@

@@ -40,6 +40,11 @@ import type { Direction, Stream } from '../connection/index.js'

*/
close: (err?: Error) => void
close: (options?: AbortOptions) => Promise<void>
/**
* Close or abort all tracked streams and stop the muxer
*/
abort: (err: Error) => void
}
export interface StreamMuxerInit extends AbortOptions {
export interface StreamMuxerInit {
/**

@@ -46,0 +51,0 @@ * A callback function invoked every time an incoming stream is opened

@@ -1,20 +0,19 @@

// import { logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { anySignal } from 'any-signal'
import { type Pushable, pushable } from 'it-pushable'
import defer, { type DeferredPromise } from 'p-defer'
import { Uint8ArrayList } from 'uint8arraylist'
import { CodeError } from '../errors.js'
import type { Direction, Stream, StreamStat } from '../connection/index.js'
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js'
import type { AbortOptions } from '../index.js'
import type { Source } from 'it-stream-types'
// const log = logger('libp2p:stream')
interface Logger {
(formatter: any, ...args: any[]): void
error: (formatter: any, ...args: any[]) => void
trace: (formatter: any, ...args: any[]) => void
enabled: boolean
}
const log: any = () => {}
log.trace = () => {}
log.error = () => {}
const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'
const ERR_SINK_ENDED = 'ERR_SINK_ENDED'
const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK'
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'

@@ -33,6 +32,5 @@ export interface AbstractStreamInit {

/**
* The maximum allowable data size, any data larger than this will be
* chunked and sent in multiple data messages
* A Logger implementation used to log stream-specific information
*/
maxDataSize: number
log: Logger

@@ -48,2 +46,28 @@ /**

onEnd?: (err?: Error | undefined) => void
/**
* Invoked when the readable end of the stream is closed
*/
onCloseRead?: () => void
/**
* Invoked when the writable end of the stream is closed
*/
onCloseWrite?: () => void
/**
* Invoked when the the stream has been reset by the remote
*/
onReset?: () => void
/**
* Invoked when the the stream has errored
*/
onAbort?: (err: Error) => void
/**
* How long to wait in ms for stream data to be written to the underlying
* connection when closing the writable end of the stream. (default: 500)
*/
closeTimeout?: number
}

@@ -57,50 +81,56 @@

public id: string
public stat: StreamStat
public direction: Direction
public timeline: StreamTimeline
public protocol?: string
public metadata: Record<string, unknown>
public source: AsyncGenerator<Uint8ArrayList, void, unknown>
public status: StreamStatus
public readStatus: ReadStatus
public writeStatus: WriteStatus
private readonly abortController: AbortController
private readonly resetController: AbortController
private readonly closeController: AbortController
private sourceEnded: boolean
private sinkEnded: boolean
private sinkSunk: boolean
private readonly sinkController: AbortController
private readonly sinkEnd: DeferredPromise<void>
private endErr: Error | undefined
private readonly streamSource: Pushable<Uint8ArrayList>
private readonly onEnd?: (err?: Error | undefined) => void
private readonly maxDataSize: number
private readonly onCloseRead?: () => void
private readonly onCloseWrite?: () => void
private readonly onReset?: () => void
private readonly onAbort?: (err: Error) => void
protected readonly log: Logger
constructor (init: AbstractStreamInit) {
this.abortController = new AbortController()
this.resetController = new AbortController()
this.closeController = new AbortController()
this.sourceEnded = false
this.sinkEnded = false
this.sinkSunk = false
this.sinkController = new AbortController()
this.sinkEnd = defer()
this.log = init.log
// stream status
this.status = 'open'
this.readStatus = 'ready'
this.writeStatus = 'ready'
this.id = init.id
this.metadata = init.metadata ?? {}
this.stat = {
direction: init.direction,
timeline: {
open: Date.now()
}
this.direction = init.direction
this.timeline = {
open: Date.now()
}
this.maxDataSize = init.maxDataSize
this.onEnd = init.onEnd
this.onCloseRead = init?.onCloseRead
this.onCloseWrite = init?.onCloseWrite
this.onReset = init?.onReset
this.onAbort = init?.onAbort
this.source = this.streamSource = pushable<Uint8ArrayList>({
onEnd: () => {
// already sent a reset message
if (this.stat.timeline.reset !== null) {
const res = this.sendCloseRead()
if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close read', err)
})
}
onEnd: (err) => {
if (err != null) {
this.log.trace('source ended with error', err)
} else {
this.log.trace('source ended')
}
this.onSourceEnd()
this.readStatus = 'closed'
this.onSourceEnd(err)
}

@@ -113,10 +143,61 @@ })

async sink (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> {
if (this.writeStatus !== 'ready') {
throw new CodeError(`writable end state is "${this.writeStatus}" not "ready"`, ERR_SINK_INVALID_STATE)
}
try {
this.writeStatus = 'writing'
const options: AbortOptions = {
signal: this.sinkController.signal
}
if (this.direction === 'outbound') { // If initiator, open a new stream
const res = this.sendNewStream(options)
if (isPromise(res)) {
await res
}
}
source = abortableSource(source, this.sinkController.signal, {
returnOnAbort: true
})
this.log.trace('sink reading from source')
for await (let data of source) {
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
const res = this.sendData(data, options)
if (isPromise(res)) { // eslint-disable-line max-depth
await res
}
}
this.log.trace('sink finished reading from source')
this.writeStatus = 'done'
this.log.trace('sink calling closeWrite')
await this.closeWrite(options)
this.onSinkEnd()
} catch (err: any) {
this.log.trace('sink ended with error, calling abort with error', err)
this.abort(err)
throw err
} finally {
this.log.trace('resolve sink end')
this.sinkEnd.resolve()
}
}
protected onSourceEnd (err?: Error): void {
if (this.sourceEnded) {
if (this.timeline.closeRead != null) {
return
}
this.stat.timeline.closeRead = Date.now()
this.sourceEnded = true
log.trace('%s stream %s source end - err: %o', this.stat.direction, this.id, err)
this.timeline.closeRead = Date.now()

@@ -127,8 +208,13 @@ if (err != null && this.endErr == null) {

if (this.sinkEnded) {
this.stat.timeline.close = Date.now()
this.onCloseRead?.()
if (this.timeline.closeWrite != null) {
this.log.trace('source and sink ended')
this.timeline.close = Date.now()
if (this.onEnd != null) {
this.onEnd(this.endErr)
}
} else {
this.log.trace('source ended, waiting for sink to end')
}

@@ -138,9 +224,7 @@ }

protected onSinkEnd (err?: Error): void {
if (this.sinkEnded) {
if (this.timeline.closeWrite != null) {
return
}
this.stat.timeline.closeWrite = Date.now()
this.sinkEnded = true
log.trace('%s stream %s sink end - err: %o', this.stat.direction, this.id, err)
this.timeline.closeWrite = Date.now()

@@ -151,8 +235,13 @@ if (err != null && this.endErr == null) {

if (this.sourceEnded) {
this.stat.timeline.close = Date.now()
this.onCloseWrite?.()
if (this.timeline.closeRead != null) {
this.log.trace('sink and source ended')
this.timeline.close = Date.now()
if (this.onEnd != null) {
this.onEnd(this.endErr)
}
} else {
this.log.trace('sink ended, waiting for source to end')
}

@@ -162,170 +251,190 @@ }

// Close for both Reading and Writing
close (): void {
log.trace('%s stream %s close', this.stat.direction, this.id)
async close (options?: AbortOptions): Promise<void> {
this.log.trace('closing gracefully')
this.closeRead()
this.closeWrite()
}
this.status = 'closing'
// Close for reading
closeRead (): void {
log.trace('%s stream %s closeRead', this.stat.direction, this.id)
await Promise.all([
this.closeRead(options),
this.closeWrite(options)
])
if (this.sourceEnded) {
return
}
this.status = 'closed'
this.streamSource.end()
this.log.trace('closed gracefully')
}
// Close for writing
closeWrite (): void {
log.trace('%s stream %s closeWrite', this.stat.direction, this.id)
if (this.sinkEnded) {
async closeRead (options: AbortOptions = {}): Promise<void> {
if (this.readStatus === 'closing' || this.readStatus === 'closed') {
return
}
this.closeController.abort()
this.log.trace('closing readable end of stream with starting read status "%s"', this.readStatus)
try {
// need to call this here as the sink method returns in the catch block
// when the close controller is aborted
const res = this.sendCloseWrite()
const readStatus = this.readStatus
this.readStatus = 'closing'
if (isPromise(res)) {
res.catch(err => {
log.error('error while sending close write', err)
})
}
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
if (readStatus === 'ready') {
this.log.trace('ending internal source queue')
this.streamSource.end()
}
this.onSinkEnd()
}
if (this.status !== 'reset' && this.status !== 'aborted') {
this.log.trace('send close read to remote')
await this.sendCloseRead(options)
}
// Close for reading and writing (local error)
abort (err: Error): void {
log.trace('%s stream %s abort', this.stat.direction, this.id, err)
// End the source with the passed error
this.streamSource.end(err)
this.abortController.abort()
this.onSinkEnd(err)
this.log.trace('closed readable end of stream')
}
// Close immediately for reading and writing (remote error)
reset (): void {
const err = new CodeError('stream reset', ERR_STREAM_RESET)
this.resetController.abort()
this.streamSource.end(err)
this.onSinkEnd(err)
}
async closeWrite (options: AbortOptions = {}): Promise<void> {
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') {
return
}
async sink (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> {
if (this.sinkSunk) {
throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK)
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus)
const writeStatus = this.writeStatus
if (this.writeStatus === 'ready') {
this.log.trace('sink was never sunk, sink an empty array')
await this.sink([])
}
this.sinkSunk = true
this.writeStatus = 'closing'
if (this.sinkEnded) {
throw new CodeError('stream closed for writing', ERR_SINK_ENDED)
if (writeStatus === 'writing') {
// stop reading from the source passed to `.sink` in the microtask queue
// - this lets any data queued by the user in the current tick get read
// before we exit
await new Promise((resolve, reject) => {
queueMicrotask(() => {
this.log.trace('aborting source passed to .sink')
this.sinkController.abort()
this.sinkEnd.promise.then(resolve, reject)
})
})
}
const signal = anySignal([
this.abortController.signal,
this.resetController.signal,
this.closeController.signal
])
if (this.status !== 'reset' && this.status !== 'aborted') {
this.log.trace('send close write to remote')
await this.sendCloseWrite(options)
}
try {
source = abortableSource(source, signal)
this.writeStatus = 'closed'
if (this.stat.direction === 'outbound') { // If initiator, open a new stream
const res = this.sendNewStream()
this.log.trace('closed writable end of stream')
}
if (isPromise(res)) {
await res
}
}
/**
* Close immediately for reading and writing and send a reset message (local
* error)
*/
abort (err: Error): void {
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') {
return
}
for await (let data of source) {
while (data.length > 0) {
if (data.length <= this.maxDataSize) {
const res = this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)
this.log('abort with error', err)
if (isPromise(res)) { // eslint-disable-line max-depth
await res
}
// try to send a reset message
this.log('try to send reset to remote')
const res = this.sendReset()
break
}
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
const res = this.sendData(data.sublist(0, this.maxDataSize))
if (isPromise(res)) {
res.catch((err) => {
this.log.error('error sending reset message', err)
})
}
if (isPromise(res)) {
await res
}
this.status = 'aborted'
this.timeline.abort = Date.now()
this._closeSinkAndSource(err)
this.onAbort?.(err)
}
data.consume(this.maxDataSize)
}
}
} catch (err: any) {
if (err.type === 'aborted' && err.message === 'The operation was aborted') {
if (this.closeController.signal.aborted) {
return
}
/**
* Receive a reset message - close immediately for reading and writing (remote
* error)
*/
reset (): void {
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') {
return
}
if (this.resetController.signal.aborted) {
err.message = 'stream reset'
err.code = ERR_STREAM_RESET
}
const err = new CodeError('stream reset', ERR_STREAM_RESET)
if (this.abortController.signal.aborted) {
err.message = 'stream aborted'
err.code = ERR_STREAM_ABORT
}
}
this.status = 'reset'
this._closeSinkAndSource(err)
this.onReset?.()
}
// Send no more data if this stream was remotely reset
if (err.code === ERR_STREAM_RESET) {
log.trace('%s stream %s reset', this.stat.direction, this.id)
} else {
log.trace('%s stream %s error', this.stat.direction, this.id, err)
try {
const res = this.sendReset()
_closeSinkAndSource (err?: Error): void {
this._closeSink(err)
this._closeSource(err)
}
if (isPromise(res)) {
await res
}
_closeSink (err?: Error): void {
// if the sink function is running, cause it to end
if (this.writeStatus === 'writing') {
this.log.trace('end sink source')
this.sinkController.abort()
}
this.stat.timeline.reset = Date.now()
} catch (err) {
log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err)
}
}
this.onSinkEnd(err)
}
_closeSource (err?: Error): void {
// if the source is not ending, end it
if (this.readStatus !== 'closing' && this.readStatus !== 'closed') {
this.log.trace('ending source with %d bytes to be read by consumer', this.streamSource.readableLength)
this.readStatus = 'closing'
this.streamSource.end(err)
this.onSinkEnd(err)
}
}
throw err
} finally {
signal.clear()
/**
* The remote closed for writing so we should expect to receive no more
* messages
*/
remoteCloseWrite (): void {
if (this.readStatus === 'closing' || this.readStatus === 'closed') {
this.log('received remote close write but local source is already closed')
return
}
try {
const res = this.sendCloseWrite()
this.log.trace('remote close write')
this._closeSource()
}
if (isPromise(res)) {
await res
}
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
/**
* The remote closed for reading so we should not send any more
* messages
*/
remoteCloseRead (): void {
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') {
this.log('received remote close read but local sink is already closed')
return
}
this.onSinkEnd()
this.log.trace('remote close read')
this._closeSink()
}
/**
* The underlying muxer has closed, no more messages can be sent or will
* be received, close immediately to free up resources
*/
destroy (): void {
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') {
this.log('received destroy but we are already closed')
return
}
this.log.trace('muxer destroyed')
this._closeSinkAndSource()
}
/**
* When an extending class reads data from it's implementation-specific source,

@@ -350,3 +459,3 @@ * call this method to allow the stream consumer to read the data.

*/
abstract sendNewStream (): void | Promise<void>
abstract sendNewStream (options?: AbortOptions): void | Promise<void>

@@ -356,3 +465,3 @@ /**

*/
abstract sendData (buf: Uint8ArrayList): void | Promise<void>
abstract sendData (buf: Uint8ArrayList, options?: AbortOptions): void | Promise<void>

@@ -362,3 +471,3 @@ /**

*/
abstract sendReset (): void | Promise<void>
abstract sendReset (options?: AbortOptions): void | Promise<void>

@@ -369,3 +478,3 @@ /**

*/
abstract sendCloseWrite (): void | Promise<void>
abstract sendCloseWrite (options?: AbortOptions): void | Promise<void>

@@ -376,3 +485,3 @@ /**

*/
abstract sendCloseRead (): void | Promise<void>
abstract sendCloseRead (options?: AbortOptions): void | Promise<void>
}

@@ -99,2 +99,8 @@ import type { Connection, MultiaddrConnection } from '../connection/index.js'

muxerFactory?: StreamMuxerFactory
/**
* The passed MultiaddrConnection has limits place on duration and/or data
* transfer amounts so is not expected to be open for very long.
*/
transient?: boolean
}

@@ -101,0 +107,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc