Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bittorrent-protocol

Package Overview
Dependencies
Maintainers
1
Versions
100
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bittorrent-protocol - npm Package Compare versions

Comparing version 0.3.1 to 0.4.0

410

index.js
module.exports = Wire
var bitfield = require('bitfield')
var BitField = require('bitfield')
var bncode = require('bncode')

@@ -18,13 +18,2 @@ var stream = require('stream')

function pull (requests, piece, offset, length) {
for (var i = 0; i < requests.length; i++) {
var req = requests[i]
if (req.piece !== piece || req.offset !== offset || req.length !== length) continue
if (i === 0) requests.shift()
else requests.splice(i, 1)
return req
}
return null
}
function Request (piece, offset, length, callback) {

@@ -35,3 +24,2 @@ this.piece = piece

this.callback = callback
this.timeout = null
}

@@ -56,96 +44,51 @@

self.requests = []
self.peerRequests = []
self.uploaded = 0
self.downloaded = 0
self.requests = []
self.peerRequests = []
self._keepAlive = null
self._finished = false
self._timeout = null
self._timeoutMs = 0
self.on('finish', function () {
self._finished = true
self.push(null) // cannot be half open
clearInterval(self._keepAlive)
self._parse(Number.MAX_VALUE, function () {})
while (self.peerRequests.length)
self.peerRequests.pop()
while (self.requests.length)
self._callback(self.requests.shift(), new Error('wire is closed'), null)
})
self._buffer = []
self._bufferSize = 0
self._parser = null
self._parserSize = 0
var ontimeout = function () {
self._callback(self.requests.shift(), new Error('request has timed out'), null)
self.emit('timeout')
}
self.on('finish', self._onfinish)
self._timeout = 0
self._ontimeout = ontimeout
self._parseHandshake()
}
var onmessagelength = function (buffer) {
var length = buffer.readUInt32BE(0)
if (length) return self._parse(length, onmessage)
self._parse(4, onmessagelength)
self.emit('keep-alive')
}
/**
* Set whether to send a "keep-alive" ping (sent every 60s)
* @param {boolean} enable
*/
Wire.prototype.setKeepAlive = function (enable) {
clearInterval(this._keepAlive)
if (enable === false) return
this._keepAlive = setInterval(this._push.bind(this, MESSAGE_KEEP_ALIVE), 60000)
}
var onmessage = function (buffer) {
self._parse(4, onmessagelength)
switch (buffer[0]) {
case 0:
return self._onchoke()
case 1:
return self._onunchoke()
case 2:
return self._oninterested()
case 3:
return self._onuninterested()
case 4:
return self._onhave(buffer.readUInt32BE(1))
case 5:
return self._onbitfield(buffer.slice(1))
case 6:
return self._onrequest(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.readUInt32BE(9))
case 7:
return self._onpiece(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.slice(9))
case 8:
return self._oncancel(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.readUInt32BE(9))
case 9:
return self._onport(buffer.readUInt16BE(1))
case 20:
return self._onextended(bncode.decode(buffer))
}
self.emit('unknownmessage', buffer)
}
/**
* Set the amount of time to wait before considering a request to be "timed out"
* @param {number} ms
*/
Wire.prototype.setTimeout = function (ms) {
this._clearTimeout()
this._timeoutMs = ms
this._updateTimeout()
}
self._buffer = []
self._bufferSize = 0
self._parser = null
self._parserSize = 0
self._parse(1, function (buffer) {
var pstrlen = buffer.readUInt8(0)
self._parse(pstrlen + 48, function (handshake) {
var protocol = handshake.slice(0, pstrlen)
if (protocol.toString() !== 'BitTorrent protocol') {
self.emit('error', 'Wire not speaking BitTorrent protocol: ', protocol)
self.destroy()
return
}
handshake = handshake.slice(pstrlen)
self._onhandshake(handshake.slice(8, 28), handshake.slice(28, 48), {
dht: !!(handshake[7] & 0x01), // see bep_0005
extended: !!(handshake[5] & 0x10) // see bep_0010
})
self._parse(4, onmessagelength)
})
})
Wire.prototype.destroy = function () {
this.emit('close')
this.end()
}
//
// MESSAGES
// OUTGOING MESSAGES
//

@@ -168,8 +111,15 @@

var reserved = new Buffer(MESSAGE_RESERVED)
if (extensions && extensions.dht) reserved[7] |= 1
reserved[5] |= 0x10 // enable extended message
// enable extended message
reserved[5] |= 0x10
if (extensions && extensions.dht)
reserved[7] |= 1
this._push(Buffer.concat([MESSAGE_PROTOCOL, reserved, infoHash, peerId]))
}
/**
* Message "choke": <len=0001><id=0>
*/
Wire.prototype.choke = function () {

@@ -182,2 +132,5 @@ if (this.amChoking) return

/**
* Message "unchoke": <len=0001><id=1>
*/
Wire.prototype.unchoke = function () {

@@ -189,2 +142,5 @@ if (!this.amChoking) return

/**
* Message "interested": <len=0001><id=2>
*/
Wire.prototype.interested = function () {

@@ -196,2 +152,5 @@ if (this.amInterested) return

/**
* Message "uninterested": <len=0001><id=3>
*/
Wire.prototype.uninterested = function () {

@@ -203,2 +162,6 @@ if (!this.amInterested) return

/**
* Message "have": <len=0005><id=4><piece index>
* @param {number} i
*/
Wire.prototype.have = function (i) {

@@ -208,12 +171,29 @@ this._message(4, [i], null)

/**
* Message "bitfield": <len=0001+X><id=5><bitfield>
* @param {BitField|Buffer} bitfield
*/
Wire.prototype.bitfield = function (bitfield) {
if (bitfield.buffer) bitfield = bitfield.buffer // support bitfield objects
if (!Buffer.isBuffer(bitfield))
bitfield = bitfield.buffer
this._message(5, [], bitfield)
}
Wire.prototype.request = function (i, offset, length, callback) {
if (!callback) callback = function () {}
if (this._finished) return callback(new Error('wire is closed'))
if (this.peerChoking) return callback(new Error('peer is choking'))
this.requests.push(new Request(i, offset, length, callback))
/**
* Message "request": <len=0013><id=6><index><begin><length>
* @param {number} i
* @param {number} offset
* @param {number} length
* @param {function} cb
*/
Wire.prototype.request = function (i, offset, length, cb) {
if (!cb) cb = function () {}
if (this._finished)
return cb(new Error('wire is closed'))
if (this.peerChoking)
return cb(new Error('peer is choking'))
this.requests.push(new Request(i, offset, length, cb))
this._updateTimeout()

@@ -223,2 +203,8 @@ this._message(6, [i, offset, length], null)

/**
* Message "piece": <len=0009+X><id=7><index><begin><block>
* @param {number} i
* @param {number} offset
* @param {Buffer} buffer
*/
Wire.prototype.piece = function (i, offset, buffer) {

@@ -230,8 +216,13 @@ this.uploaded += buffer.length

/**
* Message "cancel": <len=0013><id=8><index><begin><length>
* @param {number} i
* @param {number} offset
* @param {number} length
*/
Wire.prototype.cancel = function (i, offset, length) {
this._callback(pull(this.requests, i, offset, length), new Error('request was cancelled'), null)
this._callback(this._pull(this.requests, i, offset, length), new Error('request was cancelled'), null)
this._message(8, [i, offset, length], null)
}
/**

@@ -247,33 +238,36 @@ * Message: "port" <len=0003><id=9><listen-port>

Wire.prototype.extended = function (ext_number, msg) {
/**
* Message: "extended" <len=0005+X><id=20><ext-number><payload>
* @param {number} extNumber
* @param {Object} obj
*/
Wire.prototype.extended = function (extNumber, obj) {
var ext_id = new Buffer(1)
ext_id.writeUInt8(ext_number, 0)
this._message(20, [], Buffer.concat([ext_id, bncode.encode(msg)]))
ext_id.writeUInt8(extNumber, 0)
this._message(20, [], Buffer.concat([ext_id, bncode.encode(obj)]))
}
//
// INCOMING MESSAGES
//
Wire.prototype.setKeepAlive = function (bool) {
clearInterval(this._keepAlive)
if (bool === false) return
this._keepAlive = setInterval(this._push.bind(this, MESSAGE_KEEP_ALIVE), 60000)
Wire.prototype._onkeepalive = function () {
this.emit('keep-alive')
}
Wire.prototype.setTimeout = function (ms, fn) {
if (this.requests.length) clearTimeout(this.requests[0].timeout)
this._timeout = ms
this._updateTimeout()
if (fn) this.on('timeout', fn)
Wire.prototype._onhandshake = function (infoHash, peerId, extensions) {
this.peerExtensions = extensions
this.emit('handshake', infoHash, peerId, extensions)
}
Wire.prototype.destroy = function () {
this.emit('close')
this.end()
Wire.prototype._onchoke = function () {
this.peerChoking = true
this.emit('choke')
while (this.requests.length)
this._callback(this.requests.shift(), new Error('peer is choking'), null)
}
// inbound
Wire.prototype._onhandshake = function (infoHash, peerId, extensions) {
this.peerExtensions = extensions
this.emit('handshake', infoHash, peerId, extensions)
Wire.prototype._onunchoke = function () {
this.peerChoking = false
this.emit('unchoke')
}

@@ -291,27 +285,15 @@

Wire.prototype._onchoke = function () {
this.peerChoking = true
this.emit('choke')
while (this.requests.length)
this._callback(this.requests.shift(), new Error('peer is choking'), null)
Wire.prototype._onhave = function (i) {
this.peerPieces[i] = true
this.emit('have', i)
}
Wire.prototype._onunchoke = function () {
this.peerChoking = false
this.emit('unchoke')
}
Wire.prototype._onbitfield = function (buffer) {
var pieces = bitfield(buffer)
var pieces = new BitField(buffer)
for (var i = 0; i < 8 * buffer.length; i++) {
this.peerPieces[i] = pieces.get(i)
}
this.emit('bitfield', buffer)
this.emit('bitfield', pieces)
}
Wire.prototype._onhave = function (i) {
this.peerPieces[i] = true
this.emit('have', i)
}
Wire.prototype._onrequest = function (i, offset, length) {

@@ -321,3 +303,3 @@ if (this.amChoking) return

var respond = function (err, buffer) {
if (err || request !== pull(this.peerRequests, i, offset, length))
if (err || request !== this._pull(this.peerRequests, i, offset, length))
return

@@ -332,9 +314,4 @@ this.piece(i, offset, buffer)

Wire.prototype._oncancel = function (i, offset, length) {
pull(this.peerRequests, i, offset, length)
this.emit('cancel', i, offset, length)
}
Wire.prototype._onpiece = function (i, offset, buffer) {
this._callback(pull(this.requests, i, offset, buffer.length), null, buffer)
this._callback(this._pull(this.requests, i, offset, buffer.length), null, buffer)
this.downloaded += buffer.length

@@ -345,2 +322,7 @@ this.emit('download', buffer.length)

Wire.prototype._oncancel = function (i, offset, length) {
this._pull(this.peerRequests, i, offset, length)
this.emit('cancel', i, offset, length)
}
Wire.prototype._onport = function (port) {

@@ -354,8 +336,28 @@ this.emit('port', port)

// helpers and streams
Wire.prototype._ontimeout = function () {
this._callback(this.requests.shift(), new Error('request has timed out'), null)
this.emit('timeout')
}
//
// HELPER METHODS
//
Wire.prototype._pull = function (requests, piece, offset, length) {
for (var i = 0; i < requests.length; i++) {
var req = requests[i]
if (req.piece !== piece || req.offset !== offset || req.length !== length) continue
if (i === 0) requests.shift()
else requests.splice(i, 1)
return req
}
return null
}
Wire.prototype._callback = function (request, err, buffer) {
if (!request) return
if (request.timeout)
clearTimeout(request.timeout)
if (!request)
return
this._clearTimeout()
if (!this.peerChoking && !this._finished)

@@ -366,8 +368,22 @@ this._updateTimeout()

Wire.prototype._clearTimeout = function () {
if (!this._timeout)
return
clearTimeout(this._timeout)
this._timeout = null
}
Wire.prototype._updateTimeout = function () {
if (!this._timeout || !this.requests.length || this.requests[0].timeout)
if (!this._timeoutMs || !this.requests.length || this._timeout)
return
this.requests[0].timeout = setTimeout(this._ontimeout, this._timeout)
this._timeout = setTimeout(this._ontimeout.bind(this), this._timeoutMs)
}
Wire.prototype._parse = function (size, parser) {
this._parserSize = size
this._parser = parser
}
Wire.prototype._message = function (id, numbers, data) {

@@ -384,5 +400,82 @@ var dataLength = data ? data.length : 0

this._push(buffer)
if (data) this._push(data)
if (data)
this._push(data)
}
Wire.prototype._onmessagelength = function (buffer) {
var length = buffer.readUInt32BE(0)
if (length > 0) {
this._parse(length, this._onmessage)
} else {
this._onkeepalive()
this._parse(4, this._onmessagelength)
}
}
Wire.prototype._onmessage = function (buffer) {
this._parse(4, this._onmessagelength)
switch (buffer[0]) {
case 0:
return this._onchoke()
case 1:
return this._onunchoke()
case 2:
return this._oninterested()
case 3:
return this._onuninterested()
case 4:
return this._onhave(buffer.readUInt32BE(1))
case 5:
return this._onbitfield(buffer.slice(1))
case 6:
return this._onrequest(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.readUInt32BE(9))
case 7:
return this._onpiece(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.slice(9))
case 8:
return this._oncancel(buffer.readUInt32BE(1),
buffer.readUInt32BE(5), buffer.readUInt32BE(9))
case 9:
return this._onport(buffer.readUInt16BE(1))
case 20:
return this._onextended(bncode.decode(buffer))
}
this.emit('unknownmessage', buffer)
}
Wire.prototype._parseHandshake = function () {
this._parse(1, function (buffer) {
var pstrlen = buffer.readUInt8(0)
this._parse(pstrlen + 48, function (handshake) {
var protocol = handshake.slice(0, pstrlen)
if (protocol.toString() !== 'BitTorrent protocol') {
this.emit('error', 'Wire not speaking BitTorrent protocol: ', protocol)
this.destroy()
return
}
handshake = handshake.slice(pstrlen)
this._onhandshake(handshake.slice(8, 28), handshake.slice(28, 48), {
dht: !!(handshake[7] & 0x01), // see bep_0005
extended: !!(handshake[5] & 0x10) // see bep_0010
})
this._parse(4, this._onmessagelength)
}.bind(this))
}.bind(this))
}
Wire.prototype._onfinish = function () {
this._finished = true
this.push(null) // stream cannot be half open
clearInterval(this._keepAlive)
this._parse(Number.MAX_VALUE, function () {})
this.peerRequests = []
while (this.requests.length)
this._callback(this.requests.shift(), new Error('wire was closed'), null)
}
//
// STREAM METHODS
//
/**

@@ -397,7 +490,2 @@ * Push a message to the remote peer.

Wire.prototype._parse = function (size, parser) {
this._parserSize = size
this._parser = parser
}
/**

@@ -409,3 +497,3 @@ * Duplex stream method. Called whenever the upstream has data for us.

*/
Wire.prototype._write = function (data, encoding, callback) {
Wire.prototype._write = function (data, encoding, cb) {
this._bufferSize += data.length

@@ -425,3 +513,3 @@ this._buffer.push(data)

callback(null) // Signal that we're ready for more data
cb(null) // Signal that we're ready for more data
}

@@ -428,0 +516,0 @@

{
"name": "bittorrent-protocol",
"version": "0.3.1",
"version": "0.4.0",
"description": "Simple, robust, BitTorrent peer wire protocol implementation",

@@ -40,3 +40,7 @@ "main": "index.js",

],
"author": "Feross Aboukhadijeh <feross@feross.org> (http://feross.org/)",
"author": {
"name": "Feross Aboukhadijeh",
"email": "feross@feross.org",
"url": "http://feross.org/"
},
"license": "MIT",

@@ -43,0 +47,0 @@ "bugs": {

@@ -9,3 +9,3 @@ # bittorrent-protocol

Node.js implementation of the [BitTorrent peer wire protocol specification](https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29). This is a fork of [peer-wire-protocol](https://github.com/mafintosh/peer-wire-protocol).
Node.js implementation of the [BitTorrent peer wire protocol specification](https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29).

@@ -194,1 +194,3 @@ The protocol is the main communication layer for BitTorrent file transfer and is used by [WebTorrent](https://github.com/feross/WebTorrent).

MIT
This was originally forked from [peer-wire-protocol](https://github.com/mafintosh/peer-wire-protocol) which is also MIT licensed.
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc