@libp2p/interface
Advanced tools
Comparing version 0.1.2-7b2ddc17 to 0.1.2-7d8b1551
@@ -128,3 +128,3 @@ import type { AbortOptions } from '../index.js'; | ||
/** | ||
* Once a protocol has been negotiated for this stream, it will be set on the stat object | ||
* The protocol negotiated for this stream | ||
*/ | ||
@@ -199,7 +199,7 @@ protocol?: string; | ||
/** | ||
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object | ||
* The multiplexer negotiated for this connection | ||
*/ | ||
multiplexer?: string; | ||
/** | ||
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object | ||
* The encryption protocol negotiated for this connection | ||
*/ | ||
@@ -206,0 +206,0 @@ encryption?: string; |
@@ -53,2 +53,7 @@ import { Uint8ArrayList } from 'uint8arraylist'; | ||
closeTimeout?: number; | ||
/** | ||
* After the stream sink has closed, a limit on how long it takes to send | ||
* a close-write message to the remote peer. | ||
*/ | ||
sendCloseWriteTimeout?: number; | ||
} | ||
@@ -74,2 +79,3 @@ export declare abstract class AbstractStream implements Stream { | ||
private readonly onAbort?; | ||
private readonly sendCloseWriteTimeout; | ||
protected readonly log: Logger; | ||
@@ -76,0 +82,0 @@ constructor(init: AbstractStreamInit); |
import { abortableSource } from 'abortable-iterator'; | ||
import { pushable } from 'it-pushable'; | ||
import defer, {} from 'p-defer'; | ||
import { raceSignal } from 'race-signal'; | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
@@ -8,2 +9,3 @@ import { CodeError } from '../errors.js'; | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'; | ||
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000; | ||
function isPromise(res) { | ||
@@ -31,2 +33,3 @@ return res != null && typeof res.then === 'function'; | ||
onAbort; | ||
sendCloseWriteTimeout; | ||
log; | ||
@@ -47,2 +50,3 @@ constructor(init) { | ||
}; | ||
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT; | ||
this.onEnd = init.onEnd; | ||
@@ -61,3 +65,2 @@ this.onCloseRead = init?.onCloseRead; | ||
} | ||
this.readStatus = 'closed'; | ||
this.onSourceEnd(err); | ||
@@ -95,6 +98,11 @@ } | ||
} | ||
this.log.trace('sink finished reading from source'); | ||
this.writeStatus = 'done'; | ||
this.log.trace('sink calling closeWrite'); | ||
await this.closeWrite(options); | ||
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus); | ||
if (this.writeStatus === 'writing') { | ||
this.writeStatus = 'closing'; | ||
this.log.trace('send close write to remote'); | ||
await this.sendCloseWrite({ | ||
signal: AbortSignal.timeout(this.sendCloseWriteTimeout) | ||
}); | ||
this.writeStatus = 'closed'; | ||
} | ||
this.onSinkEnd(); | ||
@@ -117,2 +125,3 @@ } | ||
this.timeline.closeRead = Date.now(); | ||
this.readStatus = 'closed'; | ||
if (err != null && this.endErr == null) { | ||
@@ -125,2 +134,5 @@ this.endErr = err; | ||
this.timeline.close = Date.now(); | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed'; | ||
} | ||
if (this.onEnd != null) { | ||
@@ -139,2 +151,3 @@ this.onEnd(this.endErr); | ||
this.timeline.closeWrite = Date.now(); | ||
this.writeStatus = 'closed'; | ||
if (err != null && this.endErr == null) { | ||
@@ -147,2 +160,5 @@ this.endErr = err; | ||
this.timeline.close = Date.now(); | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed'; | ||
} | ||
if (this.onEnd != null) { | ||
@@ -174,2 +190,6 @@ this.onEnd(this.endErr); | ||
this.readStatus = 'closing'; | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote'); | ||
await this.sendCloseRead(options); | ||
} | ||
if (readStatus === 'ready') { | ||
@@ -179,6 +199,2 @@ this.log.trace('ending internal source queue'); | ||
} | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote'); | ||
await this.sendCloseRead(options); | ||
} | ||
this.log.trace('closed readable end of stream'); | ||
@@ -191,9 +207,7 @@ } | ||
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus); | ||
const writeStatus = this.writeStatus; | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array'); | ||
await this.sink([]); | ||
await raceSignal(this.sink([]), options.signal); | ||
} | ||
this.writeStatus = 'closing'; | ||
if (writeStatus === 'writing') { | ||
if (this.writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
@@ -206,10 +220,7 @@ // - this lets any data queued by the user in the current tick get read | ||
this.sinkController.abort(); | ||
this.sinkEnd.promise.then(resolve, reject); | ||
raceSignal(this.sinkEnd.promise, options.signal) | ||
.then(resolve, reject); | ||
}); | ||
}); | ||
} | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) { | ||
this.log.trace('send close write to remote'); | ||
await this.sendCloseWrite(options); | ||
} | ||
this.writeStatus = 'closed'; | ||
@@ -250,2 +261,3 @@ this.log.trace('closed writable end of stream'); | ||
this.status = 'reset'; | ||
this.timeline.reset = Date.now(); | ||
this._closeSinkAndSource(err); | ||
@@ -307,3 +319,3 @@ this.onReset?.(); | ||
} | ||
this.log.trace('muxer destroyed'); | ||
this.log.trace('stream destroyed'); | ||
this._closeSinkAndSource(); | ||
@@ -310,0 +322,0 @@ } |
{ | ||
"name": "@libp2p/interface", | ||
"version": "0.1.2-7b2ddc17", | ||
"version": "0.1.2-7d8b1551", | ||
"description": "The interface implemented by a libp2p node", | ||
@@ -143,2 +143,3 @@ "license": "Apache-2.0 OR MIT", | ||
"parserOptions": { | ||
"project": true, | ||
"sourceType": "module" | ||
@@ -167,2 +168,3 @@ } | ||
"p-defer": "^4.0.0", | ||
"race-signal": "^1.0.0", | ||
"uint8arraylist": "^2.4.3" | ||
@@ -173,5 +175,8 @@ }, | ||
"aegir": "^40.0.8", | ||
"sinon": "^15.1.2", | ||
"delay": "^6.0.0", | ||
"it-all": "^3.0.3", | ||
"it-drain": "^3.0.3", | ||
"sinon": "^16.0.0", | ||
"sinon-ts": "^1.0.0" | ||
} | ||
} |
@@ -149,3 +149,3 @@ import type { AbortOptions } from '../index.js' | ||
/** | ||
* Once a protocol has been negotiated for this stream, it will be set on the stat object | ||
* The protocol negotiated for this stream | ||
*/ | ||
@@ -235,3 +235,3 @@ protocol?: string | ||
/** | ||
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object | ||
* The multiplexer negotiated for this connection | ||
*/ | ||
@@ -241,3 +241,3 @@ multiplexer?: string | ||
/** | ||
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object | ||
* The encryption protocol negotiated for this connection | ||
*/ | ||
@@ -244,0 +244,0 @@ encryption?: string |
import { abortableSource } from 'abortable-iterator' | ||
import { type Pushable, pushable } from 'it-pushable' | ||
import defer, { type DeferredPromise } from 'p-defer' | ||
import { raceSignal } from 'race-signal' | ||
import { Uint8ArrayList } from 'uint8arraylist' | ||
@@ -10,2 +11,3 @@ import { CodeError } from '../errors.js' | ||
// copied from @libp2p/logger to break a circular dependency | ||
interface Logger { | ||
@@ -20,2 +22,3 @@ (formatter: any, ...args: any[]): void | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE' | ||
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000 | ||
@@ -73,2 +76,8 @@ export interface AbstractStreamInit { | ||
closeTimeout?: number | ||
/** | ||
* After the stream sink has closed, a limit on how long it takes to send | ||
* a close-write message to the remote peer. | ||
*/ | ||
sendCloseWriteTimeout?: number | ||
} | ||
@@ -100,2 +109,3 @@ | ||
private readonly onAbort?: (err: Error) => void | ||
private readonly sendCloseWriteTimeout: number | ||
@@ -120,2 +130,3 @@ protected readonly log: Logger | ||
} | ||
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT | ||
@@ -136,3 +147,2 @@ this.onEnd = init.onEnd | ||
this.readStatus = 'closed' | ||
this.onSourceEnd(err) | ||
@@ -182,7 +192,15 @@ } | ||
this.log.trace('sink finished reading from source') | ||
this.writeStatus = 'done' | ||
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus) | ||
this.log.trace('sink calling closeWrite') | ||
await this.closeWrite(options) | ||
if (this.writeStatus === 'writing') { | ||
this.writeStatus = 'closing' | ||
this.log.trace('send close write to remote') | ||
await this.sendCloseWrite({ | ||
signal: AbortSignal.timeout(this.sendCloseWriteTimeout) | ||
}) | ||
this.writeStatus = 'closed' | ||
} | ||
this.onSinkEnd() | ||
@@ -206,2 +224,3 @@ } catch (err: any) { | ||
this.timeline.closeRead = Date.now() | ||
this.readStatus = 'closed' | ||
@@ -218,2 +237,6 @@ if (err != null && this.endErr == null) { | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed' | ||
} | ||
if (this.onEnd != null) { | ||
@@ -233,2 +256,3 @@ this.onEnd(this.endErr) | ||
this.timeline.closeWrite = Date.now() | ||
this.writeStatus = 'closed' | ||
@@ -245,2 +269,6 @@ if (err != null && this.endErr == null) { | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed' | ||
} | ||
if (this.onEnd != null) { | ||
@@ -280,2 +308,7 @@ this.onEnd(this.endErr) | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote') | ||
await this.sendCloseRead(options) | ||
} | ||
if (readStatus === 'ready') { | ||
@@ -286,7 +319,2 @@ this.log.trace('ending internal source queue') | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote') | ||
await this.sendCloseRead(options) | ||
} | ||
this.log.trace('closed readable end of stream') | ||
@@ -302,12 +330,9 @@ } | ||
const writeStatus = this.writeStatus | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array') | ||
await this.sink([]) | ||
await raceSignal(this.sink([]), options.signal) | ||
} | ||
this.writeStatus = 'closing' | ||
if (writeStatus === 'writing') { | ||
if (this.writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
@@ -320,3 +345,4 @@ // - this lets any data queued by the user in the current tick get read | ||
this.sinkController.abort() | ||
this.sinkEnd.promise.then(resolve, reject) | ||
raceSignal(this.sinkEnd.promise, options.signal) | ||
.then(resolve, reject) | ||
}) | ||
@@ -326,7 +352,2 @@ }) | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) { | ||
this.log.trace('send close write to remote') | ||
await this.sendCloseWrite(options) | ||
} | ||
this.writeStatus = 'closed' | ||
@@ -376,2 +397,3 @@ | ||
this.status = 'reset' | ||
this.timeline.reset = Date.now() | ||
this._closeSinkAndSource(err) | ||
@@ -443,3 +465,3 @@ this.onReset?.() | ||
this.log.trace('muxer destroyed') | ||
this.log.trace('stream destroyed') | ||
@@ -446,0 +468,0 @@ this._closeSinkAndSource() |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
255637
6127
8
7
+ Addedrace-signal@^1.0.0
+ Addedrace-signal@1.1.0(transitive)