@libp2p/webrtc
Advanced tools
Comparing version 2.0.10-8b0e6bef to 2.0.10-a1ec46b5
import type { MultiaddrConnection, MultiaddrConnectionTimeline } from '@libp2p/interface/connection'; | ||
import type { CounterGroup } from '@libp2p/interface/metrics'; | ||
import type { Multiaddr } from '@multiformats/multiaddr'; | ||
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'; | ||
import type { Source, Sink } from 'it-stream-types'; | ||
@@ -49,5 +49,6 @@ interface WebRTCMultiaddrConnectionInit { | ||
constructor(init: WebRTCMultiaddrConnectionInit); | ||
close(err?: Error | undefined): Promise<void>; | ||
close(options?: AbortOptions): Promise<void>; | ||
abort(err: Error): void; | ||
} | ||
export {}; | ||
//# sourceMappingURL=maconn.d.ts.map |
@@ -39,12 +39,15 @@ import { logger } from '@libp2p/logger'; | ||
} | ||
async close(err) { | ||
if (err !== undefined) { | ||
log.error('error closing connection', err); | ||
} | ||
async close(options) { | ||
log.trace('closing connection'); | ||
this.peerConnection.close(); | ||
this.timeline.close = Date.now(); | ||
this.peerConnection.close(); | ||
this.metrics?.increment({ close: true }); | ||
} | ||
abort(err) { | ||
log.error('closing connection due to error', err); | ||
this.peerConnection.close(); | ||
this.timeline.close = Date.now(); | ||
this.metrics?.increment({ abort: true }); | ||
} | ||
} | ||
//# sourceMappingURL=maconn.js.map |
@@ -5,2 +5,3 @@ import type { DataChannelOpts } from './stream.js'; | ||
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'; | ||
import type { AbortOptions } from '@multiformats/multiaddr'; | ||
import type { Source, Sink } from 'it-stream-types'; | ||
@@ -55,6 +56,10 @@ import type { Uint8ArrayList } from 'uint8arraylist'; | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
* Gracefully close all tracked streams and stop the muxer | ||
*/ | ||
close: (err?: Error | undefined) => void; | ||
close: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
* Abort all tracked streams and stop the muxer | ||
*/ | ||
abort: (err: Error) => void; | ||
/** | ||
* The stream source, a no-op as the transport natively supports multiplexing | ||
@@ -61,0 +66,0 @@ */ |
@@ -56,6 +56,10 @@ import { createStream } from './stream.js'; | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
* Gracefully close all tracked streams and stop the muxer | ||
*/ | ||
close = () => { }; | ||
close = async () => { }; | ||
/** | ||
* Abort all tracked streams and stop the muxer | ||
*/ | ||
abort = () => { }; | ||
/** | ||
* The stream source, a no-op as the transport natively supports multiplexing | ||
@@ -62,0 +66,0 @@ */ |
import { logger } from '@libp2p/logger'; | ||
import { abortableDuplex } from 'abortable-iterator'; | ||
import { pbStream } from 'it-pb-stream'; | ||
import { pbStream } from 'it-protobuf-stream'; | ||
import pDefer from 'p-defer'; | ||
@@ -20,4 +20,4 @@ import { DataChannelMuxerFactory } from '../muxer.js'; | ||
pc.onicecandidate = ({ candidate }) => { | ||
answerSentPromise.promise.then(() => { | ||
stream.write({ | ||
answerSentPromise.promise.then(async () => { | ||
await stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
@@ -51,3 +51,3 @@ data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
// write the answer to the remote | ||
stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }); | ||
await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }); | ||
await pc.setLocalDescription(answer).catch(err => { | ||
@@ -80,5 +80,8 @@ log.error('could not execute setLocalDescription', err); | ||
pc.onicecandidate = ({ candidate }) => { | ||
stream.write({ | ||
void stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
}) | ||
.catch(err => { | ||
log.error('error sending ICE candidate', err); | ||
}); | ||
@@ -89,3 +92,3 @@ }; | ||
// write the offer to the stream | ||
stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }); | ||
await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }); | ||
// set offer as local description | ||
@@ -92,0 +95,0 @@ await pc.setLocalDescription(offerSdp).catch(err => { |
@@ -29,2 +29,4 @@ import { CodeError } from '@libp2p/interface/errors'; | ||
this._onProtocol(data).catch(err => { log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err); }); | ||
}, { | ||
runOnTransientConnection: true | ||
}); | ||
@@ -63,3 +65,6 @@ this._started = true; | ||
const connection = await this.components.transportManager.dial(baseAddr, options); | ||
const signalingStream = await connection.newStream([SIGNALING_PROTO_ID], options); | ||
const signalingStream = await connection.newStream(SIGNALING_PROTO_ID, { | ||
...options, | ||
runOnTransientConnection: true | ||
}); | ||
try { | ||
@@ -82,3 +87,3 @@ const { pc, muxerFactory, remoteAddress } = await initiateConnection({ | ||
// close the stream if SDP has been exchanged successfully | ||
signalingStream.close(); | ||
await signalingStream.close(); | ||
return result; | ||
@@ -88,3 +93,3 @@ } | ||
// reset the stream in case of any error | ||
signalingStream.reset(); | ||
signalingStream.abort(err); | ||
throw err; | ||
@@ -116,3 +121,3 @@ } | ||
catch (err) { | ||
stream.reset(); | ||
stream.abort(err); | ||
throw err; | ||
@@ -119,0 +124,0 @@ } |
@@ -1,3 +0,4 @@ | ||
import { type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream'; | ||
import type { Direction, Stream } from '@libp2p/interface/connection'; | ||
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream'; | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
import type { Direction } from '@libp2p/interface/connection'; | ||
export interface DataChannelOpts { | ||
@@ -17,3 +18,33 @@ maxMessageSize: number; | ||
dataChannelOptions?: Partial<DataChannelOpts>; | ||
maxDataSize: number; | ||
} | ||
export declare class WebRTCStream extends AbstractStream { | ||
/** | ||
* The data channel used to send and receive data | ||
*/ | ||
private readonly channel; | ||
/** | ||
* Data channel options | ||
*/ | ||
private readonly dataChannelOptions; | ||
/** | ||
* push data from the underlying datachannel to the length prefix decoder | ||
* and then the protobuf decoder. | ||
*/ | ||
private readonly incomingData; | ||
private messageQueue?; | ||
private readonly maxDataSize; | ||
constructor(init: WebRTCStreamInit); | ||
sendNewStream(): void; | ||
_sendMessage(data: Uint8ArrayList, checkBuffer?: boolean): Promise<void>; | ||
sendData(data: Uint8ArrayList): Promise<void>; | ||
sendReset(): Promise<void>; | ||
sendCloseWrite(): Promise<void>; | ||
sendCloseRead(): Promise<void>; | ||
/** | ||
* Handle incoming | ||
*/ | ||
private processIncomingProtobuf; | ||
private _sendFlag; | ||
} | ||
export interface WebRTCStreamOptions { | ||
@@ -35,3 +66,3 @@ /** | ||
} | ||
export declare function createStream(options: WebRTCStreamOptions): Stream; | ||
export declare function createStream(options: WebRTCStreamOptions): WebRTCStream; | ||
//# sourceMappingURL=stream.d.ts.map |
@@ -18,3 +18,3 @@ import { CodeError } from '@libp2p/interface/errors'; | ||
const PROTOBUF_OVERHEAD = 3; | ||
class WebRTCStream extends AbstractStream { | ||
export class WebRTCStream extends AbstractStream { | ||
/** | ||
@@ -34,2 +34,3 @@ * The data channel used to send and receive data | ||
messageQueue; | ||
maxDataSize; | ||
constructor(init) { | ||
@@ -46,2 +47,3 @@ super(init); | ||
}; | ||
this.maxDataSize = init.maxDataSize; | ||
// set up initial state | ||
@@ -53,4 +55,4 @@ switch (this.channel.readyState) { | ||
case 'closing': | ||
if (this.stat.timeline.close === undefined || this.stat.timeline.close === 0) { | ||
this.stat.timeline.close = Date.now(); | ||
if (this.timeline.close === undefined || this.timeline.close === 0) { | ||
this.timeline.close = Date.now(); | ||
} | ||
@@ -67,3 +69,3 @@ break; | ||
this.channel.onopen = (_evt) => { | ||
this.stat.timeline.open = new Date().getTime(); | ||
this.timeline.open = new Date().getTime(); | ||
if (this.messageQueue != null) { | ||
@@ -79,3 +81,5 @@ // send any queued messages | ||
this.channel.onclose = (_evt) => { | ||
this.close(); | ||
void this.close().catch(err => { | ||
log.error('error closing stream after channel closed', err); | ||
}); | ||
}; | ||
@@ -118,3 +122,2 @@ this.channel.onerror = (evt) => { | ||
if (err instanceof TimeoutError) { | ||
this.abort(err); | ||
throw new Error('Timed out waiting for DataChannel buffer to clear'); | ||
@@ -147,5 +150,11 @@ } | ||
async sendData(data) { | ||
const msgbuf = Message.encode({ message: data.subarray() }); | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf); | ||
await this._sendMessage(sendbuf); | ||
data = data.sublist(); | ||
while (data.byteLength > 0) { | ||
const toSend = Math.min(data.byteLength, this.maxDataSize); | ||
const buf = data.subarray(0, toSend); | ||
const msgbuf = Message.encode({ message: buf }); | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf); | ||
await this._sendMessage(sendbuf); | ||
data.consume(toSend); | ||
} | ||
} | ||
@@ -170,3 +179,3 @@ async sendReset() { | ||
this.incomingData.end(); | ||
this.closeRead(); | ||
this.remoteCloseWrite(); | ||
} | ||
@@ -179,3 +188,3 @@ if (message.flag === Message.Flag.RESET) { | ||
// The remote has stopped reading | ||
this.closeWrite(); | ||
this.remoteCloseRead(); | ||
} | ||
@@ -200,5 +209,6 @@ } | ||
onEnd, | ||
channel | ||
channel, | ||
log: logger(`libp2p:mplex:stream:${direction}:${channel.id}`) | ||
}); | ||
} | ||
//# sourceMappingURL=stream.js.map |
{ | ||
"name": "@libp2p/webrtc", | ||
"version": "2.0.10-8b0e6bef", | ||
"version": "2.0.10-a1ec46b5", | ||
"description": "A libp2p transport using WebRTC connections", | ||
@@ -48,6 +48,6 @@ "author": "", | ||
"@chainsafe/libp2p-noise": "^12.0.1", | ||
"@libp2p/interface": "0.0.1-8b0e6bef", | ||
"@libp2p/interface-internal": "0.0.1-8b0e6bef", | ||
"@libp2p/logger": "2.1.1-8b0e6bef", | ||
"@libp2p/peer-id": "2.0.3-8b0e6bef", | ||
"@libp2p/interface": "0.0.1-a1ec46b5", | ||
"@libp2p/interface-internal": "0.0.1-a1ec46b5", | ||
"@libp2p/logger": "2.1.1-a1ec46b5", | ||
"@libp2p/peer-id": "2.0.3-a1ec46b5", | ||
"@multiformats/mafmt": "^12.1.2", | ||
@@ -58,5 +58,5 @@ "@multiformats/multiaddr": "^12.1.3", | ||
"it-length-prefixed": "^9.0.1", | ||
"it-pb-stream": "^4.0.1", | ||
"it-protobuf-stream": "^1.0.0", | ||
"it-pipe": "^3.0.1", | ||
"it-pushable": "^3.1.3", | ||
"it-pushable": "^3.2.0", | ||
"it-stream-types": "^2.0.1", | ||
@@ -73,8 +73,8 @@ "it-to-buffer": "^4.0.2", | ||
"devDependencies": { | ||
"@chainsafe/libp2p-yamux": "^4.0.1", | ||
"@libp2p/interface-compliance-tests": "3.0.7-8b0e6bef", | ||
"@libp2p/peer-id-factory": "2.0.3-8b0e6bef", | ||
"@libp2p/websockets": "6.0.3-8b0e6bef", | ||
"@chainsafe/libp2p-yamux": "^4.0.0", | ||
"@libp2p/interface-compliance-tests": "3.0.7-a1ec46b5", | ||
"@libp2p/peer-id-factory": "2.0.3-a1ec46b5", | ||
"@libp2p/websockets": "6.0.3-a1ec46b5", | ||
"@types/sinon": "^10.0.15", | ||
"aegir": "^39.0.10", | ||
"aegir": "^39.0.13", | ||
"delay": "^6.0.0", | ||
@@ -84,3 +84,3 @@ "it-length": "^3.0.2", | ||
"it-pair": "^2.0.6", | ||
"libp2p": "0.45.9-8b0e6bef", | ||
"libp2p": "0.45.9-a1ec46b5", | ||
"protons": "^7.0.2", | ||
@@ -87,0 +87,0 @@ "sinon": "^15.1.2", |
@@ -5,3 +5,3 @@ import { logger } from '@libp2p/logger' | ||
import type { CounterGroup } from '@libp2p/interface/metrics' | ||
import type { Multiaddr } from '@multiformats/multiaddr' | ||
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' | ||
import type { Source, Sink } from 'it-stream-types' | ||
@@ -76,12 +76,17 @@ | ||
async close (err?: Error | undefined): Promise<void> { | ||
if (err !== undefined) { | ||
log.error('error closing connection', err) | ||
} | ||
async close (options?: AbortOptions): Promise<void> { | ||
log.trace('closing connection') | ||
this.peerConnection.close() | ||
this.timeline.close = Date.now() | ||
this.peerConnection.close() | ||
this.metrics?.increment({ close: true }) | ||
} | ||
abort (err: Error): void { | ||
log.error('closing connection due to error', err) | ||
this.peerConnection.close() | ||
this.timeline.close = Date.now() | ||
this.metrics?.increment({ abort: true }) | ||
} | ||
} |
@@ -7,2 +7,3 @@ import { createStream } from './stream.js' | ||
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer' | ||
import type { AbortOptions } from '@multiformats/multiaddr' | ||
import type { Source, Sink } from 'it-stream-types' | ||
@@ -97,7 +98,12 @@ import type { Uint8ArrayList } from 'uint8arraylist' | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
* Gracefully close all tracked streams and stop the muxer | ||
*/ | ||
close: (err?: Error | undefined) => void = () => { } | ||
close: (options?: AbortOptions) => Promise<void> = async () => { } | ||
/** | ||
* Abort all tracked streams and stop the muxer | ||
*/ | ||
abort: (err: Error) => void = () => { } | ||
/** | ||
* The stream source, a no-op as the transport natively supports multiplexing | ||
@@ -104,0 +110,0 @@ */ |
import { logger } from '@libp2p/logger' | ||
import { abortableDuplex } from 'abortable-iterator' | ||
import { pbStream } from 'it-pb-stream' | ||
import { pbStream } from 'it-protobuf-stream' | ||
import pDefer, { type DeferredPromise } from 'p-defer' | ||
@@ -31,4 +31,4 @@ import { DataChannelMuxerFactory } from '../muxer.js' | ||
answerSentPromise.promise.then( | ||
() => { | ||
stream.write({ | ||
async () => { | ||
await stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
@@ -68,3 +68,3 @@ data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
// write the answer to the remote | ||
stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }) | ||
await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }) | ||
@@ -112,6 +112,9 @@ await pc.setLocalDescription(answer).catch(err => { | ||
pc.onicecandidate = ({ candidate }) => { | ||
stream.write({ | ||
void stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
}) | ||
.catch(err => { | ||
log.error('error sending ICE candidate', err) | ||
}) | ||
} | ||
@@ -121,3 +124,3 @@ // create an offer | ||
// write the offer to the stream | ||
stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }) | ||
await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }) | ||
// set offer as local description | ||
@@ -124,0 +127,0 @@ await pc.setLocalDescription(offerSdp).catch(err => { |
@@ -52,2 +52,4 @@ import { CodeError } from '@libp2p/interface/errors' | ||
this._onProtocol(data).catch(err => { log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err) }) | ||
}, { | ||
runOnTransientConnection: true | ||
}) | ||
@@ -94,3 +96,6 @@ this._started = true | ||
const connection = await this.components.transportManager.dial(baseAddr, options) | ||
const signalingStream = await connection.newStream([SIGNALING_PROTO_ID], options) | ||
const signalingStream = await connection.newStream(SIGNALING_PROTO_ID, { | ||
...options, | ||
runOnTransientConnection: true | ||
}) | ||
@@ -119,7 +124,7 @@ try { | ||
// close the stream if SDP has been exchanged successfully | ||
signalingStream.close() | ||
await signalingStream.close() | ||
return result | ||
} catch (err) { | ||
} catch (err: any) { | ||
// reset the stream in case of any error | ||
signalingStream.reset() | ||
signalingStream.abort(err) | ||
throw err | ||
@@ -150,4 +155,4 @@ } finally { | ||
}) | ||
} catch (err) { | ||
stream.reset() | ||
} catch (err: any) { | ||
stream.abort(err) | ||
throw err | ||
@@ -154,0 +159,0 @@ } finally { |
@@ -9,3 +9,3 @@ import { CodeError } from '@libp2p/interface/errors' | ||
import { Message } from './pb/message.js' | ||
import type { Direction, Stream } from '@libp2p/interface/connection' | ||
import type { Direction } from '@libp2p/interface/connection' | ||
@@ -30,2 +30,4 @@ const log = logger('libp2p:webrtc:stream') | ||
dataChannelOptions?: Partial<DataChannelOpts> | ||
maxDataSize: number | ||
} | ||
@@ -45,3 +47,3 @@ | ||
class WebRTCStream extends AbstractStream { | ||
export class WebRTCStream extends AbstractStream { | ||
/** | ||
@@ -64,2 +66,3 @@ * The data channel used to send and receive data | ||
private messageQueue?: Uint8ArrayList | ||
private readonly maxDataSize: number | ||
@@ -78,2 +81,3 @@ constructor (init: WebRTCStreamInit) { | ||
} | ||
this.maxDataSize = init.maxDataSize | ||
@@ -87,4 +91,4 @@ // set up initial state | ||
case 'closing': | ||
if (this.stat.timeline.close === undefined || this.stat.timeline.close === 0) { | ||
this.stat.timeline.close = Date.now() | ||
if (this.timeline.close === undefined || this.timeline.close === 0) { | ||
this.timeline.close = Date.now() | ||
} | ||
@@ -103,3 +107,3 @@ break | ||
this.channel.onopen = (_evt) => { | ||
this.stat.timeline.open = new Date().getTime() | ||
this.timeline.open = new Date().getTime() | ||
@@ -117,3 +121,5 @@ if (this.messageQueue != null) { | ||
this.channel.onclose = (_evt) => { | ||
this.close() | ||
void this.close().catch(err => { | ||
log.error('error closing stream after channel closed', err) | ||
}) | ||
} | ||
@@ -164,3 +170,2 @@ | ||
if (err instanceof TimeoutError) { | ||
this.abort(err) | ||
throw new Error('Timed out waiting for DataChannel buffer to clear') | ||
@@ -196,6 +201,13 @@ } | ||
async sendData (data: Uint8ArrayList): Promise<void> { | ||
const msgbuf = Message.encode({ message: data.subarray() }) | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf) | ||
data = data.sublist() | ||
await this._sendMessage(sendbuf) | ||
while (data.byteLength > 0) { | ||
const toSend = Math.min(data.byteLength, this.maxDataSize) | ||
const buf = data.subarray(0, toSend) | ||
const msgbuf = Message.encode({ message: buf }) | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf) | ||
await this._sendMessage(sendbuf) | ||
data.consume(toSend) | ||
} | ||
} | ||
@@ -225,3 +237,3 @@ | ||
this.incomingData.end() | ||
this.closeRead() | ||
this.remoteCloseWrite() | ||
} | ||
@@ -236,3 +248,3 @@ | ||
// The remote has stopped reading | ||
this.closeWrite() | ||
this.remoteCloseRead() | ||
} | ||
@@ -274,3 +286,3 @@ } | ||
export function createStream (options: WebRTCStreamOptions): Stream { | ||
export function createStream (options: WebRTCStreamOptions): WebRTCStream { | ||
const { channel, direction, onEnd, dataChannelOptions } = options | ||
@@ -284,4 +296,5 @@ | ||
onEnd, | ||
channel | ||
channel, | ||
log: logger(`libp2p:mplex:stream:${direction}:${channel.id}`) | ||
}) | ||
} |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
627909
5024
+ Addedit-protobuf-stream@^1.0.0
+ Added@libp2p/interface@0.0.1-a1ec46b5(transitive)
+ Added@libp2p/interface-internal@0.0.1-a1ec46b5(transitive)
+ Added@libp2p/logger@2.1.1-a1ec46b5(transitive)
+ Added@libp2p/peer-collections@3.0.2-a1ec46b5(transitive)
+ Added@libp2p/peer-id@2.0.3-a1ec46b5(transitive)
+ Addedit-byte-stream@1.1.0(transitive)
+ Addedit-length-prefixed-stream@1.2.0(transitive)
+ Addedit-protobuf-stream@1.1.5(transitive)
+ Addedit-queueless-pushable@1.0.0(transitive)
+ Addedrace-signal@1.1.0(transitive)
- Removedit-pb-stream@^4.0.1
- Removed@libp2p/interface@0.0.1-8b0e6bef(transitive)
- Removed@libp2p/interface-internal@0.0.1-8b0e6bef(transitive)
- Removed@libp2p/logger@2.1.1-8b0e6bef(transitive)
- Removed@libp2p/peer-collections@3.0.2-8b0e6bef(transitive)
- Removed@libp2p/peer-id@2.0.3-8b0e6bef(transitive)
- Removedany-signal@4.1.1(transitive)
Updatedit-pushable@^3.2.0