Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bittorrent-protocol

Package Overview
Dependencies
Maintainers
8
Versions
100
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bittorrent-protocol - npm Package Compare versions

Comparing version 2.4.2 to 3.0.0

1230

index.js

@@ -1,745 +0,747 @@

module.exports = Wire
const arrayRemove = require('unordered-array-remove')
const bencode = require('bencode')
const BitField = require('bitfield')
const debug = require('debug')('bittorrent-protocol')
const extend = require('xtend')
const randombytes = require('randombytes')
const speedometer = require('speedometer')
const stream = require('readable-stream')
var arrayRemove = require('unordered-array-remove')
var bencode = require('bencode')
var BitField = require('bitfield')
var Buffer = require('safe-buffer').Buffer
var debug = require('debug')('bittorrent-protocol')
var extend = require('xtend')
var inherits = require('inherits')
var randombytes = require('randombytes')
var speedometer = require('speedometer')
var stream = require('readable-stream')
const BITFIELD_GROW = 400000
const KEEP_ALIVE_TIMEOUT = 55000
var BITFIELD_GROW = 400000
var KEEP_ALIVE_TIMEOUT = 55000
const MESSAGE_PROTOCOL = Buffer.from('\u0013BitTorrent protocol')
const MESSAGE_KEEP_ALIVE = Buffer.from([0x00, 0x00, 0x00, 0x00])
const MESSAGE_CHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x00])
const MESSAGE_UNCHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x01])
const MESSAGE_INTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x02])
const MESSAGE_UNINTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x03])
var MESSAGE_PROTOCOL = Buffer.from('\u0013BitTorrent protocol')
var MESSAGE_KEEP_ALIVE = Buffer.from([0x00, 0x00, 0x00, 0x00])
var MESSAGE_CHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x00])
var MESSAGE_UNCHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x01])
var MESSAGE_INTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x02])
var MESSAGE_UNINTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x03])
const MESSAGE_RESERVED = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
const MESSAGE_PORT = [0x00, 0x00, 0x00, 0x03, 0x09, 0x00, 0x00]
var MESSAGE_RESERVED = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
var MESSAGE_PORT = [0x00, 0x00, 0x00, 0x03, 0x09, 0x00, 0x00]
function Request (piece, offset, length, callback) {
this.piece = piece
this.offset = offset
this.length = length
this.callback = callback
class Request {
constructor (piece, offset, length, callback) {
this.piece = piece
this.offset = offset
this.length = length
this.callback = callback
}
}
inherits(Wire, stream.Duplex)
class Wire extends stream.Duplex {
constructor () {
super()
function Wire () {
if (!(this instanceof Wire)) return new Wire()
stream.Duplex.call(this)
this._debugId = randombytes(4).toString('hex')
this._debug('new wire')
this._debugId = randombytes(4).toString('hex')
this._debug('new wire')
this.peerId = null // remote peer id (hex string)
this.peerIdBuffer = null // remote peer id (buffer)
this.type = null // connection type ('webrtc', 'tcpIncoming', 'tcpOutgoing', 'webSeed')
this.peerId = null // remote peer id (hex string)
this.peerIdBuffer = null // remote peer id (buffer)
this.type = null // connection type ('webrtc', 'tcpIncoming', 'tcpOutgoing', 'webSeed')
this.amChoking = true // are we choking the peer?
this.amInterested = false // are we interested in the peer?
this.amChoking = true // are we choking the peer?
this.amInterested = false // are we interested in the peer?
this.peerChoking = true // is the peer choking us?
this.peerInterested = false // is the peer interested in us?
this.peerChoking = true // is the peer choking us?
this.peerInterested = false // is the peer interested in us?
// The largest torrent that I know of (the Geocities archive) is ~641 GB and has
// ~41,000 pieces. Therefore, cap bitfield to 10x larger (400,000 bits) to support all
// possible torrents but prevent malicious peers from growing bitfield to fill memory.
this.peerPieces = new BitField(0, { grow: BITFIELD_GROW })
// The largest torrent that I know of (the Geocities archive) is ~641 GB and has
// ~41,000 pieces. Therefore, cap bitfield to 10x larger (400,000 bits) to support all
// possible torrents but prevent malicious peers from growing bitfield to fill memory.
this.peerPieces = new BitField(0, { grow: BITFIELD_GROW })
this.peerExtensions = {}
this.peerExtensions = {}
this.requests = [] // outgoing
this.peerRequests = [] // incoming
this.requests = [] // outgoing
this.peerRequests = [] // incoming
this.extendedMapping = {} // number -> string, ex: 1 -> 'ut_metadata'
this.peerExtendedMapping = {} // string -> number, ex: 9 -> 'ut_metadata'
this.extendedMapping = {} // number -> string, ex: 1 -> 'ut_metadata'
this.peerExtendedMapping = {} // string -> number, ex: 9 -> 'ut_metadata'
// The extended handshake to send, minus the "m" field, which gets automatically
// filled from `this.extendedMapping`
this.extendedHandshake = {}
// The extended handshake to send, minus the "m" field, which gets automatically
// filled from `this.extendedMapping`
this.extendedHandshake = {}
this.peerExtendedHandshake = {} // remote peer's extended handshake
this.peerExtendedHandshake = {} // remote peer's extended handshake
this._ext = {} // string -> function, ex 'ut_metadata' -> ut_metadata()
this._nextExt = 1
this._ext = {} // string -> function, ex 'ut_metadata' -> ut_metadata()
this._nextExt = 1
this.uploaded = 0
this.downloaded = 0
this.uploadSpeed = speedometer()
this.downloadSpeed = speedometer()
this.uploaded = 0
this.downloaded = 0
this.uploadSpeed = speedometer()
this.downloadSpeed = speedometer()
this._keepAliveInterval = null
this._timeout = null
this._timeoutMs = 0
this._keepAliveInterval = null
this._timeout = null
this._timeoutMs = 0
this.destroyed = false // was the wire ended by calling `destroy`?
this._finished = false
this.destroyed = false // was the wire ended by calling `destroy`?
this._finished = false
this._parserSize = 0 // number of needed bytes to parse next message from remote peer
this._parser = null // function to call once `this._parserSize` bytes are available
this._parserSize = 0 // number of needed bytes to parse next message from remote peer
this._parser = null // function to call once `this._parserSize` bytes are available
this._buffer = [] // incomplete message data
this._bufferSize = 0 // cached total length of buffers in `this._buffer`
this._buffer = [] // incomplete message data
this._bufferSize = 0 // cached total length of buffers in `this._buffer`
this.on('finish', this._onFinish)
this.on('finish', this._onFinish)
this._parseHandshake()
}
this._parseHandshake()
}
/**
* Set whether to send a "keep-alive" ping (sent every 55s)
* @param {boolean} enable
*/
setKeepAlive (enable) {
this._debug('setKeepAlive %s', enable)
clearInterval(this._keepAliveInterval)
if (enable === false) return
this._keepAliveInterval = setInterval(() => {
this.keepAlive()
}, KEEP_ALIVE_TIMEOUT)
}
/**
* Set whether to send a "keep-alive" ping (sent every 55s)
* @param {boolean} enable
*/
Wire.prototype.setKeepAlive = function (enable) {
var self = this
self._debug('setKeepAlive %s', enable)
clearInterval(self._keepAliveInterval)
if (enable === false) return
self._keepAliveInterval = setInterval(function () {
self.keepAlive()
}, KEEP_ALIVE_TIMEOUT)
}
/**
* Set the amount of time to wait before considering a request to be "timed out"
* @param {number} ms
* @param {boolean=} unref (should the timer be unref'd? default: false)
*/
setTimeout (ms, unref) {
this._debug('setTimeout ms=%d unref=%s', ms, unref)
this._clearTimeout()
this._timeoutMs = ms
this._timeoutUnref = !!unref
this._updateTimeout()
}
/**
* Set the amount of time to wait before considering a request to be "timed out"
* @param {number} ms
* @param {boolean=} unref (should the timer be unref'd? default: false)
*/
Wire.prototype.setTimeout = function (ms, unref) {
this._debug('setTimeout ms=%d unref=%s', ms, unref)
this._clearTimeout()
this._timeoutMs = ms
this._timeoutUnref = !!unref
this._updateTimeout()
}
destroy () {
if (this.destroyed) return
this.destroyed = true
this._debug('destroy')
this.emit('close')
this.end()
}
Wire.prototype.destroy = function () {
if (this.destroyed) return
this.destroyed = true
this._debug('destroy')
this.emit('close')
this.end()
}
Wire.prototype.end = function () {
this._debug('end')
this._onUninterested()
this._onChoke()
stream.Duplex.prototype.end.apply(this, arguments)
}
/**
* Use the specified protocol extension.
* @param {function} Extension
*/
Wire.prototype.use = function (Extension) {
var name = Extension.prototype.name
if (!name) {
throw new Error('Extension class requires a "name" property on the prototype')
end (...args) {
this._debug('end')
this._onUninterested()
this._onChoke()
super.end(...args)
}
this._debug('use extension.name=%s', name)
var ext = this._nextExt
var handler = new Extension(this)
/**
* Use the specified protocol extension.
* @param {function} Extension
*/
use (Extension) {
const name = Extension.prototype.name
if (!name) {
throw new Error('Extension class requires a "name" property on the prototype')
}
this._debug('use extension.name=%s', name)
function noop () {}
const ext = this._nextExt
const handler = new Extension(this)
if (typeof handler.onHandshake !== 'function') {
handler.onHandshake = noop
}
if (typeof handler.onExtendedHandshake !== 'function') {
handler.onExtendedHandshake = noop
}
if (typeof handler.onMessage !== 'function') {
handler.onMessage = noop
}
function noop () {}
this.extendedMapping[ext] = name
this._ext[name] = handler
this[name] = handler
if (typeof handler.onHandshake !== 'function') {
handler.onHandshake = noop
}
if (typeof handler.onExtendedHandshake !== 'function') {
handler.onExtendedHandshake = noop
}
if (typeof handler.onMessage !== 'function') {
handler.onMessage = noop
}
this._nextExt += 1
}
this.extendedMapping[ext] = name
this._ext[name] = handler
this[name] = handler
//
// OUTGOING MESSAGES
//
this._nextExt += 1
}
/**
* Message "keep-alive": <len=0000>
*/
Wire.prototype.keepAlive = function () {
this._debug('keep-alive')
this._push(MESSAGE_KEEP_ALIVE)
}
//
// OUTGOING MESSAGES
//
/**
* Message: "handshake" <pstrlen><pstr><reserved><info_hash><peer_id>
* @param {Buffer|string} infoHash (as Buffer or *hex* string)
* @param {Buffer|string} peerId
* @param {Object} extensions
*/
Wire.prototype.handshake = function (infoHash, peerId, extensions) {
var infoHashBuffer, peerIdBuffer
if (typeof infoHash === 'string') {
infoHash = infoHash.toLowerCase()
infoHashBuffer = Buffer.from(infoHash, 'hex')
} else {
infoHashBuffer = infoHash
infoHash = infoHashBuffer.toString('hex')
/**
* Message "keep-alive": <len=0000>
*/
keepAlive () {
this._debug('keep-alive')
this._push(MESSAGE_KEEP_ALIVE)
}
if (typeof peerId === 'string') {
peerIdBuffer = Buffer.from(peerId, 'hex')
} else {
peerIdBuffer = peerId
peerId = peerIdBuffer.toString('hex')
}
if (infoHashBuffer.length !== 20 || peerIdBuffer.length !== 20) {
throw new Error('infoHash and peerId MUST have length 20')
}
/**
* Message: "handshake" <pstrlen><pstr><reserved><info_hash><peer_id>
* @param {Buffer|string} infoHash (as Buffer or *hex* string)
* @param {Buffer|string} peerId
* @param {Object} extensions
*/
handshake (infoHash, peerId, extensions) {
let infoHashBuffer
let peerIdBuffer
if (typeof infoHash === 'string') {
infoHash = infoHash.toLowerCase()
infoHashBuffer = Buffer.from(infoHash, 'hex')
} else {
infoHashBuffer = infoHash
infoHash = infoHashBuffer.toString('hex')
}
if (typeof peerId === 'string') {
peerIdBuffer = Buffer.from(peerId, 'hex')
} else {
peerIdBuffer = peerId
peerId = peerIdBuffer.toString('hex')
}
this._debug('handshake i=%s p=%s exts=%o', infoHash, peerId, extensions)
if (infoHashBuffer.length !== 20 || peerIdBuffer.length !== 20) {
throw new Error('infoHash and peerId MUST have length 20')
}
var reserved = Buffer.from(MESSAGE_RESERVED)
this._debug('handshake i=%s p=%s exts=%o', infoHash, peerId, extensions)
// enable extended message
reserved[5] |= 0x10
const reserved = Buffer.from(MESSAGE_RESERVED)
if (extensions && extensions.dht) reserved[7] |= 1
// enable extended message
reserved[5] |= 0x10
this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHashBuffer, peerIdBuffer]))
this._handshakeSent = true
if (extensions && extensions.dht) reserved[7] |= 1
if (this.peerExtensions.extended && !this._extendedHandshakeSent) {
// Peer's handshake indicated support already
// (incoming connection)
this._sendExtendedHandshake()
}
}
this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHashBuffer, peerIdBuffer]))
this._handshakeSent = true
/* Peer supports BEP-0010, send extended handshake.
*
* This comes after the 'handshake' event to give the user a chance to populate
* `this.extendedHandshake` and `this.extendedMapping` before the extended handshake
* is sent to the remote peer.
*/
Wire.prototype._sendExtendedHandshake = function () {
// Create extended message object from registered extensions
var msg = extend(this.extendedHandshake)
msg.m = {}
for (var ext in this.extendedMapping) {
var name = this.extendedMapping[ext]
msg.m[name] = Number(ext)
if (this.peerExtensions.extended && !this._extendedHandshakeSent) {
// Peer's handshake indicated support already
// (incoming connection)
this._sendExtendedHandshake()
}
}
// Send extended handshake
this.extended(0, bencode.encode(msg))
this._extendedHandshakeSent = true
}
/* Peer supports BEP-0010, send extended handshake.
*
* This comes after the 'handshake' event to give the user a chance to populate
* `this.extendedHandshake` and `this.extendedMapping` before the extended handshake
* is sent to the remote peer.
*/
_sendExtendedHandshake () {
// Create extended message object from registered extensions
const msg = extend(this.extendedHandshake)
msg.m = {}
for (const ext in this.extendedMapping) {
const name = this.extendedMapping[ext]
msg.m[name] = Number(ext)
}
/**
* Message "choke": <len=0001><id=0>
*/
Wire.prototype.choke = function () {
if (this.amChoking) return
this.amChoking = true
this._debug('choke')
while (this.peerRequests.length) {
this.peerRequests.pop()
// Send extended handshake
this.extended(0, bencode.encode(msg))
this._extendedHandshakeSent = true
}
this._push(MESSAGE_CHOKE)
}
/**
* Message "unchoke": <len=0001><id=1>
*/
Wire.prototype.unchoke = function () {
if (!this.amChoking) return
this.amChoking = false
this._debug('unchoke')
this._push(MESSAGE_UNCHOKE)
}
/**
* Message "choke": <len=0001><id=0>
*/
choke () {
if (this.amChoking) return
this.amChoking = true
this._debug('choke')
while (this.peerRequests.length) {
this.peerRequests.pop()
}
this._push(MESSAGE_CHOKE)
}
/**
* Message "interested": <len=0001><id=2>
*/
Wire.prototype.interested = function () {
if (this.amInterested) return
this.amInterested = true
this._debug('interested')
this._push(MESSAGE_INTERESTED)
}
/**
* Message "unchoke": <len=0001><id=1>
*/
unchoke () {
if (!this.amChoking) return
this.amChoking = false
this._debug('unchoke')
this._push(MESSAGE_UNCHOKE)
}
/**
* Message "uninterested": <len=0001><id=3>
*/
Wire.prototype.uninterested = function () {
if (!this.amInterested) return
this.amInterested = false
this._debug('uninterested')
this._push(MESSAGE_UNINTERESTED)
}
/**
* Message "interested": <len=0001><id=2>
*/
interested () {
if (this.amInterested) return
this.amInterested = true
this._debug('interested')
this._push(MESSAGE_INTERESTED)
}
/**
* Message "have": <len=0005><id=4><piece index>
* @param {number} index
*/
Wire.prototype.have = function (index) {
this._debug('have %d', index)
this._message(4, [index], null)
}
/**
* Message "uninterested": <len=0001><id=3>
*/
uninterested () {
if (!this.amInterested) return
this.amInterested = false
this._debug('uninterested')
this._push(MESSAGE_UNINTERESTED)
}
/**
* Message "bitfield": <len=0001+X><id=5><bitfield>
* @param {BitField|Buffer} bitfield
*/
Wire.prototype.bitfield = function (bitfield) {
this._debug('bitfield')
if (!Buffer.isBuffer(bitfield)) bitfield = bitfield.buffer
this._message(5, [], bitfield)
}
/**
* Message "have": <len=0005><id=4><piece index>
* @param {number} index
*/
have (index) {
this._debug('have %d', index)
this._message(4, [index], null)
}
/**
* Message "request": <len=0013><id=6><index><begin><length>
* @param {number} index
* @param {number} offset
* @param {number} length
* @param {function} cb
*/
Wire.prototype.request = function (index, offset, length, cb) {
if (!cb) cb = function () {}
if (this._finished) return cb(new Error('wire is closed'))
if (this.peerChoking) return cb(new Error('peer is choking'))
/**
* Message "bitfield": <len=0001+X><id=5><bitfield>
* @param {BitField|Buffer} bitfield
*/
bitfield (bitfield) {
this._debug('bitfield')
if (!Buffer.isBuffer(bitfield)) bitfield = bitfield.buffer
this._message(5, [], bitfield)
}
this._debug('request index=%d offset=%d length=%d', index, offset, length)
/**
* Message "request": <len=0013><id=6><index><begin><length>
* @param {number} index
* @param {number} offset
* @param {number} length
* @param {function} cb
*/
request (index, offset, length, cb) {
if (!cb) cb = () => {}
if (this._finished) return cb(new Error('wire is closed'))
if (this.peerChoking) return cb(new Error('peer is choking'))
this.requests.push(new Request(index, offset, length, cb))
this._updateTimeout()
this._message(6, [index, offset, length], null)
}
this._debug('request index=%d offset=%d length=%d', index, offset, length)
/**
* Message "piece": <len=0009+X><id=7><index><begin><block>
* @param {number} index
* @param {number} offset
* @param {Buffer} buffer
*/
Wire.prototype.piece = function (index, offset, buffer) {
this._debug('piece index=%d offset=%d', index, offset)
this.uploaded += buffer.length
this.uploadSpeed(buffer.length)
this.emit('upload', buffer.length)
this._message(7, [index, offset], buffer)
}
this.requests.push(new Request(index, offset, length, cb))
this._updateTimeout()
this._message(6, [index, offset, length], null)
}
/**
* Message "cancel": <len=0013><id=8><index><begin><length>
* @param {number} index
* @param {number} offset
* @param {number} length
*/
Wire.prototype.cancel = function (index, offset, length) {
this._debug('cancel index=%d offset=%d length=%d', index, offset, length)
this._callback(
pull(this.requests, index, offset, length),
new Error('request was cancelled'),
null
)
this._message(8, [index, offset, length], null)
}
/**
* Message "piece": <len=0009+X><id=7><index><begin><block>
* @param {number} index
* @param {number} offset
* @param {Buffer} buffer
*/
piece (index, offset, buffer) {
this._debug('piece index=%d offset=%d', index, offset)
this.uploaded += buffer.length
this.uploadSpeed(buffer.length)
this.emit('upload', buffer.length)
this._message(7, [index, offset], buffer)
}
/**
* Message: "port" <len=0003><id=9><listen-port>
* @param {Number} port
*/
Wire.prototype.port = function (port) {
this._debug('port %d', port)
var message = Buffer.from(MESSAGE_PORT)
message.writeUInt16BE(port, 5)
this._push(message)
}
/**
* Message "cancel": <len=0013><id=8><index><begin><length>
* @param {number} index
* @param {number} offset
* @param {number} length
*/
cancel (index, offset, length) {
this._debug('cancel index=%d offset=%d length=%d', index, offset, length)
this._callback(
this._pull(this.requests, index, offset, length),
new Error('request was cancelled'),
null
)
this._message(8, [index, offset, length], null)
}
/**
* Message: "extended" <len=0005+X><id=20><ext-number><payload>
* @param {number|string} ext
* @param {Object} obj
*/
Wire.prototype.extended = function (ext, obj) {
this._debug('extended ext=%s', ext)
if (typeof ext === 'string' && this.peerExtendedMapping[ext]) {
ext = this.peerExtendedMapping[ext]
/**
* Message: "port" <len=0003><id=9><listen-port>
* @param {Number} port
*/
port (port) {
this._debug('port %d', port)
const message = Buffer.from(MESSAGE_PORT)
message.writeUInt16BE(port, 5)
this._push(message)
}
if (typeof ext === 'number') {
var extId = Buffer.from([ext])
var buf = Buffer.isBuffer(obj) ? obj : bencode.encode(obj)
this._message(20, [], Buffer.concat([extId, buf]))
} else {
throw new Error('Unrecognized extension: ' + ext)
/**
* Message: "extended" <len=0005+X><id=20><ext-number><payload>
* @param {number|string} ext
* @param {Object} obj
*/
extended (ext, obj) {
this._debug('extended ext=%s', ext)
if (typeof ext === 'string' && this.peerExtendedMapping[ext]) {
ext = this.peerExtendedMapping[ext]
}
if (typeof ext === 'number') {
const extId = Buffer.from([ext])
const buf = Buffer.isBuffer(obj) ? obj : bencode.encode(obj)
this._message(20, [], Buffer.concat([extId, buf]))
} else {
throw new Error(`Unrecognized extension: ${ext}`)
}
}
}
/**
* Duplex stream method. Called whenever the remote peer stream wants data. No-op
* since we'll just push data whenever we get it.
*/
Wire.prototype._read = function () {}
/**
* Duplex stream method. Called whenever the remote peer stream wants data. No-op
* since we'll just push data whenever we get it.
*/
_read () {}
/**
* Send a message to the remote peer.
*/
Wire.prototype._message = function (id, numbers, data) {
var dataLength = data ? data.length : 0
var buffer = Buffer.allocUnsafe(5 + (4 * numbers.length))
/**
* Send a message to the remote peer.
*/
_message (id, numbers, data) {
const dataLength = data ? data.length : 0
const buffer = Buffer.allocUnsafe(5 + (4 * numbers.length))
buffer.writeUInt32BE(buffer.length + dataLength - 4, 0)
buffer[4] = id
for (var i = 0; i < numbers.length; i++) {
buffer.writeUInt32BE(numbers[i], 5 + (4 * i))
buffer.writeUInt32BE(buffer.length + dataLength - 4, 0)
buffer[4] = id
for (let i = 0; i < numbers.length; i++) {
buffer.writeUInt32BE(numbers[i], 5 + (4 * i))
}
this._push(buffer)
if (data) this._push(data)
}
this._push(buffer)
if (data) this._push(data)
}
_push (data) {
if (this._finished) return
return this.push(data)
}
Wire.prototype._push = function (data) {
if (this._finished) return
return this.push(data)
}
//
// INCOMING MESSAGES
//
//
// INCOMING MESSAGES
//
_onKeepAlive () {
this._debug('got keep-alive')
this.emit('keep-alive')
}
Wire.prototype._onKeepAlive = function () {
this._debug('got keep-alive')
this.emit('keep-alive')
}
_onHandshake (infoHashBuffer, peerIdBuffer, extensions) {
const infoHash = infoHashBuffer.toString('hex')
const peerId = peerIdBuffer.toString('hex')
Wire.prototype._onHandshake = function (infoHashBuffer, peerIdBuffer, extensions) {
var infoHash = infoHashBuffer.toString('hex')
var peerId = peerIdBuffer.toString('hex')
this._debug('got handshake i=%s p=%s exts=%o', infoHash, peerId, extensions)
this._debug('got handshake i=%s p=%s exts=%o', infoHash, peerId, extensions)
this.peerId = peerId
this.peerIdBuffer = peerIdBuffer
this.peerExtensions = extensions
this.peerId = peerId
this.peerIdBuffer = peerIdBuffer
this.peerExtensions = extensions
this.emit('handshake', infoHash, peerId, extensions)
this.emit('handshake', infoHash, peerId, extensions)
let name
for (name in this._ext) {
this._ext[name].onHandshake(infoHash, peerId, extensions)
}
var name
for (name in this._ext) {
this._ext[name].onHandshake(infoHash, peerId, extensions)
if (extensions.extended && this._handshakeSent &&
!this._extendedHandshakeSent) {
// outgoing connection
this._sendExtendedHandshake()
}
}
if (extensions.extended && this._handshakeSent &&
!this._extendedHandshakeSent) {
// outgoing connection
this._sendExtendedHandshake()
_onChoke () {
this.peerChoking = true
this._debug('got choke')
this.emit('choke')
while (this.requests.length) {
this._callback(this.requests.pop(), new Error('peer is choking'), null)
}
}
}
Wire.prototype._onChoke = function () {
this.peerChoking = true
this._debug('got choke')
this.emit('choke')
while (this.requests.length) {
this._callback(this.requests.pop(), new Error('peer is choking'), null)
_onUnchoke () {
this.peerChoking = false
this._debug('got unchoke')
this.emit('unchoke')
}
}
Wire.prototype._onUnchoke = function () {
this.peerChoking = false
this._debug('got unchoke')
this.emit('unchoke')
}
_onInterested () {
this.peerInterested = true
this._debug('got interested')
this.emit('interested')
}
Wire.prototype._onInterested = function () {
this.peerInterested = true
this._debug('got interested')
this.emit('interested')
}
_onUninterested () {
this.peerInterested = false
this._debug('got uninterested')
this.emit('uninterested')
}
Wire.prototype._onUninterested = function () {
this.peerInterested = false
this._debug('got uninterested')
this.emit('uninterested')
}
_onHave (index) {
if (this.peerPieces.get(index)) return
this._debug('got have %d', index)
Wire.prototype._onHave = function (index) {
if (this.peerPieces.get(index)) return
this._debug('got have %d', index)
this.peerPieces.set(index, true)
this.emit('have', index)
}
this.peerPieces.set(index, true)
this.emit('have', index)
}
_onBitField (buffer) {
this.peerPieces = new BitField(buffer)
this._debug('got bitfield')
this.emit('bitfield', this.peerPieces)
}
Wire.prototype._onBitField = function (buffer) {
this.peerPieces = new BitField(buffer)
this._debug('got bitfield')
this.emit('bitfield', this.peerPieces)
}
_onRequest (index, offset, length) {
if (this.amChoking) return
this._debug('got request index=%d offset=%d length=%d', index, offset, length)
Wire.prototype._onRequest = function (index, offset, length) {
var self = this
if (self.amChoking) return
self._debug('got request index=%d offset=%d length=%d', index, offset, length)
const respond = (err, buffer) => {
if (request !== this._pull(this.peerRequests, index, offset, length)) return
if (err) return this._debug('error satisfying request index=%d offset=%d length=%d (%s)', index, offset, length, err.message)
this.piece(index, offset, buffer)
}
var respond = function (err, buffer) {
if (request !== pull(self.peerRequests, index, offset, length)) return
if (err) return self._debug('error satisfying request index=%d offset=%d length=%d (%s)', index, offset, length, err.message)
self.piece(index, offset, buffer)
var request = new Request(index, offset, length, respond)
this.peerRequests.push(request)
this.emit('request', index, offset, length, respond)
}
var request = new Request(index, offset, length, respond)
self.peerRequests.push(request)
self.emit('request', index, offset, length, respond)
}
_onPiece (index, offset, buffer) {
this._debug('got piece index=%d offset=%d', index, offset)
this._callback(this._pull(this.requests, index, offset, buffer.length), null, buffer)
this.downloaded += buffer.length
this.downloadSpeed(buffer.length)
this.emit('download', buffer.length)
this.emit('piece', index, offset, buffer)
}
Wire.prototype._onPiece = function (index, offset, buffer) {
this._debug('got piece index=%d offset=%d', index, offset)
this._callback(pull(this.requests, index, offset, buffer.length), null, buffer)
this.downloaded += buffer.length
this.downloadSpeed(buffer.length)
this.emit('download', buffer.length)
this.emit('piece', index, offset, buffer)
}
_onCancel (index, offset, length) {
this._debug('got cancel index=%d offset=%d length=%d', index, offset, length)
this._pull(this.peerRequests, index, offset, length)
this.emit('cancel', index, offset, length)
}
Wire.prototype._onCancel = function (index, offset, length) {
this._debug('got cancel index=%d offset=%d length=%d', index, offset, length)
pull(this.peerRequests, index, offset, length)
this.emit('cancel', index, offset, length)
}
_onPort (port) {
this._debug('got port %d', port)
this.emit('port', port)
}
Wire.prototype._onPort = function (port) {
this._debug('got port %d', port)
this.emit('port', port)
}
_onExtended (ext, buf) {
if (ext === 0) {
let info
try {
info = bencode.decode(buf)
} catch (err) {
this._debug('ignoring invalid extended handshake: %s', err.message || err)
}
Wire.prototype._onExtended = function (ext, buf) {
if (ext === 0) {
var info
try {
info = bencode.decode(buf)
} catch (err) {
this._debug('ignoring invalid extended handshake: %s', err.message || err)
}
if (!info) return
this.peerExtendedHandshake = info
if (!info) return
this.peerExtendedHandshake = info
var name
if (typeof info.m === 'object') {
for (name in info.m) {
this.peerExtendedMapping[name] = Number(info.m[name].toString())
let name
if (typeof info.m === 'object') {
for (name in info.m) {
this.peerExtendedMapping[name] = Number(info.m[name].toString())
}
}
}
for (name in this._ext) {
if (this.peerExtendedMapping[name]) {
this._ext[name].onExtendedHandshake(this.peerExtendedHandshake)
for (name in this._ext) {
if (this.peerExtendedMapping[name]) {
this._ext[name].onExtendedHandshake(this.peerExtendedHandshake)
}
}
}
this._debug('got extended handshake')
this.emit('extended', 'handshake', this.peerExtendedHandshake)
} else {
if (this.extendedMapping[ext]) {
ext = this.extendedMapping[ext] // friendly name for extension
if (this._ext[ext]) {
// there is an registered extension handler, so call it
this._ext[ext].onMessage(buf)
this._debug('got extended handshake')
this.emit('extended', 'handshake', this.peerExtendedHandshake)
} else {
if (this.extendedMapping[ext]) {
ext = this.extendedMapping[ext] // friendly name for extension
if (this._ext[ext]) {
// there is an registered extension handler, so call it
this._ext[ext].onMessage(buf)
}
}
this._debug('got extended message ext=%s', ext)
this.emit('extended', ext, buf)
}
this._debug('got extended message ext=%s', ext)
this.emit('extended', ext, buf)
}
}
Wire.prototype._onTimeout = function () {
this._debug('request timed out')
this._callback(this.requests.shift(), new Error('request has timed out'), null)
this.emit('timeout')
}
_onTimeout () {
this._debug('request timed out')
this._callback(this.requests.shift(), new Error('request has timed out'), null)
this.emit('timeout')
}
/**
* Duplex stream method. Called whenever the remote peer has data for us. Data that the
* remote peer sends gets buffered (i.e. not actually processed) until the right number
* of bytes have arrived, determined by the last call to `this._parse(number, callback)`.
* Once enough bytes have arrived to process the message, the callback function
* (i.e. `this._parser`) gets called with the full buffer of data.
* @param {Buffer} data
* @param {string} encoding
* @param {function} cb
*/
Wire.prototype._write = function (data, encoding, cb) {
this._bufferSize += data.length
this._buffer.push(data)
/**
* Duplex stream method. Called whenever the remote peer has data for us. Data that the
* remote peer sends gets buffered (i.e. not actually processed) until the right number
* of bytes have arrived, determined by the last call to `this._parse(number, callback)`.
* Once enough bytes have arrived to process the message, the callback function
* (i.e. `this._parser`) gets called with the full buffer of data.
* @param {Buffer} data
* @param {string} encoding
* @param {function} cb
*/
_write (data, encoding, cb) {
this._bufferSize += data.length
this._buffer.push(data)
while (this._bufferSize >= this._parserSize) {
var buffer = (this._buffer.length === 1)
? this._buffer[0]
: Buffer.concat(this._buffer)
this._bufferSize -= this._parserSize
this._buffer = this._bufferSize
? [buffer.slice(this._parserSize)]
: []
this._parser(buffer.slice(0, this._parserSize))
while (this._bufferSize >= this._parserSize) {
const buffer = (this._buffer.length === 1)
? this._buffer[0]
: Buffer.concat(this._buffer)
this._bufferSize -= this._parserSize
this._buffer = this._bufferSize
? [buffer.slice(this._parserSize)]
: []
this._parser(buffer.slice(0, this._parserSize))
}
cb(null) // Signal that we're ready for more data
}
cb(null) // Signal that we're ready for more data
}
_callback (request, err, buffer) {
if (!request) return
Wire.prototype._callback = function (request, err, buffer) {
if (!request) return
this._clearTimeout()
this._clearTimeout()
if (!this.peerChoking && !this._finished) this._updateTimeout()
request.callback(err, buffer)
}
if (!this.peerChoking && !this._finished) this._updateTimeout()
request.callback(err, buffer)
}
_clearTimeout () {
if (!this._timeout) return
Wire.prototype._clearTimeout = function () {
if (!this._timeout) return
clearTimeout(this._timeout)
this._timeout = null
}
clearTimeout(this._timeout)
this._timeout = null
}
_updateTimeout () {
if (!this._timeoutMs || !this.requests.length || this._timeout) return
Wire.prototype._updateTimeout = function () {
var self = this
if (!self._timeoutMs || !self.requests.length || self._timeout) return
this._timeout = setTimeout(() => this._onTimeout(), this._timeoutMs)
if (this._timeoutUnref && this._timeout.unref) this._timeout.unref()
}
self._timeout = setTimeout(function () {
self._onTimeout()
}, self._timeoutMs)
if (self._timeoutUnref && self._timeout.unref) self._timeout.unref()
}
/**
* Takes a number of bytes that the local peer is waiting to receive from the remote peer
* in order to parse a complete message, and a callback function to be called once enough
* bytes have arrived.
* @param {number} size
* @param {function} parser
*/
_parse (size, parser) {
this._parserSize = size
this._parser = parser
}
/**
* Takes a number of bytes that the local peer is waiting to receive from the remote peer
* in order to parse a complete message, and a callback function to be called once enough
* bytes have arrived.
* @param {number} size
* @param {function} parser
*/
Wire.prototype._parse = function (size, parser) {
this._parserSize = size
this._parser = parser
}
/**
* Handle the first 4 bytes of a message, to determine the length of bytes that must be
* waited for in order to have the whole message.
* @param {Buffer} buffer
*/
_onMessageLength (buffer) {
const length = buffer.readUInt32BE(0)
if (length > 0) {
this._parse(length, this._onMessage)
} else {
this._onKeepAlive()
this._parse(4, this._onMessageLength)
}
}
/**
* Handle the first 4 bytes of a message, to determine the length of bytes that must be
* waited for in order to have the whole message.
* @param {Buffer} buffer
*/
Wire.prototype._onMessageLength = function (buffer) {
var length = buffer.readUInt32BE(0)
if (length > 0) {
this._parse(length, this._onMessage)
} else {
this._onKeepAlive()
/**
* Handle a message from the remote peer.
* @param {Buffer} buffer
*/
_onMessage (buffer) {
this._parse(4, this._onMessageLength)
switch (buffer[0]) {
case 0:
return this._onChoke()
case 1:
return this._onUnchoke()
case 2:
return this._onInterested()
case 3:
return this._onUninterested()
case 4:
return this._onHave(buffer.readUInt32BE(1))
case 5:
return this._onBitField(buffer.slice(1))
case 6:
return this._onRequest(
buffer.readUInt32BE(1),
buffer.readUInt32BE(5),
buffer.readUInt32BE(9)
)
case 7:
return this._onPiece(
buffer.readUInt32BE(1),
buffer.readUInt32BE(5),
buffer.slice(9)
)
case 8:
return this._onCancel(
buffer.readUInt32BE(1),
buffer.readUInt32BE(5),
buffer.readUInt32BE(9)
)
case 9:
return this._onPort(buffer.readUInt16BE(1))
case 20:
return this._onExtended(buffer.readUInt8(1), buffer.slice(2))
default:
this._debug('got unknown message')
return this.emit('unknownmessage', buffer)
}
}
}
/**
* Handle a message from the remote peer.
* @param {Buffer} buffer
*/
Wire.prototype._onMessage = function (buffer) {
this._parse(4, this._onMessageLength)
switch (buffer[0]) {
case 0:
return this._onChoke()
case 1:
return this._onUnchoke()
case 2:
return this._onInterested()
case 3:
return this._onUninterested()
case 4:
return this._onHave(buffer.readUInt32BE(1))
case 5:
return this._onBitField(buffer.slice(1))
case 6:
return this._onRequest(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.readUInt32BE(9))
case 7:
return this._onPiece(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.slice(9))
case 8:
return this._onCancel(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.readUInt32BE(9))
case 9:
return this._onPort(buffer.readUInt16BE(1))
case 20:
return this._onExtended(buffer.readUInt8(1), buffer.slice(2))
default:
this._debug('got unknown message')
return this.emit('unknownmessage', buffer)
}
}
Wire.prototype._parseHandshake = function () {
var self = this
self._parse(1, function (buffer) {
var pstrlen = buffer.readUInt8(0)
self._parse(pstrlen + 48, function (handshake) {
var protocol = handshake.slice(0, pstrlen)
if (protocol.toString() !== 'BitTorrent protocol') {
self._debug('Error: wire not speaking BitTorrent protocol (%s)', protocol.toString())
self.end()
return
}
handshake = handshake.slice(pstrlen)
self._onHandshake(handshake.slice(8, 28), handshake.slice(28, 48), {
dht: !!(handshake[7] & 0x01), // see bep_0005
extended: !!(handshake[5] & 0x10) // see bep_0010
_parseHandshake () {
this._parse(1, buffer => {
const pstrlen = buffer.readUInt8(0)
this._parse(pstrlen + 48, handshake => {
const protocol = handshake.slice(0, pstrlen)
if (protocol.toString() !== 'BitTorrent protocol') {
this._debug('Error: wire not speaking BitTorrent protocol (%s)', protocol.toString())
this.end()
return
}
handshake = handshake.slice(pstrlen)
this._onHandshake(handshake.slice(8, 28), handshake.slice(28, 48), {
dht: !!(handshake[7] & 0x01), // see bep_0005
extended: !!(handshake[5] & 0x10) // see bep_0010
})
this._parse(4, this._onMessageLength)
})
self._parse(4, self._onMessageLength)
})
})
}
}
Wire.prototype._onFinish = function () {
this._finished = true
_onFinish () {
this._finished = true
this.push(null) // stream cannot be half open, so signal the end of it
while (this.read()) {} // consume and discard the rest of the stream data
this.push(null) // stream cannot be half open, so signal the end of it
while (this.read()) {} // consume and discard the rest of the stream data
clearInterval(this._keepAliveInterval)
this._parse(Number.MAX_VALUE, function () {})
while (this.peerRequests.length) {
this.peerRequests.pop()
clearInterval(this._keepAliveInterval)
this._parse(Number.MAX_VALUE, () => {})
while (this.peerRequests.length) {
this.peerRequests.pop()
}
while (this.requests.length) {
this._callback(this.requests.pop(), new Error('wire was closed'), null)
}
}
while (this.requests.length) {
this._callback(this.requests.pop(), new Error('wire was closed'), null)
_debug (...args) {
args[0] = `[${this._debugId}] ${args[0]}`
debug(...args)
}
}
Wire.prototype._debug = function () {
var args = [].slice.call(arguments)
args[0] = '[' + this._debugId + '] ' + args[0]
debug.apply(null, args)
}
function pull (requests, piece, offset, length) {
for (var i = 0; i < requests.length; i++) {
var req = requests[i]
if (req.piece === piece && req.offset === offset && req.length === length) {
arrayRemove(requests, i)
return req
_pull (requests, piece, offset, length) {
for (let i = 0; i < requests.length; i++) {
const req = requests[i]
if (req.piece === piece && req.offset === offset && req.length === length) {
arrayRemove(requests, i)
return req
}
}
return null
}
return null
}
module.exports = Wire
{
"name": "bittorrent-protocol",
"description": "Simple, robust, BitTorrent peer wire protocol implementation",
"version": "2.4.2",
"version": "3.0.0",
"author": {

@@ -17,6 +17,4 @@ "name": "WebTorrent, LLC",

"debug": "^3.1.0",
"inherits": "^2.0.1",
"randombytes": "^2.0.5",
"readable-stream": "^2.3.2",
"safe-buffer": "^5.1.1",
"speedometer": "^1.0.0",

@@ -27,3 +25,3 @@ "unordered-array-remove": "^1.0.2",

"devDependencies": {
"airtap": "0.0.7",
"airtap": "0.1.0",
"standard": "*",

@@ -30,0 +28,0 @@ "tape": "^4.0.0"

@@ -40,7 +40,7 @@ # bittorrent-protocol [![travis][travis-image]][travis-url] [![npm][npm-image]][npm-url] [![downloads][downloads-image]][downloads-url] [![javascript style guide][standard-image]][standard-url]

```js
var Protocol = require('bittorrent-protocol')
var net = require('net')
const Protocol = require('bittorrent-protocol')
const net = require('net')
net.createServer(function (socket) {
var wire = new Protocol()
net.createServer(socket => {
const wire = new Protocol()

@@ -50,3 +50,3 @@ // pipe to and from the protocol

wire.on('handshake', function (infoHash, peerId) {
wire.on('handshake', (infoHash, peerId) => {
// receive a handshake (infoHash and peerId are hex strings)

@@ -58,3 +58,3 @@

wire.on('unchoke', function () {
wire.on('unchoke', () => {
console.log('peer is no longer choking us: ' + wire.peerChoking)

@@ -74,3 +74,3 @@ })

wire.handshake(infoHash, peerId, { dht: true })
wire.on('handshake', function (infoHash, peerId, extensions) {
wire.on('handshake', (infoHash, peerId, extensions) => {
// receive a handshake (infoHash and peerId are hex strings)

@@ -92,6 +92,6 @@ console.log(extensions.dht) // supports DHT (BEP-0005)

wire.on('choke', function () {
wire.on('choke', () => {
// the peer is now choking us
})
wire.on('unchoke', function () {
wire.on('unchoke', () => {
// peer is no longer choking us

@@ -109,6 +109,6 @@ })

wire.on('interested', function () {
wire.on('interested', () => {
// peer is now interested
})
wire.on('uninterested', function () {
wire.on('uninterested', () => {
// peer is no longer interested

@@ -125,3 +125,3 @@ })

wire.bitfield(buffer)
wire.on('bitfield', function (bitfield) {
wire.on('bitfield', bitfield => {
// bitfield received from the peer

@@ -132,3 +132,3 @@ })

wire.have(pieceIndex)
wire.on('have', function (pieceIndex) {
wire.on('have', pieceIndex => {
// peer has sent you a have message

@@ -152,3 +152,3 @@ })

// request a block from a peer
wire.request(pieceIndex, offset, length, function (err, block) {
wire.request(pieceIndex, offset, length, (err, block) => {
if (err) {

@@ -165,3 +165,3 @@ // there was an error (peer has started choking us etc)

// receive a request from a peer
wire.on('request', function (pieceIndex, offset, length, callback) {
wire.on('request', (pieceIndex, offset, length, callback) => {
// ... read block ...

@@ -192,3 +192,3 @@ callback(null, block) // respond back to the peer

wire.port(dhtPort)
wire.on('port', function (dhtPort) {
wire.on('port', dhtPort => {
// peer has sent a port to us

@@ -212,3 +212,3 @@ })

wire.setKeepAlive(true)
wire.on('keep-alive', function () {
wire.on('keep-alive', () => {
// peer sent a keep alive - just ignore it

@@ -253,6 +253,6 @@ })

wire.on('download', function (numberOfBytes) {
wire.on('download', numberOfBytes => {
...
})
wire.on('upload', function (numberOfBytes) {
wire.on('upload', numberOfBytes => {
...

@@ -283,8 +283,8 @@ })

```js
var Protocol = require('bittorrent-protocol')
var net = require('net')
var ut_metadata = require('ut_metadata')
const Protocol = require('bittorrent-protocol')
const net = require('net')
const ut_metadata = require('ut_metadata')
net.createServer(function (socket) {
var wire = new Protocol()
net.createServer(socket => {
const wire = new Protocol()
socket.pipe(wire).pipe(socket)

@@ -301,3 +301,3 @@

// 'metadata' event will fire when the metadata arrives and is verified to be correct!
wire.ut_metadata.on('metadata', function (metadata) {
wire.ut_metadata.on('metadata', metadata => {
// got metadata!

@@ -312,3 +312,3 @@

// probably not going to arrive for one of the above reasons.
wire.ut_metadata.on('warning', function (err) {
wire.ut_metadata.on('warning', err => {
console.log(err.message)

@@ -318,3 +318,3 @@ })

// handle handshake
wire.on('handshake', function (infoHash, peerId) {
wire.on('handshake', (infoHash, peerId) => {
// receive a handshake (infoHash and peerId are hex strings)

@@ -321,0 +321,0 @@ wire.handshake(new Buffer('my info hash'), new Buffer('my peer id'))

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc