bittorrent-protocol
Advanced tools
Comparing version 0.3.1 to 0.4.0
410
index.js
module.exports = Wire | ||
var bitfield = require('bitfield') | ||
var BitField = require('bitfield') | ||
var bncode = require('bncode') | ||
@@ -18,13 +18,2 @@ var stream = require('stream') | ||
function pull (requests, piece, offset, length) { | ||
for (var i = 0; i < requests.length; i++) { | ||
var req = requests[i] | ||
if (req.piece !== piece || req.offset !== offset || req.length !== length) continue | ||
if (i === 0) requests.shift() | ||
else requests.splice(i, 1) | ||
return req | ||
} | ||
return null | ||
} | ||
function Request (piece, offset, length, callback) { | ||
@@ -35,3 +24,2 @@ this.piece = piece | ||
this.callback = callback | ||
this.timeout = null | ||
} | ||
@@ -56,96 +44,51 @@ | ||
self.requests = [] | ||
self.peerRequests = [] | ||
self.uploaded = 0 | ||
self.downloaded = 0 | ||
self.requests = [] | ||
self.peerRequests = [] | ||
self._keepAlive = null | ||
self._finished = false | ||
self._timeout = null | ||
self._timeoutMs = 0 | ||
self.on('finish', function () { | ||
self._finished = true | ||
self.push(null) // cannot be half open | ||
clearInterval(self._keepAlive) | ||
self._parse(Number.MAX_VALUE, function () {}) | ||
while (self.peerRequests.length) | ||
self.peerRequests.pop() | ||
while (self.requests.length) | ||
self._callback(self.requests.shift(), new Error('wire is closed'), null) | ||
}) | ||
self._buffer = [] | ||
self._bufferSize = 0 | ||
self._parser = null | ||
self._parserSize = 0 | ||
var ontimeout = function () { | ||
self._callback(self.requests.shift(), new Error('request has timed out'), null) | ||
self.emit('timeout') | ||
} | ||
self.on('finish', self._onfinish) | ||
self._timeout = 0 | ||
self._ontimeout = ontimeout | ||
self._parseHandshake() | ||
} | ||
var onmessagelength = function (buffer) { | ||
var length = buffer.readUInt32BE(0) | ||
if (length) return self._parse(length, onmessage) | ||
self._parse(4, onmessagelength) | ||
self.emit('keep-alive') | ||
} | ||
/** | ||
* Set whether to send a "keep-alive" ping (sent every 60s) | ||
* @param {boolean} enable | ||
*/ | ||
Wire.prototype.setKeepAlive = function (enable) { | ||
clearInterval(this._keepAlive) | ||
if (enable === false) return | ||
this._keepAlive = setInterval(this._push.bind(this, MESSAGE_KEEP_ALIVE), 60000) | ||
} | ||
var onmessage = function (buffer) { | ||
self._parse(4, onmessagelength) | ||
switch (buffer[0]) { | ||
case 0: | ||
return self._onchoke() | ||
case 1: | ||
return self._onunchoke() | ||
case 2: | ||
return self._oninterested() | ||
case 3: | ||
return self._onuninterested() | ||
case 4: | ||
return self._onhave(buffer.readUInt32BE(1)) | ||
case 5: | ||
return self._onbitfield(buffer.slice(1)) | ||
case 6: | ||
return self._onrequest(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.readUInt32BE(9)) | ||
case 7: | ||
return self._onpiece(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.slice(9)) | ||
case 8: | ||
return self._oncancel(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.readUInt32BE(9)) | ||
case 9: | ||
return self._onport(buffer.readUInt16BE(1)) | ||
case 20: | ||
return self._onextended(bncode.decode(buffer)) | ||
} | ||
self.emit('unknownmessage', buffer) | ||
} | ||
/** | ||
* Set the amount of time to wait before considering a request to be "timed out" | ||
* @param {number} ms | ||
*/ | ||
Wire.prototype.setTimeout = function (ms) { | ||
this._clearTimeout() | ||
this._timeoutMs = ms | ||
this._updateTimeout() | ||
} | ||
self._buffer = [] | ||
self._bufferSize = 0 | ||
self._parser = null | ||
self._parserSize = 0 | ||
self._parse(1, function (buffer) { | ||
var pstrlen = buffer.readUInt8(0) | ||
self._parse(pstrlen + 48, function (handshake) { | ||
var protocol = handshake.slice(0, pstrlen) | ||
if (protocol.toString() !== 'BitTorrent protocol') { | ||
self.emit('error', 'Wire not speaking BitTorrent protocol: ', protocol) | ||
self.destroy() | ||
return | ||
} | ||
handshake = handshake.slice(pstrlen) | ||
self._onhandshake(handshake.slice(8, 28), handshake.slice(28, 48), { | ||
dht: !!(handshake[7] & 0x01), // see bep_0005 | ||
extended: !!(handshake[5] & 0x10) // see bep_0010 | ||
}) | ||
self._parse(4, onmessagelength) | ||
}) | ||
}) | ||
Wire.prototype.destroy = function () { | ||
this.emit('close') | ||
this.end() | ||
} | ||
// | ||
// MESSAGES | ||
// OUTGOING MESSAGES | ||
// | ||
@@ -168,8 +111,15 @@ | ||
var reserved = new Buffer(MESSAGE_RESERVED) | ||
if (extensions && extensions.dht) reserved[7] |= 1 | ||
reserved[5] |= 0x10 // enable extended message | ||
// enable extended message | ||
reserved[5] |= 0x10 | ||
if (extensions && extensions.dht) | ||
reserved[7] |= 1 | ||
this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHash, peerId])) | ||
} | ||
/** | ||
* Message "choke": <len=0001><id=0> | ||
*/ | ||
Wire.prototype.choke = function () { | ||
@@ -182,2 +132,5 @@ if (this.amChoking) return | ||
/** | ||
* Message "unchoke": <len=0001><id=1> | ||
*/ | ||
Wire.prototype.unchoke = function () { | ||
@@ -189,2 +142,5 @@ if (!this.amChoking) return | ||
/** | ||
* Message "interested": <len=0001><id=2> | ||
*/ | ||
Wire.prototype.interested = function () { | ||
@@ -196,2 +152,5 @@ if (this.amInterested) return | ||
/** | ||
* Message "uninterested": <len=0001><id=3> | ||
*/ | ||
Wire.prototype.uninterested = function () { | ||
@@ -203,2 +162,6 @@ if (!this.amInterested) return | ||
/** | ||
* Message "have": <len=0005><id=4><piece index> | ||
* @param {number} i | ||
*/ | ||
Wire.prototype.have = function (i) { | ||
@@ -208,12 +171,29 @@ this._message(4, [i], null) | ||
/** | ||
* Message "bitfield": <len=0001+X><id=5><bitfield> | ||
* @param {BitField|Buffer} bitfield | ||
*/ | ||
Wire.prototype.bitfield = function (bitfield) { | ||
if (bitfield.buffer) bitfield = bitfield.buffer // support bitfield objects | ||
if (!Buffer.isBuffer(bitfield)) | ||
bitfield = bitfield.buffer | ||
this._message(5, [], bitfield) | ||
} | ||
Wire.prototype.request = function (i, offset, length, callback) { | ||
if (!callback) callback = function () {} | ||
if (this._finished) return callback(new Error('wire is closed')) | ||
if (this.peerChoking) return callback(new Error('peer is choking')) | ||
this.requests.push(new Request(i, offset, length, callback)) | ||
/** | ||
* Message "request": <len=0013><id=6><index><begin><length> | ||
* @param {number} i | ||
* @param {number} offset | ||
* @param {number} length | ||
* @param {function} cb | ||
*/ | ||
Wire.prototype.request = function (i, offset, length, cb) { | ||
if (!cb) cb = function () {} | ||
if (this._finished) | ||
return cb(new Error('wire is closed')) | ||
if (this.peerChoking) | ||
return cb(new Error('peer is choking')) | ||
this.requests.push(new Request(i, offset, length, cb)) | ||
this._updateTimeout() | ||
@@ -223,2 +203,8 @@ this._message(6, [i, offset, length], null) | ||
/** | ||
* Message "piece": <len=0009+X><id=7><index><begin><block> | ||
* @param {number} i | ||
* @param {number} offset | ||
* @param {Buffer} buffer | ||
*/ | ||
Wire.prototype.piece = function (i, offset, buffer) { | ||
@@ -230,8 +216,13 @@ this.uploaded += buffer.length | ||
/** | ||
* Message "cancel": <len=0013><id=8><index><begin><length> | ||
* @param {number} i | ||
* @param {number} offset | ||
* @param {number} length | ||
*/ | ||
Wire.prototype.cancel = function (i, offset, length) { | ||
this._callback(pull(this.requests, i, offset, length), new Error('request was cancelled'), null) | ||
this._callback(this._pull(this.requests, i, offset, length), new Error('request was cancelled'), null) | ||
this._message(8, [i, offset, length], null) | ||
} | ||
/** | ||
@@ -247,33 +238,36 @@ * Message: "port" <len=0003><id=9><listen-port> | ||
Wire.prototype.extended = function (ext_number, msg) { | ||
/** | ||
* Message: "extended" <len=0005+X><id=20><ext-number><payload> | ||
* @param {number} extNumber | ||
* @param {Object} obj | ||
*/ | ||
Wire.prototype.extended = function (extNumber, obj) { | ||
var ext_id = new Buffer(1) | ||
ext_id.writeUInt8(ext_number, 0) | ||
this._message(20, [], Buffer.concat([ext_id, bncode.encode(msg)])) | ||
ext_id.writeUInt8(extNumber, 0) | ||
this._message(20, [], Buffer.concat([ext_id, bncode.encode(obj)])) | ||
} | ||
// | ||
// INCOMING MESSAGES | ||
// | ||
Wire.prototype.setKeepAlive = function (bool) { | ||
clearInterval(this._keepAlive) | ||
if (bool === false) return | ||
this._keepAlive = setInterval(this._push.bind(this, MESSAGE_KEEP_ALIVE), 60000) | ||
Wire.prototype._onkeepalive = function () { | ||
this.emit('keep-alive') | ||
} | ||
Wire.prototype.setTimeout = function (ms, fn) { | ||
if (this.requests.length) clearTimeout(this.requests[0].timeout) | ||
this._timeout = ms | ||
this._updateTimeout() | ||
if (fn) this.on('timeout', fn) | ||
Wire.prototype._onhandshake = function (infoHash, peerId, extensions) { | ||
this.peerExtensions = extensions | ||
this.emit('handshake', infoHash, peerId, extensions) | ||
} | ||
Wire.prototype.destroy = function () { | ||
this.emit('close') | ||
this.end() | ||
Wire.prototype._onchoke = function () { | ||
this.peerChoking = true | ||
this.emit('choke') | ||
while (this.requests.length) | ||
this._callback(this.requests.shift(), new Error('peer is choking'), null) | ||
} | ||
// inbound | ||
Wire.prototype._onhandshake = function (infoHash, peerId, extensions) { | ||
this.peerExtensions = extensions | ||
this.emit('handshake', infoHash, peerId, extensions) | ||
Wire.prototype._onunchoke = function () { | ||
this.peerChoking = false | ||
this.emit('unchoke') | ||
} | ||
@@ -291,27 +285,15 @@ | ||
Wire.prototype._onchoke = function () { | ||
this.peerChoking = true | ||
this.emit('choke') | ||
while (this.requests.length) | ||
this._callback(this.requests.shift(), new Error('peer is choking'), null) | ||
Wire.prototype._onhave = function (i) { | ||
this.peerPieces[i] = true | ||
this.emit('have', i) | ||
} | ||
Wire.prototype._onunchoke = function () { | ||
this.peerChoking = false | ||
this.emit('unchoke') | ||
} | ||
Wire.prototype._onbitfield = function (buffer) { | ||
var pieces = bitfield(buffer) | ||
var pieces = new BitField(buffer) | ||
for (var i = 0; i < 8 * buffer.length; i++) { | ||
this.peerPieces[i] = pieces.get(i) | ||
} | ||
this.emit('bitfield', buffer) | ||
this.emit('bitfield', pieces) | ||
} | ||
Wire.prototype._onhave = function (i) { | ||
this.peerPieces[i] = true | ||
this.emit('have', i) | ||
} | ||
Wire.prototype._onrequest = function (i, offset, length) { | ||
@@ -321,3 +303,3 @@ if (this.amChoking) return | ||
var respond = function (err, buffer) { | ||
if (err || request !== pull(this.peerRequests, i, offset, length)) | ||
if (err || request !== this._pull(this.peerRequests, i, offset, length)) | ||
return | ||
@@ -332,9 +314,4 @@ this.piece(i, offset, buffer) | ||
Wire.prototype._oncancel = function (i, offset, length) { | ||
pull(this.peerRequests, i, offset, length) | ||
this.emit('cancel', i, offset, length) | ||
} | ||
Wire.prototype._onpiece = function (i, offset, buffer) { | ||
this._callback(pull(this.requests, i, offset, buffer.length), null, buffer) | ||
this._callback(this._pull(this.requests, i, offset, buffer.length), null, buffer) | ||
this.downloaded += buffer.length | ||
@@ -345,2 +322,7 @@ this.emit('download', buffer.length) | ||
Wire.prototype._oncancel = function (i, offset, length) { | ||
this._pull(this.peerRequests, i, offset, length) | ||
this.emit('cancel', i, offset, length) | ||
} | ||
Wire.prototype._onport = function (port) { | ||
@@ -354,8 +336,28 @@ this.emit('port', port) | ||
// helpers and streams | ||
Wire.prototype._ontimeout = function () { | ||
this._callback(this.requests.shift(), new Error('request has timed out'), null) | ||
this.emit('timeout') | ||
} | ||
// | ||
// HELPER METHODS | ||
// | ||
Wire.prototype._pull = function (requests, piece, offset, length) { | ||
for (var i = 0; i < requests.length; i++) { | ||
var req = requests[i] | ||
if (req.piece !== piece || req.offset !== offset || req.length !== length) continue | ||
if (i === 0) requests.shift() | ||
else requests.splice(i, 1) | ||
return req | ||
} | ||
return null | ||
} | ||
Wire.prototype._callback = function (request, err, buffer) { | ||
if (!request) return | ||
if (request.timeout) | ||
clearTimeout(request.timeout) | ||
if (!request) | ||
return | ||
this._clearTimeout() | ||
if (!this.peerChoking && !this._finished) | ||
@@ -366,8 +368,22 @@ this._updateTimeout() | ||
Wire.prototype._clearTimeout = function () { | ||
if (!this._timeout) | ||
return | ||
clearTimeout(this._timeout) | ||
this._timeout = null | ||
} | ||
Wire.prototype._updateTimeout = function () { | ||
if (!this._timeout || !this.requests.length || this.requests[0].timeout) | ||
if (!this._timeoutMs || !this.requests.length || this._timeout) | ||
return | ||
this.requests[0].timeout = setTimeout(this._ontimeout, this._timeout) | ||
this._timeout = setTimeout(this._ontimeout.bind(this), this._timeoutMs) | ||
} | ||
Wire.prototype._parse = function (size, parser) { | ||
this._parserSize = size | ||
this._parser = parser | ||
} | ||
Wire.prototype._message = function (id, numbers, data) { | ||
@@ -384,5 +400,82 @@ var dataLength = data ? data.length : 0 | ||
this._push(buffer) | ||
if (data) this._push(data) | ||
if (data) | ||
this._push(data) | ||
} | ||
Wire.prototype._onmessagelength = function (buffer) { | ||
var length = buffer.readUInt32BE(0) | ||
if (length > 0) { | ||
this._parse(length, this._onmessage) | ||
} else { | ||
this._onkeepalive() | ||
this._parse(4, this._onmessagelength) | ||
} | ||
} | ||
Wire.prototype._onmessage = function (buffer) { | ||
this._parse(4, this._onmessagelength) | ||
switch (buffer[0]) { | ||
case 0: | ||
return this._onchoke() | ||
case 1: | ||
return this._onunchoke() | ||
case 2: | ||
return this._oninterested() | ||
case 3: | ||
return this._onuninterested() | ||
case 4: | ||
return this._onhave(buffer.readUInt32BE(1)) | ||
case 5: | ||
return this._onbitfield(buffer.slice(1)) | ||
case 6: | ||
return this._onrequest(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.readUInt32BE(9)) | ||
case 7: | ||
return this._onpiece(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.slice(9)) | ||
case 8: | ||
return this._oncancel(buffer.readUInt32BE(1), | ||
buffer.readUInt32BE(5), buffer.readUInt32BE(9)) | ||
case 9: | ||
return this._onport(buffer.readUInt16BE(1)) | ||
case 20: | ||
return this._onextended(bncode.decode(buffer)) | ||
} | ||
this.emit('unknownmessage', buffer) | ||
} | ||
Wire.prototype._parseHandshake = function () { | ||
this._parse(1, function (buffer) { | ||
var pstrlen = buffer.readUInt8(0) | ||
this._parse(pstrlen + 48, function (handshake) { | ||
var protocol = handshake.slice(0, pstrlen) | ||
if (protocol.toString() !== 'BitTorrent protocol') { | ||
this.emit('error', 'Wire not speaking BitTorrent protocol: ', protocol) | ||
this.destroy() | ||
return | ||
} | ||
handshake = handshake.slice(pstrlen) | ||
this._onhandshake(handshake.slice(8, 28), handshake.slice(28, 48), { | ||
dht: !!(handshake[7] & 0x01), // see bep_0005 | ||
extended: !!(handshake[5] & 0x10) // see bep_0010 | ||
}) | ||
this._parse(4, this._onmessagelength) | ||
}.bind(this)) | ||
}.bind(this)) | ||
} | ||
Wire.prototype._onfinish = function () { | ||
this._finished = true | ||
this.push(null) // stream cannot be half open | ||
clearInterval(this._keepAlive) | ||
this._parse(Number.MAX_VALUE, function () {}) | ||
this.peerRequests = [] | ||
while (this.requests.length) | ||
this._callback(this.requests.shift(), new Error('wire was closed'), null) | ||
} | ||
// | ||
// STREAM METHODS | ||
// | ||
/** | ||
@@ -397,7 +490,2 @@ * Push a message to the remote peer. | ||
Wire.prototype._parse = function (size, parser) { | ||
this._parserSize = size | ||
this._parser = parser | ||
} | ||
/** | ||
@@ -409,3 +497,3 @@ * Duplex stream method. Called whenever the upstream has data for us. | ||
*/ | ||
Wire.prototype._write = function (data, encoding, callback) { | ||
Wire.prototype._write = function (data, encoding, cb) { | ||
this._bufferSize += data.length | ||
@@ -425,3 +513,3 @@ this._buffer.push(data) | ||
callback(null) // Signal that we're ready for more data | ||
cb(null) // Signal that we're ready for more data | ||
} | ||
@@ -428,0 +516,0 @@ |
{ | ||
"name": "bittorrent-protocol", | ||
"version": "0.3.1", | ||
"version": "0.4.0", | ||
"description": "Simple, robust, BitTorrent peer wire protocol implementation", | ||
@@ -40,3 +40,7 @@ "main": "index.js", | ||
], | ||
"author": "Feross Aboukhadijeh <feross@feross.org> (http://feross.org/)", | ||
"author": { | ||
"name": "Feross Aboukhadijeh", | ||
"email": "feross@feross.org", | ||
"url": "http://feross.org/" | ||
}, | ||
"license": "MIT", | ||
@@ -43,0 +47,0 @@ "bugs": { |
@@ -9,3 +9,3 @@ # bittorrent-protocol | ||
Node.js implementation of the [BitTorrent peer wire protocol specification](https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29). This is a fork of [peer-wire-protocol](https://github.com/mafintosh/peer-wire-protocol). | ||
Node.js implementation of the [BitTorrent peer wire protocol specification](https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29). | ||
@@ -194,1 +194,3 @@ The protocol is the main communication layer for BitTorrent file transfer and is used by [WebTorrent](https://github.com/feross/WebTorrent). | ||
MIT | ||
This was originally forked from [peer-wire-protocol](https://github.com/mafintosh/peer-wire-protocol) which is also MIT licensed. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
24170
576
195