@libp2p/webrtc
Advanced tools
Comparing version 2.0.11 to 3.0.0-72e81dc1
@@ -1,3 +0,3 @@ | ||
import { CodeError } from '@libp2p/interfaces/errors'; | ||
import type { Direction } from '@libp2p/interface-connection'; | ||
import { CodeError } from '@libp2p/interface/errors'; | ||
import type { Direction } from '@libp2p/interface/connection'; | ||
export declare enum codes { | ||
@@ -4,0 +4,0 @@ ERR_ALREADY_ABORTED = "ERR_ALREADY_ABORTED", |
@@ -1,2 +0,2 @@ | ||
import { CodeError } from '@libp2p/interfaces/errors'; | ||
import { CodeError } from '@libp2p/interface/errors'; | ||
export var codes; | ||
@@ -3,0 +3,0 @@ (function (codes) { |
import { type WebRTCTransportDirectInit, type WebRTCDirectTransportComponents } from './private-to-public/transport.js'; | ||
import type { WebRTCTransportComponents, WebRTCTransportInit } from './private-to-private/transport.js'; | ||
import type { Transport } from '@libp2p/interface-transport'; | ||
import type { Transport } from '@libp2p/interface/transport'; | ||
/** | ||
@@ -5,0 +5,0 @@ * @param {WebRTCTransportDirectInit} init - WebRTC direct transport configuration |
@@ -1,4 +0,4 @@ | ||
import type { MultiaddrConnection, MultiaddrConnectionTimeline } from '@libp2p/interface-connection'; | ||
import type { CounterGroup } from '@libp2p/interface-metrics'; | ||
import type { Multiaddr } from '@multiformats/multiaddr'; | ||
import type { MultiaddrConnection, MultiaddrConnectionTimeline } from '@libp2p/interface/connection'; | ||
import type { CounterGroup } from '@libp2p/interface/metrics'; | ||
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'; | ||
import type { Source, Sink } from 'it-stream-types'; | ||
@@ -49,5 +49,6 @@ interface WebRTCMultiaddrConnectionInit { | ||
constructor(init: WebRTCMultiaddrConnectionInit); | ||
close(err?: Error | undefined): Promise<void>; | ||
close(options?: AbortOptions): Promise<void>; | ||
abort(err: Error): void; | ||
} | ||
export {}; | ||
//# sourceMappingURL=maconn.d.ts.map |
@@ -39,12 +39,15 @@ import { logger } from '@libp2p/logger'; | ||
} | ||
async close(err) { | ||
if (err !== undefined) { | ||
log.error('error closing connection', err); | ||
} | ||
async close(options) { | ||
log.trace('closing connection'); | ||
this.peerConnection.close(); | ||
this.timeline.close = Date.now(); | ||
this.peerConnection.close(); | ||
this.metrics?.increment({ close: true }); | ||
} | ||
abort(err) { | ||
log.error('closing connection due to error', err); | ||
this.peerConnection.close(); | ||
this.timeline.close = Date.now(); | ||
this.metrics?.increment({ abort: true }); | ||
} | ||
} | ||
//# sourceMappingURL=maconn.js.map |
import type { DataChannelOpts } from './stream.js'; | ||
import type { Stream } from '@libp2p/interface-connection'; | ||
import type { CounterGroup } from '@libp2p/interface-metrics'; | ||
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'; | ||
import type { Stream } from '@libp2p/interface/connection'; | ||
import type { CounterGroup } from '@libp2p/interface/metrics'; | ||
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'; | ||
import type { AbortOptions } from '@multiformats/multiaddr'; | ||
import type { Source, Sink } from 'it-stream-types'; | ||
@@ -54,6 +55,10 @@ import type { Uint8ArrayList } from 'uint8arraylist'; | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
* Gracefully close all tracked streams and stop the muxer | ||
*/ | ||
close: (err?: Error | undefined) => void; | ||
close: (options?: AbortOptions) => Promise<void>; | ||
/** | ||
* Abort all tracked streams and stop the muxer | ||
*/ | ||
abort: (err: Error) => void; | ||
/** | ||
* The stream source, a no-op as the transport natively supports multiplexing | ||
@@ -60,0 +65,0 @@ */ |
@@ -56,6 +56,10 @@ import { createStream } from './stream.js'; | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
* Gracefully close all tracked streams and stop the muxer | ||
*/ | ||
close = () => { }; | ||
close = async () => { }; | ||
/** | ||
* Abort all tracked streams and stop the muxer | ||
*/ | ||
abort = () => { }; | ||
/** | ||
* The stream source, a no-op as the transport natively supports multiplexing | ||
@@ -62,0 +66,0 @@ */ |
@@ -0,5 +1,6 @@ | ||
import { RTCPeerConnection } from '../webrtc/index.js'; | ||
import type { DataChannelOpts } from '../stream.js'; | ||
import type { Stream } from '@libp2p/interface-connection'; | ||
import type { IncomingStreamData } from '@libp2p/interface-registrar'; | ||
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'; | ||
import type { Stream } from '@libp2p/interface/connection'; | ||
import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer'; | ||
import type { IncomingStreamData } from '@libp2p/interface-internal/registrar'; | ||
export type IncomingStreamOpts = { | ||
@@ -6,0 +7,0 @@ rtcConfiguration?: RTCConfiguration; |
@@ -0,6 +1,8 @@ | ||
import { CodeError } from '@libp2p/interface/errors'; | ||
import { logger } from '@libp2p/logger'; | ||
import { abortableDuplex } from 'abortable-iterator'; | ||
import { pbStream } from 'it-pb-stream'; | ||
import pDefer from 'p-defer'; | ||
import { pbStream } from 'it-protobuf-stream'; | ||
import pDefer, {} from 'p-defer'; | ||
import { DataChannelMuxerFactory } from '../muxer.js'; | ||
import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js'; | ||
import { Message } from './pb/message.js'; | ||
@@ -14,49 +16,58 @@ import { readCandidatesUntilConnected, resolveOnConnected } from './util.js'; | ||
const pc = new RTCPeerConnection(rtcConfiguration); | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }); | ||
const connectedPromise = pDefer(); | ||
const answerSentPromise = pDefer(); | ||
signal.onabort = () => { connectedPromise.reject(); }; | ||
// candidate callbacks | ||
pc.onicecandidate = ({ candidate }) => { | ||
answerSentPromise.promise.then(() => { | ||
stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
try { | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }); | ||
const connectedPromise = pDefer(); | ||
const answerSentPromise = pDefer(); | ||
signal.onabort = () => { | ||
connectedPromise.reject(new CodeError('Timed out while trying to connect', 'ERR_TIMEOUT')); | ||
}; | ||
// candidate callbacks | ||
pc.onicecandidate = ({ candidate }) => { | ||
answerSentPromise.promise.then(async () => { | ||
await stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
}); | ||
}, (err) => { | ||
log.error('cannot set candidate since sending answer failed', err); | ||
connectedPromise.reject(err); | ||
}); | ||
}, (err) => { | ||
log.error('cannot set candidate since sending answer failed', err); | ||
}; | ||
resolveOnConnected(pc, connectedPromise); | ||
// read an SDP offer | ||
const pbOffer = await stream.read(); | ||
if (pbOffer.type !== Message.Type.SDP_OFFER) { | ||
throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `); | ||
} | ||
const offer = new RTCSessionDescription({ | ||
type: 'offer', | ||
sdp: pbOffer.data | ||
}); | ||
}; | ||
resolveOnConnected(pc, connectedPromise); | ||
// read an SDP offer | ||
const pbOffer = await stream.read(); | ||
if (pbOffer.type !== Message.Type.SDP_OFFER) { | ||
throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `); | ||
await pc.setRemoteDescription(offer).catch(err => { | ||
log.error('could not execute setRemoteDescription', err); | ||
throw new Error('Failed to set remoteDescription'); | ||
}); | ||
// create and write an SDP answer | ||
const answer = await pc.createAnswer().catch(err => { | ||
log.error('could not execute createAnswer', err); | ||
answerSentPromise.reject(err); | ||
throw new Error('Failed to create answer'); | ||
}); | ||
// write the answer to the remote | ||
await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }); | ||
await pc.setLocalDescription(answer).catch(err => { | ||
log.error('could not execute setLocalDescription', err); | ||
answerSentPromise.reject(err); | ||
throw new Error('Failed to set localDescription'); | ||
}); | ||
answerSentPromise.resolve(); | ||
// wait until candidates are connected | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream); | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? ''); | ||
return { pc, muxerFactory, remoteAddress }; | ||
} | ||
const offer = new RTCSessionDescription({ | ||
type: 'offer', | ||
sdp: pbOffer.data | ||
}); | ||
await pc.setRemoteDescription(offer).catch(err => { | ||
log.error('could not execute setRemoteDescription', err); | ||
throw new Error('Failed to set remoteDescription'); | ||
}); | ||
// create and write an SDP answer | ||
const answer = await pc.createAnswer().catch(err => { | ||
log.error('could not execute createAnswer', err); | ||
answerSentPromise.reject(err); | ||
throw new Error('Failed to create answer'); | ||
}); | ||
// write the answer to the remote | ||
stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }); | ||
await pc.setLocalDescription(answer).catch(err => { | ||
log.error('could not execute setLocalDescription', err); | ||
answerSentPromise.reject(err); | ||
throw new Error('Failed to set localDescription'); | ||
}); | ||
answerSentPromise.resolve(); | ||
// wait until candidates are connected | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream); | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? ''); | ||
return { pc, muxerFactory, remoteAddress }; | ||
catch (err) { | ||
pc.close(); | ||
throw err; | ||
} | ||
} | ||
@@ -67,42 +78,51 @@ export async function initiateConnection({ rtcConfiguration, dataChannelOptions, signal, stream: rawStream }) { | ||
const pc = new RTCPeerConnection(rtcConfiguration); | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }); | ||
const connectedPromise = pDefer(); | ||
resolveOnConnected(pc, connectedPromise); | ||
// reject the connectedPromise if the signal aborts | ||
signal.onabort = connectedPromise.reject; | ||
// we create the channel so that the peerconnection has a component for which | ||
// to collect candidates. The label is not relevant to connection initiation | ||
// but can be useful for debugging | ||
const channel = pc.createDataChannel('init'); | ||
// setup callback to write ICE candidates to the remote | ||
// peer | ||
pc.onicecandidate = ({ candidate }) => { | ||
stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
try { | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }); | ||
const connectedPromise = pDefer(); | ||
resolveOnConnected(pc, connectedPromise); | ||
// reject the connectedPromise if the signal aborts | ||
signal.onabort = connectedPromise.reject; | ||
// we create the channel so that the peerconnection has a component for which | ||
// to collect candidates. The label is not relevant to connection initiation | ||
// but can be useful for debugging | ||
const channel = pc.createDataChannel('init'); | ||
// setup callback to write ICE candidates to the remote | ||
// peer | ||
pc.onicecandidate = ({ candidate }) => { | ||
void stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
}) | ||
.catch(err => { | ||
log.error('error sending ICE candidate', err); | ||
}); | ||
}; | ||
// create an offer | ||
const offerSdp = await pc.createOffer(); | ||
// write the offer to the stream | ||
await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }); | ||
// set offer as local description | ||
await pc.setLocalDescription(offerSdp).catch(err => { | ||
log.error('could not execute setLocalDescription', err); | ||
throw new Error('Failed to set localDescription'); | ||
}); | ||
}; | ||
// create an offer | ||
const offerSdp = await pc.createOffer(); | ||
// write the offer to the stream | ||
stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }); | ||
// set offer as local description | ||
await pc.setLocalDescription(offerSdp).catch(err => { | ||
log.error('could not execute setLocalDescription', err); | ||
throw new Error('Failed to set localDescription'); | ||
}); | ||
// read answer | ||
const answerMessage = await stream.read(); | ||
if (answerMessage.type !== Message.Type.SDP_ANSWER) { | ||
throw new Error('remote should send an SDP answer'); | ||
// read answer | ||
const answerMessage = await stream.read(); | ||
if (answerMessage.type !== Message.Type.SDP_ANSWER) { | ||
throw new Error('remote should send an SDP answer'); | ||
} | ||
const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }); | ||
await pc.setRemoteDescription(answerSdp).catch(err => { | ||
log.error('could not execute setRemoteDescription', err); | ||
throw new Error('Failed to set remoteDescription'); | ||
}); | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream); | ||
channel.close(); | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? ''); | ||
return { pc, muxerFactory, remoteAddress }; | ||
} | ||
const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }); | ||
await pc.setRemoteDescription(answerSdp).catch(err => { | ||
log.error('could not execute setRemoteDescription', err); | ||
throw new Error('Failed to set remoteDescription'); | ||
}); | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream); | ||
channel.close(); | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? ''); | ||
return { pc, muxerFactory, remoteAddress }; | ||
catch (err) { | ||
pc.close(); | ||
throw err; | ||
} | ||
} | ||
@@ -109,0 +129,0 @@ function parseRemoteAddress(sdp) { |
@@ -1,4 +0,5 @@ | ||
import { EventEmitter } from '@libp2p/interfaces/events'; | ||
import type { PeerId } from '@libp2p/interface-peer-id'; | ||
import type { ListenerEvents, Listener, TransportManager } from '@libp2p/interface-transport'; | ||
import { EventEmitter } from '@libp2p/interface/events'; | ||
import type { PeerId } from '@libp2p/interface/peer-id'; | ||
import type { ListenerEvents, Listener } from '@libp2p/interface/transport'; | ||
import type { TransportManager } from '@libp2p/interface-internal/transport-manager'; | ||
import type { Multiaddr } from '@multiformats/multiaddr'; | ||
@@ -5,0 +6,0 @@ export interface ListenerOptions { |
@@ -1,2 +0,2 @@ | ||
import { EventEmitter } from '@libp2p/interfaces/events'; | ||
import { EventEmitter } from '@libp2p/interface/events'; | ||
import { Circuit } from '@multiformats/mafmt'; | ||
@@ -3,0 +3,0 @@ export class WebRTCPeerListener extends EventEmitter { |
@@ -1,8 +0,9 @@ | ||
import { type CreateListenerOptions, type DialOptions, type Listener, symbol, type Transport, type Upgrader, type TransportManager } from '@libp2p/interface-transport'; | ||
import { type CreateListenerOptions, type DialOptions, symbol, type Transport, type Listener, type Upgrader } from '@libp2p/interface/transport'; | ||
import { type Multiaddr } from '@multiformats/multiaddr'; | ||
import type { DataChannelOpts } from '../stream.js'; | ||
import type { Connection } from '@libp2p/interface-connection'; | ||
import type { PeerId } from '@libp2p/interface-peer-id'; | ||
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'; | ||
import type { Startable } from '@libp2p/interfaces/startable'; | ||
import type { Connection } from '@libp2p/interface/connection'; | ||
import type { PeerId } from '@libp2p/interface/peer-id'; | ||
import type { Startable } from '@libp2p/interface/startable'; | ||
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'; | ||
import type { TransportManager } from '@libp2p/interface-internal/transport-manager'; | ||
export interface WebRTCTransportInit { | ||
@@ -9,0 +10,0 @@ rtcConfiguration?: RTCConfiguration; |
@@ -1,3 +0,3 @@ | ||
import { symbol } from '@libp2p/interface-transport'; | ||
import { CodeError } from '@libp2p/interfaces/errors'; | ||
import { CodeError } from '@libp2p/interface/errors'; | ||
import { symbol } from '@libp2p/interface/transport'; | ||
import { logger } from '@libp2p/logger'; | ||
@@ -8,2 +8,3 @@ import { peerIdFromString } from '@libp2p/peer-id'; | ||
import { WebRTCMultiaddrConnection } from '../maconn.js'; | ||
import { cleanup } from '../webrtc/index.js'; | ||
import { initiateConnection, handleIncomingStream } from './handler.js'; | ||
@@ -30,2 +31,4 @@ import { WebRTCPeerListener } from './listener.js'; | ||
this._onProtocol(data).catch(err => { log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err); }); | ||
}, { | ||
runOnTransientConnection: true | ||
}); | ||
@@ -36,2 +39,3 @@ this._started = true; | ||
await this.components.registrar.unhandle(SIGNALING_PROTO_ID); | ||
cleanup(); | ||
this._started = false; | ||
@@ -65,3 +69,6 @@ } | ||
const connection = await this.components.transportManager.dial(baseAddr, options); | ||
const signalingStream = await connection.newStream([SIGNALING_PROTO_ID], options); | ||
const signalingStream = await connection.newStream(SIGNALING_PROTO_ID, { | ||
...options, | ||
runOnTransientConnection: true | ||
}); | ||
try { | ||
@@ -84,3 +91,3 @@ const { pc, muxerFactory, remoteAddress } = await initiateConnection({ | ||
// close the stream if SDP has been exchanged successfully | ||
signalingStream.close(); | ||
await signalingStream.close(); | ||
return result; | ||
@@ -90,3 +97,3 @@ } | ||
// reset the stream in case of any error | ||
signalingStream.reset(); | ||
signalingStream.abort(err); | ||
throw err; | ||
@@ -118,3 +125,3 @@ } | ||
catch (err) { | ||
stream.reset(); | ||
stream.abort(err); | ||
throw err; | ||
@@ -121,0 +128,0 @@ } |
import { logger } from '@libp2p/logger'; | ||
import { isFirefox } from '../util.js'; | ||
import { RTCIceCandidate } from '../webrtc/index.js'; | ||
import { Message } from './pb/message.js'; | ||
@@ -4,0 +5,0 @@ const log = logger('libp2p:webrtc:peer:util'); |
@@ -1,2 +0,2 @@ | ||
import type { CreateListenerOptions, DialOptions } from '@libp2p/interface-transport'; | ||
import type { CreateListenerOptions, DialOptions } from '@libp2p/interface/transport'; | ||
export interface WebRTCListenerOptions extends CreateListenerOptions { | ||
@@ -3,0 +3,0 @@ } |
@@ -114,3 +114,3 @@ import { logger } from '@libp2p/logger'; | ||
a=sctp-port:5000 | ||
a=max-message-size:100000 | ||
a=max-message-size:16384 | ||
a=candidate:1467250027 1 UDP 1467250027 ${host} ${port} typ host\r\n`; | ||
@@ -117,0 +117,0 @@ } |
@@ -1,7 +0,7 @@ | ||
import { type CreateListenerOptions, type Listener, symbol, type Transport } from '@libp2p/interface-transport'; | ||
import { type CreateListenerOptions, symbol, type Transport, type Listener } from '@libp2p/interface/transport'; | ||
import type { WebRTCDialOptions } from './options.js'; | ||
import type { DataChannelOpts } from '../stream.js'; | ||
import type { Connection } from '@libp2p/interface-connection'; | ||
import type { CounterGroup, Metrics } from '@libp2p/interface-metrics'; | ||
import type { PeerId } from '@libp2p/interface-peer-id'; | ||
import type { Connection } from '@libp2p/interface/connection'; | ||
import type { CounterGroup, Metrics } from '@libp2p/interface/metrics'; | ||
import type { PeerId } from '@libp2p/interface/peer-id'; | ||
import type { Multiaddr } from '@multiformats/multiaddr'; | ||
@@ -8,0 +8,0 @@ /** |
import { noise as Noise } from '@chainsafe/libp2p-noise'; | ||
import { symbol } from '@libp2p/interface-transport'; | ||
import { symbol } from '@libp2p/interface/transport'; | ||
import { logger } from '@libp2p/logger'; | ||
@@ -14,2 +14,3 @@ import * as p from '@libp2p/peer-id'; | ||
import { isFirefox } from '../util.js'; | ||
import { RTCPeerConnection } from '../webrtc/index.js'; | ||
import * as sdp from './sdp.js'; | ||
@@ -100,95 +101,101 @@ import { genUfrag } from './util.js'; | ||
const peerConnection = new RTCPeerConnection({ certificates: [certificate] }); | ||
// create data channel for running the noise handshake. Once the data channel is opened, | ||
// the remote will initiate the noise handshake. This is used to confirm the identity of | ||
// the peer. | ||
const dataChannelOpenPromise = new Promise((resolve, reject) => { | ||
const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }); | ||
const handshakeTimeout = setTimeout(() => { | ||
const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}`; | ||
log.error(error); | ||
this.metrics?.dialerEvents.increment({ open_error: true }); | ||
reject(dataChannelError('data', error)); | ||
}, HANDSHAKE_TIMEOUT_MS); | ||
handshakeDataChannel.onopen = (_) => { | ||
clearTimeout(handshakeTimeout); | ||
resolve(handshakeDataChannel); | ||
try { | ||
// create data channel for running the noise handshake. Once the data channel is opened, | ||
// the remote will initiate the noise handshake. This is used to confirm the identity of | ||
// the peer. | ||
const dataChannelOpenPromise = new Promise((resolve, reject) => { | ||
const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }); | ||
const handshakeTimeout = setTimeout(() => { | ||
const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}`; | ||
log.error(error); | ||
this.metrics?.dialerEvents.increment({ open_error: true }); | ||
reject(dataChannelError('data', error)); | ||
}, HANDSHAKE_TIMEOUT_MS); | ||
handshakeDataChannel.onopen = (_) => { | ||
clearTimeout(handshakeTimeout); | ||
resolve(handshakeDataChannel); | ||
}; | ||
// ref: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/error_event | ||
handshakeDataChannel.onerror = (event) => { | ||
clearTimeout(handshakeTimeout); | ||
const errorTarget = event.target?.toString() ?? 'not specified'; | ||
const error = `Error opening a data channel for handshaking: ${errorTarget}`; | ||
log.error(error); | ||
// NOTE: We use unknown error here but this could potentially be considered a reset by some standards. | ||
this.metrics?.dialerEvents.increment({ unknown_error: true }); | ||
reject(dataChannelError('data', error)); | ||
}; | ||
}); | ||
const ufrag = 'libp2p+webrtc+v1/' + genUfrag(32); | ||
// Create offer and munge sdp with ufrag == pwd. This allows the remote to | ||
// respond to STUN messages without performing an actual SDP exchange. | ||
// This is because it can infer the passwd field by reading the USERNAME | ||
// attribute of the STUN message. | ||
const offerSdp = await peerConnection.createOffer(); | ||
const mungedOfferSdp = sdp.munge(offerSdp, ufrag); | ||
await peerConnection.setLocalDescription(mungedOfferSdp); | ||
// construct answer sdp from multiaddr and ufrag | ||
const answerSdp = sdp.fromMultiAddr(ma, ufrag); | ||
await peerConnection.setRemoteDescription(answerSdp); | ||
// wait for peerconnection.onopen to fire, or for the datachannel to open | ||
const handshakeDataChannel = await dataChannelOpenPromise; | ||
const myPeerId = this.components.peerId; | ||
// Do noise handshake. | ||
// Set the Noise Prologue to libp2p-webrtc-noise:<FINGERPRINTS> before starting the actual Noise handshake. | ||
// <FINGERPRINTS> is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. | ||
const fingerprintsPrologue = this.generateNoisePrologue(peerConnection, remoteCerthash.code, ma); | ||
// Since we use the default crypto interface and do not use a static key or early data, | ||
// we pass in undefined for these parameters. | ||
const noise = Noise({ prologueBytes: fingerprintsPrologue })(); | ||
const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }); | ||
const wrappedDuplex = { | ||
...wrappedChannel, | ||
sink: wrappedChannel.sink.bind(wrappedChannel), | ||
source: (async function* () { | ||
for await (const list of wrappedChannel.source) { | ||
for (const buf of list) { | ||
yield buf; | ||
} | ||
} | ||
}()) | ||
}; | ||
// ref: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/error_event | ||
handshakeDataChannel.onerror = (event) => { | ||
clearTimeout(handshakeTimeout); | ||
const errorTarget = event.target?.toString() ?? 'not specified'; | ||
const error = `Error opening a data channel for handshaking: ${errorTarget}`; | ||
log.error(error); | ||
// NOTE: We use unknown error here but this could potentially be considered a reset by some standards. | ||
this.metrics?.dialerEvents.increment({ unknown_error: true }); | ||
reject(dataChannelError('data', error)); | ||
}; | ||
}); | ||
const ufrag = 'libp2p+webrtc+v1/' + genUfrag(32); | ||
// Create offer and munge sdp with ufrag == pwd. This allows the remote to | ||
// respond to STUN messages without performing an actual SDP exchange. | ||
// This is because it can infer the passwd field by reading the USERNAME | ||
// attribute of the STUN message. | ||
const offerSdp = await peerConnection.createOffer(); | ||
const mungedOfferSdp = sdp.munge(offerSdp, ufrag); | ||
await peerConnection.setLocalDescription(mungedOfferSdp); | ||
// construct answer sdp from multiaddr and ufrag | ||
const answerSdp = sdp.fromMultiAddr(ma, ufrag); | ||
await peerConnection.setRemoteDescription(answerSdp); | ||
// wait for peerconnection.onopen to fire, or for the datachannel to open | ||
const handshakeDataChannel = await dataChannelOpenPromise; | ||
const myPeerId = this.components.peerId; | ||
// Do noise handshake. | ||
// Set the Noise Prologue to libp2p-webrtc-noise:<FINGERPRINTS> before starting the actual Noise handshake. | ||
// <FINGERPRINTS> is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. | ||
const fingerprintsPrologue = this.generateNoisePrologue(peerConnection, remoteCerthash.code, ma); | ||
// Since we use the default crypto interface and do not use a static key or early data, | ||
// we pass in undefined for these parameters. | ||
const noise = Noise({ prologueBytes: fingerprintsPrologue })(); | ||
const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }); | ||
const wrappedDuplex = { | ||
...wrappedChannel, | ||
sink: wrappedChannel.sink.bind(wrappedChannel), | ||
source: (async function* () { | ||
for await (const list of wrappedChannel.source) { | ||
for (const buf of list) { | ||
yield buf; | ||
} | ||
// Creating the connection before completion of the noise | ||
// handshake ensures that the stream opening callback is set up | ||
const maConn = new WebRTCMultiaddrConnection({ | ||
peerConnection, | ||
remoteAddr: ma, | ||
timeline: { | ||
open: Date.now() | ||
}, | ||
metrics: this.metrics?.dialerEvents | ||
}); | ||
const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange'; | ||
peerConnection.addEventListener(eventListeningName, () => { | ||
switch (peerConnection.connectionState) { | ||
case 'failed': | ||
case 'disconnected': | ||
case 'closed': | ||
maConn.close().catch((err) => { | ||
log.error('error closing connection', err); | ||
}).finally(() => { | ||
// Remove the event listener once the connection is closed | ||
controller.abort(); | ||
}); | ||
break; | ||
default: | ||
break; | ||
} | ||
}()) | ||
}; | ||
// Creating the connection before completion of the noise | ||
// handshake ensures that the stream opening callback is set up | ||
const maConn = new WebRTCMultiaddrConnection({ | ||
peerConnection, | ||
remoteAddr: ma, | ||
timeline: { | ||
open: Date.now() | ||
}, | ||
metrics: this.metrics?.dialerEvents | ||
}); | ||
const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange'; | ||
peerConnection.addEventListener(eventListeningName, () => { | ||
switch (peerConnection.connectionState) { | ||
case 'failed': | ||
case 'disconnected': | ||
case 'closed': | ||
maConn.close().catch((err) => { | ||
log.error('error closing connection', err); | ||
}).finally(() => { | ||
// Remove the event listener once the connection is closed | ||
controller.abort(); | ||
}); | ||
break; | ||
default: | ||
break; | ||
} | ||
}, { signal }); | ||
// Track opened peer connection | ||
this.metrics?.dialerEvents.increment({ peer_connection: true }); | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection, metrics: this.metrics?.dialerEvents, dataChannelOptions: this.init.dataChannel }); | ||
// For outbound connections, the remote is expected to start the noise handshake. | ||
// Therefore, we need to secure an inbound noise connection from the remote. | ||
await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId); | ||
return options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }); | ||
}, { signal }); | ||
// Track opened peer connection | ||
this.metrics?.dialerEvents.increment({ peer_connection: true }); | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection, metrics: this.metrics?.dialerEvents, dataChannelOptions: this.init.dataChannel }); | ||
// For outbound connections, the remote is expected to start the noise handshake. | ||
// Therefore, we need to secure an inbound noise connection from the remote. | ||
await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId); | ||
return await options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }); | ||
} | ||
catch (err) { | ||
peerConnection.close(); | ||
throw err; | ||
} | ||
} | ||
@@ -195,0 +202,0 @@ /** |
@@ -1,3 +0,4 @@ | ||
import { type AbstractStreamInit } from '@libp2p/interface-stream-muxer/stream'; | ||
import type { Direction, Stream } from '@libp2p/interface-connection'; | ||
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream'; | ||
import { Uint8ArrayList } from 'uint8arraylist'; | ||
import type { Direction } from '@libp2p/interface/connection'; | ||
export interface DataChannelOpts { | ||
@@ -17,3 +18,33 @@ maxMessageSize: number; | ||
dataChannelOptions?: Partial<DataChannelOpts>; | ||
maxDataSize: number; | ||
} | ||
export declare class WebRTCStream extends AbstractStream { | ||
/** | ||
* The data channel used to send and receive data | ||
*/ | ||
private readonly channel; | ||
/** | ||
* Data channel options | ||
*/ | ||
private readonly dataChannelOptions; | ||
/** | ||
* push data from the underlying datachannel to the length prefix decoder | ||
* and then the protobuf decoder. | ||
*/ | ||
private readonly incomingData; | ||
private messageQueue?; | ||
private readonly maxDataSize; | ||
constructor(init: WebRTCStreamInit); | ||
sendNewStream(): void; | ||
_sendMessage(data: Uint8ArrayList, checkBuffer?: boolean): Promise<void>; | ||
sendData(data: Uint8ArrayList): Promise<void>; | ||
sendReset(): Promise<void>; | ||
sendCloseWrite(): Promise<void>; | ||
sendCloseRead(): Promise<void>; | ||
/** | ||
* Handle incoming | ||
*/ | ||
private processIncomingProtobuf; | ||
private _sendFlag; | ||
} | ||
export interface WebRTCStreamOptions { | ||
@@ -35,3 +66,3 @@ /** | ||
} | ||
export declare function createStream(options: WebRTCStreamOptions): Stream; | ||
export declare function createStream(options: WebRTCStreamOptions): WebRTCStream; | ||
//# sourceMappingURL=stream.d.ts.map |
@@ -1,3 +0,3 @@ | ||
import { AbstractStream } from '@libp2p/interface-stream-muxer/stream'; | ||
import { CodeError } from '@libp2p/interfaces/errors'; | ||
import { CodeError } from '@libp2p/interface/errors'; | ||
import { AbstractStream } from '@libp2p/interface/stream-muxer/stream'; | ||
import { logger } from '@libp2p/logger'; | ||
@@ -18,3 +18,3 @@ import * as lengthPrefixed from 'it-length-prefixed'; | ||
const PROTOBUF_OVERHEAD = 3; | ||
class WebRTCStream extends AbstractStream { | ||
export class WebRTCStream extends AbstractStream { | ||
/** | ||
@@ -34,2 +34,3 @@ * The data channel used to send and receive data | ||
messageQueue; | ||
maxDataSize; | ||
constructor(init) { | ||
@@ -46,2 +47,3 @@ super(init); | ||
}; | ||
this.maxDataSize = init.maxDataSize; | ||
// set up initial state | ||
@@ -53,4 +55,4 @@ switch (this.channel.readyState) { | ||
case 'closing': | ||
if (this.stat.timeline.close === undefined || this.stat.timeline.close === 0) { | ||
this.stat.timeline.close = Date.now(); | ||
if (this.timeline.close === undefined || this.timeline.close === 0) { | ||
this.timeline.close = Date.now(); | ||
} | ||
@@ -67,3 +69,3 @@ break; | ||
this.channel.onopen = (_evt) => { | ||
this.stat.timeline.open = new Date().getTime(); | ||
this.timeline.open = new Date().getTime(); | ||
if (this.messageQueue != null) { | ||
@@ -79,3 +81,5 @@ // send any queued messages | ||
this.channel.onclose = (_evt) => { | ||
this.close(); | ||
void this.close().catch(err => { | ||
log.error('error closing stream after channel closed', err); | ||
}); | ||
}; | ||
@@ -118,3 +122,2 @@ this.channel.onerror = (evt) => { | ||
if (err instanceof TimeoutError) { | ||
this.abort(err); | ||
throw new Error('Timed out waiting for DataChannel buffer to clear'); | ||
@@ -147,5 +150,11 @@ } | ||
async sendData(data) { | ||
const msgbuf = Message.encode({ message: data.subarray() }); | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf); | ||
await this._sendMessage(sendbuf); | ||
data = data.sublist(); | ||
while (data.byteLength > 0) { | ||
const toSend = Math.min(data.byteLength, this.maxDataSize); | ||
const buf = data.subarray(0, toSend); | ||
const msgbuf = Message.encode({ message: buf }); | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf); | ||
await this._sendMessage(sendbuf); | ||
data.consume(toSend); | ||
} | ||
} | ||
@@ -170,3 +179,3 @@ async sendReset() { | ||
this.incomingData.end(); | ||
this.closeRead(); | ||
this.remoteCloseWrite(); | ||
} | ||
@@ -179,3 +188,3 @@ if (message.flag === Message.Flag.RESET) { | ||
// The remote has stopped reading | ||
this.closeWrite(); | ||
this.remoteCloseRead(); | ||
} | ||
@@ -200,5 +209,6 @@ } | ||
onEnd, | ||
channel | ||
channel, | ||
log: logger(`libp2p:mplex:stream:${direction}:${channel.id}`) | ||
}); | ||
} | ||
//# sourceMappingURL=stream.js.map |
150
package.json
{ | ||
"name": "@libp2p/webrtc", | ||
"version": "2.0.11", | ||
"version": "3.0.0-72e81dc1", | ||
"description": "A libp2p transport using WebRTC connections", | ||
"author": "", | ||
"license": "Apache-2.0 OR MIT", | ||
"homepage": "https://github.com/libp2p/js-libp2p-webrtc#readme", | ||
"homepage": "https://github.com/libp2p/js-libp2p/tree/master/packages/transport-webrtc#readme", | ||
"repository": { | ||
"type": "git", | ||
"url": "git+https://github.com/libp2p/js-libp2p-webrtc.git" | ||
"url": "git+https://github.com/libp2p/js-libp2p.git" | ||
}, | ||
"bugs": { | ||
"url": "https://github.com/libp2p/js-libp2p-webrtc/issues" | ||
"url": "https://github.com/libp2p/js-libp2p/issues" | ||
}, | ||
"engines": { | ||
"node": ">=18.0.0", | ||
"npm": ">=8.6.0" | ||
}, | ||
"type": "module", | ||
@@ -25,4 +21,3 @@ "types": "./dist/src/index.d.ts", | ||
"!dist/test", | ||
"!**/*.tsbuildinfo", | ||
"proto_ts" | ||
"!**/*.tsbuildinfo" | ||
], | ||
@@ -41,91 +36,7 @@ "exports": { | ||
}, | ||
"release": { | ||
"branches": [ | ||
"main" | ||
], | ||
"plugins": [ | ||
[ | ||
"@semantic-release/commit-analyzer", | ||
{ | ||
"preset": "conventionalcommits", | ||
"releaseRules": [ | ||
{ | ||
"breaking": true, | ||
"release": "major" | ||
}, | ||
{ | ||
"revert": true, | ||
"release": "patch" | ||
}, | ||
{ | ||
"type": "feat", | ||
"release": "minor" | ||
}, | ||
{ | ||
"type": "fix", | ||
"release": "patch" | ||
}, | ||
{ | ||
"type": "docs", | ||
"release": "patch" | ||
}, | ||
{ | ||
"type": "test", | ||
"release": "patch" | ||
}, | ||
{ | ||
"type": "deps", | ||
"release": "patch" | ||
}, | ||
{ | ||
"scope": "no-release", | ||
"release": false | ||
} | ||
] | ||
} | ||
], | ||
[ | ||
"@semantic-release/release-notes-generator", | ||
{ | ||
"preset": "conventionalcommits", | ||
"presetConfig": { | ||
"types": [ | ||
{ | ||
"type": "feat", | ||
"section": "Features" | ||
}, | ||
{ | ||
"type": "fix", | ||
"section": "Bug Fixes" | ||
}, | ||
{ | ||
"type": "chore", | ||
"section": "Trivial Changes" | ||
}, | ||
{ | ||
"type": "docs", | ||
"section": "Documentation" | ||
}, | ||
{ | ||
"type": "deps", | ||
"section": "Dependencies" | ||
}, | ||
{ | ||
"type": "test", | ||
"section": "Tests" | ||
} | ||
] | ||
} | ||
} | ||
], | ||
"@semantic-release/changelog", | ||
"@semantic-release/npm", | ||
"@semantic-release/github", | ||
"@semantic-release/git" | ||
] | ||
}, | ||
"scripts": { | ||
"generate": "protons src/private-to-private/pb/message.proto src/pb/message.proto", | ||
"build": "aegir build", | ||
"test": "aegir test -t browser", | ||
"test": "aegir test -t node -t browser -t electron-main", | ||
"test:node": "aegir test -t node --cov", | ||
"test:chrome": "aegir test -t browser --cov", | ||
@@ -136,28 +47,23 @@ "test:firefox": "aegir test -t browser -- --browser firefox", | ||
"clean": "aegir clean", | ||
"dep-check": "aegir dep-check -i protons", | ||
"release": "aegir release" | ||
"dep-check": "aegir dep-check" | ||
}, | ||
"dependencies": { | ||
"@chainsafe/libp2p-noise": "^12.0.0", | ||
"@libp2p/interface-connection": "^5.0.2", | ||
"@libp2p/interface-metrics": "^4.0.8", | ||
"@libp2p/interface-peer-id": "^2.0.2", | ||
"@libp2p/interface-registrar": "^2.0.12", | ||
"@libp2p/interface-stream-muxer": "^4.1.2", | ||
"@libp2p/interface-transport": "^4.0.3", | ||
"@libp2p/interfaces": "^3.3.2", | ||
"@libp2p/logger": "^2.0.7", | ||
"@libp2p/peer-id": "^2.0.3", | ||
"@libp2p/interface": "0.1.0-72e81dc1", | ||
"@libp2p/interface-internal": "0.1.0-72e81dc1", | ||
"@libp2p/logger": "3.0.0-72e81dc1", | ||
"@libp2p/peer-id": "3.0.0-72e81dc1", | ||
"@multiformats/mafmt": "^12.1.2", | ||
"@multiformats/multiaddr": "^12.1.2", | ||
"@multiformats/multiaddr": "^12.1.3", | ||
"abortable-iterator": "^5.0.1", | ||
"detect-browser": "^5.3.0", | ||
"it-length-prefixed": "^9.0.1", | ||
"it-pb-stream": "^4.0.1", | ||
"it-pipe": "^3.0.1", | ||
"it-pushable": "^3.1.3", | ||
"it-protobuf-stream": "^1.0.0", | ||
"it-pushable": "^3.2.0", | ||
"it-stream-types": "^2.0.1", | ||
"it-to-buffer": "^4.0.2", | ||
"multiformats": "^11.0.2", | ||
"multiformats": "^12.0.1", | ||
"multihashes": "^4.0.3", | ||
"node-datachannel": "^0.4.3", | ||
"p-defer": "^4.0.0", | ||
@@ -167,12 +73,11 @@ "p-event": "^6.0.0", | ||
"uint8arraylist": "^2.4.3", | ||
"uint8arrays": "^4.0.3" | ||
"uint8arrays": "^4.0.4" | ||
}, | ||
"devDependencies": { | ||
"@chainsafe/libp2p-yamux": "^4.0.1", | ||
"@libp2p/interface-libp2p": "^3.1.0", | ||
"@libp2p/interface-mocks": "^12.0.1", | ||
"@libp2p/peer-id-factory": "^2.0.3", | ||
"@libp2p/websockets": "^6.0.1", | ||
"@types/sinon": "^10.0.14", | ||
"aegir": "^39.0.7", | ||
"@chainsafe/libp2p-yamux": "^4.0.0", | ||
"@libp2p/interface-compliance-tests": "4.0.0-72e81dc1", | ||
"@libp2p/peer-id-factory": "3.0.0-72e81dc1", | ||
"@libp2p/websockets": "7.0.0-72e81dc1", | ||
"@types/sinon": "^10.0.15", | ||
"aegir": "^40.0.1", | ||
"delay": "^6.0.0", | ||
@@ -182,7 +87,10 @@ "it-length": "^3.0.2", | ||
"it-pair": "^2.0.6", | ||
"libp2p": "^0.45.0", | ||
"libp2p": "0.46.0-72e81dc1", | ||
"protons": "^7.0.2", | ||
"sinon": "^15.0.4", | ||
"sinon": "^15.1.2", | ||
"sinon-ts": "^1.0.0" | ||
}, | ||
"browser": { | ||
"./dist/src/webrtc/index.js": "./dist/src/webrtc/index.browser.js" | ||
} | ||
} |
@@ -5,4 +5,4 @@ # @libp2p/webrtc <!-- omit in toc --> | ||
[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) | ||
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-webrtc.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-webrtc) | ||
[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p-webrtc/js-test-and-release.yml?branch=main\&style=flat-square)](https://github.com/libp2p/js-libp2p-webrtc/actions/workflows/js-test-and-release.yml?query=branch%3Amain) | ||
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p) | ||
[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p/main.yml?branch=master\&style=flat-square)](https://github.com/libp2p/js-libp2p/actions/workflows/main.yml?query=branch%3Amaster) | ||
@@ -27,2 +27,3 @@ > A libp2p transport using WebRTC connections | ||
- [Check Dependencies](#check-dependencies) | ||
- [API Docs](#api-docs) | ||
- [License](#license) | ||
@@ -176,2 +177,6 @@ - [Contribution](#contribution) | ||
## API Docs | ||
- <https://libp2p.github.io/js-libp2p/modules/_libp2p_webrtc.html> | ||
## License | ||
@@ -178,0 +183,0 @@ |
@@ -1,3 +0,3 @@ | ||
import { CodeError } from '@libp2p/interfaces/errors' | ||
import type { Direction } from '@libp2p/interface-connection' | ||
import { CodeError } from '@libp2p/interface/errors' | ||
import type { Direction } from '@libp2p/interface/connection' | ||
@@ -4,0 +4,0 @@ export enum codes { |
import { WebRTCTransport } from './private-to-private/transport.js' | ||
import { WebRTCDirectTransport, type WebRTCTransportDirectInit, type WebRTCDirectTransportComponents } from './private-to-public/transport.js' | ||
import type { WebRTCTransportComponents, WebRTCTransportInit } from './private-to-private/transport.js' | ||
import type { Transport } from '@libp2p/interface-transport' | ||
import type { Transport } from '@libp2p/interface/transport' | ||
@@ -6,0 +6,0 @@ /** |
import { logger } from '@libp2p/logger' | ||
import { nopSink, nopSource } from './util.js' | ||
import type { MultiaddrConnection, MultiaddrConnectionTimeline } from '@libp2p/interface-connection' | ||
import type { CounterGroup } from '@libp2p/interface-metrics' | ||
import type { Multiaddr } from '@multiformats/multiaddr' | ||
import type { MultiaddrConnection, MultiaddrConnectionTimeline } from '@libp2p/interface/connection' | ||
import type { CounterGroup } from '@libp2p/interface/metrics' | ||
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' | ||
import type { Source, Sink } from 'it-stream-types' | ||
@@ -75,12 +75,17 @@ | ||
async close (err?: Error | undefined): Promise<void> { | ||
if (err !== undefined) { | ||
log.error('error closing connection', err) | ||
} | ||
async close (options?: AbortOptions): Promise<void> { | ||
log.trace('closing connection') | ||
this.peerConnection.close() | ||
this.timeline.close = Date.now() | ||
this.peerConnection.close() | ||
this.metrics?.increment({ close: true }) | ||
} | ||
abort (err: Error): void { | ||
log.error('closing connection due to error', err) | ||
this.peerConnection.close() | ||
this.timeline.close = Date.now() | ||
this.metrics?.increment({ abort: true }) | ||
} | ||
} |
import { createStream } from './stream.js' | ||
import { nopSink, nopSource } from './util.js' | ||
import type { DataChannelOpts } from './stream.js' | ||
import type { Stream } from '@libp2p/interface-connection' | ||
import type { CounterGroup } from '@libp2p/interface-metrics' | ||
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer' | ||
import type { Stream } from '@libp2p/interface/connection' | ||
import type { CounterGroup } from '@libp2p/interface/metrics' | ||
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer' | ||
import type { AbortOptions } from '@multiformats/multiaddr' | ||
import type { Source, Sink } from 'it-stream-types' | ||
@@ -96,7 +97,12 @@ import type { Uint8ArrayList } from 'uint8arraylist' | ||
/** | ||
* Close or abort all tracked streams and stop the muxer | ||
* Gracefully close all tracked streams and stop the muxer | ||
*/ | ||
close: (err?: Error | undefined) => void = () => { } | ||
close: (options?: AbortOptions) => Promise<void> = async () => { } | ||
/** | ||
* Abort all tracked streams and stop the muxer | ||
*/ | ||
abort: (err: Error) => void = () => { } | ||
/** | ||
* The stream source, a no-op as the transport natively supports multiplexing | ||
@@ -103,0 +109,0 @@ */ |
@@ -0,12 +1,14 @@ | ||
import { CodeError } from '@libp2p/interface/errors' | ||
import { logger } from '@libp2p/logger' | ||
import { abortableDuplex } from 'abortable-iterator' | ||
import { pbStream } from 'it-pb-stream' | ||
import { pbStream } from 'it-protobuf-stream' | ||
import pDefer, { type DeferredPromise } from 'p-defer' | ||
import { DataChannelMuxerFactory } from '../muxer.js' | ||
import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js' | ||
import { Message } from './pb/message.js' | ||
import { readCandidatesUntilConnected, resolveOnConnected } from './util.js' | ||
import type { DataChannelOpts } from '../stream.js' | ||
import type { Stream } from '@libp2p/interface-connection' | ||
import type { IncomingStreamData } from '@libp2p/interface-registrar' | ||
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer' | ||
import type { Stream } from '@libp2p/interface/connection' | ||
import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer' | ||
import type { IncomingStreamData } from '@libp2p/interface-internal/registrar' | ||
@@ -23,62 +25,71 @@ const DEFAULT_TIMEOUT = 30 * 1000 | ||
const pc = new RTCPeerConnection(rtcConfiguration) | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) | ||
const connectedPromise: DeferredPromise<void> = pDefer() | ||
const answerSentPromise: DeferredPromise<void> = pDefer() | ||
signal.onabort = () => { connectedPromise.reject() } | ||
// candidate callbacks | ||
pc.onicecandidate = ({ candidate }) => { | ||
answerSentPromise.promise.then( | ||
() => { | ||
stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
}) | ||
}, | ||
(err) => { | ||
log.error('cannot set candidate since sending answer failed', err) | ||
} | ||
) | ||
} | ||
try { | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) | ||
const connectedPromise: DeferredPromise<void> = pDefer() | ||
const answerSentPromise: DeferredPromise<void> = pDefer() | ||
resolveOnConnected(pc, connectedPromise) | ||
signal.onabort = () => { | ||
connectedPromise.reject(new CodeError('Timed out while trying to connect', 'ERR_TIMEOUT')) | ||
} | ||
// candidate callbacks | ||
pc.onicecandidate = ({ candidate }) => { | ||
answerSentPromise.promise.then( | ||
async () => { | ||
await stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
}) | ||
}, | ||
(err) => { | ||
log.error('cannot set candidate since sending answer failed', err) | ||
connectedPromise.reject(err) | ||
} | ||
) | ||
} | ||
// read an SDP offer | ||
const pbOffer = await stream.read() | ||
if (pbOffer.type !== Message.Type.SDP_OFFER) { | ||
throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `) | ||
} | ||
const offer = new RTCSessionDescription({ | ||
type: 'offer', | ||
sdp: pbOffer.data | ||
}) | ||
resolveOnConnected(pc, connectedPromise) | ||
await pc.setRemoteDescription(offer).catch(err => { | ||
log.error('could not execute setRemoteDescription', err) | ||
throw new Error('Failed to set remoteDescription') | ||
}) | ||
// read an SDP offer | ||
const pbOffer = await stream.read() | ||
if (pbOffer.type !== Message.Type.SDP_OFFER) { | ||
throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `) | ||
} | ||
const offer = new RTCSessionDescription({ | ||
type: 'offer', | ||
sdp: pbOffer.data | ||
}) | ||
// create and write an SDP answer | ||
const answer = await pc.createAnswer().catch(err => { | ||
log.error('could not execute createAnswer', err) | ||
answerSentPromise.reject(err) | ||
throw new Error('Failed to create answer') | ||
}) | ||
// write the answer to the remote | ||
stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }) | ||
await pc.setRemoteDescription(offer).catch(err => { | ||
log.error('could not execute setRemoteDescription', err) | ||
throw new Error('Failed to set remoteDescription') | ||
}) | ||
await pc.setLocalDescription(answer).catch(err => { | ||
log.error('could not execute setLocalDescription', err) | ||
answerSentPromise.reject(err) | ||
throw new Error('Failed to set localDescription') | ||
}) | ||
// create and write an SDP answer | ||
const answer = await pc.createAnswer().catch(err => { | ||
log.error('could not execute createAnswer', err) | ||
answerSentPromise.reject(err) | ||
throw new Error('Failed to create answer') | ||
}) | ||
// write the answer to the remote | ||
await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }) | ||
answerSentPromise.resolve() | ||
await pc.setLocalDescription(answer).catch(err => { | ||
log.error('could not execute setLocalDescription', err) | ||
answerSentPromise.reject(err) | ||
throw new Error('Failed to set localDescription') | ||
}) | ||
// wait until candidates are connected | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream) | ||
answerSentPromise.resolve() | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') | ||
// wait until candidates are connected | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream) | ||
return { pc, muxerFactory, remoteAddress } | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') | ||
return { pc, muxerFactory, remoteAddress } | ||
} catch (err) { | ||
pc.close() | ||
throw err | ||
} | ||
} | ||
@@ -97,49 +108,59 @@ | ||
const pc = new RTCPeerConnection(rtcConfiguration) | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) | ||
const connectedPromise: DeferredPromise<void> = pDefer() | ||
resolveOnConnected(pc, connectedPromise) | ||
try { | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) | ||
// reject the connectedPromise if the signal aborts | ||
signal.onabort = connectedPromise.reject | ||
// we create the channel so that the peerconnection has a component for which | ||
// to collect candidates. The label is not relevant to connection initiation | ||
// but can be useful for debugging | ||
const channel = pc.createDataChannel('init') | ||
// setup callback to write ICE candidates to the remote | ||
// peer | ||
pc.onicecandidate = ({ candidate }) => { | ||
stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
const connectedPromise: DeferredPromise<void> = pDefer() | ||
resolveOnConnected(pc, connectedPromise) | ||
// reject the connectedPromise if the signal aborts | ||
signal.onabort = connectedPromise.reject | ||
// we create the channel so that the peerconnection has a component for which | ||
// to collect candidates. The label is not relevant to connection initiation | ||
// but can be useful for debugging | ||
const channel = pc.createDataChannel('init') | ||
// setup callback to write ICE candidates to the remote | ||
// peer | ||
pc.onicecandidate = ({ candidate }) => { | ||
void stream.write({ | ||
type: Message.Type.ICE_CANDIDATE, | ||
data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' | ||
}) | ||
.catch(err => { | ||
log.error('error sending ICE candidate', err) | ||
}) | ||
} | ||
// create an offer | ||
const offerSdp = await pc.createOffer() | ||
// write the offer to the stream | ||
await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }) | ||
// set offer as local description | ||
await pc.setLocalDescription(offerSdp).catch(err => { | ||
log.error('could not execute setLocalDescription', err) | ||
throw new Error('Failed to set localDescription') | ||
}) | ||
} | ||
// create an offer | ||
const offerSdp = await pc.createOffer() | ||
// write the offer to the stream | ||
stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }) | ||
// set offer as local description | ||
await pc.setLocalDescription(offerSdp).catch(err => { | ||
log.error('could not execute setLocalDescription', err) | ||
throw new Error('Failed to set localDescription') | ||
}) | ||
// read answer | ||
const answerMessage = await stream.read() | ||
if (answerMessage.type !== Message.Type.SDP_ANSWER) { | ||
throw new Error('remote should send an SDP answer') | ||
} | ||
// read answer | ||
const answerMessage = await stream.read() | ||
if (answerMessage.type !== Message.Type.SDP_ANSWER) { | ||
throw new Error('remote should send an SDP answer') | ||
} | ||
const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) | ||
await pc.setRemoteDescription(answerSdp).catch(err => { | ||
log.error('could not execute setRemoteDescription', err) | ||
throw new Error('Failed to set remoteDescription') | ||
}) | ||
const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) | ||
await pc.setRemoteDescription(answerSdp).catch(err => { | ||
log.error('could not execute setRemoteDescription', err) | ||
throw new Error('Failed to set remoteDescription') | ||
}) | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream) | ||
channel.close() | ||
await readCandidatesUntilConnected(connectedPromise, pc, stream) | ||
channel.close() | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') | ||
const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') | ||
return { pc, muxerFactory, remoteAddress } | ||
return { pc, muxerFactory, remoteAddress } | ||
} catch (err) { | ||
pc.close() | ||
throw err | ||
} | ||
} | ||
@@ -146,0 +167,0 @@ |
@@ -1,5 +0,6 @@ | ||
import { EventEmitter } from '@libp2p/interfaces/events' | ||
import { EventEmitter } from '@libp2p/interface/events' | ||
import { Circuit } from '@multiformats/mafmt' | ||
import type { PeerId } from '@libp2p/interface-peer-id' | ||
import type { ListenerEvents, Listener, TransportManager } from '@libp2p/interface-transport' | ||
import type { PeerId } from '@libp2p/interface/peer-id' | ||
import type { ListenerEvents, Listener } from '@libp2p/interface/transport' | ||
import type { TransportManager } from '@libp2p/interface-internal/transport-manager' | ||
import type { Multiaddr } from '@multiformats/multiaddr' | ||
@@ -6,0 +7,0 @@ |
@@ -1,3 +0,3 @@ | ||
import { type CreateListenerOptions, type DialOptions, type Listener, symbol, type Transport, type Upgrader, type TransportManager } from '@libp2p/interface-transport' | ||
import { CodeError } from '@libp2p/interfaces/errors' | ||
import { CodeError } from '@libp2p/interface/errors' | ||
import { type CreateListenerOptions, type DialOptions, symbol, type Transport, type Listener, type Upgrader } from '@libp2p/interface/transport' | ||
import { logger } from '@libp2p/logger' | ||
@@ -8,9 +8,11 @@ import { peerIdFromString } from '@libp2p/peer-id' | ||
import { WebRTCMultiaddrConnection } from '../maconn.js' | ||
import { cleanup } from '../webrtc/index.js' | ||
import { initiateConnection, handleIncomingStream } from './handler.js' | ||
import { WebRTCPeerListener } from './listener.js' | ||
import type { DataChannelOpts } from '../stream.js' | ||
import type { Connection } from '@libp2p/interface-connection' | ||
import type { PeerId } from '@libp2p/interface-peer-id' | ||
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' | ||
import type { Startable } from '@libp2p/interfaces/startable' | ||
import type { Connection } from '@libp2p/interface/connection' | ||
import type { PeerId } from '@libp2p/interface/peer-id' | ||
import type { Startable } from '@libp2p/interface/startable' | ||
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar' | ||
import type { TransportManager } from '@libp2p/interface-internal/transport-manager' | ||
@@ -52,2 +54,4 @@ const log = logger('libp2p:webrtc:peer') | ||
this._onProtocol(data).catch(err => { log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err) }) | ||
}, { | ||
runOnTransientConnection: true | ||
}) | ||
@@ -59,2 +63,3 @@ this._started = true | ||
await this.components.registrar.unhandle(SIGNALING_PROTO_ID) | ||
cleanup() | ||
this._started = false | ||
@@ -95,3 +100,6 @@ } | ||
const connection = await this.components.transportManager.dial(baseAddr, options) | ||
const signalingStream = await connection.newStream([SIGNALING_PROTO_ID], options) | ||
const signalingStream = await connection.newStream(SIGNALING_PROTO_ID, { | ||
...options, | ||
runOnTransientConnection: true | ||
}) | ||
@@ -120,7 +128,7 @@ try { | ||
// close the stream if SDP has been exchanged successfully | ||
signalingStream.close() | ||
await signalingStream.close() | ||
return result | ||
} catch (err) { | ||
} catch (err: any) { | ||
// reset the stream in case of any error | ||
signalingStream.reset() | ||
signalingStream.abort(err) | ||
throw err | ||
@@ -151,4 +159,4 @@ } finally { | ||
}) | ||
} catch (err) { | ||
stream.reset() | ||
} catch (err: any) { | ||
stream.abort(err) | ||
throw err | ||
@@ -155,0 +163,0 @@ } finally { |
import { logger } from '@libp2p/logger' | ||
import { isFirefox } from '../util.js' | ||
import { RTCIceCandidate } from '../webrtc/index.js' | ||
import { Message } from './pb/message.js' | ||
@@ -4,0 +5,0 @@ import type { DeferredPromise } from 'p-defer' |
@@ -1,4 +0,4 @@ | ||
import type { CreateListenerOptions, DialOptions } from '@libp2p/interface-transport' | ||
import type { CreateListenerOptions, DialOptions } from '@libp2p/interface/transport' | ||
export interface WebRTCListenerOptions extends CreateListenerOptions {} | ||
export interface WebRTCDialOptions extends DialOptions {} |
@@ -136,3 +136,3 @@ import { logger } from '@libp2p/logger' | ||
a=sctp-port:5000 | ||
a=max-message-size:100000 | ||
a=max-message-size:16384 | ||
a=candidate:1467250027 1 UDP 1467250027 ${host} ${port} typ host\r\n` | ||
@@ -139,0 +139,0 @@ } |
import { noise as Noise } from '@chainsafe/libp2p-noise' | ||
import { type CreateListenerOptions, type Listener, symbol, type Transport } from '@libp2p/interface-transport' | ||
import { type CreateListenerOptions, symbol, type Transport, type Listener } from '@libp2p/interface/transport' | ||
import { logger } from '@libp2p/logger' | ||
@@ -14,2 +14,3 @@ import * as p from '@libp2p/peer-id' | ||
import { isFirefox } from '../util.js' | ||
import { RTCPeerConnection } from '../webrtc/index.js' | ||
import * as sdp from './sdp.js' | ||
@@ -19,5 +20,5 @@ import { genUfrag } from './util.js' | ||
import type { DataChannelOpts } from '../stream.js' | ||
import type { Connection } from '@libp2p/interface-connection' | ||
import type { CounterGroup, Metrics } from '@libp2p/interface-metrics' | ||
import type { PeerId } from '@libp2p/interface-peer-id' | ||
import type { Connection } from '@libp2p/interface/connection' | ||
import type { CounterGroup, Metrics } from '@libp2p/interface/metrics' | ||
import type { PeerId } from '@libp2p/interface/peer-id' | ||
import type { Multiaddr } from '@multiformats/multiaddr' | ||
@@ -139,112 +140,117 @@ | ||
// create data channel for running the noise handshake. Once the data channel is opened, | ||
// the remote will initiate the noise handshake. This is used to confirm the identity of | ||
// the peer. | ||
const dataChannelOpenPromise = new Promise<RTCDataChannel>((resolve, reject) => { | ||
const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }) | ||
const handshakeTimeout = setTimeout(() => { | ||
const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}` | ||
log.error(error) | ||
this.metrics?.dialerEvents.increment({ open_error: true }) | ||
reject(dataChannelError('data', error)) | ||
}, HANDSHAKE_TIMEOUT_MS) | ||
try { | ||
// create data channel for running the noise handshake. Once the data channel is opened, | ||
// the remote will initiate the noise handshake. This is used to confirm the identity of | ||
// the peer. | ||
const dataChannelOpenPromise = new Promise<RTCDataChannel>((resolve, reject) => { | ||
const handshakeDataChannel = peerConnection.createDataChannel('', { negotiated: true, id: 0 }) | ||
const handshakeTimeout = setTimeout(() => { | ||
const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}` | ||
log.error(error) | ||
this.metrics?.dialerEvents.increment({ open_error: true }) | ||
reject(dataChannelError('data', error)) | ||
}, HANDSHAKE_TIMEOUT_MS) | ||
handshakeDataChannel.onopen = (_) => { | ||
clearTimeout(handshakeTimeout) | ||
resolve(handshakeDataChannel) | ||
} | ||
handshakeDataChannel.onopen = (_) => { | ||
clearTimeout(handshakeTimeout) | ||
resolve(handshakeDataChannel) | ||
} | ||
// ref: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/error_event | ||
handshakeDataChannel.onerror = (event: Event) => { | ||
clearTimeout(handshakeTimeout) | ||
const errorTarget = event.target?.toString() ?? 'not specified' | ||
const error = `Error opening a data channel for handshaking: ${errorTarget}` | ||
log.error(error) | ||
// NOTE: We use unknown error here but this could potentially be considered a reset by some standards. | ||
this.metrics?.dialerEvents.increment({ unknown_error: true }) | ||
reject(dataChannelError('data', error)) | ||
} | ||
}) | ||
// ref: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/error_event | ||
handshakeDataChannel.onerror = (event: Event) => { | ||
clearTimeout(handshakeTimeout) | ||
const errorTarget = event.target?.toString() ?? 'not specified' | ||
const error = `Error opening a data channel for handshaking: ${errorTarget}` | ||
log.error(error) | ||
// NOTE: We use unknown error here but this could potentially be considered a reset by some standards. | ||
this.metrics?.dialerEvents.increment({ unknown_error: true }) | ||
reject(dataChannelError('data', error)) | ||
} | ||
}) | ||
const ufrag = 'libp2p+webrtc+v1/' + genUfrag(32) | ||
const ufrag = 'libp2p+webrtc+v1/' + genUfrag(32) | ||
// Create offer and munge sdp with ufrag == pwd. This allows the remote to | ||
// respond to STUN messages without performing an actual SDP exchange. | ||
// This is because it can infer the passwd field by reading the USERNAME | ||
// attribute of the STUN message. | ||
const offerSdp = await peerConnection.createOffer() | ||
const mungedOfferSdp = sdp.munge(offerSdp, ufrag) | ||
await peerConnection.setLocalDescription(mungedOfferSdp) | ||
// Create offer and munge sdp with ufrag == pwd. This allows the remote to | ||
// respond to STUN messages without performing an actual SDP exchange. | ||
// This is because it can infer the passwd field by reading the USERNAME | ||
// attribute of the STUN message. | ||
const offerSdp = await peerConnection.createOffer() | ||
const mungedOfferSdp = sdp.munge(offerSdp, ufrag) | ||
await peerConnection.setLocalDescription(mungedOfferSdp) | ||
// construct answer sdp from multiaddr and ufrag | ||
const answerSdp = sdp.fromMultiAddr(ma, ufrag) | ||
await peerConnection.setRemoteDescription(answerSdp) | ||
// construct answer sdp from multiaddr and ufrag | ||
const answerSdp = sdp.fromMultiAddr(ma, ufrag) | ||
await peerConnection.setRemoteDescription(answerSdp) | ||
// wait for peerconnection.onopen to fire, or for the datachannel to open | ||
const handshakeDataChannel = await dataChannelOpenPromise | ||
// wait for peerconnection.onopen to fire, or for the datachannel to open | ||
const handshakeDataChannel = await dataChannelOpenPromise | ||
const myPeerId = this.components.peerId | ||
const myPeerId = this.components.peerId | ||
// Do noise handshake. | ||
// Set the Noise Prologue to libp2p-webrtc-noise:<FINGERPRINTS> before starting the actual Noise handshake. | ||
// <FINGERPRINTS> is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. | ||
const fingerprintsPrologue = this.generateNoisePrologue(peerConnection, remoteCerthash.code, ma) | ||
// Do noise handshake. | ||
// Set the Noise Prologue to libp2p-webrtc-noise:<FINGERPRINTS> before starting the actual Noise handshake. | ||
// <FINGERPRINTS> is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. | ||
const fingerprintsPrologue = this.generateNoisePrologue(peerConnection, remoteCerthash.code, ma) | ||
// Since we use the default crypto interface and do not use a static key or early data, | ||
// we pass in undefined for these parameters. | ||
const noise = Noise({ prologueBytes: fingerprintsPrologue })() | ||
// Since we use the default crypto interface and do not use a static key or early data, | ||
// we pass in undefined for these parameters. | ||
const noise = Noise({ prologueBytes: fingerprintsPrologue })() | ||
const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }) | ||
const wrappedDuplex = { | ||
...wrappedChannel, | ||
sink: wrappedChannel.sink.bind(wrappedChannel), | ||
source: (async function * () { | ||
for await (const list of wrappedChannel.source) { | ||
for (const buf of list) { | ||
yield buf | ||
const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }) | ||
const wrappedDuplex = { | ||
...wrappedChannel, | ||
sink: wrappedChannel.sink.bind(wrappedChannel), | ||
source: (async function * () { | ||
for await (const list of wrappedChannel.source) { | ||
for (const buf of list) { | ||
yield buf | ||
} | ||
} | ||
} | ||
}()) | ||
} | ||
}()) | ||
} | ||
// Creating the connection before completion of the noise | ||
// handshake ensures that the stream opening callback is set up | ||
const maConn = new WebRTCMultiaddrConnection({ | ||
peerConnection, | ||
remoteAddr: ma, | ||
timeline: { | ||
open: Date.now() | ||
}, | ||
metrics: this.metrics?.dialerEvents | ||
}) | ||
// Creating the connection before completion of the noise | ||
// handshake ensures that the stream opening callback is set up | ||
const maConn = new WebRTCMultiaddrConnection({ | ||
peerConnection, | ||
remoteAddr: ma, | ||
timeline: { | ||
open: Date.now() | ||
}, | ||
metrics: this.metrics?.dialerEvents | ||
}) | ||
const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange' | ||
const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange' | ||
peerConnection.addEventListener(eventListeningName, () => { | ||
switch (peerConnection.connectionState) { | ||
case 'failed': | ||
case 'disconnected': | ||
case 'closed': | ||
maConn.close().catch((err) => { | ||
log.error('error closing connection', err) | ||
}).finally(() => { | ||
// Remove the event listener once the connection is closed | ||
controller.abort() | ||
}) | ||
break | ||
default: | ||
break | ||
} | ||
}, { signal }) | ||
peerConnection.addEventListener(eventListeningName, () => { | ||
switch (peerConnection.connectionState) { | ||
case 'failed': | ||
case 'disconnected': | ||
case 'closed': | ||
maConn.close().catch((err) => { | ||
log.error('error closing connection', err) | ||
}).finally(() => { | ||
// Remove the event listener once the connection is closed | ||
controller.abort() | ||
}) | ||
break | ||
default: | ||
break | ||
} | ||
}, { signal }) | ||
// Track opened peer connection | ||
this.metrics?.dialerEvents.increment({ peer_connection: true }) | ||
// Track opened peer connection | ||
this.metrics?.dialerEvents.increment({ peer_connection: true }) | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection, metrics: this.metrics?.dialerEvents, dataChannelOptions: this.init.dataChannel }) | ||
const muxerFactory = new DataChannelMuxerFactory({ peerConnection, metrics: this.metrics?.dialerEvents, dataChannelOptions: this.init.dataChannel }) | ||
// For outbound connections, the remote is expected to start the noise handshake. | ||
// Therefore, we need to secure an inbound noise connection from the remote. | ||
await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId) | ||
// For outbound connections, the remote is expected to start the noise handshake. | ||
// Therefore, we need to secure an inbound noise connection from the remote. | ||
await noise.secureInbound(myPeerId, wrappedDuplex, theirPeerId) | ||
return options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }) | ||
return await options.upgrader.upgradeOutbound(maConn, { skipProtection: true, skipEncryption: true, muxerFactory }) | ||
} catch (err) { | ||
peerConnection.close() | ||
throw err | ||
} | ||
} | ||
@@ -251,0 +257,0 @@ |
@@ -1,3 +0,2 @@ | ||
const charset = Array.from('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/') | ||
export const genUfrag = (len: number): string => [...Array(len)].map(() => charset.at(Math.floor(Math.random() * charset.length))).join('') |
@@ -1,3 +0,3 @@ | ||
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface-stream-muxer/stream' | ||
import { CodeError } from '@libp2p/interfaces/errors' | ||
import { CodeError } from '@libp2p/interface/errors' | ||
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream' | ||
import { logger } from '@libp2p/logger' | ||
@@ -9,3 +9,3 @@ import * as lengthPrefixed from 'it-length-prefixed' | ||
import { Message } from './pb/message.js' | ||
import type { Direction, Stream } from '@libp2p/interface-connection' | ||
import type { Direction } from '@libp2p/interface/connection' | ||
@@ -30,2 +30,4 @@ const log = logger('libp2p:webrtc:stream') | ||
dataChannelOptions?: Partial<DataChannelOpts> | ||
maxDataSize: number | ||
} | ||
@@ -45,3 +47,3 @@ | ||
class WebRTCStream extends AbstractStream { | ||
export class WebRTCStream extends AbstractStream { | ||
/** | ||
@@ -64,2 +66,3 @@ * The data channel used to send and receive data | ||
private messageQueue?: Uint8ArrayList | ||
private readonly maxDataSize: number | ||
@@ -78,2 +81,3 @@ constructor (init: WebRTCStreamInit) { | ||
} | ||
this.maxDataSize = init.maxDataSize | ||
@@ -87,4 +91,4 @@ // set up initial state | ||
case 'closing': | ||
if (this.stat.timeline.close === undefined || this.stat.timeline.close === 0) { | ||
this.stat.timeline.close = Date.now() | ||
if (this.timeline.close === undefined || this.timeline.close === 0) { | ||
this.timeline.close = Date.now() | ||
} | ||
@@ -103,3 +107,3 @@ break | ||
this.channel.onopen = (_evt) => { | ||
this.stat.timeline.open = new Date().getTime() | ||
this.timeline.open = new Date().getTime() | ||
@@ -117,3 +121,5 @@ if (this.messageQueue != null) { | ||
this.channel.onclose = (_evt) => { | ||
this.close() | ||
void this.close().catch(err => { | ||
log.error('error closing stream after channel closed', err) | ||
}) | ||
} | ||
@@ -164,3 +170,2 @@ | ||
if (err instanceof TimeoutError) { | ||
this.abort(err) | ||
throw new Error('Timed out waiting for DataChannel buffer to clear') | ||
@@ -196,6 +201,13 @@ } | ||
async sendData (data: Uint8ArrayList): Promise<void> { | ||
const msgbuf = Message.encode({ message: data.subarray() }) | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf) | ||
data = data.sublist() | ||
await this._sendMessage(sendbuf) | ||
while (data.byteLength > 0) { | ||
const toSend = Math.min(data.byteLength, this.maxDataSize) | ||
const buf = data.subarray(0, toSend) | ||
const msgbuf = Message.encode({ message: buf }) | ||
const sendbuf = lengthPrefixed.encode.single(msgbuf) | ||
await this._sendMessage(sendbuf) | ||
data.consume(toSend) | ||
} | ||
} | ||
@@ -225,3 +237,3 @@ | ||
this.incomingData.end() | ||
this.closeRead() | ||
this.remoteCloseWrite() | ||
} | ||
@@ -236,3 +248,3 @@ | ||
// The remote has stopped reading | ||
this.closeWrite() | ||
this.remoteCloseRead() | ||
} | ||
@@ -274,3 +286,3 @@ } | ||
export function createStream (options: WebRTCStreamOptions): Stream { | ||
export function createStream (options: WebRTCStreamOptions): WebRTCStream { | ||
const { channel, direction, onEnd, dataChannelOptions } = options | ||
@@ -284,4 +296,5 @@ | ||
onEnd, | ||
channel | ||
channel, | ||
log: logger(`libp2p:mplex:stream:${direction}:${channel.id}`) | ||
}) | ||
} |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
682396
23
14
121
6077
189
4
2
+ Addedit-protobuf-stream@^1.0.0
+ Addednode-datachannel@^0.4.3
+ Added@libp2p/interface@0.1.0-72e81dc1(transitive)
+ Added@libp2p/interface-internal@0.1.0-72e81dc1(transitive)
+ Added@libp2p/logger@3.0.0-72e81dc1(transitive)
+ Added@libp2p/peer-collections@4.0.0-72e81dc1(transitive)
+ Added@libp2p/peer-id@3.0.0-72e81dc1(transitive)
+ Addedit-byte-stream@1.1.0(transitive)
+ Addedit-length-prefixed-stream@1.2.0(transitive)
+ Addedit-protobuf-stream@1.1.5(transitive)
+ Addedit-queueless-pushable@1.0.0(transitive)
+ Addednode-datachannel@0.4.3(transitive)
+ Addedrace-signal@1.1.0(transitive)
- Removed@libp2p/interface-connection@^5.0.2
- Removed@libp2p/interface-metrics@^4.0.8
- Removed@libp2p/interface-peer-id@^2.0.2
- Removed@libp2p/interface-registrar@^2.0.12
- Removed@libp2p/interface-transport@^4.0.3
- Removed@libp2p/interfaces@^3.3.2
- Removedit-pb-stream@^4.0.1
- Removed@libp2p/interface-registrar@2.0.12(transitive)
- Removed@libp2p/interface-stream-muxer@4.1.2(transitive)
- Removed@libp2p/interface-transport@4.0.3(transitive)
- Removedany-signal@4.1.1(transitive)
Updatedit-pushable@^3.2.0
Updatedmultiformats@^12.0.1
Updateduint8arrays@^4.0.4