@libp2p/interface
Advanced tools
Comparing version 0.1.6-9c67c5b3d to 0.1.6-adea7bbbf
@@ -1,2 +0,2 @@ | ||
import type { AbortOptions } from '../index.js'; | ||
import type { AbortOptions, Logger } from '../index.js'; | ||
import type { PeerId } from '../peer-id/index.js'; | ||
@@ -147,2 +147,6 @@ import type { Multiaddr } from '@multiformats/multiaddr'; | ||
writeStatus: WriteStatus; | ||
/** | ||
* The stream logger | ||
*/ | ||
log: Logger; | ||
} | ||
@@ -230,2 +234,6 @@ export interface NewStreamOptions extends AbortOptions { | ||
abort(err: Error): void; | ||
/** | ||
* The connection logger | ||
*/ | ||
log: Logger; | ||
} | ||
@@ -280,3 +288,7 @@ export declare const symbol: unique symbol; | ||
timeline: MultiaddrConnectionTimeline; | ||
/** | ||
* The multiaddr connection logger | ||
*/ | ||
log: Logger; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,150 +0,3 @@ | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js'; | ||
import type { AbortOptions } from '../index.js'; | ||
import type { Source } from 'it-stream-types'; | ||
interface Logger { | ||
(formatter: any, ...args: any[]): void; | ||
error(formatter: any, ...args: any[]): void; | ||
trace(formatter: any, ...args: any[]): void; | ||
enabled: boolean; | ||
} | ||
export interface AbstractStreamInit { | ||
/** | ||
* A unique identifier for this stream | ||
*/ | ||
id: string; | ||
/** | ||
* The stream direction | ||
*/ | ||
direction: Direction; | ||
/** | ||
* A Logger implementation used to log stream-specific information | ||
*/ | ||
log: Logger; | ||
/** | ||
* User specific stream metadata | ||
*/ | ||
metadata?: Record<string, unknown>; | ||
/** | ||
* Invoked when the stream ends | ||
*/ | ||
onEnd?(err?: Error | undefined): void; | ||
/** | ||
* Invoked when the readable end of the stream is closed | ||
*/ | ||
onCloseRead?(): void; | ||
/** | ||
* Invoked when the writable end of the stream is closed | ||
*/ | ||
onCloseWrite?(): void; | ||
/** | ||
* Invoked when the the stream has been reset by the remote | ||
*/ | ||
onReset?(): void; | ||
/** | ||
* Invoked when the the stream has errored | ||
*/ | ||
onAbort?(err: Error): void; | ||
/** | ||
* How long to wait in ms for stream data to be written to the underlying | ||
* connection when closing the writable end of the stream. (default: 500) | ||
*/ | ||
closeTimeout?: number; | ||
/** | ||
* After the stream sink has closed, a limit on how long it takes to send | ||
* a close-write message to the remote peer. | ||
*/ | ||
sendCloseWriteTimeout?: number; | ||
} | ||
export declare abstract class AbstractStream implements Stream { | ||
id: string; | ||
direction: Direction; | ||
timeline: StreamTimeline; | ||
protocol?: string; | ||
metadata: Record<string, unknown>; | ||
source: AsyncGenerator<Uint8ArrayList, void, unknown>; | ||
status: StreamStatus; | ||
readStatus: ReadStatus; | ||
writeStatus: WriteStatus; | ||
private readonly sinkController; | ||
private readonly sinkEnd; | ||
private endErr; | ||
private readonly streamSource; | ||
private readonly onEnd?; | ||
private readonly onCloseRead?; | ||
private readonly onCloseWrite?; | ||
private readonly onReset?; | ||
private readonly onAbort?; | ||
private readonly sendCloseWriteTimeout; | ||
protected readonly log: Logger; | ||
constructor(init: AbstractStreamInit); | ||
sink(source: Source<Uint8ArrayList | Uint8Array>): Promise<void>; | ||
protected onSourceEnd(err?: Error): void; | ||
protected onSinkEnd(err?: Error): void; | ||
close(options?: AbortOptions): Promise<void>; | ||
closeRead(options?: AbortOptions): Promise<void>; | ||
closeWrite(options?: AbortOptions): Promise<void>; | ||
/** | ||
* Close immediately for reading and writing and send a reset message (local | ||
* error) | ||
*/ | ||
abort(err: Error): void; | ||
/** | ||
* Receive a reset message - close immediately for reading and writing (remote | ||
* error) | ||
*/ | ||
reset(): void; | ||
_closeSinkAndSource(err?: Error): void; | ||
_closeSink(err?: Error): void; | ||
_closeSource(err?: Error): void; | ||
/** | ||
* The remote closed for writing so we should expect to receive no more | ||
* messages | ||
*/ | ||
remoteCloseWrite(): void; | ||
/** | ||
* The remote closed for reading so we should not send any more | ||
* messages | ||
*/ | ||
remoteCloseRead(): void; | ||
/** | ||
* The underlying muxer has closed, no more messages can be sent or will | ||
* be received, close immediately to free up resources | ||
*/ | ||
destroy(): void; | ||
/** | ||
* When an extending class reads data from it's implementation-specific source, | ||
* call this method to allow the stream consumer to read the data. | ||
*/ | ||
sourcePush(data: Uint8ArrayList): void; | ||
/** | ||
* Returns the amount of unread data - can be used to prevent large amounts of | ||
* data building up when the stream consumer is too slow. | ||
*/ | ||
sourceReadableLength(): number; | ||
/** | ||
* Send a message to the remote muxer informing them a new stream is being | ||
* opened | ||
*/ | ||
abstract sendNewStream(options?: AbortOptions): void | Promise<void>; | ||
/** | ||
* Send a data message to the remote muxer | ||
*/ | ||
abstract sendData(buf: Uint8ArrayList, options?: AbortOptions): void | Promise<void>; | ||
/** | ||
* Send a reset message to the remote muxer | ||
*/ | ||
abstract sendReset(options?: AbortOptions): void | Promise<void>; | ||
/** | ||
* Send a message to the remote muxer, informing them no more data messages | ||
* will be sent by this end of the stream | ||
*/ | ||
abstract sendCloseWrite(options?: AbortOptions): void | Promise<void>; | ||
/** | ||
* Send a message to the remote muxer, informing them no more data messages | ||
* will be read by this end of the stream | ||
*/ | ||
abstract sendCloseRead(options?: AbortOptions): void | Promise<void>; | ||
} | ||
export {}; | ||
export type AbstractStreamInit = any; | ||
export declare const AbstractStream: any; | ||
//# sourceMappingURL=stream.d.ts.map |
@@ -1,323 +0,9 @@ | ||
import { abortableSource } from 'abortable-iterator'; | ||
import { pushable } from 'it-pushable'; | ||
import defer, {} from 'p-defer'; | ||
import { raceSignal } from 'race-signal'; | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
import { CodeError } from '../errors.js'; | ||
const ERR_STREAM_RESET = 'ERR_STREAM_RESET'; | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'; | ||
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000; | ||
function isPromise(res) { | ||
return res != null && typeof res.then === 'function'; | ||
} | ||
export class AbstractStream { | ||
id; | ||
direction; | ||
timeline; | ||
protocol; | ||
metadata; | ||
source; | ||
status; | ||
readStatus; | ||
writeStatus; | ||
sinkController; | ||
sinkEnd; | ||
endErr; | ||
streamSource; | ||
onEnd; | ||
onCloseRead; | ||
onCloseWrite; | ||
onReset; | ||
onAbort; | ||
sendCloseWriteTimeout; | ||
log; | ||
constructor(init) { | ||
this.sinkController = new AbortController(); | ||
this.sinkEnd = defer(); | ||
this.log = init.log; | ||
// stream status | ||
this.status = 'open'; | ||
this.readStatus = 'ready'; | ||
this.writeStatus = 'ready'; | ||
this.id = init.id; | ||
this.metadata = init.metadata ?? {}; | ||
this.direction = init.direction; | ||
this.timeline = { | ||
open: Date.now() | ||
}; | ||
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT; | ||
this.onEnd = init.onEnd; | ||
this.onCloseRead = init?.onCloseRead; | ||
this.onCloseWrite = init?.onCloseWrite; | ||
this.onReset = init?.onReset; | ||
this.onAbort = init?.onAbort; | ||
this.source = this.streamSource = pushable({ | ||
onEnd: (err) => { | ||
if (err != null) { | ||
this.log.trace('source ended with error', err); | ||
} | ||
else { | ||
this.log.trace('source ended'); | ||
} | ||
this.onSourceEnd(err); | ||
} | ||
}); | ||
// necessary because the libp2p upgrader wraps the sink function | ||
this.sink = this.sink.bind(this); | ||
} | ||
async sink(source) { | ||
if (this.writeStatus !== 'ready') { | ||
throw new CodeError(`writable end state is "${this.writeStatus}" not "ready"`, ERR_SINK_INVALID_STATE); | ||
} | ||
try { | ||
this.writeStatus = 'writing'; | ||
const options = { | ||
signal: this.sinkController.signal | ||
}; | ||
if (this.direction === 'outbound') { // If initiator, open a new stream | ||
const res = this.sendNewStream(options); | ||
if (isPromise(res)) { | ||
await res; | ||
} | ||
} | ||
source = abortableSource(source, this.sinkController.signal, { | ||
returnOnAbort: true | ||
}); | ||
this.log.trace('sink reading from source'); | ||
for await (let data of source) { | ||
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data; | ||
const res = this.sendData(data, options); | ||
if (isPromise(res)) { // eslint-disable-line max-depth | ||
await res; | ||
} | ||
} | ||
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus); | ||
if (this.writeStatus === 'writing') { | ||
this.writeStatus = 'closing'; | ||
this.log.trace('send close write to remote'); | ||
await this.sendCloseWrite({ | ||
signal: AbortSignal.timeout(this.sendCloseWriteTimeout) | ||
}); | ||
this.writeStatus = 'closed'; | ||
} | ||
this.onSinkEnd(); | ||
} | ||
catch (err) { | ||
this.log.trace('sink ended with error, calling abort with error', err); | ||
this.abort(err); | ||
throw err; | ||
} | ||
finally { | ||
this.log.trace('resolve sink end'); | ||
this.sinkEnd.resolve(); | ||
} | ||
} | ||
onSourceEnd(err) { | ||
if (this.timeline.closeRead != null) { | ||
return; | ||
} | ||
this.timeline.closeRead = Date.now(); | ||
this.readStatus = 'closed'; | ||
if (err != null && this.endErr == null) { | ||
this.endErr = err; | ||
} | ||
this.onCloseRead?.(); | ||
if (this.timeline.closeWrite != null) { | ||
this.log.trace('source and sink ended'); | ||
this.timeline.close = Date.now(); | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed'; | ||
} | ||
if (this.onEnd != null) { | ||
this.onEnd(this.endErr); | ||
} | ||
} | ||
else { | ||
this.log.trace('source ended, waiting for sink to end'); | ||
} | ||
} | ||
onSinkEnd(err) { | ||
if (this.timeline.closeWrite != null) { | ||
return; | ||
} | ||
this.timeline.closeWrite = Date.now(); | ||
this.writeStatus = 'closed'; | ||
if (err != null && this.endErr == null) { | ||
this.endErr = err; | ||
} | ||
this.onCloseWrite?.(); | ||
if (this.timeline.closeRead != null) { | ||
this.log.trace('sink and source ended'); | ||
this.timeline.close = Date.now(); | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed'; | ||
} | ||
if (this.onEnd != null) { | ||
this.onEnd(this.endErr); | ||
} | ||
} | ||
else { | ||
this.log.trace('sink ended, waiting for source to end'); | ||
} | ||
} | ||
// Close for both Reading and Writing | ||
async close(options) { | ||
this.log.trace('closing gracefully'); | ||
this.status = 'closing'; | ||
await Promise.all([ | ||
this.closeRead(options), | ||
this.closeWrite(options) | ||
]); | ||
this.status = 'closed'; | ||
this.log.trace('closed gracefully'); | ||
} | ||
async closeRead(options = {}) { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
return; | ||
} | ||
this.log.trace('closing readable end of stream with starting read status "%s"', this.readStatus); | ||
const readStatus = this.readStatus; | ||
this.readStatus = 'closing'; | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote'); | ||
await this.sendCloseRead(options); | ||
} | ||
if (readStatus === 'ready') { | ||
this.log.trace('ending internal source queue'); | ||
this.streamSource.end(); | ||
} | ||
this.log.trace('closed readable end of stream'); | ||
} | ||
async closeWrite(options = {}) { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
return; | ||
} | ||
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus); | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array'); | ||
await raceSignal(this.sink([]), options.signal); | ||
} | ||
if (this.writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
// - this lets any data queued by the user in the current tick get read | ||
// before we exit | ||
await new Promise((resolve, reject) => { | ||
queueMicrotask(() => { | ||
this.log.trace('aborting source passed to .sink'); | ||
this.sinkController.abort(); | ||
raceSignal(this.sinkEnd.promise, options.signal) | ||
.then(resolve, reject); | ||
}); | ||
}); | ||
} | ||
this.writeStatus = 'closed'; | ||
this.log.trace('closed writable end of stream'); | ||
} | ||
/** | ||
* Close immediately for reading and writing and send a reset message (local | ||
* error) | ||
*/ | ||
abort(err) { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return; | ||
} | ||
this.log('abort with error', err); | ||
// try to send a reset message | ||
this.log('try to send reset to remote'); | ||
const res = this.sendReset(); | ||
if (isPromise(res)) { | ||
res.catch((err) => { | ||
this.log.error('error sending reset message', err); | ||
}); | ||
} | ||
this.status = 'aborted'; | ||
this.timeline.abort = Date.now(); | ||
this._closeSinkAndSource(err); | ||
this.onAbort?.(err); | ||
} | ||
/** | ||
* Receive a reset message - close immediately for reading and writing (remote | ||
* error) | ||
*/ | ||
reset() { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return; | ||
} | ||
const err = new CodeError('stream reset', ERR_STREAM_RESET); | ||
this.status = 'reset'; | ||
this.timeline.reset = Date.now(); | ||
this._closeSinkAndSource(err); | ||
this.onReset?.(); | ||
} | ||
_closeSinkAndSource(err) { | ||
this._closeSink(err); | ||
this._closeSource(err); | ||
} | ||
_closeSink(err) { | ||
// if the sink function is running, cause it to end | ||
if (this.writeStatus === 'writing') { | ||
this.log.trace('end sink source'); | ||
this.sinkController.abort(); | ||
} | ||
this.onSinkEnd(err); | ||
} | ||
_closeSource(err) { | ||
// if the source is not ending, end it | ||
if (this.readStatus !== 'closing' && this.readStatus !== 'closed') { | ||
this.log.trace('ending source with %d bytes to be read by consumer', this.streamSource.readableLength); | ||
this.readStatus = 'closing'; | ||
this.streamSource.end(err); | ||
} | ||
} | ||
/** | ||
* The remote closed for writing so we should expect to receive no more | ||
* messages | ||
*/ | ||
remoteCloseWrite() { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
this.log('received remote close write but local source is already closed'); | ||
return; | ||
} | ||
this.log.trace('remote close write'); | ||
this._closeSource(); | ||
} | ||
/** | ||
* The remote closed for reading so we should not send any more | ||
* messages | ||
*/ | ||
remoteCloseRead() { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
this.log('received remote close read but local sink is already closed'); | ||
return; | ||
} | ||
this.log.trace('remote close read'); | ||
this._closeSink(); | ||
} | ||
/** | ||
* The underlying muxer has closed, no more messages can be sent or will | ||
* be received, close immediately to free up resources | ||
*/ | ||
destroy() { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
this.log('received destroy but we are already closed'); | ||
return; | ||
} | ||
this.log.trace('stream destroyed'); | ||
this._closeSinkAndSource(); | ||
} | ||
/** | ||
* When an extending class reads data from it's implementation-specific source, | ||
* call this method to allow the stream consumer to read the data. | ||
*/ | ||
sourcePush(data) { | ||
this.streamSource.push(data); | ||
} | ||
/** | ||
* Returns the amount of unread data - can be used to prevent large amounts of | ||
* data building up when the stream consumer is too slow. | ||
*/ | ||
sourceReadableLength() { | ||
return this.streamSource.readableLength; | ||
} | ||
} | ||
// TODO: remove this file and the dep-check override in package.json for | ||
// @libp2p/utils when yamux is updated. | ||
// This is a hack to defeat TypeScript trying to inspect the types for | ||
// @libp2p/utils/abstract-stream which depends on this module - making the | ||
// import path only resolvable at runtime breaks the transpile time circular | ||
// dependency | ||
const s = await import('@libp2p/utils' + '/abstract-stream'); | ||
export const AbstractStream = s.AbstractStream; | ||
//# sourceMappingURL=stream.js.map |
{ | ||
"name": "@libp2p/interface", | ||
"version": "0.1.6-9c67c5b3d", | ||
"version": "0.1.6-adea7bbbf", | ||
"description": "The interface implemented by a libp2p node", | ||
@@ -150,3 +150,3 @@ "license": "Apache-2.0 OR MIT", | ||
"lint": "aegir lint", | ||
"dep-check": "aegir dep-check", | ||
"dep-check": "aegir dep-check -i @libp2p/utils", | ||
"build": "aegir build", | ||
@@ -163,8 +163,5 @@ "test": "aegir test", | ||
"@multiformats/multiaddr": "^12.1.10", | ||
"abortable-iterator": "^5.0.1", | ||
"it-pushable": "^3.2.1", | ||
"it-stream-types": "^2.0.1", | ||
"multiformats": "^12.1.3", | ||
"p-defer": "^4.0.0", | ||
"race-signal": "^1.0.0", | ||
"uint8arraylist": "^2.4.3" | ||
@@ -175,5 +172,2 @@ }, | ||
"aegir": "^41.0.2", | ||
"delay": "^6.0.0", | ||
"it-all": "^3.0.3", | ||
"it-drain": "^3.0.3", | ||
"sinon": "^17.0.0", | ||
@@ -180,0 +174,0 @@ "sinon-ts": "^2.0.0" |
@@ -1,2 +0,2 @@ | ||
import type { AbortOptions } from '../index.js' | ||
import type { AbortOptions, Logger } from '../index.js' | ||
import type { PeerId } from '../peer-id/index.js' | ||
@@ -172,2 +172,7 @@ import type { Multiaddr } from '@multiformats/multiaddr' | ||
writeStatus: WriteStatus | ||
/** | ||
* The stream logger | ||
*/ | ||
log: Logger | ||
} | ||
@@ -272,2 +277,7 @@ | ||
abort(err: Error): void | ||
/** | ||
* The connection logger | ||
*/ | ||
log: Logger | ||
} | ||
@@ -334,2 +344,7 @@ | ||
timeline: MultiaddrConnectionTimeline | ||
/** | ||
* The multiaddr connection logger | ||
*/ | ||
log: Logger | ||
} |
@@ -1,496 +0,11 @@ | ||
import { abortableSource } from 'abortable-iterator' | ||
import { type Pushable, pushable } from 'it-pushable' | ||
import defer, { type DeferredPromise } from 'p-defer' | ||
import { raceSignal } from 'race-signal' | ||
import { Uint8ArrayList } from 'uint8arraylist' | ||
import { CodeError } from '../errors.js' | ||
import type { Direction, ReadStatus, Stream, StreamStatus, StreamTimeline, WriteStatus } from '../connection/index.js' | ||
import type { AbortOptions } from '../index.js' | ||
import type { Source } from 'it-stream-types' | ||
// TODO: remove this file and the dep-check override in package.json for | ||
// @libp2p/utils when yamux is updated. | ||
// This is a hack to defeat TypeScript trying to inspect the types for | ||
// @libp2p/utils/abstract-stream which depends on this module - making the | ||
// import path only resolvable at runtime breaks the transpile time circular | ||
// dependency | ||
const s = await import('@libp2p/utils' + '/abstract-stream') | ||
// copied from @libp2p/logger to break a circular dependency | ||
interface Logger { | ||
(formatter: any, ...args: any[]): void | ||
error(formatter: any, ...args: any[]): void | ||
trace(formatter: any, ...args: any[]): void | ||
enabled: boolean | ||
} | ||
export type AbstractStreamInit = any | ||
const ERR_STREAM_RESET = 'ERR_STREAM_RESET' | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE' | ||
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000 | ||
export interface AbstractStreamInit { | ||
/** | ||
* A unique identifier for this stream | ||
*/ | ||
id: string | ||
/** | ||
* The stream direction | ||
*/ | ||
direction: Direction | ||
/** | ||
* A Logger implementation used to log stream-specific information | ||
*/ | ||
log: Logger | ||
/** | ||
* User specific stream metadata | ||
*/ | ||
metadata?: Record<string, unknown> | ||
/** | ||
* Invoked when the stream ends | ||
*/ | ||
onEnd?(err?: Error | undefined): void | ||
/** | ||
* Invoked when the readable end of the stream is closed | ||
*/ | ||
onCloseRead?(): void | ||
/** | ||
* Invoked when the writable end of the stream is closed | ||
*/ | ||
onCloseWrite?(): void | ||
/** | ||
* Invoked when the the stream has been reset by the remote | ||
*/ | ||
onReset?(): void | ||
/** | ||
* Invoked when the the stream has errored | ||
*/ | ||
onAbort?(err: Error): void | ||
/** | ||
* How long to wait in ms for stream data to be written to the underlying | ||
* connection when closing the writable end of the stream. (default: 500) | ||
*/ | ||
closeTimeout?: number | ||
/** | ||
* After the stream sink has closed, a limit on how long it takes to send | ||
* a close-write message to the remote peer. | ||
*/ | ||
sendCloseWriteTimeout?: number | ||
} | ||
function isPromise (res?: any): res is Promise<void> { | ||
return res != null && typeof res.then === 'function' | ||
} | ||
export abstract class AbstractStream implements Stream { | ||
public id: string | ||
public direction: Direction | ||
public timeline: StreamTimeline | ||
public protocol?: string | ||
public metadata: Record<string, unknown> | ||
public source: AsyncGenerator<Uint8ArrayList, void, unknown> | ||
public status: StreamStatus | ||
public readStatus: ReadStatus | ||
public writeStatus: WriteStatus | ||
private readonly sinkController: AbortController | ||
private readonly sinkEnd: DeferredPromise<void> | ||
private endErr: Error | undefined | ||
private readonly streamSource: Pushable<Uint8ArrayList> | ||
private readonly onEnd?: (err?: Error | undefined) => void | ||
private readonly onCloseRead?: () => void | ||
private readonly onCloseWrite?: () => void | ||
private readonly onReset?: () => void | ||
private readonly onAbort?: (err: Error) => void | ||
private readonly sendCloseWriteTimeout: number | ||
protected readonly log: Logger | ||
constructor (init: AbstractStreamInit) { | ||
this.sinkController = new AbortController() | ||
this.sinkEnd = defer() | ||
this.log = init.log | ||
// stream status | ||
this.status = 'open' | ||
this.readStatus = 'ready' | ||
this.writeStatus = 'ready' | ||
this.id = init.id | ||
this.metadata = init.metadata ?? {} | ||
this.direction = init.direction | ||
this.timeline = { | ||
open: Date.now() | ||
} | ||
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT | ||
this.onEnd = init.onEnd | ||
this.onCloseRead = init?.onCloseRead | ||
this.onCloseWrite = init?.onCloseWrite | ||
this.onReset = init?.onReset | ||
this.onAbort = init?.onAbort | ||
this.source = this.streamSource = pushable<Uint8ArrayList>({ | ||
onEnd: (err) => { | ||
if (err != null) { | ||
this.log.trace('source ended with error', err) | ||
} else { | ||
this.log.trace('source ended') | ||
} | ||
this.onSourceEnd(err) | ||
} | ||
}) | ||
// necessary because the libp2p upgrader wraps the sink function | ||
this.sink = this.sink.bind(this) | ||
} | ||
async sink (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> { | ||
if (this.writeStatus !== 'ready') { | ||
throw new CodeError(`writable end state is "${this.writeStatus}" not "ready"`, ERR_SINK_INVALID_STATE) | ||
} | ||
try { | ||
this.writeStatus = 'writing' | ||
const options: AbortOptions = { | ||
signal: this.sinkController.signal | ||
} | ||
if (this.direction === 'outbound') { // If initiator, open a new stream | ||
const res = this.sendNewStream(options) | ||
if (isPromise(res)) { | ||
await res | ||
} | ||
} | ||
source = abortableSource(source, this.sinkController.signal, { | ||
returnOnAbort: true | ||
}) | ||
this.log.trace('sink reading from source') | ||
for await (let data of source) { | ||
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data | ||
const res = this.sendData(data, options) | ||
if (isPromise(res)) { // eslint-disable-line max-depth | ||
await res | ||
} | ||
} | ||
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus) | ||
if (this.writeStatus === 'writing') { | ||
this.writeStatus = 'closing' | ||
this.log.trace('send close write to remote') | ||
await this.sendCloseWrite({ | ||
signal: AbortSignal.timeout(this.sendCloseWriteTimeout) | ||
}) | ||
this.writeStatus = 'closed' | ||
} | ||
this.onSinkEnd() | ||
} catch (err: any) { | ||
this.log.trace('sink ended with error, calling abort with error', err) | ||
this.abort(err) | ||
throw err | ||
} finally { | ||
this.log.trace('resolve sink end') | ||
this.sinkEnd.resolve() | ||
} | ||
} | ||
protected onSourceEnd (err?: Error): void { | ||
if (this.timeline.closeRead != null) { | ||
return | ||
} | ||
this.timeline.closeRead = Date.now() | ||
this.readStatus = 'closed' | ||
if (err != null && this.endErr == null) { | ||
this.endErr = err | ||
} | ||
this.onCloseRead?.() | ||
if (this.timeline.closeWrite != null) { | ||
this.log.trace('source and sink ended') | ||
this.timeline.close = Date.now() | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed' | ||
} | ||
if (this.onEnd != null) { | ||
this.onEnd(this.endErr) | ||
} | ||
} else { | ||
this.log.trace('source ended, waiting for sink to end') | ||
} | ||
} | ||
protected onSinkEnd (err?: Error): void { | ||
if (this.timeline.closeWrite != null) { | ||
return | ||
} | ||
this.timeline.closeWrite = Date.now() | ||
this.writeStatus = 'closed' | ||
if (err != null && this.endErr == null) { | ||
this.endErr = err | ||
} | ||
this.onCloseWrite?.() | ||
if (this.timeline.closeRead != null) { | ||
this.log.trace('sink and source ended') | ||
this.timeline.close = Date.now() | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed' | ||
} | ||
if (this.onEnd != null) { | ||
this.onEnd(this.endErr) | ||
} | ||
} else { | ||
this.log.trace('sink ended, waiting for source to end') | ||
} | ||
} | ||
// Close for both Reading and Writing | ||
async close (options?: AbortOptions): Promise<void> { | ||
this.log.trace('closing gracefully') | ||
this.status = 'closing' | ||
await Promise.all([ | ||
this.closeRead(options), | ||
this.closeWrite(options) | ||
]) | ||
this.status = 'closed' | ||
this.log.trace('closed gracefully') | ||
} | ||
async closeRead (options: AbortOptions = {}): Promise<void> { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
return | ||
} | ||
this.log.trace('closing readable end of stream with starting read status "%s"', this.readStatus) | ||
const readStatus = this.readStatus | ||
this.readStatus = 'closing' | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote') | ||
await this.sendCloseRead(options) | ||
} | ||
if (readStatus === 'ready') { | ||
this.log.trace('ending internal source queue') | ||
this.streamSource.end() | ||
} | ||
this.log.trace('closed readable end of stream') | ||
} | ||
async closeWrite (options: AbortOptions = {}): Promise<void> { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
return | ||
} | ||
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus) | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array') | ||
await raceSignal(this.sink([]), options.signal) | ||
} | ||
if (this.writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
// - this lets any data queued by the user in the current tick get read | ||
// before we exit | ||
await new Promise((resolve, reject) => { | ||
queueMicrotask(() => { | ||
this.log.trace('aborting source passed to .sink') | ||
this.sinkController.abort() | ||
raceSignal(this.sinkEnd.promise, options.signal) | ||
.then(resolve, reject) | ||
}) | ||
}) | ||
} | ||
this.writeStatus = 'closed' | ||
this.log.trace('closed writable end of stream') | ||
} | ||
/** | ||
* Close immediately for reading and writing and send a reset message (local | ||
* error) | ||
*/ | ||
abort (err: Error): void { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return | ||
} | ||
this.log('abort with error', err) | ||
// try to send a reset message | ||
this.log('try to send reset to remote') | ||
const res = this.sendReset() | ||
if (isPromise(res)) { | ||
res.catch((err) => { | ||
this.log.error('error sending reset message', err) | ||
}) | ||
} | ||
this.status = 'aborted' | ||
this.timeline.abort = Date.now() | ||
this._closeSinkAndSource(err) | ||
this.onAbort?.(err) | ||
} | ||
/** | ||
* Receive a reset message - close immediately for reading and writing (remote | ||
* error) | ||
*/ | ||
reset (): void { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
return | ||
} | ||
const err = new CodeError('stream reset', ERR_STREAM_RESET) | ||
this.status = 'reset' | ||
this.timeline.reset = Date.now() | ||
this._closeSinkAndSource(err) | ||
this.onReset?.() | ||
} | ||
_closeSinkAndSource (err?: Error): void { | ||
this._closeSink(err) | ||
this._closeSource(err) | ||
} | ||
_closeSink (err?: Error): void { | ||
// if the sink function is running, cause it to end | ||
if (this.writeStatus === 'writing') { | ||
this.log.trace('end sink source') | ||
this.sinkController.abort() | ||
} | ||
this.onSinkEnd(err) | ||
} | ||
_closeSource (err?: Error): void { | ||
// if the source is not ending, end it | ||
if (this.readStatus !== 'closing' && this.readStatus !== 'closed') { | ||
this.log.trace('ending source with %d bytes to be read by consumer', this.streamSource.readableLength) | ||
this.readStatus = 'closing' | ||
this.streamSource.end(err) | ||
} | ||
} | ||
/** | ||
* The remote closed for writing so we should expect to receive no more | ||
* messages | ||
*/ | ||
remoteCloseWrite (): void { | ||
if (this.readStatus === 'closing' || this.readStatus === 'closed') { | ||
this.log('received remote close write but local source is already closed') | ||
return | ||
} | ||
this.log.trace('remote close write') | ||
this._closeSource() | ||
} | ||
/** | ||
* The remote closed for reading so we should not send any more | ||
* messages | ||
*/ | ||
remoteCloseRead (): void { | ||
if (this.writeStatus === 'closing' || this.writeStatus === 'closed') { | ||
this.log('received remote close read but local sink is already closed') | ||
return | ||
} | ||
this.log.trace('remote close read') | ||
this._closeSink() | ||
} | ||
/** | ||
* The underlying muxer has closed, no more messages can be sent or will | ||
* be received, close immediately to free up resources | ||
*/ | ||
destroy (): void { | ||
if (this.status === 'closed' || this.status === 'aborted' || this.status === 'reset') { | ||
this.log('received destroy but we are already closed') | ||
return | ||
} | ||
this.log.trace('stream destroyed') | ||
this._closeSinkAndSource() | ||
} | ||
/** | ||
* When an extending class reads data from it's implementation-specific source, | ||
* call this method to allow the stream consumer to read the data. | ||
*/ | ||
sourcePush (data: Uint8ArrayList): void { | ||
this.streamSource.push(data) | ||
} | ||
/** | ||
* Returns the amount of unread data - can be used to prevent large amounts of | ||
* data building up when the stream consumer is too slow. | ||
*/ | ||
sourceReadableLength (): number { | ||
return this.streamSource.readableLength | ||
} | ||
/** | ||
* Send a message to the remote muxer informing them a new stream is being | ||
* opened | ||
*/ | ||
abstract sendNewStream (options?: AbortOptions): void | Promise<void> | ||
/** | ||
* Send a data message to the remote muxer | ||
*/ | ||
abstract sendData (buf: Uint8ArrayList, options?: AbortOptions): void | Promise<void> | ||
/** | ||
* Send a reset message to the remote muxer | ||
*/ | ||
abstract sendReset (options?: AbortOptions): void | Promise<void> | ||
/** | ||
* Send a message to the remote muxer, informing them no more data messages | ||
* will be sent by this end of the stream | ||
*/ | ||
abstract sendCloseWrite (options?: AbortOptions): void | Promise<void> | ||
/** | ||
* Send a message to the remote muxer, informing them no more data messages | ||
* will be read by this end of the stream | ||
*/ | ||
abstract sendCloseRead (options?: AbortOptions): void | Promise<void> | ||
} | ||
export const AbstractStream = s.AbstractStream |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
5
4
210432
5130
- Removedabortable-iterator@^5.0.1
- Removedp-defer@^4.0.0
- Removedrace-signal@^1.0.0
- Removedabortable-iterator@5.1.0(transitive)
- Removedget-iterator@2.0.1(transitive)
- Removedrace-signal@1.1.0(transitive)