@libp2p/interface
Advanced tools
Comparing version 0.0.1-8b0e6bef to 0.0.1-b1024c6c
@@ -1,2 +0,1 @@ | ||
import type * as Status from './status.js'; | ||
import type { AbortOptions } from '../index.js'; | ||
@@ -8,32 +7,20 @@ import type { PeerId } from '../peer-id/index.js'; | ||
export interface ConnectionTimeline { | ||
/** | ||
* When the connection was opened | ||
*/ | ||
open: number; | ||
/** | ||
* When the MultiaddrConnection was upgraded to a Connection - e.g. the type | ||
* of connection encryption and multiplexing was negotiated. | ||
*/ | ||
upgraded?: number; | ||
/** | ||
* When the connection was closed. | ||
*/ | ||
close?: number; | ||
} | ||
/** | ||
* Outbound conections are opened by the local node, inbound streams are opened by the remote | ||
* Outbound connections are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
export type Direction = 'inbound' | 'outbound'; | ||
export interface ConnectionStat { | ||
/** | ||
* Outbound conections are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
direction: Direction; | ||
/** | ||
* Lifecycle times for the connection | ||
*/ | ||
timeline: ConnectionTimeline; | ||
/** | ||
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
multiplexer?: string; | ||
/** | ||
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
encryption?: string; | ||
/** | ||
* The current status of the connection | ||
*/ | ||
status: keyof typeof Status; | ||
} | ||
export interface StreamTimeline { | ||
@@ -60,18 +47,30 @@ /** | ||
reset?: number; | ||
} | ||
export interface StreamStat { | ||
/** | ||
* Outbound streams are opened by the local node, inbound streams are opened by the remote | ||
* A timestamp of when the stream was aborted | ||
*/ | ||
direction: Direction; | ||
/** | ||
* Lifecycle times for the stream | ||
*/ | ||
timeline: StreamTimeline; | ||
/** | ||
* Once a protocol has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
protocol?: string; | ||
abort?: number; | ||
} | ||
/** | ||
* The states a stream can be in | ||
*/ | ||
export type StreamStatus = 'open' | 'closing' | 'closed' | 'aborted' | 'reset'; | ||
/** | ||
* The states the readable end of a stream can be in | ||
* | ||
* ready - the readable end is ready for reading | ||
* closing - the readable end is closing | ||
* closed - the readable end has closed | ||
*/ | ||
export type ReadStatus = 'ready' | 'closing' | 'closed'; | ||
/** | ||
* The states the writable end of a stream can be in | ||
* | ||
* ready - the writable end is ready for writing | ||
* writing - the writable end is in the process of being written to | ||
* done - the source passed to the `.sink` function yielded all values without error | ||
* closing - the writable end is closing | ||
* closed - the writable end has closed | ||
*/ | ||
export type WriteStatus = 'ready' | 'writing' | 'done' | 'closing' | 'closed'; | ||
/** | ||
* A Stream is a data channel between two peers that | ||
@@ -93,3 +92,3 @@ * can be written to and read from at both ends. | ||
*/ | ||
close: () => void; | ||
close: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
@@ -102,3 +101,3 @@ * Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed. | ||
*/ | ||
closeRead: () => void; | ||
closeRead: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
@@ -109,3 +108,3 @@ * Closes the stream for **writing**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed. | ||
*/ | ||
closeWrite: () => void; | ||
closeWrite: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
@@ -122,10 +121,2 @@ * Closes the stream for **reading** *and* **writing**. This should be called when a *local error* has occurred. | ||
/** | ||
* Closes the stream *immediately* for **reading** *and* **writing**. This should be called when a *remote error* has occurred. | ||
* | ||
* This function is called automatically by the muxer when it receives a `RESET` message from the remote. | ||
* | ||
* The sink will return and the source will throw. | ||
*/ | ||
reset: () => void; | ||
/** | ||
* Unique identifier for a stream. Identifiers are not unique across muxers. | ||
@@ -135,9 +126,29 @@ */ | ||
/** | ||
* Stats about this stream | ||
* Outbound streams are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
stat: StreamStat; | ||
direction: Direction; | ||
/** | ||
* Lifecycle times for the stream | ||
*/ | ||
timeline: StreamTimeline; | ||
/** | ||
* Once a protocol has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
protocol?: string; | ||
/** | ||
* User defined stream metadata | ||
*/ | ||
metadata: Record<string, any>; | ||
/** | ||
* The current status of the stream | ||
*/ | ||
status: StreamStatus; | ||
/** | ||
* The current status of the readable end of the stream | ||
*/ | ||
readStatus: ReadStatus; | ||
/** | ||
* The current status of the writable end of the stream | ||
*/ | ||
writeStatus: WriteStatus; | ||
} | ||
@@ -152,2 +163,3 @@ export interface NewStreamOptions extends AbortOptions { | ||
} | ||
export type ConnectionStatus = 'open' | 'closing' | 'closed'; | ||
/** | ||
@@ -160,12 +172,63 @@ * A Connection is a high-level representation of a connection | ||
export interface Connection { | ||
/** | ||
* The unique identifier for this connection | ||
*/ | ||
id: string; | ||
stat: ConnectionStat; | ||
/** | ||
* The address of the remote end of the connection | ||
*/ | ||
remoteAddr: Multiaddr; | ||
/** | ||
* The id of the peer at the remote end of the connection | ||
*/ | ||
remotePeer: PeerId; | ||
/** | ||
* A list of tags applied to this connection | ||
*/ | ||
tags: string[]; | ||
/** | ||
* A list of open streams on this connection | ||
*/ | ||
streams: Stream[]; | ||
newStream: (multicodecs: string | string[], options?: NewStreamOptions) => Promise<Stream>; | ||
/** | ||
* Outbound conections are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
direction: Direction; | ||
/** | ||
* Lifecycle times for the connection | ||
*/ | ||
timeline: ConnectionTimeline; | ||
/** | ||
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
multiplexer?: string; | ||
/** | ||
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
encryption?: string; | ||
/** | ||
* The current status of the connection | ||
*/ | ||
status: ConnectionStatus; | ||
/** | ||
* Create a new stream on this connection and negotiate one of the passed protocols | ||
*/ | ||
newStream: (protocols: string | string[], options?: NewStreamOptions) => Promise<Stream>; | ||
/** | ||
* Add a stream to this connection | ||
*/ | ||
addStream: (stream: Stream) => void; | ||
/** | ||
* Remove a stream from this connection | ||
*/ | ||
removeStream: (id: string) => void; | ||
close: () => Promise<void>; | ||
/** | ||
* Gracefully close the connection. All queued data will be written to the | ||
* underlying transport. | ||
*/ | ||
close: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
* Immediately close the connection, any queued data will be discarded | ||
*/ | ||
abort: (err: Error) => void; | ||
} | ||
@@ -183,4 +246,14 @@ export declare const symbol: unique symbol; | ||
export interface MultiaddrConnectionTimeline { | ||
/** | ||
* When the connection was opened | ||
*/ | ||
open: number; | ||
/** | ||
* When the MultiaddrConnection was upgraded to a Connection - the type of | ||
* connection encryption and multiplexing was negotiated. | ||
*/ | ||
upgraded?: number; | ||
/** | ||
* When the connection was closed. | ||
*/ | ||
close?: number; | ||
@@ -194,6 +267,20 @@ } | ||
export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> { | ||
close: (err?: Error) => Promise<void>; | ||
/** | ||
* Gracefully close the connection. All queued data will be written to the | ||
* underlying transport. | ||
*/ | ||
close: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
* Immediately close the connection, any queued data will be discarded | ||
*/ | ||
abort: (err: Error) => void; | ||
/** | ||
* The address of the remote end of the connection | ||
*/ | ||
remoteAddr: Multiaddr; | ||
/** | ||
* When connection lifecycle events occurred | ||
*/ | ||
timeline: MultiaddrConnectionTimeline; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
@@ -464,4 +464,8 @@ /** | ||
/** | ||
* Attempts to gracefully close an open connection to the given peer. If the connection is not closed in the grace period, it will be forcefully closed. | ||
* Attempts to gracefully close an open connection to the given peer. If the | ||
* connection is not closed in the grace period, it will be forcefully closed. | ||
* | ||
* An AbortSignal can optionally be passed to control when the connection is | ||
* forcefully closed. | ||
* | ||
* @example | ||
@@ -473,3 +477,3 @@ * | ||
*/ | ||
hangUp: (peer: PeerId | Multiaddr) => Promise<void>; | ||
hangUp: (peer: PeerId | Multiaddr, options?: AbortOptions) => Promise<void>; | ||
/** | ||
@@ -476,0 +480,0 @@ * Sets up [multistream-select routing](https://github.com/multiformats/multistream-select) of protocols to their application handlers. Whenever a stream is opened on one of the provided protocols, the handler will be called. `handle` must be called in order to register a handler and support for a given protocol. This also informs other peers of the protocols you support. |
@@ -35,5 +35,9 @@ import type { Direction, Stream } from '../connection/index.js'; | ||
*/ | ||
close: (err?: Error) => void; | ||
close: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
*/ | ||
abort: (err: Error) => void; | ||
} | ||
export interface StreamMuxerInit extends AbortOptions { | ||
export interface StreamMuxerInit { | ||
/** | ||
@@ -40,0 +44,0 @@ * A callback function invoked every time an incoming stream is opened |
import { Uint8ArrayList } from 'uint8arraylist'; | ||
import type { Direction, Stream, StreamStat } from '../connection/index.js'; | ||
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js'; | ||
import type { AbortOptions } from '../index.js'; | ||
import type { Source } from 'it-stream-types'; | ||
interface Logger { | ||
(formatter: any, ...args: any[]): void; | ||
error: (formatter: any, ...args: any[]) => void; | ||
trace: (formatter: any, ...args: any[]) => void; | ||
enabled: boolean; | ||
} | ||
export interface AbstractStreamInit { | ||
@@ -14,6 +21,5 @@ /** | ||
/** | ||
* The maximum allowable data size, any data larger than this will be | ||
* chunked and sent in multiple data messages | ||
* A Logger implementation used to log stream-specific information | ||
*/ | ||
maxDataSize: number; | ||
log: Logger; | ||
/** | ||
@@ -27,28 +33,80 @@ * User specific stream metadata | ||
onEnd?: (err?: Error | undefined) => void; | ||
/** | ||
* Invoked when the readable end of the stream is closed | ||
*/ | ||
onCloseRead?: () => void; | ||
/** | ||
* Invoked when the writable end of the stream is closed | ||
*/ | ||
onCloseWrite?: () => void; | ||
/** | ||
* Invoked when the the stream has been reset by the remote | ||
*/ | ||
onReset?: () => void; | ||
/** | ||
* Invoked when the the stream has errored | ||
*/ | ||
onAbort?: (err: Error) => void; | ||
/** | ||
* How long to wait in ms for stream data to be written to the underlying | ||
* connection when closing the writable end of the stream. (default: 500) | ||
*/ | ||
closeTimeout?: number; | ||
} | ||
export declare abstract class AbstractStream implements Stream { | ||
id: string; | ||
stat: StreamStat; | ||
direction: Direction; | ||
timeline: StreamTimeline; | ||
protocol?: string; | ||
metadata: Record<string, unknown>; | ||
source: AsyncGenerator<Uint8ArrayList, void, unknown>; | ||
private readonly abortController; | ||
private readonly resetController; | ||
private readonly closeController; | ||
private sourceEnded; | ||
private sinkEnded; | ||
private sinkSunk; | ||
status: StreamStatus; | ||
readStatus: ReadStatus; | ||
writeStatus: WriteStatus; | ||
private readonly sinkController; | ||
private readonly sinkEnd; | ||
private endErr; | ||
private readonly streamSource; | ||
private readonly onEnd?; | ||
private readonly maxDataSize; | ||
private readonly onCloseRead?; | ||
private readonly onCloseWrite?; | ||
private readonly onReset?; | ||
private readonly onAbort?; | ||
protected readonly log: Logger; | ||
constructor(init: AbstractStreamInit); | ||
sink(source: Source<Uint8ArrayList | Uint8Array>): Promise<void>; | ||
protected onSourceEnd(err?: Error): void; | ||
protected onSinkEnd(err?: Error): void; | ||
close(): void; | ||
closeRead(): void; | ||
closeWrite(): void; | ||
close(options?: AbortOptions): Promise<void>; | ||
closeRead(options?: AbortOptions): Promise<void>; | ||
closeWrite(options?: AbortOptions): Promise<void>; | ||
/** | ||
* Close immediately for reading and writing and send a reset message (local | ||
* error) | ||
*/ | ||
abort(err: Error): void; | ||
/** | ||
* Receive a reset message - close immediately for reading and writing (remote | ||
* error) | ||
*/ | ||
reset(): void; | ||
sink(source: Source<Uint8ArrayList | Uint8Array>): Promise<void>; | ||
_closeSinkAndSource(err?: Error): void; | ||
_closeSink(err?: Error): void; | ||
_closeSource(err?: Error): void; | ||
/** | ||
* The remote closed for writing so we should expect to receive no more | ||
* messages | ||
*/ | ||
remoteCloseWrite(): void; | ||
/** | ||
* The remote closed for reading so we should not send any more | ||
* messages | ||
*/ | ||
remoteCloseRead(): void; | ||
/** | ||
* The underlying muxer has closed, no more messages can be sent or will | ||
* be received, close immediately to free up resources | ||
*/ | ||
destroy(): void; | ||
/** | ||
* When an extending class reads data from it's implementation-specific source, | ||
@@ -67,11 +125,11 @@ * call this method to allow the stream consumer to read the data. | ||
*/ | ||
abstract sendNewStream(): void | Promise<void>; | ||
abstract sendNewStream(options?: AbortOptions): void | Promise<void>; | ||
/** | ||
* Send a data message to the remote muxer | ||
*/ | ||
abstract sendData(buf: Uint8ArrayList): void | Promise<void>; | ||
abstract sendData(buf: Uint8ArrayList, options?: AbortOptions): void | Promise<void>; | ||
/** | ||
* Send a reset message to the remote muxer | ||
*/ | ||
abstract sendReset(): void | Promise<void>; | ||
abstract sendReset(options?: AbortOptions): void | Promise<void>; | ||
/** | ||
@@ -81,3 +139,3 @@ * Send a message to the remote muxer, informing them no more data messages | ||
*/ | ||
abstract sendCloseWrite(): void | Promise<void>; | ||
abstract sendCloseWrite(options?: AbortOptions): void | Promise<void>; | ||
/** | ||
@@ -87,4 +145,5 @@ * Send a message to the remote muxer, informing them no more data messages | ||
*/ | ||
abstract sendCloseRead(): void | Promise<void>; | ||
abstract sendCloseRead(options?: AbortOptions): void | Promise<void>; | ||
} | ||
export {}; | ||
//# sourceMappingURL=stream.d.ts.map |
@@ -1,15 +0,8 @@ | ||
// import { logger } from '@libp2p/logger' | ||
import { abortableSource } from 'abortable-iterator'; | ||
import { anySignal } from 'any-signal'; | ||
import { pushable } from 'it-pushable'; | ||
import defer from 'p-defer'; | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
import { CodeError } from '../errors.js'; | ||
// const log = logger('libp2p:stream') | ||
const log = () => { }; | ||
log.trace = () => { }; | ||
log.error = () => { }; | ||
const ERR_STREAM_RESET = 'ERR_STREAM_RESET'; | ||
const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'; | ||
const ERR_SINK_ENDED = 'ERR_SINK_ENDED'; | ||
const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK'; | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'; | ||
function isPromise(res) { | ||
@@ -20,44 +13,49 @@ return res != null && typeof res.then === 'function'; | ||
id; | ||
stat; | ||
direction; | ||
timeline; | ||
protocol; | ||
metadata; | ||
source; | ||
abortController; | ||
resetController; | ||
closeController; | ||
sourceEnded; | ||
sinkEnded; | ||
sinkSunk; | ||
status; | ||
readStatus; | ||
writeStatus; | ||
sinkController; | ||
sinkEnd; | ||
endErr; | ||
streamSource; | ||
onEnd; | ||
maxDataSize; | ||
onCloseRead; | ||
onCloseWrite; | ||
onReset; | ||
onAbort; | ||
log; | ||
constructor(init) { | ||
this.abortController = new AbortController(); | ||
this.resetController = new AbortController(); | ||
this.closeController = new AbortController(); | ||
this.sourceEnded = false; | ||
this.sinkEnded = false; | ||
this.sinkSunk = false; | ||
this.sinkController = new AbortController(); | ||
this.sinkEnd = defer(); | ||
this.log = init.log; | ||
// stream status | ||
this.status = 'open'; | ||
this.readStatus = 'ready'; | ||
this.writeStatus = 'ready'; | ||
this.id = init.id; | ||
this.metadata = init.metadata ?? {}; | ||
this.stat = { | ||
direction: init.direction, | ||
timeline: { | ||
open: Date.now() | ||
} | ||
this.direction = init.direction; | ||
this.timeline = { | ||
open: Date.now() | ||
}; | ||
this.maxDataSize = init.maxDataSize; | ||
this.onEnd = init.onEnd; | ||
this.onCloseRead = init?.onCloseRead; | ||
this.onCloseWrite = init?.onCloseWrite; | ||
this.onReset = init?.onReset; | ||
this.onAbort = init?.onAbort; | ||
this.source = this.streamSource = pushable({ | ||
onEnd: () => { | ||
// already sent a reset message | ||
if (this.stat.timeline.reset !== null) { | ||
const res = this.sendCloseRead(); | ||
if (isPromise(res)) { | ||
res.catch(err => { | ||
log.error('error while sending close read', err); | ||
}); | ||
} | ||
onEnd: (err) => { | ||
if (err != null) { | ||
this.log.trace('source ended with error', err); | ||
} | ||
this.onSourceEnd(); | ||
else { | ||
this.log.trace('source ended'); | ||
} | ||
this.readStatus = 'closed'; | ||
this.onSourceEnd(err); | ||
} | ||
@@ -68,14 +66,56 @@ }); | ||
} | ||
async sink(source) { | ||
if (this.writeStatus !== 'ready') { | ||
throw new CodeError(`writable end state is "${this.writeStatus}" not "ready"`, ERR_SINK_INVALID_STATE); | ||
} | ||
try { | ||
this.writeStatus = 'writing'; | ||
const options = { | ||
signal: this.sinkController.signal | ||
}; | ||
if (this.direction === 'outbound') { // If initiator, open a new stream | ||
const res = this.sendNewStream(options); | ||
if (isPromise(res)) { | ||
await res; | ||
} | ||
} | ||
source = abortableSource(source, this.sinkController.signal, { | ||
returnOnAbort: true | ||
}); | ||
this.log.trace('sink reading from source'); | ||
for await (let data of source) { | ||
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data; | ||
const res = this.sendData(data, options); | ||
if (isPromise(res)) { // eslint-disable-line max-depth | ||
await res; | ||
} | ||
} | ||
this.log.trace('sink finished reading from source'); | ||
this.writeStatus = 'done'; | ||
this.log.trace('sink calling closeWrite'); | ||
await this.closeWrite(options); | ||
this.onSinkEnd(); | ||
} | ||
catch (err) { | ||
this.log.trace('sink ended with error, calling abort with error', err); | ||
this.abort(err); | ||
throw err; | ||
} | ||
finally { | ||
this.log.trace('resolve sink end'); | ||
this.sinkEnd.resolve(); | ||
} | ||
} | ||
onSourceEnd(err) { | ||
if (this.sourceEnded) { | ||
if (this.timeline.closeRead != null) { | ||
return; | ||
} | ||
this.stat.timeline.closeRead = Date.now(); | ||
this.sourceEnded = true; | ||
log.trace('%s stream %s source end - err: %o', this.stat.direction, this.id, err); | ||
this.timeline.closeRead = Date.now(); | ||
if (err != null && this.endErr == null) { | ||
this.endErr = err; | ||
} | ||
if (this.sinkEnded) { | ||
this.stat.timeline.close = Date.now(); | ||
this.onCloseRead?.(); | ||
if (this.timeline.closeWrite != null) { | ||
this.log.trace('source and sink ended'); | ||
this.timeline.close = Date.now(); | ||
if (this.onEnd != null) { | ||
@@ -85,15 +125,18 @@ this.onEnd(this.endErr); | ||
} | ||
else { | ||
this.log.trace('source ended, waiting for sink to end'); | ||
} | ||
} | ||
onSinkEnd(err) { | ||
if (this.sinkEnded) { | ||
if (this.timeline.closeWrite != null) { | ||
return; | ||
} | ||
this.stat.timeline.closeWrite = Date.now(); | ||
this.sinkEnded = true; | ||
log.trace('%s stream %s sink end - err: %o', this.stat.direction, this.id, err); | ||
this.timeline.closeWrite = Date.now(); | ||
if (err != null && this.endErr == null) { | ||
this.endErr = err; | ||
} | ||
if (this.sourceEnded) { | ||
this.stat.timeline.close = Date.now(); | ||
this.onCloseWrite?.(); | ||
if (this.timeline.closeRead != null) { | ||
this.log.trace('sink and source ended'); | ||
this.timeline.close = Date.now(); | ||
if (this.onEnd != null) { | ||
@@ -103,141 +146,154 @@ this.onEnd(this.endErr); | ||
} | ||
else { | ||
this.log.trace('sink ended, waiting for source to end'); | ||
} | ||
} | ||
// Close for both Reading and Writing | ||
close() { | ||
log.trace('%s stream %s close', this.stat.direction, this.id); | ||
this.closeRead(); | ||
this.closeWrite(); | ||
async close(options) { | ||
this.log.trace('closing gracefully'); | ||
this.status = 'closing'; | ||
await Promise.all([ | ||
this.closeRead(options), | ||
this.closeWrite(options) | ||
]); | ||
this.status = 'closed'; | ||
this.log.trace('closed gracefully'); | ||
} | ||
// Close for reading | ||
closeRead() { | ||
log.trace('%s stream %s closeRead', this.stat.direction, this.id); | ||
if (this.sourceEnded) { | ||
async closeRead(options = {}) { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
return; | ||
} | ||
this.streamSource.end(); | ||
this.log.trace('closing readable end of stream with starting read status "%s"', this.readStatus); | ||
const readStatus = this.readStatus; | ||
this.readStatus = 'closing'; | ||
if (readStatus === 'ready') { | ||
this.log.trace('ending internal source queue'); | ||
this.streamSource.end(); | ||
} | ||
if (this.status !== 'reset' && this.status !== 'aborted') { | ||
this.log.trace('send close read to remote'); | ||
await this.sendCloseRead(options); | ||
} | ||
this.log.trace('closed readable end of stream'); | ||
} | ||
// Close for writing | ||
closeWrite() { | ||
log.trace('%s stream %s closeWrite', this.stat.direction, this.id); | ||
if (this.sinkEnded) { | ||
async closeWrite(options = {}) { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
return; | ||
} | ||
this.closeController.abort(); | ||
try { | ||
// need to call this here as the sink method returns in the catch block | ||
// when the close controller is aborted | ||
const res = this.sendCloseWrite(); | ||
if (isPromise(res)) { | ||
res.catch(err => { | ||
log.error('error while sending close write', err); | ||
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus); | ||
const writeStatus = this.writeStatus; | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array'); | ||
await this.sink([]); | ||
} | ||
this.writeStatus = 'closing'; | ||
if (writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
// - this lets any data queued by the user in the current tick get read | ||
// before we exit | ||
await new Promise((resolve, reject) => { | ||
queueMicrotask(() => { | ||
this.log.trace('aborting source passed to .sink'); | ||
this.sinkController.abort(); | ||
this.sinkEnd.promise.then(resolve, reject); | ||
}); | ||
} | ||
}); | ||
} | ||
catch (err) { | ||
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err); | ||
if (this.status !== 'reset' && this.status !== 'aborted') { | ||
this.log.trace('send close write to remote'); | ||
await this.sendCloseWrite(options); | ||
} | ||
this.onSinkEnd(); | ||
this.writeStatus = 'closed'; | ||
this.log.trace('closed writable end of stream'); | ||
} | ||
// Close for reading and writing (local error) | ||
/** | ||
* Close immediately for reading and writing and send a reset message (local | ||
* error) | ||
*/ | ||
abort(err) { | ||
log.trace('%s stream %s abort', this.stat.direction, this.id, err); | ||
// End the source with the passed error | ||
this.streamSource.end(err); | ||
this.abortController.abort(); | ||
this.onSinkEnd(err); | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return; | ||
} | ||
this.log('abort with error', err); | ||
// try to send a reset message | ||
this.log('try to send reset to remote'); | ||
const res = this.sendReset(); | ||
if (isPromise(res)) { | ||
res.catch((err) => { | ||
this.log.error('error sending reset message', err); | ||
}); | ||
} | ||
this.status = 'aborted'; | ||
this.timeline.abort = Date.now(); | ||
this._closeSinkAndSource(err); | ||
this.onAbort?.(err); | ||
} | ||
// Close immediately for reading and writing (remote error) | ||
/** | ||
* Receive a reset message - close immediately for reading and writing (remote | ||
* error) | ||
*/ | ||
reset() { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return; | ||
} | ||
const err = new CodeError('stream reset', ERR_STREAM_RESET); | ||
this.resetController.abort(); | ||
this.streamSource.end(err); | ||
this.status = 'reset'; | ||
this._closeSinkAndSource(err); | ||
this.onReset?.(); | ||
} | ||
_closeSinkAndSource(err) { | ||
this._closeSink(err); | ||
this._closeSource(err); | ||
} | ||
_closeSink(err) { | ||
// if the sink function is running, cause it to end | ||
if (this.writeStatus === 'writing') { | ||
this.log.trace('end sink source'); | ||
this.sinkController.abort(); | ||
} | ||
this.onSinkEnd(err); | ||
} | ||
async sink(source) { | ||
if (this.sinkSunk) { | ||
throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK); | ||
} | ||
this.sinkSunk = true; | ||
if (this.sinkEnded) { | ||
throw new CodeError('stream closed for writing', ERR_SINK_ENDED); | ||
} | ||
const signal = anySignal([ | ||
this.abortController.signal, | ||
this.resetController.signal, | ||
this.closeController.signal | ||
]); | ||
try { | ||
source = abortableSource(source, signal); | ||
if (this.stat.direction === 'outbound') { // If initiator, open a new stream | ||
const res = this.sendNewStream(); | ||
if (isPromise(res)) { | ||
await res; | ||
} | ||
} | ||
for await (let data of source) { | ||
while (data.length > 0) { | ||
if (data.length <= this.maxDataSize) { | ||
const res = this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data); | ||
if (isPromise(res)) { // eslint-disable-line max-depth | ||
await res; | ||
} | ||
break; | ||
} | ||
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data; | ||
const res = this.sendData(data.sublist(0, this.maxDataSize)); | ||
if (isPromise(res)) { | ||
await res; | ||
} | ||
data.consume(this.maxDataSize); | ||
} | ||
} | ||
} | ||
catch (err) { | ||
if (err.type === 'aborted' && err.message === 'The operation was aborted') { | ||
if (this.closeController.signal.aborted) { | ||
return; | ||
} | ||
if (this.resetController.signal.aborted) { | ||
err.message = 'stream reset'; | ||
err.code = ERR_STREAM_RESET; | ||
} | ||
if (this.abortController.signal.aborted) { | ||
err.message = 'stream aborted'; | ||
err.code = ERR_STREAM_ABORT; | ||
} | ||
} | ||
// Send no more data if this stream was remotely reset | ||
if (err.code === ERR_STREAM_RESET) { | ||
log.trace('%s stream %s reset', this.stat.direction, this.id); | ||
} | ||
else { | ||
log.trace('%s stream %s error', this.stat.direction, this.id, err); | ||
try { | ||
const res = this.sendReset(); | ||
if (isPromise(res)) { | ||
await res; | ||
} | ||
this.stat.timeline.reset = Date.now(); | ||
} | ||
catch (err) { | ||
log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err); | ||
} | ||
} | ||
_closeSource(err) { | ||
// if the source is not ending, end it | ||
if (this.readStatus !== 'closing' && this.readStatus !== 'closed') { | ||
this.log.trace('ending source with %d bytes to be read by consumer', this.streamSource.readableLength); | ||
this.readStatus = 'closing'; | ||
this.streamSource.end(err); | ||
this.onSinkEnd(err); | ||
throw err; | ||
} | ||
finally { | ||
signal.clear(); | ||
} | ||
/** | ||
* The remote closed for writing so we should expect to receive no more | ||
* messages | ||
*/ | ||
remoteCloseWrite() { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
this.log('received remote close write but local source is already closed'); | ||
return; | ||
} | ||
try { | ||
const res = this.sendCloseWrite(); | ||
if (isPromise(res)) { | ||
await res; | ||
} | ||
this.log.trace('remote close write'); | ||
this._closeSource(); | ||
} | ||
/** | ||
* The remote closed for reading so we should not send any more | ||
* messages | ||
*/ | ||
remoteCloseRead() { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
this.log('received remote close read but local sink is already closed'); | ||
return; | ||
} | ||
catch (err) { | ||
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err); | ||
this.log.trace('remote close read'); | ||
this._closeSink(); | ||
} | ||
/** | ||
* The underlying muxer has closed, no more messages can be sent or will | ||
* be received, close immediately to free up resources | ||
*/ | ||
destroy() { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
this.log('received destroy but we are already closed'); | ||
return; | ||
} | ||
this.onSinkEnd(); | ||
this.log.trace('muxer destroyed'); | ||
this._closeSinkAndSource(); | ||
} | ||
@@ -244,0 +300,0 @@ /** |
{ | ||
"name": "@libp2p/interface", | ||
"version": "0.0.1-8b0e6bef", | ||
"version": "0.0.1-b1024c6c", | ||
"description": "The interface implemented by a libp2p node", | ||
@@ -59,6 +59,2 @@ "license": "Apache-2.0 OR MIT", | ||
}, | ||
"./connection/status": { | ||
"types": "./dist/src/connection/status.d.ts", | ||
"import": "./dist/src/connection/status.js" | ||
}, | ||
"./content-routing": { | ||
@@ -116,2 +112,6 @@ "types": "./dist/src/content-routing/index.d.ts", | ||
}, | ||
"./pubsub": { | ||
"types": "./dist/src/pubsub/index.d.ts", | ||
"import": "./dist/src/pubsub/index.js" | ||
}, | ||
"./record": { | ||
@@ -164,6 +164,6 @@ "types": "./dist/src/record/index.d.ts", | ||
"abortable-iterator": "^5.0.1", | ||
"any-signal": "^4.1.1", | ||
"it-pushable": "^3.1.3", | ||
"it-pushable": "^3.2.0", | ||
"it-stream-types": "^2.0.1", | ||
"multiformats": "^12.0.1", | ||
"p-defer": "^4.0.0", | ||
"uint8arraylist": "^2.4.3" | ||
@@ -173,3 +173,3 @@ }, | ||
"@types/sinon": "^10.0.15", | ||
"aegir": "^39.0.10", | ||
"aegir": "^39.0.13", | ||
"sinon": "^15.1.2", | ||
@@ -176,0 +176,0 @@ "sinon-ts": "^1.0.0" |
@@ -1,2 +0,1 @@ | ||
import type * as Status from './status.js' | ||
import type { AbortOptions } from '../index.js' | ||
@@ -9,39 +8,24 @@ import type { PeerId } from '../peer-id/index.js' | ||
export interface ConnectionTimeline { | ||
open: number | ||
upgraded?: number | ||
close?: number | ||
} | ||
/** | ||
* Outbound conections are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
export type Direction = 'inbound' | 'outbound' | ||
export interface ConnectionStat { | ||
/** | ||
* Outbound conections are opened by the local node, inbound streams are opened by the remote | ||
* When the connection was opened | ||
*/ | ||
direction: Direction | ||
open: number | ||
/** | ||
* Lifecycle times for the connection | ||
* When the MultiaddrConnection was upgraded to a Connection - e.g. the type | ||
* of connection encryption and multiplexing was negotiated. | ||
*/ | ||
timeline: ConnectionTimeline | ||
upgraded?: number | ||
/** | ||
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object | ||
* When the connection was closed. | ||
*/ | ||
multiplexer?: string | ||
close?: number | ||
} | ||
/** | ||
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
encryption?: string | ||
/** | ||
* Outbound connections are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
export type Direction = 'inbound' | 'outbound' | ||
/** | ||
* The current status of the connection | ||
*/ | ||
status: keyof typeof Status | ||
} | ||
export interface StreamTimeline { | ||
@@ -72,22 +56,35 @@ /** | ||
reset?: number | ||
} | ||
export interface StreamStat { | ||
/** | ||
* Outbound streams are opened by the local node, inbound streams are opened by the remote | ||
* A timestamp of when the stream was aborted | ||
*/ | ||
direction: Direction | ||
abort?: number | ||
} | ||
/** | ||
* Lifecycle times for the stream | ||
*/ | ||
timeline: StreamTimeline | ||
/** | ||
* The states a stream can be in | ||
*/ | ||
export type StreamStatus = 'open' | 'closing' | 'closed' | 'aborted' | 'reset' | ||
/** | ||
* Once a protocol has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
protocol?: string | ||
} | ||
/** | ||
* The states the readable end of a stream can be in | ||
* | ||
* ready - the readable end is ready for reading | ||
* closing - the readable end is closing | ||
* closed - the readable end has closed | ||
*/ | ||
export type ReadStatus = 'ready' | 'closing' | 'closed' | ||
/** | ||
* The states the writable end of a stream can be in | ||
* | ||
* ready - the writable end is ready for writing | ||
* writing - the writable end is in the process of being written to | ||
* done - the source passed to the `.sink` function yielded all values without error | ||
* closing - the writable end is closing | ||
* closed - the writable end has closed | ||
*/ | ||
export type WriteStatus = 'ready' | 'writing' | 'done' | 'closing' | 'closed' | ||
/** | ||
* A Stream is a data channel between two peers that | ||
@@ -109,3 +106,3 @@ * can be written to and read from at both ends. | ||
*/ | ||
close: () => void | ||
close: (options?: AbortOptions) => Promise<void> | ||
@@ -119,3 +116,3 @@ /** | ||
*/ | ||
closeRead: () => void | ||
closeRead: (options?: AbortOptions) => Promise<void> | ||
@@ -127,3 +124,3 @@ /** | ||
*/ | ||
closeWrite: () => void | ||
closeWrite: (options?: AbortOptions) => Promise<void> | ||
@@ -142,24 +139,40 @@ /** | ||
/** | ||
* Closes the stream *immediately* for **reading** *and* **writing**. This should be called when a *remote error* has occurred. | ||
* | ||
* This function is called automatically by the muxer when it receives a `RESET` message from the remote. | ||
* | ||
* The sink will return and the source will throw. | ||
* Unique identifier for a stream. Identifiers are not unique across muxers. | ||
*/ | ||
reset: () => void | ||
id: string | ||
/** | ||
* Unique identifier for a stream. Identifiers are not unique across muxers. | ||
* Outbound streams are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
id: string | ||
direction: Direction | ||
/** | ||
* Stats about this stream | ||
* Lifecycle times for the stream | ||
*/ | ||
stat: StreamStat | ||
timeline: StreamTimeline | ||
/** | ||
* Once a protocol has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
protocol?: string | ||
/** | ||
* User defined stream metadata | ||
*/ | ||
metadata: Record<string, any> | ||
/** | ||
* The current status of the stream | ||
*/ | ||
status: StreamStatus | ||
/** | ||
* The current status of the readable end of the stream | ||
*/ | ||
readStatus: ReadStatus | ||
/** | ||
* The current status of the writable end of the stream | ||
*/ | ||
writeStatus: WriteStatus | ||
} | ||
@@ -176,2 +189,4 @@ | ||
export type ConnectionStatus = 'open' | 'closing' | 'closed' | ||
/** | ||
@@ -184,13 +199,77 @@ * A Connection is a high-level representation of a connection | ||
export interface Connection { | ||
/** | ||
* The unique identifier for this connection | ||
*/ | ||
id: string | ||
stat: ConnectionStat | ||
/** | ||
* The address of the remote end of the connection | ||
*/ | ||
remoteAddr: Multiaddr | ||
/** | ||
* The id of the peer at the remote end of the connection | ||
*/ | ||
remotePeer: PeerId | ||
/** | ||
* A list of tags applied to this connection | ||
*/ | ||
tags: string[] | ||
/** | ||
* A list of open streams on this connection | ||
*/ | ||
streams: Stream[] | ||
newStream: (multicodecs: string | string[], options?: NewStreamOptions) => Promise<Stream> | ||
/** | ||
* Outbound conections are opened by the local node, inbound streams are opened by the remote | ||
*/ | ||
direction: Direction | ||
/** | ||
* Lifecycle times for the connection | ||
*/ | ||
timeline: ConnectionTimeline | ||
/** | ||
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
multiplexer?: string | ||
/** | ||
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object | ||
*/ | ||
encryption?: string | ||
/** | ||
* The current status of the connection | ||
*/ | ||
status: ConnectionStatus | ||
/** | ||
* Create a new stream on this connection and negotiate one of the passed protocols | ||
*/ | ||
newStream: (protocols: string | string[], options?: NewStreamOptions) => Promise<Stream> | ||
/** | ||
* Add a stream to this connection | ||
*/ | ||
addStream: (stream: Stream) => void | ||
/** | ||
* Remove a stream from this connection | ||
*/ | ||
removeStream: (id: string) => void | ||
close: () => Promise<void> | ||
/** | ||
* Gracefully close the connection. All queued data will be written to the | ||
* underlying transport. | ||
*/ | ||
close: (options?: AbortOptions) => Promise<void> | ||
/** | ||
* Immediately close the connection, any queued data will be discarded | ||
*/ | ||
abort: (err: Error) => void | ||
} | ||
@@ -205,3 +284,2 @@ | ||
export interface ConnectionProtector { | ||
/** | ||
@@ -216,4 +294,16 @@ * Takes a given Connection and creates a private encryption stream | ||
export interface MultiaddrConnectionTimeline { | ||
/** | ||
* When the connection was opened | ||
*/ | ||
open: number | ||
/** | ||
* When the MultiaddrConnection was upgraded to a Connection - the type of | ||
* connection encryption and multiplexing was negotiated. | ||
*/ | ||
upgraded?: number | ||
/** | ||
* When the connection was closed. | ||
*/ | ||
close?: number | ||
@@ -228,5 +318,22 @@ } | ||
export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> { | ||
close: (err?: Error) => Promise<void> | ||
/** | ||
* Gracefully close the connection. All queued data will be written to the | ||
* underlying transport. | ||
*/ | ||
close: (options?: AbortOptions) => Promise<void> | ||
/** | ||
* Immediately close the connection, any queued data will be discarded | ||
*/ | ||
abort: (err: Error) => void | ||
/** | ||
* The address of the remote end of the connection | ||
*/ | ||
remoteAddr: Multiaddr | ||
/** | ||
* When connection lifecycle events occurred | ||
*/ | ||
timeline: MultiaddrConnectionTimeline | ||
} |
@@ -509,4 +509,8 @@ /** | ||
/** | ||
* Attempts to gracefully close an open connection to the given peer. If the connection is not closed in the grace period, it will be forcefully closed. | ||
* Attempts to gracefully close an open connection to the given peer. If the | ||
* connection is not closed in the grace period, it will be forcefully closed. | ||
* | ||
* An AbortSignal can optionally be passed to control when the connection is | ||
* forcefully closed. | ||
* | ||
* @example | ||
@@ -518,3 +522,3 @@ * | ||
*/ | ||
hangUp: (peer: PeerId | Multiaddr) => Promise<void> | ||
hangUp: (peer: PeerId | Multiaddr, options?: AbortOptions) => Promise<void> | ||
@@ -521,0 +525,0 @@ /** |
@@ -40,6 +40,11 @@ import type { Direction, Stream } from '../connection/index.js' | ||
*/ | ||
close: (err?: Error) => void | ||
close: (options?: AbortOptions) => Promise<void> | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
*/ | ||
abort: (err: Error) => void | ||
} | ||
export interface StreamMuxerInit extends AbortOptions { | ||
export interface StreamMuxerInit { | ||
/** | ||
@@ -46,0 +51,0 @@ * A callback function invoked every time an incoming stream is opened |
@@ -1,20 +0,19 @@ | ||
// import { logger } from '@libp2p/logger' | ||
import { abortableSource } from 'abortable-iterator' | ||
import { anySignal } from 'any-signal' | ||
import { type Pushable, pushable } from 'it-pushable' | ||
import defer, { type DeferredPromise } from 'p-defer' | ||
import { Uint8ArrayList } from 'uint8arraylist' | ||
import { CodeError } from '../errors.js' | ||
import type { Direction, Stream, StreamStat } from '../connection/index.js' | ||
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js' | ||
import type { AbortOptions } from '../index.js' | ||
import type { Source } from 'it-stream-types' | ||
// const log = logger('libp2p:stream') | ||
interface Logger { | ||
(formatter: any, ...args: any[]): void | ||
error: (formatter: any, ...args: any[]) => void | ||
trace: (formatter: any, ...args: any[]) => void | ||
enabled: boolean | ||
} | ||
const log: any = () => {} | ||
log.trace = () => {} | ||
log.error = () => {} | ||
const ERR_STREAM_RESET = 'ERR_STREAM_RESET' | ||
const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT' | ||
const ERR_SINK_ENDED = 'ERR_SINK_ENDED' | ||
const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK' | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE' | ||
@@ -33,6 +32,5 @@ export interface AbstractStreamInit { | ||
/** | ||
* The maximum allowable data size, any data larger than this will be | ||
* chunked and sent in multiple data messages | ||
* A Logger implementation used to log stream-specific information | ||
*/ | ||
maxDataSize: number | ||
log: Logger | ||
@@ -48,2 +46,28 @@ /** | ||
onEnd?: (err?: Error | undefined) => void | ||
/** | ||
* Invoked when the readable end of the stream is closed | ||
*/ | ||
onCloseRead?: () => void | ||
/** | ||
* Invoked when the writable end of the stream is closed | ||
*/ | ||
onCloseWrite?: () => void | ||
/** | ||
* Invoked when the the stream has been reset by the remote | ||
*/ | ||
onReset?: () => void | ||
/** | ||
* Invoked when the the stream has errored | ||
*/ | ||
onAbort?: (err: Error) => void | ||
/** | ||
* How long to wait in ms for stream data to be written to the underlying | ||
* connection when closing the writable end of the stream. (default: 500) | ||
*/ | ||
closeTimeout?: number | ||
} | ||
@@ -57,50 +81,56 @@ | ||
public id: string | ||
public stat: StreamStat | ||
public direction: Direction | ||
public timeline: StreamTimeline | ||
public protocol?: string | ||
public metadata: Record<string, unknown> | ||
public source: AsyncGenerator<Uint8ArrayList, void, unknown> | ||
public status: StreamStatus | ||
public readStatus: ReadStatus | ||
public writeStatus: WriteStatus | ||
private readonly abortController: AbortController | ||
private readonly resetController: AbortController | ||
private readonly closeController: AbortController | ||
private sourceEnded: boolean | ||
private sinkEnded: boolean | ||
private sinkSunk: boolean | ||
private readonly sinkController: AbortController | ||
private readonly sinkEnd: DeferredPromise<void> | ||
private endErr: Error | undefined | ||
private readonly streamSource: Pushable<Uint8ArrayList> | ||
private readonly onEnd?: (err?: Error | undefined) => void | ||
private readonly maxDataSize: number | ||
private readonly onCloseRead?: () => void | ||
private readonly onCloseWrite?: () => void | ||
private readonly onReset?: () => void | ||
private readonly onAbort?: (err: Error) => void | ||
protected readonly log: Logger | ||
constructor (init: AbstractStreamInit) { | ||
this.abortController = new AbortController() | ||
this.resetController = new AbortController() | ||
this.closeController = new AbortController() | ||
this.sourceEnded = false | ||
this.sinkEnded = false | ||
this.sinkSunk = false | ||
this.sinkController = new AbortController() | ||
this.sinkEnd = defer() | ||
this.log = init.log | ||
// stream status | ||
this.status = 'open' | ||
this.readStatus = 'ready' | ||
this.writeStatus = 'ready' | ||
this.id = init.id | ||
this.metadata = init.metadata ?? {} | ||
this.stat = { | ||
direction: init.direction, | ||
timeline: { | ||
open: Date.now() | ||
} | ||
this.direction = init.direction | ||
this.timeline = { | ||
open: Date.now() | ||
} | ||
this.maxDataSize = init.maxDataSize | ||
this.onEnd = init.onEnd | ||
this.onCloseRead = init?.onCloseRead | ||
this.onCloseWrite = init?.onCloseWrite | ||
this.onReset = init?.onReset | ||
this.onAbort = init?.onAbort | ||
this.source = this.streamSource = pushable<Uint8ArrayList>({ | ||
onEnd: () => { | ||
// already sent a reset message | ||
if (this.stat.timeline.reset !== null) { | ||
const res = this.sendCloseRead() | ||
if (isPromise(res)) { | ||
res.catch(err => { | ||
log.error('error while sending close read', err) | ||
}) | ||
} | ||
onEnd: (err) => { | ||
if (err != null) { | ||
this.log.trace('source ended with error', err) | ||
} else { | ||
this.log.trace('source ended') | ||
} | ||
this.onSourceEnd() | ||
this.readStatus = 'closed' | ||
this.onSourceEnd(err) | ||
} | ||
@@ -113,10 +143,61 @@ }) | ||
async sink (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> { | ||
if (this.writeStatus !== 'ready') { | ||
throw new CodeError(`writable end state is "${this.writeStatus}" not "ready"`, ERR_SINK_INVALID_STATE) | ||
} | ||
try { | ||
this.writeStatus = 'writing' | ||
const options: AbortOptions = { | ||
signal: this.sinkController.signal | ||
} | ||
if (this.direction === 'outbound') { // If initiator, open a new stream | ||
const res = this.sendNewStream(options) | ||
if (isPromise(res)) { | ||
await res | ||
} | ||
} | ||
source = abortableSource(source, this.sinkController.signal, { | ||
returnOnAbort: true | ||
}) | ||
this.log.trace('sink reading from source') | ||
for await (let data of source) { | ||
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data | ||
const res = this.sendData(data, options) | ||
if (isPromise(res)) { // eslint-disable-line max-depth | ||
await res | ||
} | ||
} | ||
this.log.trace('sink finished reading from source') | ||
this.writeStatus = 'done' | ||
this.log.trace('sink calling closeWrite') | ||
await this.closeWrite(options) | ||
this.onSinkEnd() | ||
} catch (err: any) { | ||
this.log.trace('sink ended with error, calling abort with error', err) | ||
this.abort(err) | ||
throw err | ||
} finally { | ||
this.log.trace('resolve sink end') | ||
this.sinkEnd.resolve() | ||
} | ||
} | ||
protected onSourceEnd (err?: Error): void { | ||
if (this.sourceEnded) { | ||
if (this.timeline.closeRead != null) { | ||
return | ||
} | ||
this.stat.timeline.closeRead = Date.now() | ||
this.sourceEnded = true | ||
log.trace('%s stream %s source end - err: %o', this.stat.direction, this.id, err) | ||
this.timeline.closeRead = Date.now() | ||
@@ -127,8 +208,13 @@ if (err != null && this.endErr == null) { | ||
if (this.sinkEnded) { | ||
this.stat.timeline.close = Date.now() | ||
this.onCloseRead?.() | ||
if (this.timeline.closeWrite != null) { | ||
this.log.trace('source and sink ended') | ||
this.timeline.close = Date.now() | ||
if (this.onEnd != null) { | ||
this.onEnd(this.endErr) | ||
} | ||
} else { | ||
this.log.trace('source ended, waiting for sink to end') | ||
} | ||
@@ -138,9 +224,7 @@ } | ||
protected onSinkEnd (err?: Error): void { | ||
if (this.sinkEnded) { | ||
if (this.timeline.closeWrite != null) { | ||
return | ||
} | ||
this.stat.timeline.closeWrite = Date.now() | ||
this.sinkEnded = true | ||
log.trace('%s stream %s sink end - err: %o', this.stat.direction, this.id, err) | ||
this.timeline.closeWrite = Date.now() | ||
@@ -151,8 +235,13 @@ if (err != null && this.endErr == null) { | ||
if (this.sourceEnded) { | ||
this.stat.timeline.close = Date.now() | ||
this.onCloseWrite?.() | ||
if (this.timeline.closeRead != null) { | ||
this.log.trace('sink and source ended') | ||
this.timeline.close = Date.now() | ||
if (this.onEnd != null) { | ||
this.onEnd(this.endErr) | ||
} | ||
} else { | ||
this.log.trace('sink ended, waiting for source to end') | ||
} | ||
@@ -162,170 +251,190 @@ } | ||
// Close for both Reading and Writing | ||
close (): void { | ||
log.trace('%s stream %s close', this.stat.direction, this.id) | ||
async close (options?: AbortOptions): Promise<void> { | ||
this.log.trace('closing gracefully') | ||
this.closeRead() | ||
this.closeWrite() | ||
} | ||
this.status = 'closing' | ||
// Close for reading | ||
closeRead (): void { | ||
log.trace('%s stream %s closeRead', this.stat.direction, this.id) | ||
await Promise.all([ | ||
this.closeRead(options), | ||
this.closeWrite(options) | ||
]) | ||
if (this.sourceEnded) { | ||
return | ||
} | ||
this.status = 'closed' | ||
this.streamSource.end() | ||
this.log.trace('closed gracefully') | ||
} | ||
// Close for writing | ||
closeWrite (): void { | ||
log.trace('%s stream %s closeWrite', this.stat.direction, this.id) | ||
if (this.sinkEnded) { | ||
async closeRead (options: AbortOptions = {}): Promise<void> { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
return | ||
} | ||
this.closeController.abort() | ||
this.log.trace('closing readable end of stream with starting read status "%s"', this.readStatus) | ||
try { | ||
// need to call this here as the sink method returns in the catch block | ||
// when the close controller is aborted | ||
const res = this.sendCloseWrite() | ||
const readStatus = this.readStatus | ||
this.readStatus = 'closing' | ||
if (isPromise(res)) { | ||
res.catch(err => { | ||
log.error('error while sending close write', err) | ||
}) | ||
} | ||
} catch (err) { | ||
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err) | ||
if (readStatus === 'ready') { | ||
this.log.trace('ending internal source queue') | ||
this.streamSource.end() | ||
} | ||
this.onSinkEnd() | ||
} | ||
if (this.status !== 'reset' && this.status !== 'aborted') { | ||
this.log.trace('send close read to remote') | ||
await this.sendCloseRead(options) | ||
} | ||
// Close for reading and writing (local error) | ||
abort (err: Error): void { | ||
log.trace('%s stream %s abort', this.stat.direction, this.id, err) | ||
// End the source with the passed error | ||
this.streamSource.end(err) | ||
this.abortController.abort() | ||
this.onSinkEnd(err) | ||
this.log.trace('closed readable end of stream') | ||
} | ||
// Close immediately for reading and writing (remote error) | ||
reset (): void { | ||
const err = new CodeError('stream reset', ERR_STREAM_RESET) | ||
this.resetController.abort() | ||
this.streamSource.end(err) | ||
this.onSinkEnd(err) | ||
} | ||
async closeWrite (options: AbortOptions = {}): Promise<void> { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
return | ||
} | ||
async sink (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> { | ||
if (this.sinkSunk) { | ||
throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK) | ||
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus) | ||
const writeStatus = this.writeStatus | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array') | ||
await this.sink([]) | ||
} | ||
this.sinkSunk = true | ||
this.writeStatus = 'closing' | ||
if (this.sinkEnded) { | ||
throw new CodeError('stream closed for writing', ERR_SINK_ENDED) | ||
if (writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
// - this lets any data queued by the user in the current tick get read | ||
// before we exit | ||
await new Promise((resolve, reject) => { | ||
queueMicrotask(() => { | ||
this.log.trace('aborting source passed to .sink') | ||
this.sinkController.abort() | ||
this.sinkEnd.promise.then(resolve, reject) | ||
}) | ||
}) | ||
} | ||
const signal = anySignal([ | ||
this.abortController.signal, | ||
this.resetController.signal, | ||
this.closeController.signal | ||
]) | ||
if (this.status !== 'reset' && this.status !== 'aborted') { | ||
this.log.trace('send close write to remote') | ||
await this.sendCloseWrite(options) | ||
} | ||
try { | ||
source = abortableSource(source, signal) | ||
this.writeStatus = 'closed' | ||
if (this.stat.direction === 'outbound') { // If initiator, open a new stream | ||
const res = this.sendNewStream() | ||
this.log.trace('closed writable end of stream') | ||
} | ||
if (isPromise(res)) { | ||
await res | ||
} | ||
} | ||
/** | ||
* Close immediately for reading and writing and send a reset message (local | ||
* error) | ||
*/ | ||
abort (err: Error): void { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return | ||
} | ||
for await (let data of source) { | ||
while (data.length > 0) { | ||
if (data.length <= this.maxDataSize) { | ||
const res = this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data) | ||
this.log('abort with error', err) | ||
if (isPromise(res)) { // eslint-disable-line max-depth | ||
await res | ||
} | ||
// try to send a reset message | ||
this.log('try to send reset to remote') | ||
const res = this.sendReset() | ||
break | ||
} | ||
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data | ||
const res = this.sendData(data.sublist(0, this.maxDataSize)) | ||
if (isPromise(res)) { | ||
res.catch((err) => { | ||
this.log.error('error sending reset message', err) | ||
}) | ||
} | ||
if (isPromise(res)) { | ||
await res | ||
} | ||
this.status = 'aborted' | ||
this.timeline.abort = Date.now() | ||
this._closeSinkAndSource(err) | ||
this.onAbort?.(err) | ||
} | ||
data.consume(this.maxDataSize) | ||
} | ||
} | ||
} catch (err: any) { | ||
if (err.type === 'aborted' && err.message === 'The operation was aborted') { | ||
if (this.closeController.signal.aborted) { | ||
return | ||
} | ||
/** | ||
* Receive a reset message - close immediately for reading and writing (remote | ||
* error) | ||
*/ | ||
reset (): void { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return | ||
} | ||
if (this.resetController.signal.aborted) { | ||
err.message = 'stream reset' | ||
err.code = ERR_STREAM_RESET | ||
} | ||
const err = new CodeError('stream reset', ERR_STREAM_RESET) | ||
if (this.abortController.signal.aborted) { | ||
err.message = 'stream aborted' | ||
err.code = ERR_STREAM_ABORT | ||
} | ||
} | ||
this.status = 'reset' | ||
this._closeSinkAndSource(err) | ||
this.onReset?.() | ||
} | ||
// Send no more data if this stream was remotely reset | ||
if (err.code === ERR_STREAM_RESET) { | ||
log.trace('%s stream %s reset', this.stat.direction, this.id) | ||
} else { | ||
log.trace('%s stream %s error', this.stat.direction, this.id, err) | ||
try { | ||
const res = this.sendReset() | ||
_closeSinkAndSource (err?: Error): void { | ||
this._closeSink(err) | ||
this._closeSource(err) | ||
} | ||
if (isPromise(res)) { | ||
await res | ||
} | ||
_closeSink (err?: Error): void { | ||
// if the sink function is running, cause it to end | ||
if (this.writeStatus === 'writing') { | ||
this.log.trace('end sink source') | ||
this.sinkController.abort() | ||
} | ||
this.stat.timeline.reset = Date.now() | ||
} catch (err) { | ||
log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err) | ||
} | ||
} | ||
this.onSinkEnd(err) | ||
} | ||
_closeSource (err?: Error): void { | ||
// if the source is not ending, end it | ||
if (this.readStatus !== 'closing' && this.readStatus !== 'closed') { | ||
this.log.trace('ending source with %d bytes to be read by consumer', this.streamSource.readableLength) | ||
this.readStatus = 'closing' | ||
this.streamSource.end(err) | ||
this.onSinkEnd(err) | ||
} | ||
} | ||
throw err | ||
} finally { | ||
signal.clear() | ||
/** | ||
* The remote closed for writing so we should expect to receive no more | ||
* messages | ||
*/ | ||
remoteCloseWrite (): void { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
this.log('received remote close write but local source is already closed') | ||
return | ||
} | ||
try { | ||
const res = this.sendCloseWrite() | ||
this.log.trace('remote close write') | ||
this._closeSource() | ||
} | ||
if (isPromise(res)) { | ||
await res | ||
} | ||
} catch (err) { | ||
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err) | ||
/** | ||
* The remote closed for reading so we should not send any more | ||
* messages | ||
*/ | ||
remoteCloseRead (): void { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
this.log('received remote close read but local sink is already closed') | ||
return | ||
} | ||
this.onSinkEnd() | ||
this.log.trace('remote close read') | ||
this._closeSink() | ||
} | ||
/** | ||
* The underlying muxer has closed, no more messages can be sent or will | ||
* be received, close immediately to free up resources | ||
*/ | ||
destroy (): void { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
this.log('received destroy but we are already closed') | ||
return | ||
} | ||
this.log.trace('muxer destroyed') | ||
this._closeSinkAndSource() | ||
} | ||
/** | ||
* When an extending class reads data from it's implementation-specific source, | ||
@@ -350,3 +459,3 @@ * call this method to allow the stream consumer to read the data. | ||
*/ | ||
abstract sendNewStream (): void | Promise<void> | ||
abstract sendNewStream (options?: AbortOptions): void | Promise<void> | ||
@@ -356,3 +465,3 @@ /** | ||
*/ | ||
abstract sendData (buf: Uint8ArrayList): void | Promise<void> | ||
abstract sendData (buf: Uint8ArrayList, options?: AbortOptions): void | Promise<void> | ||
@@ -362,3 +471,3 @@ /** | ||
*/ | ||
abstract sendReset (): void | Promise<void> | ||
abstract sendReset (options?: AbortOptions): void | Promise<void> | ||
@@ -369,3 +478,3 @@ /** | ||
*/ | ||
abstract sendCloseWrite (): void | Promise<void> | ||
abstract sendCloseWrite (options?: AbortOptions): void | Promise<void> | ||
@@ -376,3 +485,3 @@ /** | ||
*/ | ||
abstract sendCloseRead (): void | Promise<void> | ||
abstract sendCloseRead (options?: AbortOptions): void | Promise<void> | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
252176
6063
52
+ Addedp-defer@^4.0.0
- Removedany-signal@^4.1.1
- Removedany-signal@4.1.1(transitive)
Updatedit-pushable@^3.2.0