@libp2p/interface
Advanced tools
Comparing version 0.1.2-c88de8e1 to 0.1.2-c97dea04
@@ -53,2 +53,7 @@ import { Uint8ArrayList } from 'uint8arraylist'; | ||
closeTimeout?: number; | ||
/** | ||
* After the stream sink has closed, a limit on how long it takes to send | ||
* a close-write message to the remote peer. | ||
*/ | ||
sendCloseWriteTimeout?: number; | ||
} | ||
@@ -74,2 +79,3 @@ export declare abstract class AbstractStream implements Stream { | ||
private readonly onAbort?; | ||
private readonly sendCloseWriteTimeout; | ||
protected readonly log: Logger; | ||
@@ -76,0 +82,0 @@ constructor(init: AbstractStreamInit); |
import { abortableSource } from 'abortable-iterator'; | ||
import { pushable } from 'it-pushable'; | ||
import defer, {} from 'p-defer'; | ||
import { raceSignal } from 'race-signal'; | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
@@ -8,2 +9,3 @@ import { CodeError } from '../errors.js'; | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'; | ||
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000; | ||
function isPromise(res) { | ||
@@ -31,2 +33,3 @@ return res != null && typeof res.then === 'function'; | ||
onAbort; | ||
sendCloseWriteTimeout; | ||
log; | ||
@@ -47,2 +50,3 @@ constructor(init) { | ||
}; | ||
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT; | ||
this.onEnd = init.onEnd; | ||
@@ -61,3 +65,2 @@ this.onCloseRead = init?.onCloseRead; | ||
} | ||
this.readStatus = 'closed'; | ||
this.onSourceEnd(err); | ||
@@ -95,6 +98,11 @@ } | ||
} | ||
this.log.trace('sink finished reading from source'); | ||
this.writeStatus = 'done'; | ||
this.log.trace('sink calling closeWrite'); | ||
await this.closeWrite(options); | ||
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus); | ||
if (this.writeStatus === 'writing') { | ||
this.writeStatus = 'closing'; | ||
this.log.trace('send close write to remote'); | ||
await this.sendCloseWrite({ | ||
signal: AbortSignal.timeout(this.sendCloseWriteTimeout) | ||
}); | ||
this.writeStatus = 'closed'; | ||
} | ||
this.onSinkEnd(); | ||
@@ -117,2 +125,3 @@ } | ||
this.timeline.closeRead = Date.now(); | ||
this.readStatus = 'closed'; | ||
if (err != null && this.endErr == null) { | ||
@@ -125,2 +134,5 @@ this.endErr = err; | ||
this.timeline.close = Date.now(); | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed'; | ||
} | ||
if (this.onEnd != null) { | ||
@@ -139,2 +151,3 @@ this.onEnd(this.endErr); | ||
this.timeline.closeWrite = Date.now(); | ||
this.writeStatus = 'closed'; | ||
if (err != null && this.endErr == null) { | ||
@@ -147,2 +160,5 @@ this.endErr = err; | ||
this.timeline.close = Date.now(); | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed'; | ||
} | ||
if (this.onEnd != null) { | ||
@@ -174,2 +190,6 @@ this.onEnd(this.endErr); | ||
this.readStatus = 'closing'; | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote'); | ||
await this.sendCloseRead(options); | ||
} | ||
if (readStatus === 'ready') { | ||
@@ -179,6 +199,2 @@ this.log.trace('ending internal source queue'); | ||
} | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote'); | ||
await this.sendCloseRead(options); | ||
} | ||
this.log.trace('closed readable end of stream'); | ||
@@ -191,9 +207,7 @@ } | ||
this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus); | ||
const writeStatus = this.writeStatus; | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array'); | ||
await this.sink([]); | ||
await raceSignal(this.sink([]), options.signal); | ||
} | ||
this.writeStatus = 'closing'; | ||
if (writeStatus === 'writing') { | ||
if (this.writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
@@ -206,10 +220,7 @@ // - this lets any data queued by the user in the current tick get read | ||
this.sinkController.abort(); | ||
this.sinkEnd.promise.then(resolve, reject); | ||
raceSignal(this.sinkEnd.promise, options.signal) | ||
.then(resolve, reject); | ||
}); | ||
}); | ||
} | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) { | ||
this.log.trace('send close write to remote'); | ||
await this.sendCloseWrite(options); | ||
} | ||
this.writeStatus = 'closed'; | ||
@@ -250,2 +261,3 @@ this.log.trace('closed writable end of stream'); | ||
this.status = 'reset'; | ||
this.timeline.reset = Date.now(); | ||
this._closeSinkAndSource(err); | ||
@@ -307,3 +319,3 @@ this.onReset?.(); | ||
} | ||
this.log.trace('muxer destroyed'); | ||
this.log.trace('stream destroyed'); | ||
this._closeSinkAndSource(); | ||
@@ -310,0 +322,0 @@ } |
{ | ||
"name": "@libp2p/interface", | ||
"version": "0.1.2-c88de8e1", | ||
"version": "0.1.2-c97dea04", | ||
"description": "The interface implemented by a libp2p node", | ||
@@ -143,2 +143,3 @@ "license": "Apache-2.0 OR MIT", | ||
"parserOptions": { | ||
"project": true, | ||
"sourceType": "module" | ||
@@ -167,2 +168,3 @@ } | ||
"p-defer": "^4.0.0", | ||
"race-signal": "^1.0.0", | ||
"uint8arraylist": "^2.4.3" | ||
@@ -173,2 +175,5 @@ }, | ||
"aegir": "^40.0.8", | ||
"delay": "^6.0.0", | ||
"it-all": "^3.0.3", | ||
"it-drain": "^3.0.3", | ||
"sinon": "^16.0.0", | ||
@@ -175,0 +180,0 @@ "sinon-ts": "^1.0.0" |
import { abortableSource } from 'abortable-iterator' | ||
import { type Pushable, pushable } from 'it-pushable' | ||
import defer, { type DeferredPromise } from 'p-defer' | ||
import { raceSignal } from 'race-signal' | ||
import { Uint8ArrayList } from 'uint8arraylist' | ||
@@ -10,2 +11,3 @@ import { CodeError } from '../errors.js' | ||
// copied from @libp2p/logger to break a circular dependency | ||
interface Logger { | ||
@@ -20,2 +22,3 @@ (formatter: any, ...args: any[]): void | ||
const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE' | ||
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000 | ||
@@ -73,2 +76,8 @@ export interface AbstractStreamInit { | ||
closeTimeout?: number | ||
/** | ||
* After the stream sink has closed, a limit on how long it takes to send | ||
* a close-write message to the remote peer. | ||
*/ | ||
sendCloseWriteTimeout?: number | ||
} | ||
@@ -100,2 +109,3 @@ | ||
private readonly onAbort?: (err: Error) => void | ||
private readonly sendCloseWriteTimeout: number | ||
@@ -120,2 +130,3 @@ protected readonly log: Logger | ||
} | ||
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT | ||
@@ -136,3 +147,2 @@ this.onEnd = init.onEnd | ||
this.readStatus = 'closed' | ||
this.onSourceEnd(err) | ||
@@ -182,7 +192,15 @@ } | ||
this.log.trace('sink finished reading from source') | ||
this.writeStatus = 'done' | ||
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus) | ||
this.log.trace('sink calling closeWrite') | ||
await this.closeWrite(options) | ||
if (this.writeStatus === 'writing') { | ||
this.writeStatus = 'closing' | ||
this.log.trace('send close write to remote') | ||
await this.sendCloseWrite({ | ||
signal: AbortSignal.timeout(this.sendCloseWriteTimeout) | ||
}) | ||
this.writeStatus = 'closed' | ||
} | ||
this.onSinkEnd() | ||
@@ -206,2 +224,3 @@ } catch (err: any) { | ||
this.timeline.closeRead = Date.now() | ||
this.readStatus = 'closed' | ||
@@ -218,2 +237,6 @@ if (err != null && this.endErr == null) { | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed' | ||
} | ||
if (this.onEnd != null) { | ||
@@ -233,2 +256,3 @@ this.onEnd(this.endErr) | ||
this.timeline.closeWrite = Date.now() | ||
this.writeStatus = 'closed' | ||
@@ -245,2 +269,6 @@ if (err != null && this.endErr == null) { | ||
if (this.status !== 'aborted' && this.status !== 'reset') { | ||
this.status = 'closed' | ||
} | ||
if (this.onEnd != null) { | ||
@@ -280,2 +308,7 @@ this.onEnd(this.endErr) | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote') | ||
await this.sendCloseRead(options) | ||
} | ||
if (readStatus === 'ready') { | ||
@@ -286,7 +319,2 @@ this.log.trace('ending internal source queue') | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) { | ||
this.log.trace('send close read to remote') | ||
await this.sendCloseRead(options) | ||
} | ||
this.log.trace('closed readable end of stream') | ||
@@ -302,12 +330,9 @@ } | ||
const writeStatus = this.writeStatus | ||
if (this.writeStatus === 'ready') { | ||
this.log.trace('sink was never sunk, sink an empty array') | ||
await this.sink([]) | ||
await raceSignal(this.sink([]), options.signal) | ||
} | ||
this.writeStatus = 'closing' | ||
if (writeStatus === 'writing') { | ||
if (this.writeStatus === 'writing') { | ||
// stop reading from the source passed to `.sink` in the microtask queue | ||
@@ -320,3 +345,4 @@ // - this lets any data queued by the user in the current tick get read | ||
this.sinkController.abort() | ||
this.sinkEnd.promise.then(resolve, reject) | ||
raceSignal(this.sinkEnd.promise, options.signal) | ||
.then(resolve, reject) | ||
}) | ||
@@ -326,7 +352,2 @@ }) | ||
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) { | ||
this.log.trace('send close write to remote') | ||
await this.sendCloseWrite(options) | ||
} | ||
this.writeStatus = 'closed' | ||
@@ -376,2 +397,3 @@ | ||
this.status = 'reset' | ||
this.timeline.reset = Date.now() | ||
this._closeSinkAndSource(err) | ||
@@ -443,3 +465,3 @@ this.onReset?.() | ||
this.log.trace('muxer destroyed') | ||
this.log.trace('stream destroyed') | ||
@@ -446,0 +468,0 @@ this._closeSinkAndSource() |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
255637
6127
8
7
+ Addedrace-signal@^1.0.0
+ Addedrace-signal@1.1.0(transitive)