Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@libp2p/interface

Package Overview
Dependencies
Maintainers
6
Versions
535
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@libp2p/interface - npm Package Compare versions

Comparing version 0.1.2-7b2ddc17 to 0.1.2-7d8b1551

6

dist/src/connection/index.d.ts

@@ -128,3 +128,3 @@ import type { AbortOptions } from '../index.js';

/**
* Once a protocol has been negotiated for this stream, it will be set on the stat object
* The protocol negotiated for this stream
*/

@@ -199,7 +199,7 @@ protocol?: string;

/**
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object
* The multiplexer negotiated for this connection
*/
multiplexer?: string;
/**
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object
* The encryption protocol negotiated for this connection
*/

@@ -206,0 +206,0 @@ encryption?: string;

@@ -53,2 +53,7 @@ import { Uint8ArrayList } from 'uint8arraylist';

closeTimeout?: number;
/**
* After the stream sink has closed, a limit on how long it takes to send
* a close-write message to the remote peer.
*/
sendCloseWriteTimeout?: number;
}

@@ -74,2 +79,3 @@ export declare abstract class AbstractStream implements Stream {

private readonly onAbort?;
private readonly sendCloseWriteTimeout;
protected readonly log: Logger;

@@ -76,0 +82,0 @@ constructor(init: AbstractStreamInit);

import { abortableSource } from 'abortable-iterator';
import { pushable } from 'it-pushable';
import defer, {} from 'p-defer';
import { raceSignal } from 'race-signal';
import { Uint8ArrayList } from 'uint8arraylist';

@@ -8,2 +9,3 @@ import { CodeError } from '../errors.js';

const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE';
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000;
function isPromise(res) {

@@ -31,2 +33,3 @@ return res != null && typeof res.then === 'function';

onAbort;
sendCloseWriteTimeout;
log;

@@ -47,2 +50,3 @@ constructor(init) {

};
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT;
this.onEnd = init.onEnd;

@@ -61,3 +65,2 @@ this.onCloseRead = init?.onCloseRead;

}
this.readStatus = 'closed';
this.onSourceEnd(err);

@@ -95,6 +98,11 @@ }

}
this.log.trace('sink finished reading from source');
this.writeStatus = 'done';
this.log.trace('sink calling closeWrite');
await this.closeWrite(options);
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus);
if (this.writeStatus === 'writing') {
this.writeStatus = 'closing';
this.log.trace('send close write to remote');
await this.sendCloseWrite({
signal: AbortSignal.timeout(this.sendCloseWriteTimeout)
});
this.writeStatus = 'closed';
}
this.onSinkEnd();

@@ -117,2 +125,3 @@ }

this.timeline.closeRead = Date.now();
this.readStatus = 'closed';
if (err != null && this.endErr == null) {

@@ -125,2 +134,5 @@ this.endErr = err;

this.timeline.close = Date.now();
if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed';
}
if (this.onEnd != null) {

@@ -139,2 +151,3 @@ this.onEnd(this.endErr);

this.timeline.closeWrite = Date.now();
this.writeStatus = 'closed';
if (err != null && this.endErr == null) {

@@ -147,2 +160,5 @@ this.endErr = err;

this.timeline.close = Date.now();
if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed';
}
if (this.onEnd != null) {

@@ -174,2 +190,6 @@ this.onEnd(this.endErr);

this.readStatus = 'closing';
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) {
this.log.trace('send close read to remote');
await this.sendCloseRead(options);
}
if (readStatus === 'ready') {

@@ -179,6 +199,2 @@ this.log.trace('ending internal source queue');

}
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) {
this.log.trace('send close read to remote');
await this.sendCloseRead(options);
}
this.log.trace('closed readable end of stream');

@@ -191,9 +207,7 @@ }

this.log.trace('closing writable end of stream with starting write status "%s"', this.writeStatus);
const writeStatus = this.writeStatus;
if (this.writeStatus === 'ready') {
this.log.trace('sink was never sunk, sink an empty array');
await this.sink([]);
await raceSignal(this.sink([]), options.signal);
}
this.writeStatus = 'closing';
if (writeStatus === 'writing') {
if (this.writeStatus === 'writing') {
// stop reading from the source passed to `.sink` in the microtask queue

@@ -206,10 +220,7 @@ // - this lets any data queued by the user in the current tick get read

this.sinkController.abort();
this.sinkEnd.promise.then(resolve, reject);
raceSignal(this.sinkEnd.promise, options.signal)
.then(resolve, reject);
});
});
}
if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) {
this.log.trace('send close write to remote');
await this.sendCloseWrite(options);
}
this.writeStatus = 'closed';

@@ -250,2 +261,3 @@ this.log.trace('closed writable end of stream');

this.status = 'reset';
this.timeline.reset = Date.now();
this._closeSinkAndSource(err);

@@ -307,3 +319,3 @@ this.onReset?.();

}
this.log.trace('muxer destroyed');
this.log.trace('stream destroyed');
this._closeSinkAndSource();

@@ -310,0 +322,0 @@ }

{
"name": "@libp2p/interface",
"version": "0.1.2-7b2ddc17",
"version": "0.1.2-7d8b1551",
"description": "The interface implemented by a libp2p node",

@@ -143,2 +143,3 @@ "license": "Apache-2.0 OR MIT",

"parserOptions": {
"project": true,
"sourceType": "module"

@@ -167,2 +168,3 @@ }

"p-defer": "^4.0.0",
"race-signal": "^1.0.0",
"uint8arraylist": "^2.4.3"

@@ -173,5 +175,8 @@ },

"aegir": "^40.0.8",
"sinon": "^15.1.2",
"delay": "^6.0.0",
"it-all": "^3.0.3",
"it-drain": "^3.0.3",
"sinon": "^16.0.0",
"sinon-ts": "^1.0.0"
}
}

@@ -149,3 +149,3 @@ import type { AbortOptions } from '../index.js'

/**
* Once a protocol has been negotiated for this stream, it will be set on the stat object
* The protocol negotiated for this stream
*/

@@ -235,3 +235,3 @@ protocol?: string

/**
* Once a multiplexer has been negotiated for this stream, it will be set on the stat object
* The multiplexer negotiated for this connection
*/

@@ -241,3 +241,3 @@ multiplexer?: string

/**
* Once a connection encrypter has been negotiated for this stream, it will be set on the stat object
* The encryption protocol negotiated for this connection
*/

@@ -244,0 +244,0 @@ encryption?: string

import { abortableSource } from 'abortable-iterator'
import { type Pushable, pushable } from 'it-pushable'
import defer, { type DeferredPromise } from 'p-defer'
import { raceSignal } from 'race-signal'
import { Uint8ArrayList } from 'uint8arraylist'

@@ -10,2 +11,3 @@ import { CodeError } from '../errors.js'

// copied from @libp2p/logger to break a circular dependency
interface Logger {

@@ -20,2 +22,3 @@ (formatter: any, ...args: any[]): void

const ERR_SINK_INVALID_STATE = 'ERR_SINK_INVALID_STATE'
const DEFAULT_SEND_CLOSE_WRITE_TIMEOUT = 5000

@@ -73,2 +76,8 @@ export interface AbstractStreamInit {

closeTimeout?: number
/**
* After the stream sink has closed, a limit on how long it takes to send
* a close-write message to the remote peer.
*/
sendCloseWriteTimeout?: number
}

@@ -100,2 +109,3 @@

private readonly onAbort?: (err: Error) => void
private readonly sendCloseWriteTimeout: number

@@ -120,2 +130,3 @@ protected readonly log: Logger

}
this.sendCloseWriteTimeout = init.sendCloseWriteTimeout ?? DEFAULT_SEND_CLOSE_WRITE_TIMEOUT

@@ -136,3 +147,2 @@ this.onEnd = init.onEnd

this.readStatus = 'closed'
this.onSourceEnd(err)

@@ -182,7 +192,15 @@ }

this.log.trace('sink finished reading from source')
this.writeStatus = 'done'
this.log.trace('sink finished reading from source, write status is "%s"', this.writeStatus)
this.log.trace('sink calling closeWrite')
await this.closeWrite(options)
if (this.writeStatus === 'writing') {
this.writeStatus = 'closing'
this.log.trace('send close write to remote')
await this.sendCloseWrite({
signal: AbortSignal.timeout(this.sendCloseWriteTimeout)
})
this.writeStatus = 'closed'
}
this.onSinkEnd()

@@ -206,2 +224,3 @@ } catch (err: any) {

this.timeline.closeRead = Date.now()
this.readStatus = 'closed'

@@ -218,2 +237,6 @@ if (err != null && this.endErr == null) {

if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed'
}
if (this.onEnd != null) {

@@ -233,2 +256,3 @@ this.onEnd(this.endErr)

this.timeline.closeWrite = Date.now()
this.writeStatus = 'closed'

@@ -245,2 +269,6 @@ if (err != null && this.endErr == null) {

if (this.status !== 'aborted' && this.status !== 'reset') {
this.status = 'closed'
}
if (this.onEnd != null) {

@@ -280,2 +308,7 @@ this.onEnd(this.endErr)

if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) {
this.log.trace('send close read to remote')
await this.sendCloseRead(options)
}
if (readStatus === 'ready') {

@@ -286,7 +319,2 @@ this.log.trace('ending internal source queue')

if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeRead == null) {
this.log.trace('send close read to remote')
await this.sendCloseRead(options)
}
this.log.trace('closed readable end of stream')

@@ -302,12 +330,9 @@ }

const writeStatus = this.writeStatus
if (this.writeStatus === 'ready') {
this.log.trace('sink was never sunk, sink an empty array')
await this.sink([])
await raceSignal(this.sink([]), options.signal)
}
this.writeStatus = 'closing'
if (writeStatus === 'writing') {
if (this.writeStatus === 'writing') {
// stop reading from the source passed to `.sink` in the microtask queue

@@ -320,3 +345,4 @@ // - this lets any data queued by the user in the current tick get read

this.sinkController.abort()
this.sinkEnd.promise.then(resolve, reject)
raceSignal(this.sinkEnd.promise, options.signal)
.then(resolve, reject)
})

@@ -326,7 +352,2 @@ })

if (this.status !== 'reset' && this.status !== 'aborted' && this.timeline.closeWrite == null) {
this.log.trace('send close write to remote')
await this.sendCloseWrite(options)
}
this.writeStatus = 'closed'

@@ -376,2 +397,3 @@

this.status = 'reset'
this.timeline.reset = Date.now()
this._closeSinkAndSource(err)

@@ -443,3 +465,3 @@ this.onReset?.()

this.log.trace('muxer destroyed')
this.log.trace('stream destroyed')

@@ -446,0 +468,0 @@ this._closeSinkAndSource()

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc