bittorrent-protocol
Advanced tools
Comparing version 2.4.2 to 3.0.0
1230
index.js
@@ -1,745 +0,747 @@ | ||
module.exports = Wire | ||
const arrayRemove = require('unordered-array-remove') | ||
const bencode = require('bencode') | ||
const BitField = require('bitfield') | ||
const debug = require('debug')('bittorrent-protocol') | ||
const extend = require('xtend') | ||
const randombytes = require('randombytes') | ||
const speedometer = require('speedometer') | ||
const stream = require('readable-stream') | ||
var arrayRemove = require('unordered-array-remove') | ||
var bencode = require('bencode') | ||
var BitField = require('bitfield') | ||
var Buffer = require('safe-buffer').Buffer | ||
var debug = require('debug')('bittorrent-protocol') | ||
var extend = require('xtend') | ||
var inherits = require('inherits') | ||
var randombytes = require('randombytes') | ||
var speedometer = require('speedometer') | ||
var stream = require('readable-stream') | ||
const BITFIELD_GROW = 400000 | ||
const KEEP_ALIVE_TIMEOUT = 55000 | ||
var BITFIELD_GROW = 400000 | ||
var KEEP_ALIVE_TIMEOUT = 55000 | ||
const MESSAGE_PROTOCOL = Buffer.from('\u0013BitTorrent protocol') | ||
const MESSAGE_KEEP_ALIVE = Buffer.from([0x00, 0x00, 0x00, 0x00]) | ||
const MESSAGE_CHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x00]) | ||
const MESSAGE_UNCHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x01]) | ||
const MESSAGE_INTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x02]) | ||
const MESSAGE_UNINTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x03]) | ||
var MESSAGE_PROTOCOL = Buffer.from('\u0013BitTorrent protocol') | ||
var MESSAGE_KEEP_ALIVE = Buffer.from([0x00, 0x00, 0x00, 0x00]) | ||
var MESSAGE_CHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x00]) | ||
var MESSAGE_UNCHOKE = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x01]) | ||
var MESSAGE_INTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x02]) | ||
var MESSAGE_UNINTERESTED = Buffer.from([0x00, 0x00, 0x00, 0x01, 0x03]) | ||
const MESSAGE_RESERVED = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] | ||
const MESSAGE_PORT = [0x00, 0x00, 0x00, 0x03, 0x09, 0x00, 0x00] | ||
var MESSAGE_RESERVED = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] | ||
var MESSAGE_PORT = [0x00, 0x00, 0x00, 0x03, 0x09, 0x00, 0x00] | ||
function Request (piece, offset, length, callback) { | ||
this.piece = piece | ||
this.offset = offset | ||
this.length = length | ||
this.callback = callback | ||
class Request { | ||
constructor (piece, offset, length, callback) { | ||
this.piece = piece | ||
this.offset = offset | ||
this.length = length | ||
this.callback = callback | ||
} | ||
} | ||
inherits(Wire, stream.Duplex) | ||
class Wire extends stream.Duplex { | ||
constructor () { | ||
super() | ||
function Wire () { | ||
if (!(this instanceof Wire)) return new Wire() | ||
stream.Duplex.call(this) | ||
this._debugId = randombytes(4).toString('hex') | ||
this._debug('new wire') | ||
this._debugId = randombytes(4).toString('hex') | ||
this._debug('new wire') | ||
this.peerId = null // remote peer id (hex string) | ||
this.peerIdBuffer = null // remote peer id (buffer) | ||
this.type = null // connection type ('webrtc', 'tcpIncoming', 'tcpOutgoing', 'webSeed') | ||
this.peerId = null // remote peer id (hex string) | ||
this.peerIdBuffer = null // remote peer id (buffer) | ||
this.type = null // connection type ('webrtc', 'tcpIncoming', 'tcpOutgoing', 'webSeed') | ||
this.amChoking = true // are we choking the peer? | ||
this.amInterested = false // are we interested in the peer? | ||
this.amChoking = true // are we choking the peer? | ||
this.amInterested = false // are we interested in the peer? | ||
this.peerChoking = true // is the peer choking us? | ||
this.peerInterested = false // is the peer interested in us? | ||
this.peerChoking = true // is the peer choking us? | ||
this.peerInterested = false // is the peer interested in us? | ||
// The largest torrent that I know of (the Geocities archive) is ~641 GB and has | ||
// ~41,000 pieces. Therefore, cap bitfield to 10x larger (400,000 bits) to support all | ||
// possible torrents but prevent malicious peers from growing bitfield to fill memory. | ||
this.peerPieces = new BitField(0, { grow: BITFIELD_GROW }) | ||
// The largest torrent that I know of (the Geocities archive) is ~641 GB and has | ||
// ~41,000 pieces. Therefore, cap bitfield to 10x larger (400,000 bits) to support all | ||
// possible torrents but prevent malicious peers from growing bitfield to fill memory. | ||
this.peerPieces = new BitField(0, { grow: BITFIELD_GROW }) | ||
this.peerExtensions = {} | ||
this.peerExtensions = {} | ||
this.requests = [] // outgoing | ||
this.peerRequests = [] // incoming | ||
this.requests = [] // outgoing | ||
this.peerRequests = [] // incoming | ||
this.extendedMapping = {} // number -> string, ex: 1 -> 'ut_metadata' | ||
this.peerExtendedMapping = {} // string -> number, ex: 9 -> 'ut_metadata' | ||
this.extendedMapping = {} // number -> string, ex: 1 -> 'ut_metadata' | ||
this.peerExtendedMapping = {} // string -> number, ex: 9 -> 'ut_metadata' | ||
// The extended handshake to send, minus the "m" field, which gets automatically | ||
// filled from `this.extendedMapping` | ||
this.extendedHandshake = {} | ||
// The extended handshake to send, minus the "m" field, which gets automatically | ||
// filled from `this.extendedMapping` | ||
this.extendedHandshake = {} | ||
this.peerExtendedHandshake = {} // remote peer's extended handshake | ||
this.peerExtendedHandshake = {} // remote peer's extended handshake | ||
this._ext = {} // string -> function, ex 'ut_metadata' -> ut_metadata() | ||
this._nextExt = 1 | ||
this._ext = {} // string -> function, ex 'ut_metadata' -> ut_metadata() | ||
this._nextExt = 1 | ||
this.uploaded = 0 | ||
this.downloaded = 0 | ||
this.uploadSpeed = speedometer() | ||
this.downloadSpeed = speedometer() | ||
this.uploaded = 0 | ||
this.downloaded = 0 | ||
this.uploadSpeed = speedometer() | ||
this.downloadSpeed = speedometer() | ||
this._keepAliveInterval = null | ||
this._timeout = null | ||
this._timeoutMs = 0 | ||
this._keepAliveInterval = null | ||
this._timeout = null | ||
this._timeoutMs = 0 | ||
this.destroyed = false // was the wire ended by calling `destroy`? | ||
this._finished = false | ||
this.destroyed = false // was the wire ended by calling `destroy`? | ||
this._finished = false | ||
this._parserSize = 0 // number of needed bytes to parse next message from remote peer | ||
this._parser = null // function to call once `this._parserSize` bytes are available | ||
this._parserSize = 0 // number of needed bytes to parse next message from remote peer | ||
this._parser = null // function to call once `this._parserSize` bytes are available | ||
this._buffer = [] // incomplete message data | ||
this._bufferSize = 0 // cached total length of buffers in `this._buffer` | ||
this._buffer = [] // incomplete message data | ||
this._bufferSize = 0 // cached total length of buffers in `this._buffer` | ||
this.on('finish', this._onFinish) | ||
this.on('finish', this._onFinish) | ||
this._parseHandshake() | ||
} | ||
this._parseHandshake() | ||
} | ||
/** | ||
* Set whether to send a "keep-alive" ping (sent every 55s) | ||
* @param {boolean} enable | ||
*/ | ||
setKeepAlive (enable) { | ||
this._debug('setKeepAlive %s', enable) | ||
clearInterval(this._keepAliveInterval) | ||
if (enable === false) return | ||
this._keepAliveInterval = setInterval(() => { | ||
this.keepAlive() | ||
}, KEEP_ALIVE_TIMEOUT) | ||
} | ||
/** | ||
* Set whether to send a "keep-alive" ping (sent every 55s) | ||
* @param {boolean} enable | ||
*/ | ||
Wire.prototype.setKeepAlive = function (enable) { | ||
var self = this | ||
self._debug('setKeepAlive %s', enable) | ||
clearInterval(self._keepAliveInterval) | ||
if (enable === false) return | ||
self._keepAliveInterval = setInterval(function () { | ||
self.keepAlive() | ||
}, KEEP_ALIVE_TIMEOUT) | ||
} | ||
/** | ||
* Set the amount of time to wait before considering a request to be "timed out" | ||
* @param {number} ms | ||
* @param {boolean=} unref (should the timer be unref'd? default: false) | ||
*/ | ||
setTimeout (ms, unref) { | ||
this._debug('setTimeout ms=%d unref=%s', ms, unref) | ||
this._clearTimeout() | ||
this._timeoutMs = ms | ||
this._timeoutUnref = !!unref | ||
this._updateTimeout() | ||
} | ||
/** | ||
* Set the amount of time to wait before considering a request to be "timed out" | ||
* @param {number} ms | ||
* @param {boolean=} unref (should the timer be unref'd? default: false) | ||
*/ | ||
Wire.prototype.setTimeout = function (ms, unref) { | ||
this._debug('setTimeout ms=%d unref=%s', ms, unref) | ||
this._clearTimeout() | ||
this._timeoutMs = ms | ||
this._timeoutUnref = !!unref | ||
this._updateTimeout() | ||
} | ||
destroy () { | ||
if (this.destroyed) return | ||
this.destroyed = true | ||
this._debug('destroy') | ||
this.emit('close') | ||
this.end() | ||
} | ||
Wire.prototype.destroy = function () { | ||
if (this.destroyed) return | ||
this.destroyed = true | ||
this._debug('destroy') | ||
this.emit('close') | ||
this.end() | ||
} | ||
Wire.prototype.end = function () { | ||
this._debug('end') | ||
this._onUninterested() | ||
this._onChoke() | ||
stream.Duplex.prototype.end.apply(this, arguments) | ||
} | ||
/** | ||
* Use the specified protocol extension. | ||
* @param {function} Extension | ||
*/ | ||
Wire.prototype.use = function (Extension) { | ||
var name = Extension.prototype.name | ||
if (!name) { | ||
throw new Error('Extension class requires a "name" property on the prototype') | ||
end (...args) { | ||
this._debug('end') | ||
this._onUninterested() | ||
this._onChoke() | ||
super.end(...args) | ||
} | ||
this._debug('use extension.name=%s', name) | ||
var ext = this._nextExt | ||
var handler = new Extension(this) | ||
/** | ||
* Use the specified protocol extension. | ||
* @param {function} Extension | ||
*/ | ||
use (Extension) { | ||
const name = Extension.prototype.name | ||
if (!name) { | ||
throw new Error('Extension class requires a "name" property on the prototype') | ||
} | ||
this._debug('use extension.name=%s', name) | ||
function noop () {} | ||
const ext = this._nextExt | ||
const handler = new Extension(this) | ||
if (typeof handler.onHandshake !== 'function') { | ||
handler.onHandshake = noop | ||
} | ||
if (typeof handler.onExtendedHandshake !== 'function') { | ||
handler.onExtendedHandshake = noop | ||
} | ||
if (typeof handler.onMessage !== 'function') { | ||
handler.onMessage = noop | ||
} | ||
function noop () {} | ||
this.extendedMapping[ext] = name | ||
this._ext[name] = handler | ||
this[name] = handler | ||
if (typeof handler.onHandshake !== 'function') { | ||
handler.onHandshake = noop | ||
} | ||
if (typeof handler.onExtendedHandshake !== 'function') { | ||
handler.onExtendedHandshake = noop | ||
} | ||
if (typeof handler.onMessage !== 'function') { | ||
handler.onMessage = noop | ||
} | ||
this._nextExt += 1 | ||
} | ||
this.extendedMapping[ext] = name | ||
this._ext[name] = handler | ||
this[name] = handler | ||
// | ||
// OUTGOING MESSAGES | ||
// | ||
this._nextExt += 1 | ||
} | ||
/** | ||
* Message "keep-alive": <len=0000> | ||
*/ | ||
Wire.prototype.keepAlive = function () { | ||
this._debug('keep-alive') | ||
this._push(MESSAGE_KEEP_ALIVE) | ||
} | ||
// | ||
// OUTGOING MESSAGES | ||
// | ||
/** | ||
* Message: "handshake" <pstrlen><pstr><reserved><info_hash><peer_id> | ||
* @param {Buffer|string} infoHash (as Buffer or *hex* string) | ||
* @param {Buffer|string} peerId | ||
* @param {Object} extensions | ||
*/ | ||
Wire.prototype.handshake = function (infoHash, peerId, extensions) { | ||
var infoHashBuffer, peerIdBuffer | ||
if (typeof infoHash === 'string') { | ||
infoHash = infoHash.toLowerCase() | ||
infoHashBuffer = Buffer.from(infoHash, 'hex') | ||
} else { | ||
infoHashBuffer = infoHash | ||
infoHash = infoHashBuffer.toString('hex') | ||
/** | ||
* Message "keep-alive": <len=0000> | ||
*/ | ||
keepAlive () { | ||
this._debug('keep-alive') | ||
this._push(MESSAGE_KEEP_ALIVE) | ||
} | ||
if (typeof peerId === 'string') { | ||
peerIdBuffer = Buffer.from(peerId, 'hex') | ||
} else { | ||
peerIdBuffer = peerId | ||
peerId = peerIdBuffer.toString('hex') | ||
} | ||
if (infoHashBuffer.length !== 20 || peerIdBuffer.length !== 20) { | ||
throw new Error('infoHash and peerId MUST have length 20') | ||
} | ||
/** | ||
* Message: "handshake" <pstrlen><pstr><reserved><info_hash><peer_id> | ||
* @param {Buffer|string} infoHash (as Buffer or *hex* string) | ||
* @param {Buffer|string} peerId | ||
* @param {Object} extensions | ||
*/ | ||
handshake (infoHash, peerId, extensions) { | ||
let infoHashBuffer | ||
let peerIdBuffer | ||
if (typeof infoHash === 'string') { | ||
infoHash = infoHash.toLowerCase() | ||
infoHashBuffer = Buffer.from(infoHash, 'hex') | ||
} else { | ||
infoHashBuffer = infoHash | ||
infoHash = infoHashBuffer.toString('hex') | ||
} | ||
if (typeof peerId === 'string') { | ||
peerIdBuffer = Buffer.from(peerId, 'hex') | ||
} else { | ||
peerIdBuffer = peerId | ||
peerId = peerIdBuffer.toString('hex') | ||
} | ||
this._debug('handshake i=%s p=%s exts=%o', infoHash, peerId, extensions) | ||
if (infoHashBuffer.length !== 20 || peerIdBuffer.length !== 20) { | ||
throw new Error('infoHash and peerId MUST have length 20') | ||
} | ||
var reserved = Buffer.from(MESSAGE_RESERVED) | ||
this._debug('handshake i=%s p=%s exts=%o', infoHash, peerId, extensions) | ||
// enable extended message | ||
reserved[5] |= 0x10 | ||
const reserved = Buffer.from(MESSAGE_RESERVED) | ||
if (extensions && extensions.dht) reserved[7] |= 1 | ||
// enable extended message | ||
reserved[5] |= 0x10 | ||
this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHashBuffer, peerIdBuffer])) | ||
this._handshakeSent = true | ||
if (extensions && extensions.dht) reserved[7] |= 1 | ||
if (this.peerExtensions.extended && !this._extendedHandshakeSent) { | ||
// Peer's handshake indicated support already | ||
// (incoming connection) | ||
this._sendExtendedHandshake() | ||
} | ||
} | ||
this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHashBuffer, peerIdBuffer])) | ||
this._handshakeSent = true | ||
/* Peer supports BEP-0010, send extended handshake. | ||
* | ||
* This comes after the 'handshake' event to give the user a chance to populate | ||
* `this.extendedHandshake` and `this.extendedMapping` before the extended handshake | ||
* is sent to the remote peer. | ||
*/ | ||
Wire.prototype._sendExtendedHandshake = function () { | ||
// Create extended message object from registered extensions | ||
var msg = extend(this.extendedHandshake) | ||
msg.m = {} | ||
for (var ext in this.extendedMapping) { | ||
var name = this.extendedMapping[ext] | ||
msg.m[name] = Number(ext) | ||
if (this.peerExtensions.extended && !this._extendedHandshakeSent) { | ||
// Peer's handshake indicated support already | ||
// (incoming connection) | ||
this._sendExtendedHandshake() | ||
} | ||
} | ||
// Send extended handshake | ||
this.extended(0, bencode.encode(msg)) | ||
this._extendedHandshakeSent = true | ||
} | ||
/* Peer supports BEP-0010, send extended handshake. | ||
* | ||
* This comes after the 'handshake' event to give the user a chance to populate | ||
* `this.extendedHandshake` and `this.extendedMapping` before the extended handshake | ||
* is sent to the remote peer. | ||
*/ | ||
_sendExtendedHandshake () { | ||
// Create extended message object from registered extensions | ||
const msg = extend(this.extendedHandshake) | ||
msg.m = {} | ||
for (const ext in this.extendedMapping) { | ||
const name = this.extendedMapping[ext] | ||
msg.m[name] = Number(ext) | ||
} | ||
/** | ||
* Message "choke": <len=0001><id=0> | ||
*/ | ||
Wire.prototype.choke = function () { | ||
if (this.amChoking) return | ||
this.amChoking = true | ||
this._debug('choke') | ||
while (this.peerRequests.length) { | ||
this.peerRequests.pop() | ||
// Send extended handshake | ||
this.extended(0, bencode.encode(msg)) | ||
this._extendedHandshakeSent = true | ||
} | ||
this._push(MESSAGE_CHOKE) | ||
} | ||
/** | ||
* Message "unchoke": <len=0001><id=1> | ||
*/ | ||
Wire.prototype.unchoke = function () { | ||
if (!this.amChoking) return | ||
this.amChoking = false | ||
this._debug('unchoke') | ||
this._push(MESSAGE_UNCHOKE) | ||
} | ||
/** | ||
* Message "choke": <len=0001><id=0> | ||
*/ | ||
choke () { | ||
if (this.amChoking) return | ||
this.amChoking = true | ||
this._debug('choke') | ||
while (this.peerRequests.length) { | ||
this.peerRequests.pop() | ||
} | ||
this._push(MESSAGE_CHOKE) | ||
} | ||
/** | ||
* Message "interested": <len=0001><id=2> | ||
*/ | ||
Wire.prototype.interested = function () { | ||
if (this.amInterested) return | ||
this.amInterested = true | ||
this._debug('interested') | ||
this._push(MESSAGE_INTERESTED) | ||
} | ||
/** | ||
* Message "unchoke": <len=0001><id=1> | ||
*/ | ||
unchoke () { | ||
if (!this.amChoking) return | ||
this.amChoking = false | ||
this._debug('unchoke') | ||
this._push(MESSAGE_UNCHOKE) | ||
} | ||
/** | ||
* Message "uninterested": <len=0001><id=3> | ||
*/ | ||
Wire.prototype.uninterested = function () { | ||
if (!this.amInterested) return | ||
this.amInterested = false | ||
this._debug('uninterested') | ||
this._push(MESSAGE_UNINTERESTED) | ||
} | ||
/** | ||
* Message "interested": <len=0001><id=2> | ||
*/ | ||
interested () { | ||
if (this.amInterested) return | ||
this.amInterested = true | ||
this._debug('interested') | ||
this._push(MESSAGE_INTERESTED) | ||
} | ||
/** | ||
* Message "have": <len=0005><id=4><piece index> | ||
* @param {number} index | ||
*/ | ||
Wire.prototype.have = function (index) { | ||
this._debug('have %d', index) | ||
this._message(4, [index], null) | ||
} | ||
/** | ||
* Message "uninterested": <len=0001><id=3> | ||
*/ | ||
uninterested () { | ||
if (!this.amInterested) return | ||
this.amInterested = false | ||
this._debug('uninterested') | ||
this._push(MESSAGE_UNINTERESTED) | ||
} | ||
/** | ||
* Message "bitfield": <len=0001+X><id=5><bitfield> | ||
* @param {BitField|Buffer} bitfield | ||
*/ | ||
Wire.prototype.bitfield = function (bitfield) { | ||
this._debug('bitfield') | ||
if (!Buffer.isBuffer(bitfield)) bitfield = bitfield.buffer | ||
this._message(5, [], bitfield) | ||
} | ||
/** | ||
* Message "have": <len=0005><id=4><piece index> | ||
* @param {number} index | ||
*/ | ||
have (index) { | ||
this._debug('have %d', index) | ||
this._message(4, [index], null) | ||
} | ||
/** | ||
* Message "request": <len=0013><id=6><index><begin><length> | ||
* @param {number} index | ||
* @param {number} offset | ||
* @param {number} length | ||
* @param {function} cb | ||
*/ | ||
Wire.prototype.request = function (index, offset, length, cb) { | ||
if (!cb) cb = function () {} | ||
if (this._finished) return cb(new Error('wire is closed')) | ||
if (this.peerChoking) return cb(new Error('peer is choking')) | ||
/** | ||
* Message "bitfield": <len=0001+X><id=5><bitfield> | ||
* @param {BitField|Buffer} bitfield | ||
*/ | ||
bitfield (bitfield) { | ||
this._debug('bitfield') | ||
if (!Buffer.isBuffer(bitfield)) bitfield = bitfield.buffer | ||
this._message(5, [], bitfield) | ||
} | ||
this._debug('request index=%d offset=%d length=%d', index, offset, length) | ||
/** | ||
* Message "request": <len=0013><id=6><index><begin><length> | ||
* @param {number} index | ||
* @param {number} offset | ||
* @param {number} length | ||
* @param {function} cb | ||
*/ | ||
request (index, offset, length, cb) { | ||
if (!cb) cb = () => {} | ||
if (this._finished) return cb(new Error('wire is closed')) | ||
if (this.peerChoking) return cb(new Error('peer is choking')) | ||
this.requests.push(new Request(index, offset, length, cb)) | ||
this._updateTimeout() | ||
this._message(6, [index, offset, length], null) | ||
} | ||
this._debug('request index=%d offset=%d length=%d', index, offset, length) | ||
/** | ||
* Message "piece": <len=0009+X><id=7><index><begin><block> | ||
* @param {number} index | ||
* @param {number} offset | ||
* @param {Buffer} buffer | ||
*/ | ||
Wire.prototype.piece = function (index, offset, buffer) { | ||
this._debug('piece index=%d offset=%d', index, offset) | ||
this.uploaded += buffer.length | ||
this.uploadSpeed(buffer.length) | ||
this.emit('upload', buffer.length) | ||
this._message(7, [index, offset], buffer) | ||
} | ||
this.requests.push(new Request(index, offset, length, cb)) | ||
this._updateTimeout() | ||
this._message(6, [index, offset, length], null) | ||
} | ||
/** | ||
* Message "cancel": <len=0013><id=8><index><begin><length> | ||
* @param {number} index | ||
* @param {number} offset | ||
* @param {number} length | ||
*/ | ||
Wire.prototype.cancel = function (index, offset, length) { | ||
this._debug('cancel index=%d offset=%d length=%d', index, offset, length) | ||
this._callback( | ||
pull(this.requests, index, offset, length), | ||
new Error('request was cancelled'), | ||
null | ||
) | ||
this._message(8, [index, offset, length], null) | ||
} | ||
/** | ||
* Message "piece": <len=0009+X><id=7><index><begin><block> | ||
* @param {number} index | ||
* @param {number} offset | ||
* @param {Buffer} buffer | ||
*/ | ||
piece (index, offset, buffer) { | ||
this._debug('piece index=%d offset=%d', index, offset) | ||
this.uploaded += buffer.length | ||
this.uploadSpeed(buffer.length) | ||
this.emit('upload', buffer.length) | ||
this._message(7, [index, offset], buffer) | ||
} | ||
/** | ||
* Message: "port" <len=0003><id=9><listen-port> | ||
* @param {Number} port | ||
*/ | ||
Wire.prototype.port = function (port) { | ||
this._debug('port %d', port) | ||
var message = Buffer.from(MESSAGE_PORT) | ||
message.writeUInt16BE(port, 5) | ||
this._push(message) | ||
} | ||
/** | ||
* Message "cancel": <len=0013><id=8><index><begin><length> | ||
* @param {number} index | ||
* @param {number} offset | ||
* @param {number} length | ||
*/ | ||
cancel (index, offset, length) { | ||
this._debug('cancel index=%d offset=%d length=%d', index, offset, length) | ||
this._callback( | ||
this._pull(this.requests, index, offset, length), | ||
new Error('request was cancelled'), | ||
null | ||
) | ||
this._message(8, [index, offset, length], null) | ||
} | ||
/** | ||
* Message: "extended" <len=0005+X><id=20><ext-number><payload> | ||
* @param {number|string} ext | ||
* @param {Object} obj | ||
*/ | ||
Wire.prototype.extended = function (ext, obj) { | ||
this._debug('extended ext=%s', ext) | ||
if (typeof ext === 'string' && this.peerExtendedMapping[ext]) { | ||
ext = this.peerExtendedMapping[ext] | ||
/** | ||
* Message: "port" <len=0003><id=9><listen-port> | ||
* @param {Number} port | ||
*/ | ||
port (port) { | ||
this._debug('port %d', port) | ||
const message = Buffer.from(MESSAGE_PORT) | ||
message.writeUInt16BE(port, 5) | ||
this._push(message) | ||
} | ||
if (typeof ext === 'number') { | ||
var extId = Buffer.from([ext]) | ||
var buf = Buffer.isBuffer(obj) ? obj : bencode.encode(obj) | ||
this._message(20, [], Buffer.concat([extId, buf])) | ||
} else { | ||
throw new Error('Unrecognized extension: ' + ext) | ||
/** | ||
* Message: "extended" <len=0005+X><id=20><ext-number><payload> | ||
* @param {number|string} ext | ||
* @param {Object} obj | ||
*/ | ||
extended (ext, obj) { | ||
this._debug('extended ext=%s', ext) | ||
if (typeof ext === 'string' && this.peerExtendedMapping[ext]) { | ||
ext = this.peerExtendedMapping[ext] | ||
} | ||
if (typeof ext === 'number') { | ||
const extId = Buffer.from([ext]) | ||
const buf = Buffer.isBuffer(obj) ? obj : bencode.encode(obj) | ||
this._message(20, [], Buffer.concat([extId, buf])) | ||
} else { | ||
throw new Error(`Unrecognized extension: ${ext}`) | ||
} | ||
} | ||
} | ||
/** | ||
* Duplex stream method. Called whenever the remote peer stream wants data. No-op | ||
* since we'll just push data whenever we get it. | ||
*/ | ||
Wire.prototype._read = function () {} | ||
/** | ||
* Duplex stream method. Called whenever the remote peer stream wants data. No-op | ||
* since we'll just push data whenever we get it. | ||
*/ | ||
_read () {} | ||
/** | ||
* Send a message to the remote peer. | ||
*/ | ||
Wire.prototype._message = function (id, numbers, data) { | ||
var dataLength = data ? data.length : 0 | ||
var buffer = Buffer.allocUnsafe(5 + (4 * numbers.length)) | ||
/** | ||
* Send a message to the remote peer. | ||
*/ | ||
_message (id, numbers, data) { | ||
const dataLength = data ? data.length : 0 | ||
const buffer = Buffer.allocUnsafe(5 + (4 * numbers.length)) | ||
buffer.writeUInt32BE(buffer.length + dataLength - 4, 0) | ||
buffer[4] = id | ||
for (var i = 0; i < numbers.length; i++) { | ||
buffer.writeUInt32BE(numbers[i], 5 + (4 * i)) | ||
buffer.writeUInt32BE(buffer.length + dataLength - 4, 0) | ||
buffer[4] = id | ||
for (let i = 0; i < numbers.length; i++) { | ||
buffer.writeUInt32BE(numbers[i], 5 + (4 * i)) | ||
} | ||
this._push(buffer) | ||
if (data) this._push(data) | ||
} | ||
this._push(buffer) | ||
if (data) this._push(data) | ||
} | ||
_push (data) { | ||
if (this._finished) return | ||
return this.push(data) | ||
} | ||
Wire.prototype._push = function (data) { | ||
if (this._finished) return | ||
return this.push(data) | ||
} | ||
// | ||
// INCOMING MESSAGES | ||
// | ||
// | ||
// INCOMING MESSAGES | ||
// | ||
_onKeepAlive () { | ||
this._debug('got keep-alive') | ||
this.emit('keep-alive') | ||
} | ||
Wire.prototype._onKeepAlive = function () { | ||
this._debug('got keep-alive') | ||
this.emit('keep-alive') | ||
} | ||
_onHandshake (infoHashBuffer, peerIdBuffer, extensions) { | ||
const infoHash = infoHashBuffer.toString('hex') | ||
const peerId = peerIdBuffer.toString('hex') | ||
Wire.prototype._onHandshake = function (infoHashBuffer, peerIdBuffer, extensions) { | ||
var infoHash = infoHashBuffer.toString('hex') | ||
var peerId = peerIdBuffer.toString('hex') | ||
this._debug('got handshake i=%s p=%s exts=%o', infoHash, peerId, extensions) | ||
this._debug('got handshake i=%s p=%s exts=%o', infoHash, peerId, extensions) | ||
this.peerId = peerId | ||
this.peerIdBuffer = peerIdBuffer | ||
this.peerExtensions = extensions | ||
this.peerId = peerId | ||
this.peerIdBuffer = peerIdBuffer | ||
this.peerExtensions = extensions | ||
this.emit('handshake', infoHash, peerId, extensions) | ||
this.emit('handshake', infoHash, peerId, extensions) | ||
let name | ||
for (name in this._ext) { | ||
this._ext[name].onHandshake(infoHash, peerId, extensions) | ||
} | ||
var name | ||
for (name in this._ext) { | ||
this._ext[name].onHandshake(infoHash, peerId, extensions) | ||
if (extensions.extended && this._handshakeSent && | ||
!this._extendedHandshakeSent) { | ||
// outgoing connection | ||
this._sendExtendedHandshake() | ||
} | ||
} | ||
if (extensions.extended && this._handshakeSent && | ||
!this._extendedHandshakeSent) { | ||
// outgoing connection | ||
this._sendExtendedHandshake() | ||
_onChoke () { | ||
this.peerChoking = true | ||
this._debug('got choke') | ||
this.emit('choke') | ||
while (this.requests.length) { | ||
this._callback(this.requests.pop(), new Error('peer is choking'), null) | ||
} | ||
} | ||
} | ||
Wire.prototype._onChoke = function () { | ||
this.peerChoking = true | ||
this._debug('got choke') | ||
this.emit('choke') | ||
while (this.requests.length) { | ||
this._callback(this.requests.pop(), new Error('peer is choking'), null) | ||
_onUnchoke () { | ||
this.peerChoking = false | ||
this._debug('got unchoke') | ||
this.emit('unchoke') | ||
} | ||
} | ||
Wire.prototype._onUnchoke = function () { | ||
this.peerChoking = false | ||
this._debug('got unchoke') | ||
this.emit('unchoke') | ||
} | ||
_onInterested () { | ||
this.peerInterested = true | ||
this._debug('got interested') | ||
this.emit('interested') | ||
} | ||
Wire.prototype._onInterested = function () { | ||
this.peerInterested = true | ||
this._debug('got interested') | ||
this.emit('interested') | ||
} | ||
_onUninterested () { | ||
this.peerInterested = false | ||
this._debug('got uninterested') | ||
this.emit('uninterested') | ||
} | ||
Wire.prototype._onUninterested = function () { | ||
this.peerInterested = false | ||
this._debug('got uninterested') | ||
this.emit('uninterested') | ||
} | ||
_onHave (index) { | ||
if (this.peerPieces.get(index)) return | ||
this._debug('got have %d', index) | ||
Wire.prototype._onHave = function (index) { | ||
if (this.peerPieces.get(index)) return | ||
this._debug('got have %d', index) | ||
this.peerPieces.set(index, true) | ||
this.emit('have', index) | ||
} | ||
this.peerPieces.set(index, true) | ||
this.emit('have', index) | ||
} | ||
_onBitField (buffer) { | ||
this.peerPieces = new BitField(buffer) | ||
this._debug('got bitfield') | ||
this.emit('bitfield', this.peerPieces) | ||
} | ||
Wire.prototype._onBitField = function (buffer) { | ||
this.peerPieces = new BitField(buffer) | ||
this._debug('got bitfield') | ||
this.emit('bitfield', this.peerPieces) | ||
} | ||
_onRequest (index, offset, length) { | ||
if (this.amChoking) return | ||
this._debug('got request index=%d offset=%d length=%d', index, offset, length) | ||
Wire.prototype._onRequest = function (index, offset, length) { | ||
var self = this | ||
if (self.amChoking) return | ||
self._debug('got request index=%d offset=%d length=%d', index, offset, length) | ||
const respond = (err, buffer) => { | ||
if (request !== this._pull(this.peerRequests, index, offset, length)) return | ||
if (err) return this._debug('error satisfying request index=%d offset=%d length=%d (%s)', index, offset, length, err.message) | ||
this.piece(index, offset, buffer) | ||
} | ||
var respond = function (err, buffer) { | ||
if (request !== pull(self.peerRequests, index, offset, length)) return | ||
if (err) return self._debug('error satisfying request index=%d offset=%d length=%d (%s)', index, offset, length, err.message) | ||
self.piece(index, offset, buffer) | ||
var request = new Request(index, offset, length, respond) | ||
this.peerRequests.push(request) | ||
this.emit('request', index, offset, length, respond) | ||
} | ||
var request = new Request(index, offset, length, respond) | ||
self.peerRequests.push(request) | ||
self.emit('request', index, offset, length, respond) | ||
} | ||
_onPiece (index, offset, buffer) { | ||
this._debug('got piece index=%d offset=%d', index, offset) | ||
this._callback(this._pull(this.requests, index, offset, buffer.length), null, buffer) | ||
this.downloaded += buffer.length | ||
this.downloadSpeed(buffer.length) | ||
this.emit('download', buffer.length) | ||
this.emit('piece', index, offset, buffer) | ||
} | ||
Wire.prototype._onPiece = function (index, offset, buffer) { | ||
this._debug('got piece index=%d offset=%d', index, offset) | ||
this._callback(pull(this.requests, index, offset, buffer.length), null, buffer) | ||
this.downloaded += buffer.length | ||
this.downloadSpeed(buffer.length) | ||
this.emit('download', buffer.length) | ||
this.emit('piece', index, offset, buffer) | ||
} | ||
_onCancel (index, offset, length) { | ||
this._debug('got cancel index=%d offset=%d length=%d', index, offset, length) | ||
this._pull(this.peerRequests, index, offset, length) | ||
this.emit('cancel', index, offset, length) | ||
} | ||
Wire.prototype._onCancel = function (index, offset, length) { | ||
this._debug('got cancel index=%d offset=%d length=%d', index, offset, length) | ||
pull(this.peerRequests, index, offset, length) | ||
this.emit('cancel', index, offset, length) | ||
} | ||
_onPort (port) { | ||
this._debug('got port %d', port) | ||
this.emit('port', port) | ||
} | ||
Wire.prototype._onPort = function (port) { | ||
this._debug('got port %d', port) | ||
this.emit('port', port) | ||
} | ||
_onExtended (ext, buf) { | ||
if (ext === 0) { | ||
let info | ||
try { | ||
info = bencode.decode(buf) | ||
} catch (err) { | ||
this._debug('ignoring invalid extended handshake: %s', err.message || err) | ||
} | ||
Wire.prototype._onExtended = function (ext, buf) { | ||
if (ext === 0) { | ||
var info | ||
try { | ||
info = bencode.decode(buf) | ||
} catch (err) { | ||
this._debug('ignoring invalid extended handshake: %s', err.message || err) | ||
} | ||
if (!info) return | ||
this.peerExtendedHandshake = info | ||
if (!info) return | ||
this.peerExtendedHandshake = info | ||
var name | ||
if (typeof info.m === 'object') { | ||
for (name in info.m) { | ||
this.peerExtendedMapping[name] = Number(info.m[name].toString()) | ||
let name | ||
if (typeof info.m === 'object') { | ||
for (name in info.m) { | ||
this.peerExtendedMapping[name] = Number(info.m[name].toString()) | ||
} | ||
} | ||
} | ||
for (name in this._ext) { | ||
if (this.peerExtendedMapping[name]) { | ||
this._ext[name].onExtendedHandshake(this.peerExtendedHandshake) | ||
for (name in this._ext) { | ||
if (this.peerExtendedMapping[name]) { | ||
this._ext[name].onExtendedHandshake(this.peerExtendedHandshake) | ||
} | ||
} | ||
} | ||
this._debug('got extended handshake') | ||
this.emit('extended', 'handshake', this.peerExtendedHandshake) | ||
} else { | ||
if (this.extendedMapping[ext]) { | ||
ext = this.extendedMapping[ext] // friendly name for extension | ||
if (this._ext[ext]) { | ||
// there is an registered extension handler, so call it | ||
this._ext[ext].onMessage(buf) | ||
this._debug('got extended handshake') | ||
this.emit('extended', 'handshake', this.peerExtendedHandshake) | ||
} else { | ||
if (this.extendedMapping[ext]) { | ||
ext = this.extendedMapping[ext] // friendly name for extension | ||
if (this._ext[ext]) { | ||
// there is an registered extension handler, so call it | ||
this._ext[ext].onMessage(buf) | ||
} | ||
} | ||
this._debug('got extended message ext=%s', ext) | ||
this.emit('extended', ext, buf) | ||
} | ||
this._debug('got extended message ext=%s', ext) | ||
this.emit('extended', ext, buf) | ||
} | ||
} | ||
Wire.prototype._onTimeout = function () { | ||
this._debug('request timed out') | ||
this._callback(this.requests.shift(), new Error('request has timed out'), null) | ||
this.emit('timeout') | ||
} | ||
_onTimeout () { | ||
this._debug('request timed out') | ||
this._callback(this.requests.shift(), new Error('request has timed out'), null) | ||
this.emit('timeout') | ||
} | ||
/** | ||
* Duplex stream method. Called whenever the remote peer has data for us. Data that the | ||
* remote peer sends gets buffered (i.e. not actually processed) until the right number | ||
* of bytes have arrived, determined by the last call to `this._parse(number, callback)`. | ||
* Once enough bytes have arrived to process the message, the callback function | ||
* (i.e. `this._parser`) gets called with the full buffer of data. | ||
* @param {Buffer} data | ||
* @param {string} encoding | ||
* @param {function} cb | ||
*/ | ||
Wire.prototype._write = function (data, encoding, cb) { | ||
this._bufferSize += data.length | ||
this._buffer.push(data) | ||
/** | ||
* Duplex stream method. Called whenever the remote peer has data for us. Data that the | ||
* remote peer sends gets buffered (i.e. not actually processed) until the right number | ||
* of bytes have arrived, determined by the last call to `this._parse(number, callback)`. | ||
* Once enough bytes have arrived to process the message, the callback function | ||
* (i.e. `this._parser`) gets called with the full buffer of data. | ||
* @param {Buffer} data | ||
* @param {string} encoding | ||
* @param {function} cb | ||
*/ | ||
_write (data, encoding, cb) { | ||
this._bufferSize += data.length | ||
this._buffer.push(data) | ||
while (this._bufferSize >= this._parserSize) { | ||
var buffer = (this._buffer.length === 1) | ||
? this._buffer[0] | ||
: Buffer.concat(this._buffer) | ||
this._bufferSize -= this._parserSize | ||
this._buffer = this._bufferSize | ||
? [buffer.slice(this._parserSize)] | ||
: [] | ||
this._parser(buffer.slice(0, this._parserSize)) | ||
while (this._bufferSize >= this._parserSize) { | ||
const buffer = (this._buffer.length === 1) | ||
? this._buffer[0] | ||
: Buffer.concat(this._buffer) | ||
this._bufferSize -= this._parserSize | ||
this._buffer = this._bufferSize | ||
? [buffer.slice(this._parserSize)] | ||
: [] | ||
this._parser(buffer.slice(0, this._parserSize)) | ||
} | ||
cb(null) // Signal that we're ready for more data | ||
} | ||
cb(null) // Signal that we're ready for more data | ||
} | ||
_callback (request, err, buffer) { | ||
if (!request) return | ||
Wire.prototype._callback = function (request, err, buffer) { | ||
if (!request) return | ||
this._clearTimeout() | ||
this._clearTimeout() | ||
if (!this.peerChoking && !this._finished) this._updateTimeout() | ||
request.callback(err, buffer) | ||
} | ||
if (!this.peerChoking && !this._finished) this._updateTimeout() | ||
request.callback(err, buffer) | ||
} | ||
_clearTimeout () { | ||
if (!this._timeout) return | ||
Wire.prototype._clearTimeout = function () { | ||
if (!this._timeout) return | ||
clearTimeout(this._timeout) | ||
this._timeout = null | ||
} | ||
clearTimeout(this._timeout) | ||
this._timeout = null | ||
} | ||
_updateTimeout () { | ||
if (!this._timeoutMs || !this.requests.length || this._timeout) return | ||
Wire.prototype._updateTimeout = function () { | ||
var self = this | ||
if (!self._timeoutMs || !self.requests.length || self._timeout) return | ||
this._timeout = setTimeout(() => this._onTimeout(), this._timeoutMs) | ||
if (this._timeoutUnref && this._timeout.unref) this._timeout.unref() | ||
} | ||
self._timeout = setTimeout(function () { | ||
self._onTimeout() | ||
}, self._timeoutMs) | ||
if (self._timeoutUnref && self._timeout.unref) self._timeout.unref() | ||
} | ||
/** | ||
* Takes a number of bytes that the local peer is waiting to receive from the remote peer | ||
* in order to parse a complete message, and a callback function to be called once enough | ||
* bytes have arrived. | ||
* @param {number} size | ||
* @param {function} parser | ||
*/ | ||
_parse (size, parser) { | ||
this._parserSize = size | ||
this._parser = parser | ||
} | ||
/** | ||
* Takes a number of bytes that the local peer is waiting to receive from the remote peer | ||
* in order to parse a complete message, and a callback function to be called once enough | ||
* bytes have arrived. | ||
* @param {number} size | ||
* @param {function} parser | ||
*/ | ||
Wire.prototype._parse = function (size, parser) { | ||
this._parserSize = size | ||
this._parser = parser | ||
} | ||
/** | ||
* Handle the first 4 bytes of a message, to determine the length of bytes that must be | ||
* waited for in order to have the whole message. | ||
* @param {Buffer} buffer | ||
*/ | ||
_onMessageLength (buffer) { | ||
const length = buffer.readUInt32BE(0) | ||
if (length > 0) { | ||
this._parse(length, this._onMessage) | ||
} else { | ||
this._onKeepAlive() | ||
this._parse(4, this._onMessageLength) | ||
} | ||
} | ||
/** | ||
* Handle the first 4 bytes of a message, to determine the length of bytes that must be | ||
* waited for in order to have the whole message. | ||
* @param {Buffer} buffer | ||
*/ | ||
Wire.prototype._onMessageLength = function (buffer) { | ||
var length = buffer.readUInt32BE(0) | ||
if (length > 0) { | ||
this._parse(length, this._onMessage) | ||
} else { | ||
this._onKeepAlive() | ||
/** | ||
* Handle a message from the remote peer. | ||
* @param {Buffer} buffer | ||
*/ | ||
_onMessage (buffer) { | ||
this._parse(4, this._onMessageLength) | ||
switch (buffer[0]) { | ||
case 0: | ||
return this._onChoke() | ||
case 1: | ||
return this._onUnchoke() | ||
case 2: | ||
return this._onInterested() | ||
case 3: | ||
return this._onUninterested() | ||
case 4: | ||
return this._onHave(buffer.readUInt32BE(1)) | ||
case 5: | ||
return this._onBitField(buffer.slice(1)) | ||
case 6: | ||
return this._onRequest( | ||
buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), | ||
buffer.readUInt32BE(9) | ||
) | ||
case 7: | ||
return this._onPiece( | ||
buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), | ||
buffer.slice(9) | ||
) | ||
case 8: | ||
return this._onCancel( | ||
buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), | ||
buffer.readUInt32BE(9) | ||
) | ||
case 9: | ||
return this._onPort(buffer.readUInt16BE(1)) | ||
case 20: | ||
return this._onExtended(buffer.readUInt8(1), buffer.slice(2)) | ||
default: | ||
this._debug('got unknown message') | ||
return this.emit('unknownmessage', buffer) | ||
} | ||
} | ||
} | ||
/** | ||
* Handle a message from the remote peer. | ||
* @param {Buffer} buffer | ||
*/ | ||
Wire.prototype._onMessage = function (buffer) { | ||
this._parse(4, this._onMessageLength) | ||
switch (buffer[0]) { | ||
case 0: | ||
return this._onChoke() | ||
case 1: | ||
return this._onUnchoke() | ||
case 2: | ||
return this._onInterested() | ||
case 3: | ||
return this._onUninterested() | ||
case 4: | ||
return this._onHave(buffer.readUInt32BE(1)) | ||
case 5: | ||
return this._onBitField(buffer.slice(1)) | ||
case 6: | ||
return this._onRequest(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.readUInt32BE(9)) | ||
case 7: | ||
return this._onPiece(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.slice(9)) | ||
case 8: | ||
return this._onCancel(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.readUInt32BE(9)) | ||
case 9: | ||
return this._onPort(buffer.readUInt16BE(1)) | ||
case 20: | ||
return this._onExtended(buffer.readUInt8(1), buffer.slice(2)) | ||
default: | ||
this._debug('got unknown message') | ||
return this.emit('unknownmessage', buffer) | ||
} | ||
} | ||
Wire.prototype._parseHandshake = function () { | ||
var self = this | ||
self._parse(1, function (buffer) { | ||
var pstrlen = buffer.readUInt8(0) | ||
self._parse(pstrlen + 48, function (handshake) { | ||
var protocol = handshake.slice(0, pstrlen) | ||
if (protocol.toString() !== 'BitTorrent protocol') { | ||
self._debug('Error: wire not speaking BitTorrent protocol (%s)', protocol.toString()) | ||
self.end() | ||
return | ||
} | ||
handshake = handshake.slice(pstrlen) | ||
self._onHandshake(handshake.slice(8, 28), handshake.slice(28, 48), { | ||
dht: !!(handshake[7] & 0x01), // see bep_0005 | ||
extended: !!(handshake[5] & 0x10) // see bep_0010 | ||
_parseHandshake () { | ||
this._parse(1, buffer => { | ||
const pstrlen = buffer.readUInt8(0) | ||
this._parse(pstrlen + 48, handshake => { | ||
const protocol = handshake.slice(0, pstrlen) | ||
if (protocol.toString() !== 'BitTorrent protocol') { | ||
this._debug('Error: wire not speaking BitTorrent protocol (%s)', protocol.toString()) | ||
this.end() | ||
return | ||
} | ||
handshake = handshake.slice(pstrlen) | ||
this._onHandshake(handshake.slice(8, 28), handshake.slice(28, 48), { | ||
dht: !!(handshake[7] & 0x01), // see bep_0005 | ||
extended: !!(handshake[5] & 0x10) // see bep_0010 | ||
}) | ||
this._parse(4, this._onMessageLength) | ||
}) | ||
self._parse(4, self._onMessageLength) | ||
}) | ||
}) | ||
} | ||
} | ||
Wire.prototype._onFinish = function () { | ||
this._finished = true | ||
_onFinish () { | ||
this._finished = true | ||
this.push(null) // stream cannot be half open, so signal the end of it | ||
while (this.read()) {} // consume and discard the rest of the stream data | ||
this.push(null) // stream cannot be half open, so signal the end of it | ||
while (this.read()) {} // consume and discard the rest of the stream data | ||
clearInterval(this._keepAliveInterval) | ||
this._parse(Number.MAX_VALUE, function () {}) | ||
while (this.peerRequests.length) { | ||
this.peerRequests.pop() | ||
clearInterval(this._keepAliveInterval) | ||
this._parse(Number.MAX_VALUE, () => {}) | ||
while (this.peerRequests.length) { | ||
this.peerRequests.pop() | ||
} | ||
while (this.requests.length) { | ||
this._callback(this.requests.pop(), new Error('wire was closed'), null) | ||
} | ||
} | ||
while (this.requests.length) { | ||
this._callback(this.requests.pop(), new Error('wire was closed'), null) | ||
_debug (...args) { | ||
args[0] = `[${this._debugId}] ${args[0]}` | ||
debug(...args) | ||
} | ||
} | ||
Wire.prototype._debug = function () { | ||
var args = [].slice.call(arguments) | ||
args[0] = '[' + this._debugId + '] ' + args[0] | ||
debug.apply(null, args) | ||
} | ||
function pull (requests, piece, offset, length) { | ||
for (var i = 0; i < requests.length; i++) { | ||
var req = requests[i] | ||
if (req.piece === piece && req.offset === offset && req.length === length) { | ||
arrayRemove(requests, i) | ||
return req | ||
_pull (requests, piece, offset, length) { | ||
for (let i = 0; i < requests.length; i++) { | ||
const req = requests[i] | ||
if (req.piece === piece && req.offset === offset && req.length === length) { | ||
arrayRemove(requests, i) | ||
return req | ||
} | ||
} | ||
return null | ||
} | ||
return null | ||
} | ||
module.exports = Wire |
{ | ||
"name": "bittorrent-protocol", | ||
"description": "Simple, robust, BitTorrent peer wire protocol implementation", | ||
"version": "2.4.2", | ||
"version": "3.0.0", | ||
"author": { | ||
@@ -17,6 +17,4 @@ "name": "WebTorrent, LLC", | ||
"debug": "^3.1.0", | ||
"inherits": "^2.0.1", | ||
"randombytes": "^2.0.5", | ||
"readable-stream": "^2.3.2", | ||
"safe-buffer": "^5.1.1", | ||
"speedometer": "^1.0.0", | ||
@@ -27,3 +25,3 @@ "unordered-array-remove": "^1.0.2", | ||
"devDependencies": { | ||
"airtap": "0.0.7", | ||
"airtap": "0.1.0", | ||
"standard": "*", | ||
@@ -30,0 +28,0 @@ "tape": "^4.0.0" |
@@ -40,7 +40,7 @@ # bittorrent-protocol [![travis][travis-image]][travis-url] [![npm][npm-image]][npm-url] [![downloads][downloads-image]][downloads-url] [![javascript style guide][standard-image]][standard-url] | ||
```js | ||
var Protocol = require('bittorrent-protocol') | ||
var net = require('net') | ||
const Protocol = require('bittorrent-protocol') | ||
const net = require('net') | ||
net.createServer(function (socket) { | ||
var wire = new Protocol() | ||
net.createServer(socket => { | ||
const wire = new Protocol() | ||
@@ -50,3 +50,3 @@ // pipe to and from the protocol | ||
wire.on('handshake', function (infoHash, peerId) { | ||
wire.on('handshake', (infoHash, peerId) => { | ||
// receive a handshake (infoHash and peerId are hex strings) | ||
@@ -58,3 +58,3 @@ | ||
wire.on('unchoke', function () { | ||
wire.on('unchoke', () => { | ||
console.log('peer is no longer choking us: ' + wire.peerChoking) | ||
@@ -74,3 +74,3 @@ }) | ||
wire.handshake(infoHash, peerId, { dht: true }) | ||
wire.on('handshake', function (infoHash, peerId, extensions) { | ||
wire.on('handshake', (infoHash, peerId, extensions) => { | ||
// receive a handshake (infoHash and peerId are hex strings) | ||
@@ -92,6 +92,6 @@ console.log(extensions.dht) // supports DHT (BEP-0005) | ||
wire.on('choke', function () { | ||
wire.on('choke', () => { | ||
// the peer is now choking us | ||
}) | ||
wire.on('unchoke', function () { | ||
wire.on('unchoke', () => { | ||
// peer is no longer choking us | ||
@@ -109,6 +109,6 @@ }) | ||
wire.on('interested', function () { | ||
wire.on('interested', () => { | ||
// peer is now interested | ||
}) | ||
wire.on('uninterested', function () { | ||
wire.on('uninterested', () => { | ||
// peer is no longer interested | ||
@@ -125,3 +125,3 @@ }) | ||
wire.bitfield(buffer) | ||
wire.on('bitfield', function (bitfield) { | ||
wire.on('bitfield', bitfield => { | ||
// bitfield received from the peer | ||
@@ -132,3 +132,3 @@ }) | ||
wire.have(pieceIndex) | ||
wire.on('have', function (pieceIndex) { | ||
wire.on('have', pieceIndex => { | ||
// peer has sent you a have message | ||
@@ -152,3 +152,3 @@ }) | ||
// request a block from a peer | ||
wire.request(pieceIndex, offset, length, function (err, block) { | ||
wire.request(pieceIndex, offset, length, (err, block) => { | ||
if (err) { | ||
@@ -165,3 +165,3 @@ // there was an error (peer has started choking us etc) | ||
// receive a request from a peer | ||
wire.on('request', function (pieceIndex, offset, length, callback) { | ||
wire.on('request', (pieceIndex, offset, length, callback) => { | ||
// ... read block ... | ||
@@ -192,3 +192,3 @@ callback(null, block) // respond back to the peer | ||
wire.port(dhtPort) | ||
wire.on('port', function (dhtPort) { | ||
wire.on('port', dhtPort => { | ||
// peer has sent a port to us | ||
@@ -212,3 +212,3 @@ }) | ||
wire.setKeepAlive(true) | ||
wire.on('keep-alive', function () { | ||
wire.on('keep-alive', () => { | ||
// peer sent a keep alive - just ignore it | ||
@@ -253,6 +253,6 @@ }) | ||
wire.on('download', function (numberOfBytes) { | ||
wire.on('download', numberOfBytes => { | ||
... | ||
}) | ||
wire.on('upload', function (numberOfBytes) { | ||
wire.on('upload', numberOfBytes => { | ||
... | ||
@@ -283,8 +283,8 @@ }) | ||
```js | ||
var Protocol = require('bittorrent-protocol') | ||
var net = require('net') | ||
var ut_metadata = require('ut_metadata') | ||
const Protocol = require('bittorrent-protocol') | ||
const net = require('net') | ||
const ut_metadata = require('ut_metadata') | ||
net.createServer(function (socket) { | ||
var wire = new Protocol() | ||
net.createServer(socket => { | ||
const wire = new Protocol() | ||
socket.pipe(wire).pipe(socket) | ||
@@ -301,3 +301,3 @@ | ||
// 'metadata' event will fire when the metadata arrives and is verified to be correct! | ||
wire.ut_metadata.on('metadata', function (metadata) { | ||
wire.ut_metadata.on('metadata', metadata => { | ||
// got metadata! | ||
@@ -312,3 +312,3 @@ | ||
// probably not going to arrive for one of the above reasons. | ||
wire.ut_metadata.on('warning', function (err) { | ||
wire.ut_metadata.on('warning', err => { | ||
console.log(err.message) | ||
@@ -318,3 +318,3 @@ }) | ||
// handle handshake | ||
wire.on('handshake', function (infoHash, peerId) { | ||
wire.on('handshake', (infoHash, peerId) => { | ||
// receive a handshake (infoHash and peerId are hex strings) | ||
@@ -321,0 +321,0 @@ wire.handshake(new Buffer('my info hash'), new Buffer('my peer id')) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
8
638
33170
- Removedinherits@^2.0.1
- Removedsafe-buffer@^5.1.1