bittorrent-dht
Advanced tools
Comparing version 5.1.3 to 6.0.0
1921
client.js
module.exports = DHT | ||
module.exports.dgram = require('dgram') // allow override for chrome apps (chrome-dgram) | ||
var addrToIPPort = require('addr-to-ip-port') | ||
var inherits = require('inherits') | ||
var EventEmitter = require('events').EventEmitter | ||
var krpc = require('k-rpc') | ||
var KBucket = require('k-bucket') | ||
var crypto = require('crypto') | ||
var bencode = require('bencode') | ||
var bufferEqual = require('buffer-equal') | ||
var compact2string = require('compact2string') | ||
var equals = require('buffer-equals') | ||
var LRU = require('lru') | ||
var debug = require('debug')('bittorrent-dht') | ||
var dns = require('dns') | ||
var EventEmitter = require('events').EventEmitter | ||
var hat = require('hat') | ||
var inherits = require('inherits') | ||
var isIP = require('is-ip') | ||
var KBucket = require('k-bucket') | ||
var networkAddress = require('network-address') | ||
var once = require('once') | ||
var os = require('os') | ||
var parallel = require('run-parallel') | ||
var publicAddress = require('./lib/public-address') | ||
var sha1 = require('simple-sha1') | ||
var string2compact = require('string2compact') | ||
var BOOTSTRAP_NODES = [ | ||
'router.bittorrent.com:6881', | ||
'router.utorrent.com:6881', | ||
'dht.transmissionbt.com:6881' | ||
] | ||
var BOOTSTRAP_TIMEOUT = 10000 | ||
var K = module.exports.K = 20 // number of nodes per bucket | ||
var MAX_CONCURRENCY = 6 // α from Kademlia paper | ||
var ROTATE_INTERVAL = 5 * 60 * 1000 // rotate secrets every 5 minutes | ||
var SECRET_ENTROPY = 160 // entropy of token secrets | ||
var SEND_TIMEOUT = 2000 | ||
var UINT16 = 0xffff | ||
var MESSAGE_TYPE = module.exports.MESSAGE_TYPE = { | ||
QUERY: 'q', | ||
RESPONSE: 'r', | ||
ERROR: 'e' | ||
} | ||
var ERROR_TYPE = module.exports.ERROR_TYPE = { | ||
GENERIC: 201, | ||
SERVER: 202, | ||
PROTOCOL: 203, // malformed packet, invalid arguments, or bad token | ||
METHOD_UNKNOWN: 204 | ||
} | ||
var LOCAL_HOSTS = { 4: [], 6: [] } | ||
var interfaces = os.networkInterfaces() | ||
for (var i in interfaces) { | ||
for (var j = 0; j < interfaces[i].length; j++) { | ||
var face = interfaces[i][j] | ||
if (face.family === 'IPv4') LOCAL_HOSTS[4].push(face.address) | ||
if (face.family === 'IPv6') LOCAL_HOSTS[6].push(face.address) | ||
} | ||
} | ||
inherits(DHT, EventEmitter) | ||
/** | ||
* A DHT client implementation. The DHT is the main peer discovery layer for BitTorrent, | ||
* which allows for trackerless torrents. | ||
* @param {string|Buffer} opts | ||
*/ | ||
function DHT (opts) { | ||
if (!(this instanceof DHT)) return new DHT(opts) | ||
if (!opts) opts = {} | ||
var self = this | ||
if (!(self instanceof DHT)) return new DHT(opts) | ||
EventEmitter.call(self) | ||
if (!debug.enabled) self.setMaxListeners(0) | ||
if (!opts) opts = {} | ||
this._tables = LRU({maxAge: ROTATE_INTERVAL, max: opts.maxTables || 1000}) | ||
this._values = LRU(opts.maxValues || 1000) | ||
this._peers = new PeerStore(opts.maxPeers || 10000) | ||
self.nodeId = idToHexString(opts.nodeId || hat(160)) | ||
self.nodeIdBuffer = idToBuffer(self.nodeId) | ||
this._secrets = null | ||
this._rpc = krpc(opts) | ||
this._rpc.on('query', onquery) | ||
this._rpc.on('node', onnode) | ||
this._rpc.on('warning', onwarning) | ||
this._rpc.on('listening', onlistening) | ||
this._rotateSecrets() | ||
this._verify = opts.verify || null | ||
this._interval = setInterval(rotateSecrets, ROTATE_INTERVAL) | ||
self._debug('new DHT %s', self.nodeId) | ||
this.listening = false | ||
this.destroyed = false | ||
this.nodeId = this._rpc.id | ||
this.nodes = this._rpc.nodes | ||
self.ready = false | ||
self.listening = false | ||
self.destroyed = false | ||
process.nextTick(bootstrap) | ||
self._binding = false | ||
self._port = null | ||
self._ipv = opts.ipv || 4 | ||
self._rotateInterval = null | ||
self._verify = opts.verify | ||
EventEmitter.call(this) | ||
this._debug('new DHT %s', this.nodeId) | ||
/** | ||
* Query Handlers table | ||
* @type {Object} string -> function | ||
*/ | ||
self.queryHandler = { | ||
ping: self._onPing, | ||
find_node: self._onFindNode, | ||
get_peers: self._onGetPeers, | ||
announce_peer: self._onAnnouncePeer, | ||
put: self._onPut, | ||
get: self._onGet | ||
function onlistening () { | ||
self.listening = true | ||
self._debug('listening %d', self.address().port) | ||
self.emit('listening') | ||
} | ||
/** | ||
* Routing table | ||
* @type {KBucket} | ||
*/ | ||
self.nodes = new KBucket({ | ||
localNodeId: self.nodeIdBuffer, | ||
numberOfNodesPerKBucket: K, | ||
numberOfNodesToPing: MAX_CONCURRENCY | ||
}) | ||
function onquery (query, peer) { | ||
self._onquery(query, peer) | ||
} | ||
/** | ||
* Cache of routing tables used during a lookup. Saved in this object so we can access | ||
* each node's unique token for announces later. | ||
* TODO: Clean up tables after 5 minutes. | ||
* @type {Object} infoHash:string -> KBucket | ||
*/ | ||
self.tables = {} | ||
function rotateSecrets () { | ||
self._rotateSecrets() | ||
} | ||
/** | ||
* Pending transactions (unresolved requests to peers) | ||
* @type {Object} addr:string -> array of pending transactions | ||
*/ | ||
self.transactions = {} | ||
function bootstrap () { | ||
if (!self.destroyed) self._bootstrap(opts.bootstrap !== false) | ||
} | ||
/** | ||
* Peer address data (tracker storage) | ||
* @type {Object} infoHash:string -> Object {index:Object, list:Array.<Buffer>} | ||
*/ | ||
self.peers = {} | ||
function onwarning (err) { | ||
self.emit('warning', err) | ||
} | ||
/** | ||
* Secrets for token generation. | ||
*/ | ||
self.secrets = null | ||
/** | ||
* IP addresses of the local DHT node. Used to store the peer, controlling this DHT | ||
* node, into the local table when `client.announce()` is called. | ||
* @type {Array.<string>} | ||
*/ | ||
self.localAddresses = [ networkAddress.ipv4() ] | ||
publicAddress(function (err, ip) { | ||
if (err) return self._debug('failed to get public ip: %s', err.message || err) | ||
self.localAddresses.push(ip) | ||
}) | ||
// Create socket and attach listeners | ||
self.socket = module.exports.dgram.createSocket('udp' + self._ipv) | ||
self.socket.on('message', self._onData.bind(self)) | ||
self.socket.on('listening', self._onListening.bind(self)) | ||
self.socket.on('error', noop) // throw away errors | ||
self._rotateSecrets() | ||
self._rotateInterval = setInterval(self._rotateSecrets.bind(self), ROTATE_INTERVAL) | ||
if (self._rotateInterval.unref) self._rotateInterval.unref() | ||
process.nextTick(function () { | ||
if (opts.bootstrap === false) { | ||
// Emit `ready` right away because the user does not want to bootstrap. Presumably, | ||
// the user will call addNode() to populate the routing table manually. | ||
self.ready = true | ||
self.emit('ready') | ||
} else if (typeof opts.bootstrap === 'string') { | ||
self._bootstrap([ opts.bootstrap ]) | ||
} else if (Array.isArray(opts.bootstrap)) { | ||
self._bootstrap(fromArray(opts.bootstrap)) | ||
} else { | ||
// opts.bootstrap is undefined or true | ||
self._bootstrap(BOOTSTRAP_NODES) | ||
} | ||
}) | ||
self.on('ready', function () { | ||
self._debug('emit ready') | ||
}) | ||
function onnode (node) { | ||
self.emit('node', node) | ||
} | ||
} | ||
/** | ||
* Start listening for UDP messages on given port. | ||
* @param {number} port | ||
* @param {string} address | ||
* @param {function=} onlistening added as handler for listening event | ||
*/ | ||
DHT.prototype.listen = function (port, address, onlistening) { | ||
DHT.prototype.addNode = function (node) { | ||
if (typeof node === 'string') node = parseAddr(node) | ||
var self = this | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
if (self._binding || self.listening) throw new Error('dht is already listening') | ||
if (typeof port === 'string') { | ||
onlistening = address | ||
address = port | ||
port = undefined | ||
if (node.id) { | ||
node.id = toBuffer(node.id) | ||
var old = !!this._rpc.nodes.get(node.id) | ||
this._rpc.nodes.add(node) | ||
if (!old) this.emit('node', node) | ||
return | ||
} | ||
if (typeof port === 'function') { | ||
onlistening = port | ||
port = undefined | ||
address = undefined | ||
} | ||
if (typeof address === 'function') { | ||
onlistening = address | ||
address = undefined | ||
} | ||
if (onlistening) self.once('listening', onlistening) | ||
self._binding = true | ||
self._debug('listen %s', port) | ||
self.socket.bind(port, address) | ||
this._sendPing(node, function (_, node) { | ||
if (node) self.addNode(node) | ||
}) | ||
} | ||
/** | ||
* Called when DHT is listening for UDP messages. | ||
*/ | ||
DHT.prototype._onListening = function () { | ||
var self = this | ||
self._binding = false | ||
self.listening = true | ||
self._port = self.socket.address().port | ||
self._debug('emit listening %s', self._port) | ||
self.emit('listening') | ||
DHT.prototype.removeNode = function (id) { | ||
this._rpc.nodes.remove({id: toBuffer(id)}) | ||
} | ||
DHT.prototype.address = function () { | ||
var self = this | ||
return self.socket.address() | ||
} | ||
/** | ||
* Announce that the peer, controlling the querying node, is downloading a torrent on a | ||
* port. | ||
* @param {string|Buffer} infoHash | ||
* @param {number} port | ||
* @param {function=} cb | ||
*/ | ||
DHT.prototype.announce = function (infoHash, port, cb) { | ||
var self = this | ||
if (!cb) cb = noop | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
var infoHashBuffer = idToBuffer(infoHash) | ||
infoHash = idToHexString(infoHash) | ||
self._debug('announce %s %s', infoHash, port) | ||
self.localAddresses.forEach(function (address) { | ||
self._addPeer(address + ':' + port, infoHash) | ||
}) | ||
// TODO: it would be nice to not use a table when a lookup is in progress | ||
var table = self.tables[infoHash] | ||
if (table) { | ||
process.nextTick(function () { | ||
onClosest(null, table.closest({ id: infoHashBuffer }, K)) | ||
}) | ||
} else { | ||
self._lookup(infoHash, onClosest) | ||
} | ||
function onClosest (err, closest) { | ||
DHT.prototype._sendPing = function (node, cb) { | ||
this._rpc.query(node, {q: 'ping'}, function (err, pong, node) { | ||
if (err) return cb(err) | ||
closest.forEach(function (contact) { | ||
self._sendAnnouncePeer(contact.addr, infoHashBuffer, port, contact.token) | ||
if (!pong.r || !pong.r.id || !Buffer.isBuffer(pong.r.id) || pong.r.id.length !== 20) { | ||
return cb(new Error('Bad reply')) | ||
} | ||
cb(null, { | ||
id: pong.r.id, | ||
host: node.host || node.address, | ||
port: node.port | ||
}) | ||
self._debug('announce end %s %s', infoHash, port) | ||
cb(null) | ||
} | ||
}) | ||
} | ||
/** | ||
* Write arbitrary mutable and immutable data to the DHT. | ||
* Specified in BEP44: http://bittorrent.org/beps/bep_0044.html | ||
* @param {Object} opts | ||
* @param {function=} cb | ||
*/ | ||
DHT.prototype.toJSON = | ||
DHT.prototype.toArray = function () { | ||
return this._rpc.nodes.toArray().map(toNode) | ||
} | ||
DHT.prototype.put = function (opts, cb) { | ||
var self = this | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
var isMutable = opts.k | ||
if (Buffer.isBuffer(opts) || typeof opts === 'string') opts = {v: opts} | ||
var isMutable = !!opts.k | ||
if (opts.v === undefined) { | ||
@@ -290,3 +121,2 @@ throw new Error('opts.v not given') | ||
} | ||
if (isMutable && opts.cas !== undefined && typeof opts.cas !== 'number') { | ||
@@ -313,1401 +143,544 @@ throw new Error('opts.cas must be an integer if provided') | ||
} | ||
return self._put(opts, cb) | ||
return this._put(opts, cb) | ||
} | ||
/** | ||
* put() without type checks for internal use | ||
* @param {Object} opts | ||
* @param {function=} cb | ||
*/ | ||
DHT.prototype._put = function (opts, cb) { | ||
var self = this | ||
var pending = 0 | ||
var errors = [] | ||
var isMutable = opts.k | ||
var hash = isMutable | ||
? sha1.sync(opts.salt ? Buffer.concat([ opts.salt, opts.k ]) : opts.k) | ||
: sha1.sync(bencode.encode(opts.v)) | ||
var hashBuffer = idToBuffer(hash) | ||
if (!cb) cb = noop | ||
if (self.nodes.toArray().length === 0) { | ||
process.nextTick(function () { | ||
addLocal(null, []) | ||
}) | ||
} else { | ||
self._lookup(hash, {findNode: true}, onLookup) | ||
} | ||
var isMutable = !!opts.k | ||
var v = typeof opts.v === 'string' ? new Buffer(opts.v) : opts.v | ||
var key = isMutable | ||
? sha1(opts.salt ? Buffer.concat([opts.salt, opts.k]) : opts.k) | ||
: sha1(bencode.encode(v)) | ||
function onLookup (err, nodes) { | ||
if (err) return cb(err) | ||
nodes.forEach(function (node) { | ||
put(node) | ||
}) | ||
addLocal() | ||
} | ||
var table = this._tables.get(key.toString('hex')) | ||
if (!table) return this._preput(key, opts, cb) | ||
function addLocal () { | ||
var localData = { | ||
id: self.nodeIdBuffer, | ||
v: opts.v | ||
var message = { | ||
q: 'put', | ||
a: { | ||
id: this._rpc.id, | ||
token: null, // queryAll sets this | ||
v: v | ||
} | ||
var localAddr = '127.0.0.1:' + self._port | ||
if (isMutable) { | ||
if (opts.cas) localData.cas = opts.cas | ||
localData.sig = opts.sign(encodeSigData(opts)) | ||
localData.k = opts.k | ||
localData.seq = opts.seq | ||
localData.token = idToBuffer(opts.token || self._generateToken(localAddr)) | ||
} | ||
self.nodes.add({ | ||
id: hashBuffer, | ||
addr: localAddr, | ||
data: localData | ||
}) | ||
if (pending === 0) { | ||
process.nextTick(function () { cb(errors, hash) }) | ||
} | ||
} | ||
return hash | ||
function put (node) { | ||
if (node.data) return // skip data nodes | ||
pending += 1 | ||
if (isMutable) { | ||
if (typeof opts.cas === 'number') message.a.cas = opts.cas | ||
if (opts.salt) message.a.salt = opts.salt | ||
message.a.k = opts.k | ||
message.a.seq = opts.seq | ||
if (!message.a.sig) message.a.sig = opts.sign(encodeSigData(message.a)) | ||
} | ||
var t = self._getTransactionId(node.addr, putOnGet) | ||
self._send(node.addr, { | ||
a: { | ||
id: self.nodeIdBuffer, | ||
target: hashBuffer | ||
}, | ||
t: transactionIdToBuffer(t), | ||
y: 'q', | ||
q: 'get' | ||
}) | ||
this._values.set(key.toString('hex'), message.a) | ||
this._rpc.queryAll(table.closest({id: key}), message, null, function (err, n) { | ||
if (err) return cb(err, key, n) | ||
cb(null, key, n) | ||
}) | ||
function putOnGet (err, res) { | ||
if (err) return next(node)(err) | ||
var t = self._getTransactionId(node.addr, next(node)) | ||
var data = { | ||
a: { | ||
id: opts.id || self.nodeIdBuffer, | ||
v: opts.v, | ||
token: res && res.token | ||
}, | ||
t: transactionIdToBuffer(t), | ||
y: 'q', | ||
q: 'put' | ||
} | ||
if (isMutable) { | ||
data.a.seq = opts.seq | ||
data.a.sig = opts.sign(encodeSigData(opts)) | ||
data.a.k = opts.k | ||
if (opts.salt) data.a.salt = opts.salt | ||
if (typeof opts.cas === 'number') data.a.cas = opts.cas | ||
} | ||
self._send(node.addr, data) | ||
} | ||
} | ||
function next (node) { | ||
return function (err) { | ||
if (err) { | ||
err.address = node.addr | ||
errors.push(err) | ||
} | ||
if (--pending === 0) cb(errors, hash) | ||
} | ||
} | ||
return key | ||
} | ||
DHT.prototype.get = function (hash, cb) { | ||
DHT.prototype._preput = function (key, opts, cb) { | ||
var self = this | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
var hashBuffer = idToBuffer(hash) | ||
var local = self.nodes.get(hashBuffer) | ||
if (local && local.data) { | ||
return process.nextTick(function () { | ||
cb(null, local.data) | ||
}) | ||
} | ||
this._closest(key, { | ||
q: 'get', | ||
a: { | ||
id: this._rpc.id, | ||
target: key | ||
} | ||
}, null, function (err, n) { | ||
if (err) return cb(err) | ||
self.put(opts, cb) | ||
}) | ||
self._lookup(hash, {get: true}, cb) | ||
return key | ||
} | ||
DHT.prototype._onPut = function (addr, message) { | ||
var self = this | ||
var msg = message.a | ||
if (!msg || !msg.v || !msg.id) { | ||
return self._sendError(addr, message.t, 203, 'not enough parameters') | ||
DHT.prototype.get = function (key, opts, cb) { | ||
key = toBuffer(key) | ||
if (typeof opts === 'function') { | ||
cb = opts | ||
opts = null | ||
} | ||
var isMutable = message.a.k || message.a.sig | ||
self._debug('put from %s', addr) | ||
if (!opts) opts = {} | ||
var verify = opts.verify || this._verify | ||
var value = this._values.get(key.toString('hex')) || null | ||
var data = { | ||
id: message.a.id, | ||
addr: addr, | ||
v: message.a.v | ||
if (value) { | ||
value = createGetResponse(this._rpc.id, null, value) | ||
return process.nextTick(done) | ||
} | ||
if (data.v && data.v.length > 1000) { | ||
return self._sendError(addr, message.t, 205, 'data payload too large') | ||
} | ||
if (isMutable && !msg.k) { | ||
return self._sendError(addr, message.t, 203, 'missing public key') | ||
} | ||
var hash | ||
if (isMutable) { | ||
hash = msg.salt | ||
? sha1.sync(Buffer.concat([ msg.salt, msg.k ])) | ||
: sha1.sync(msg.k) | ||
} else { | ||
hash = sha1.sync(bencode.encode(data.v)) | ||
} | ||
var hashBuffer = idToBuffer(hash) | ||
if (isMutable) { | ||
if (!self._verify) { | ||
return self._sendError(addr, message.t, 400, 'verification not supported') | ||
this._closest(key, { | ||
q: 'get', | ||
a: { | ||
id: this._rpc.id, | ||
target: key | ||
} | ||
var sdata = encodeSigData(msg) | ||
if (!msg.sig || !Buffer.isBuffer(msg.sig) || !self._verify(msg.sig, sdata, msg.k)) { | ||
return self._sendError(addr, message.t, 206, 'invalid signature') | ||
} | ||
var prev = self.nodes.get(hashBuffer) | ||
if (prev && prev.data && prev.data.seq !== undefined && typeof msg.cas === 'number') { | ||
if (msg.cas !== prev.data.seq) { | ||
return self._sendError(addr, message.t, 301, | ||
'CAS mismatch, re-read and try again') | ||
} | ||
} | ||
if (prev && prev.data && prev.data.seq !== undefined) { | ||
if (msg.seq === undefined || msg.seq <= prev.data.seq) { | ||
return self._sendError(addr, message.t, 302, | ||
'sequence number less than current') | ||
} | ||
} | ||
}, onreply, done) | ||
data.sig = msg.sig | ||
data.k = msg.k | ||
data.seq = msg.seq | ||
data.token = msg.token | ||
if (msg.salt && msg.salt.length > 64) { | ||
return self._sendError(addr, message.t, 207, 'salt too big') | ||
} | ||
if (msg.salt) data.salt = msg.salt | ||
function done (err) { | ||
if (err) return cb(err) | ||
cb(null, value) | ||
} | ||
self.nodes.add({ id: hashBuffer, addr: addr, data: data }) | ||
self._send(addr, { | ||
t: message.t, | ||
y: MESSAGE_TYPE.RESPONSE, | ||
r: { id: self.nodeIdBuffer } | ||
}) | ||
} | ||
function onreply (message) { | ||
var r = message.r | ||
if (!r || !r.v) return true | ||
DHT.prototype._onGet = function (addr, message) { | ||
var self = this | ||
var msg = message.a | ||
if (!msg) return self._debug('skipping malformed get request from %s', addr) | ||
if (!msg.target) return self._debug('missing a.target in get() from %s', addr) | ||
var isMutable = r.k || r.sig | ||
var addrData = addrToIPPort(addr) | ||
var hashBuffer = message.a.target | ||
var rec = self.nodes.get(hashBuffer) | ||
if (rec && rec.data) { | ||
msg = { | ||
t: message.t, | ||
y: MESSAGE_TYPE.RESPONSE, | ||
r: { | ||
id: self.nodeIdBuffer, | ||
nodes: [], // found, so we don't need to know the nodes | ||
nodes6: [], | ||
v: rec.data.v | ||
} | ||
} | ||
var isMutable = rec.data.k || rec.data.sig | ||
if (isMutable) { | ||
msg.r.k = rec.data.k | ||
msg.r.seq = rec.data.seq | ||
msg.r.sig = rec.data.sig | ||
msg.r.token = rec.data.token | ||
if (rec.data.salt) { | ||
msg.r.salt = rec.data.salt | ||
if (!verify || !r.sig || !r.k) return true | ||
if (!verify(r.sig, encodeSigData(r), r.k)) return true | ||
if (equals(sha1(r.salt ? Buffer.concat([r.salt, r.k]) : r.k), key)) { | ||
value = r | ||
return false | ||
} | ||
if (rec.data.cas) { | ||
msg.r.cas = rec.data.cas | ||
} else { | ||
if (equals(sha1(bencode.encode(r.v)), key)) { | ||
value = r | ||
return false | ||
} | ||
} | ||
self._send(addr, msg) | ||
} else { | ||
self._lookup(hashBuffer, function (err, nodes) { | ||
if (err && self.destroyed) return | ||
if (err) return self._sendError(addr, message.t, 201, err) | ||
var res = { | ||
t: message.t, | ||
y: MESSAGE_TYPE.RESPONSE, | ||
r: { | ||
token: idToBuffer(self._generateToken(addrData[0])), | ||
id: self.nodeIdBuffer, | ||
nodes: nodes.map(function (node) { | ||
return node.addr | ||
}), | ||
nodes6: [] // todo: filter the addrs | ||
} | ||
} | ||
if (rec && rec.data && rec.data.k) res.k = rec.data.k | ||
if (rec && rec.data && rec.data.seq) res.seq = rec.data.seq | ||
if (rec && rec.data && rec.data.sig) res.sig = rec.data.sig | ||
if (rec && rec.data && rec.data.token) res.token = rec.data.token | ||
if (rec && rec.data && rec.data.v) res.v = rec.data.v | ||
self._send(addr, res) | ||
}) | ||
return true | ||
} | ||
} | ||
/** | ||
* Destroy and cleanup the DHT. | ||
* @param {function=} cb | ||
*/ | ||
DHT.prototype.destroy = function (cb) { | ||
var self = this | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
DHT.prototype.announce = function (infoHash, port, cb) { | ||
infoHash = toBuffer(infoHash) | ||
if (!cb) cb = noop | ||
if (cb) cb = once(cb) | ||
else cb = noop | ||
var table = this._tables.get(infoHash.toString('hex')) | ||
if (!table) return this._preannounce(infoHash, port, cb) | ||
if (self._binding) return self.once('listening', self.destroy.bind(self, cb)) | ||
self._debug('destroy') | ||
self.destroyed = true | ||
self.listening = false | ||
// garbage collect large data structures | ||
self.nodes = null | ||
self.tables = null | ||
self.transactions = null | ||
self.peers = null | ||
clearInterval(self._rotateInterval) | ||
self.socket.on('close', cb) | ||
try { | ||
self.socket.close() | ||
} catch (err) { | ||
// ignore error, socket was either already closed / not yet bound | ||
process.nextTick(function () { | ||
cb(null) | ||
}) | ||
var message = { | ||
q: 'announce_peer', | ||
a: { | ||
id: this._rpc.id, | ||
token: null, // queryAll sets this | ||
info_hash: infoHash, | ||
port: port | ||
} | ||
} | ||
} | ||
/** | ||
* Add a DHT node to the routing table. | ||
* @param {string} addr | ||
* @param {string|Buffer} nodeId | ||
* @param {string=} from addr | ||
*/ | ||
DHT.prototype.addNode = function (addr, nodeId) { | ||
var self = this | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
// If `nodeId` is undefined, then the peer will be pinged to learn their node id. | ||
// If the peer does not respond, the will not be added to the routing table. | ||
if (nodeId == null) { | ||
self._sendPing(addr, function (err, res) { | ||
if (err) { | ||
self._debug('skipping addNode %s; peer did not respond: %s', addr, err.message) | ||
} | ||
// No need to call `self._addNode()` explicitly here. `_onData` automatically | ||
// attempts to add every node the client gets a message from to the routing table. | ||
}) | ||
return | ||
} | ||
var nodeIdBuffer = idToBuffer(nodeId) | ||
if (nodeIdBuffer.length !== 20) throw new Error('invalid node id length') | ||
self._addNode(addr, nodeIdBuffer) | ||
this._debug('announce %s %d', infoHash, port) | ||
this._rpc.queryAll(table.closest({id: infoHash}), message, null, cb) | ||
} | ||
/** | ||
* Internal version of `addNode` that doesn't throw errors on invalid arguments, but | ||
* silently fails instead. Useful for dealing with potentially bad data from the network. | ||
* @param {string} addr | ||
* @param {string|Buffer} nodeId | ||
* @param {string=} from addr | ||
* @return {boolean} was the node valid and new and added to the table | ||
*/ | ||
DHT.prototype._addNode = function (addr, nodeId, from) { | ||
DHT.prototype._preannounce = function (infoHash, port, cb) { | ||
var self = this | ||
if (self.destroyed) return | ||
var nodeIdBuffer = idToBuffer(nodeId) | ||
if (!nodeIdBuffer) return | ||
nodeId = idToHexString(nodeId) | ||
if (nodeIdBuffer.length !== 20) { | ||
self._debug('skipping addNode %s %s; invalid id length', addr, nodeId) | ||
return | ||
} | ||
if (self._addrIsSelf(addr) || nodeId === self.nodeId) { | ||
self._debug('skip addNode %s %s; that is us!', addr, nodeId) | ||
return | ||
} | ||
var existing = self.nodes.get(nodeIdBuffer) | ||
if (existing && existing.addr === addr) return | ||
self.nodes.add({ | ||
id: nodeIdBuffer, | ||
addr: addr | ||
this.lookup(infoHash, function (err) { | ||
if (err) return cb(err) | ||
self.announce(infoHash, port, cb) | ||
}) | ||
process.nextTick(function () { | ||
self.emit('node', addr, nodeId, from) | ||
}) | ||
self._debug('addNode %s %s discovered from %s', nodeId, addr, from) | ||
} | ||
/** | ||
* Remove a DHT node from the routing table. | ||
* @param {string|Buffer} nodeId | ||
*/ | ||
DHT.prototype.removeNode = function (nodeId) { | ||
DHT.prototype.lookup = function (infoHash, cb) { | ||
infoHash = toBuffer(infoHash) | ||
if (!cb) cb = noop | ||
var self = this | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
var nodeIdBuffer = idToBuffer(nodeId) | ||
var contact = self.nodes.get(nodeIdBuffer) | ||
if (contact) { | ||
self._debug('removeNode %s %s', contact.nodeId, contact.addr) | ||
self.nodes.remove(contact) | ||
} | ||
} | ||
process.nextTick(emit) | ||
this._debug('lookup %s', infoHash) | ||
this._closest(infoHash, { | ||
q: 'get_peers', | ||
a: { | ||
id: this._rpc.id, | ||
info_hash: infoHash | ||
} | ||
}, onreply, cb) | ||
/** | ||
* Store a peer in the DHT. Called when a peer sends a `announce_peer` message. | ||
* @param {string} addr | ||
* @param {Buffer|string} infoHash | ||
*/ | ||
DHT.prototype._addPeer = function (addr, infoHash) { | ||
var self = this | ||
if (self.destroyed) return | ||
infoHash = idToHexString(infoHash) | ||
var peers = self.peers[infoHash] | ||
if (!peers) { | ||
peers = self.peers[infoHash] = { | ||
index: {}, // addr -> true | ||
list: [] // compactAddr | ||
function emit (values, from) { | ||
if (!values) values = self._peers.get(infoHash.toString('hex')) | ||
var peers = decodePeers(values) | ||
for (var i = 0; i < peers.length; i++) { | ||
self.emit('peer', peers[i], infoHash, from || null) | ||
} | ||
} | ||
if (!peers.index[addr]) { | ||
peers.index[addr] = true | ||
peers.list.push(string2compact(addr)) | ||
self._debug('addPeer %s %s', addr, infoHash) | ||
self.emit('announce', addr, infoHash) | ||
function onreply (message, node) { | ||
if (message.r.values) emit(message.r.values, node) | ||
} | ||
} | ||
/** | ||
* Remove a peer from the DHT. | ||
* @param {string} addr | ||
* @param {Buffer|string} infoHash | ||
*/ | ||
DHT.prototype._removePeer = function (addr, infoHash) { | ||
var self = this | ||
if (self.destroyed) return | ||
infoHash = idToHexString(infoHash) | ||
var peers = self.peers[infoHash] | ||
if (peers && peers.index[addr]) { | ||
peers.index[addr] = null | ||
var compactPeerInfo = string2compact(addr) | ||
peers.list.some(function (peer, index) { | ||
if (bufferEqual(peer, compactPeerInfo)) { | ||
peers.list.splice(index, 1) | ||
self._debug('removePeer %s %s', addr, infoHash) | ||
return true // abort early | ||
} | ||
}) | ||
} | ||
DHT.prototype.address = function () { | ||
return this._rpc.address() | ||
} | ||
/** | ||
* Join the DHT network. To join initially, connect to known nodes (either public | ||
* bootstrap nodes, or known nodes from a previous run of bittorrent-client). | ||
* @param {Array.<string|Object>} nodes | ||
*/ | ||
DHT.prototype._bootstrap = function (nodes) { | ||
var self = this | ||
if (self.destroyed) return | ||
self._debug('bootstrap with %s', JSON.stringify(nodes)) | ||
var contacts = nodes.map(function (obj) { | ||
if (typeof obj === 'string') { | ||
return { addr: obj } | ||
} else { | ||
return obj | ||
} | ||
}) | ||
self._resolveContacts(contacts, function (err, contacts) { | ||
if (self.destroyed) return | ||
if (err) return self.emit('error', err) | ||
// add all non-bootstrap nodes to routing table | ||
contacts | ||
.filter(function (contact) { | ||
return !!contact.id | ||
}) | ||
.forEach(function (contact) { | ||
self._addNode(contact.addr, contact.id, contact.from) | ||
}) | ||
// get addresses of bootstrap nodes | ||
var addrs = contacts | ||
.filter(function (contact) { | ||
return !contact.id | ||
}) | ||
.map(function (contact) { | ||
return contact.addr | ||
}) | ||
lookup() | ||
function lookup () { | ||
self._lookup(self.nodeId, { | ||
findNode: true, | ||
addrs: addrs.length ? addrs : null | ||
}, function (err) { | ||
if (err) return self._debug('lookup error during bootstrap: %s', err.message) | ||
// emit `ready` once the recursive lookup for our own node ID is finished | ||
// (successful or not), so that later get_peer lookups will have a good shot at | ||
// succeeding. | ||
if (!self.ready) { | ||
self.ready = true | ||
self.emit('ready') | ||
} | ||
}) | ||
startBootstrapTimeout() | ||
} | ||
function startBootstrapTimeout () { | ||
var bootstrapTimeout = setTimeout(function () { | ||
if (self.destroyed) return | ||
// If 0 nodes are in the table after a timeout, retry with bootstrap nodes | ||
if (self.nodes.count() === 0) { | ||
self._debug('No DHT bootstrap nodes replied, retry') | ||
lookup() | ||
} | ||
}, BOOTSTRAP_TIMEOUT) | ||
if (bootstrapTimeout.unref) bootstrapTimeout.unref() | ||
} | ||
}) | ||
DHT.prototype.listen = function (port, cb) { | ||
if (typeof port === 'function') return this.listen(0, port) | ||
this._rpc.bind(port, cb) | ||
} | ||
/** | ||
* Resolve the DNS for nodes whose hostname is a domain name (often the case for | ||
* bootstrap nodes). | ||
* @param {Array.<Object>} contacts array of contact objects with domain addresses | ||
* @param {function} cb | ||
*/ | ||
DHT.prototype._resolveContacts = function (contacts, cb) { | ||
DHT.prototype.destroy = function (cb) { | ||
if (this.destroyed) { | ||
if (cb) process.nextTick(cb) | ||
return | ||
} | ||
this.destroyed = true | ||
var self = this | ||
var tasks = contacts.map(function (contact) { | ||
return function (cb) { | ||
var addrData = addrToIPPort(contact.addr) | ||
if (isIP(addrData[0])) { | ||
cb(null, contact) | ||
} else { | ||
dns.lookup(addrData[0], self._ipv, function (err, host) { | ||
if (err) return cb(null, null) | ||
contact.addr = host + ':' + addrData[1] | ||
cb(null, contact) | ||
}) | ||
} | ||
} | ||
clearInterval(this._interval) | ||
this._debug('destroying') | ||
this._rpc.destroy(function () { | ||
self.emit('close') | ||
if (cb) cb() | ||
}) | ||
parallel(tasks, function (err, contacts) { | ||
if (err) return cb(err) | ||
// filter out hosts that don't resolve | ||
contacts = contacts.filter(function (contact) { return !!contact }) | ||
cb(null, contacts) | ||
}) | ||
} | ||
/** | ||
* Perform a recurive node lookup for the given nodeId. If isFindNode is true, then | ||
* `find_node` will be sent to each peer instead of `get_peers`. | ||
* @param {Buffer|string} id node id or info hash | ||
* @param {Object=} opts | ||
* @param {boolean} opts.findNode | ||
* @param {Array.<string>} opts.addrs | ||
* @param {function} cb called with K closest nodes | ||
*/ | ||
DHT.prototype.lookup = function (id, opts, cb) { | ||
var self = this | ||
if (self.destroyed) throw new Error('dht is destroyed') | ||
self._lookup(id, opts, cb) | ||
} | ||
DHT.prototype._onquery = function (query, peer) { | ||
var q = query.q.toString() | ||
this._debug('received %s query from %s:%d', q, peer.address, peer.port) | ||
if (!query.a) return | ||
/** | ||
* lookup() for private use. If DHT is destroyed, returns an error via callback. | ||
*/ | ||
DHT.prototype._lookup = function (id, opts, cb) { | ||
var self = this | ||
switch (q) { | ||
case 'ping': | ||
return this._rpc.response(peer, query, {id: this._rpc.id}) | ||
if (self.destroyed) { | ||
return process.nextTick(function () { | ||
cb(new Error('dht is destroyed')) | ||
}) | ||
} | ||
case 'find_node': | ||
return this._onfindnode(query, peer) | ||
if (typeof opts === 'function') { | ||
cb = opts | ||
opts = {} | ||
} | ||
if (!opts) opts = {} | ||
case 'get_peers': | ||
return this._ongetpeers(query, peer) | ||
if (cb) cb = once(cb) | ||
else cb = noop | ||
case 'announce_peer': | ||
return this._onannouncepeer(query, peer) | ||
var idBuffer = idToBuffer(id) | ||
id = idToHexString(id) | ||
case 'get': | ||
return this._onget(query, peer) | ||
if (self._binding) return self.once('listening', self._lookup.bind(self, id, opts, cb)) | ||
if (!self.listening) return self.listen(self._lookup.bind(self, id, opts, cb)) | ||
if (idBuffer.length !== 20) throw new Error('invalid node id / info hash length') | ||
self._debug('lookup %s %s', (opts.findNode ? '(find_node)' : '(get_peers)'), id) | ||
// Return local peers, if we have any in our table | ||
var peers = self.peers[id] | ||
if (peers) { | ||
peers = parsePeerInfo(peers.list) | ||
peers.forEach(function (peerAddr) { | ||
self._debug('emit peer %s %s from %s', peerAddr, id, 'local') | ||
self.emit('peer', peerAddr, id, 'local') | ||
}) | ||
case 'put': | ||
return this._onput(query, peer) | ||
} | ||
} | ||
var table = new KBucket({ | ||
localNodeId: idBuffer, | ||
numberOfNodesPerKBucket: K, | ||
numberOfNodesToPing: MAX_CONCURRENCY | ||
}) | ||
DHT.prototype._onfindnode = function (query, peer) { | ||
var target = query.a.target | ||
if (!target) return this._rpc.error(peer, query, [203, '`find_node` missing required `a.target` field']) | ||
// NOT the same table as the one used for the lookup, as that table may have nodes without tokens | ||
if (!self.tables[id]) { | ||
self.tables[id] = new KBucket({ | ||
localNodeId: idBuffer, | ||
numberOfNodesPerKBucket: K, | ||
numberOfNodesToPing: MAX_CONCURRENCY | ||
}) | ||
} | ||
var nodes = this._rpc.nodes.closest({ id: target }) | ||
this._rpc.response(peer, query, {id: this._rpc.id}, nodes) | ||
} | ||
var tokenful = self.tables[id] | ||
DHT.prototype._ongetpeers = function (query, peer) { | ||
var host = peer.address || peer.host | ||
var infoHash = query.a.info_hash | ||
if (!infoHash) return this._rpc.error(peer, query, [203, '`get_peers` missing required `a.info_hash` field']) | ||
function add (contact) { | ||
if (self._addrIsSelf(contact.addr) || bufferEqual(contact.id, self.nodeIdBuffer)) return | ||
if (contact.token) tokenful.add(contact) | ||
var r = {id: this._rpc.id, token: this._generateToken(host)} | ||
var peers = this._peers.get(infoHash.toString('hex')) | ||
table.add(contact) | ||
} | ||
var queried = {} | ||
var pending = 0 // pending queries | ||
if (opts.addrs) { | ||
// kick off lookup with explicitly passed nodes (usually, bootstrap servers) | ||
opts.addrs.forEach(query) | ||
if (peers.length) { | ||
r.values = peers | ||
this._rpc.response(peer, query, r) | ||
} else { | ||
// kick off lookup with nodes in the main table | ||
queryClosest() | ||
this._rpc.response(peer, query, r, this._rpc.nodes.closest({id: infoHash})) | ||
} | ||
} | ||
function query (addr) { | ||
pending += 1 | ||
queried[addr] = true | ||
DHT.prototype._onannouncepeer = function (query, peer) { | ||
var host = peer.address || peer.host | ||
var port = query.a.implied_port ? peer.port : query.a.port | ||
if (!port || typeof port !== 'number') return | ||
var infoHash = query.a.info_hash | ||
var token = query.a.token | ||
if (!infoHash || !token) return | ||
if (opts.get) { | ||
self._sendGet(addr, idBuffer, onResponse.bind(null, addr)) | ||
} else if (opts.findNode) { | ||
self._sendFindNode(addr, idBuffer, onResponse.bind(null, addr)) | ||
} else { | ||
self._sendGetPeers(addr, idBuffer, onResponse.bind(null, addr)) | ||
} | ||
if (!this._validateToken(host, token)) { | ||
return this._rpc.error(peer, query, [203, 'cannot `announce_peer` with bad token']) | ||
} | ||
function queryClosest () { | ||
self.nodes.closest({ id: idBuffer }, K).forEach(function (contact) { | ||
query(contact.addr) | ||
}) | ||
} | ||
// Note: `_sendFindNode` and `_sendGetPeers` will insert newly discovered nodes into | ||
// the routing table, so that's not done here. | ||
function onResponse (addr, err, res) { | ||
if (cb.called) return | ||
if (self.destroyed) return cb(new Error('dht is destroyed')) | ||
if (opts.get && res && res.v) { | ||
var isMutable = res.k || res.sig | ||
var sdata = encodeSigData(res) | ||
if (isMutable && !self._verify) { | ||
self._debug('ed25519 verify not provided') | ||
} else if (isMutable && !self._verify(res.sig, sdata, res.k)) { | ||
self._debug('invalid mutable hash from %s', addr) | ||
} else if (!isMutable && sha1.sync(bencode.encode(res.v)) !== id) { | ||
self._debug('invalid immutable hash from %s', addr) | ||
} else { | ||
return cb(null, res) | ||
} | ||
} | ||
pending -= 1 | ||
var nodeIdBuffer = res && res.id | ||
var nodeId = idToHexString(nodeIdBuffer) | ||
// ignore errors - they are just timeouts | ||
if (err) { | ||
self._debug('got lookup error: %s', err.message) | ||
} else { | ||
self._debug('got lookup response from %s', nodeId) | ||
// add node that sent this response | ||
var contact = table.get(nodeIdBuffer) || { id: nodeIdBuffer, addr: addr } | ||
contact.token = res && res.token | ||
add(contact) | ||
// add nodes to this routing table for this lookup | ||
if (res && res.nodes) { | ||
res.nodes.forEach(function (contact) { | ||
add(contact) | ||
}) | ||
} | ||
} | ||
// find closest unqueried nodes | ||
var candidates = table.closest({ id: idBuffer }, K) | ||
.filter(function (contact) { | ||
return !queried[contact.addr] | ||
}) | ||
while (pending < MAX_CONCURRENCY && candidates.length) { | ||
// query as many candidates as our concurrency limit will allow | ||
query(candidates.pop().addr) | ||
} | ||
if (pending === 0 && candidates.length === 0) { | ||
// recursive lookup should terminate because there are no closer nodes to find | ||
self._debug('terminating lookup %s %s', | ||
(opts.findNode ? '(find_node)' : '(get_peers)'), id) | ||
var closest = (opts.findNode ? table : tokenful).closest({ id: idBuffer }, K) | ||
self._debug('K closest nodes are:') | ||
closest.forEach(function (contact) { | ||
self._debug(' ' + contact.addr + ' ' + idToHexString(contact.id)) | ||
}) | ||
if (opts.get) return cb(new Error('hash not found')) | ||
cb(null, closest) | ||
} | ||
} | ||
this._addPeer({host: host, port: port}, infoHash, {host: host, port: peer.port}) | ||
this._rpc.response(peer, query, {id: this._rpc.id}) | ||
} | ||
/** | ||
* Called when another node sends a UDP message | ||
* @param {Buffer} data | ||
* @param {Object} rinfo | ||
*/ | ||
DHT.prototype._onData = function (data, rinfo) { | ||
var self = this | ||
var addr = rinfo.address + ':' + rinfo.port | ||
var message, errMessage | ||
try { | ||
message = bencode.decode(data) | ||
if (!message) throw new Error('message is empty') | ||
} catch (err) { | ||
errMessage = err.message + ' from ' + addr + ' (' + data + ')' | ||
self._debug(errMessage) | ||
self.emit('warning', new Error(errMessage)) | ||
return | ||
} | ||
var type = message.y && message.y.toString() | ||
if (type !== MESSAGE_TYPE.QUERY && type !== MESSAGE_TYPE.RESPONSE && | ||
type !== MESSAGE_TYPE.ERROR) { | ||
errMessage = 'unknown message type ' + type + ' from ' + addr | ||
self._debug(errMessage) | ||
self.emit('warning', new Error(errMessage)) | ||
return | ||
} | ||
// self._debug('got data %s from %s', JSON.stringify(message), addr) | ||
// Attempt to add every (valid) node that we see to the routing table. | ||
// TODO: If the node is already in the table, just update the "last heard from" time | ||
var nodeIdBuffer = (message.r && message.r.id) || (message.a && message.a.id) | ||
if (nodeIdBuffer) { | ||
// self._debug('adding (potentially) new node %s %s', idToHexString(nodeId), addr) | ||
self._addNode(addr, nodeIdBuffer, addr) | ||
} | ||
if (type === MESSAGE_TYPE.QUERY) { | ||
self._onQuery(addr, message) | ||
} else if (type === MESSAGE_TYPE.RESPONSE || type === MESSAGE_TYPE.ERROR) { | ||
self._onResponseOrError(addr, type, message) | ||
} | ||
DHT.prototype._addPeer = function (peer, infoHash, from) { | ||
if (typeof peer === 'string') peer = parseAddr(peer) | ||
this._peers.add(infoHash.toString('hex'), encodePeer(peer.host, peer.port)) | ||
this.emit('announce', peer, infoHash, from) | ||
} | ||
/** | ||
* Called when another node sends a query. | ||
* @param {string} addr | ||
* @param {Object} message | ||
*/ | ||
DHT.prototype._onQuery = function (addr, message) { | ||
var self = this | ||
var query = message.q.toString() | ||
DHT.prototype._onget = function (query, peer) { | ||
var host = peer.address || peer.host | ||
var target = query.a.target | ||
if (!target) return | ||
var token = this._generateToken(host) | ||
var value = this._values.get(target.toString('hex')) | ||
if (typeof self.queryHandler[query] === 'function') { | ||
self.queryHandler[query].call(self, addr, message) | ||
if (!value) { | ||
var nodes = this._rpc.nodes.closest({id: target}) | ||
this._rpc.response(peer, query, {id: this._rpc.id, token: token}, nodes) | ||
} else { | ||
var errMessage = 'unexpected query type' | ||
self._debug(errMessage) | ||
self._sendError(addr, message.t, ERROR_TYPE.METHOD_UNKNOWN, errMessage) | ||
this._rpc.response(peer, query, createGetResponse(this._rpc.id, token, value)) | ||
} | ||
} | ||
/** | ||
* Called when another node sends a response or error. | ||
* @param {string} addr | ||
* @param {string} type | ||
* @param {Object} message | ||
*/ | ||
DHT.prototype._onResponseOrError = function (addr, type, message) { | ||
var self = this | ||
if (self.destroyed) return | ||
DHT.prototype._onput = function (query, peer) { | ||
var host = peer.address || peer.host | ||
var transactionId = Buffer.isBuffer(message.t) && message.t.length === 2 && | ||
message.t.readUInt16BE(0) | ||
var a = query.a | ||
if (!a) return | ||
var v = query.a.v | ||
if (!v) return | ||
var transaction = self.transactions && self.transactions[addr] && | ||
self.transactions[addr][transactionId] | ||
var token = a.token | ||
if (!token) return | ||
var err = null | ||
if (type === MESSAGE_TYPE.ERROR) { | ||
err = new Error(Array.isArray(message.e) ? message.e.join(' ') : undefined) | ||
if (!this._validateToken(host, token)) { | ||
return this._rpc.error(peer, query, [203, 'cannot `put` with bad token']) | ||
} | ||
if (!transaction || !transaction.cb) { | ||
// unexpected message! | ||
var errMessage | ||
if (err) { | ||
errMessage = 'got unexpected error from ' + addr + ' ' + err.message | ||
self._debug(errMessage) | ||
self.emit('warning', new Error(errMessage)) | ||
} else { | ||
errMessage = 'got unexpected message from ' + addr + ' ' + JSON.stringify(message) | ||
self._debug(errMessage) | ||
self.emit('warning', new Error(errMessage)) | ||
} | ||
return | ||
if (v.length > 1000) { | ||
return this._rpc.error(peer, query, [205, 'data payload too large']) | ||
} | ||
transaction.cb(err, message.r) | ||
} | ||
var isMutable = !!(a.k || a.sig) | ||
if (isMutable && !a.k && !a.sig) return | ||
/** | ||
* Send a UDP message to the given addr. | ||
* @param {string} addr | ||
* @param {Object} message | ||
* @param {function=} cb called once message has been sent | ||
*/ | ||
DHT.prototype._send = function (addr, message, cb) { | ||
var self = this | ||
if (self._binding) return self.once('listening', self._send.bind(self, addr, message, cb)) | ||
if (!cb) cb = noop | ||
var addrData = addrToIPPort(addr) | ||
var host = addrData[0] | ||
var port = addrData[1] | ||
var key = isMutable | ||
? sha1(a.salt ? Buffer.concat([a.salt, a.k]) : a.k) | ||
: sha1(bencode.encode(v)) | ||
var keyHex = key.toString('hex') | ||
if (!(port > 0 && port < 65535)) { | ||
return | ||
if (isMutable) { | ||
if (!this._verify) return this._rpc.error(peer, query, [400, 'verification not supported']) | ||
if (!this._verify(a.sig, encodeSigData(a), a.k)) return | ||
var prev = this._values.get(keyHex) | ||
if (prev && typeof a.cas === 'number' && prev.seq !== a.cas) { | ||
return this._rpc.error(peer, query, [301, 'CAS mismatch, re-read and try again']) | ||
} | ||
if (prev && typeof prev.seq === 'number' && !(a.seq > prev.seq)) { | ||
return this._rpc.error(peer, query, [302, 'sequence number less than current']) | ||
} | ||
this._values.set(keyHex, {v: v, k: a.k, salt: a.salt, sig: a.sig, seq: a.seq}) | ||
} else { | ||
this._values.set(keyHex, {v: v}) | ||
} | ||
// self._debug('send %s to %s', JSON.stringify(message), addr) | ||
message = bencode.encode(message) | ||
self.socket.send(message, 0, message.length, port, host, cb) | ||
this._rpc.response(peer, query, {id: this._rpc.id}) | ||
} | ||
DHT.prototype._query = function (data, addr, cb) { | ||
DHT.prototype._bootstrap = function (populate) { | ||
var self = this | ||
if (!populate) return process.nextTick(ready) | ||
if (!data.a) data.a = {} | ||
if (!data.a.id) data.a.id = self.nodeIdBuffer | ||
this._rpc.populate(self._rpc.id, { | ||
q: 'find_node', | ||
a: { | ||
id: self._rpc.id, | ||
target: self._rpc.id | ||
} | ||
}, ready) | ||
var transactionId = self._getTransactionId(addr, cb) | ||
var message = { | ||
t: transactionIdToBuffer(transactionId), | ||
y: MESSAGE_TYPE.QUERY, | ||
q: data.q, | ||
a: data.a | ||
function ready () { | ||
self._debug('emit ready') | ||
self.emit('ready') | ||
} | ||
if (data.q === 'find_node') { | ||
self._debug('sent find_node %s to %s', data.a.target.toString('hex'), addr) | ||
} else if (data.q === 'get_peers') { | ||
self._debug('sent get_peers %s to %s', data.a.info_hash.toString('hex'), addr) | ||
} | ||
self._send(addr, message) | ||
} | ||
/** | ||
* Send "ping" query to given addr. | ||
* @param {string} addr | ||
* @param {function} cb called with response | ||
*/ | ||
DHT.prototype._sendPing = function (addr, cb) { | ||
DHT.prototype._closest = function (target, message, onmessage, cb) { | ||
var self = this | ||
self._query({ q: 'ping' }, addr, cb) | ||
} | ||
/** | ||
* Called when another node sends a "ping" query. | ||
* @param {string} addr | ||
* @param {Object} message | ||
*/ | ||
DHT.prototype._onPing = function (addr, message) { | ||
var self = this | ||
var res = { | ||
t: message.t, | ||
y: MESSAGE_TYPE.RESPONSE, | ||
r: { | ||
id: self.nodeIdBuffer | ||
} | ||
} | ||
var table = new KBucket({ | ||
localNodeId: target, | ||
numberOfNodesPerKBucket: this._rpc.k | ||
}) | ||
self._debug('got ping from %s', addr) | ||
self._send(addr, res) | ||
} | ||
this._rpc.closest(target, message, onreply, done) | ||
/** | ||
* Send "find_node" query to given addr. | ||
* @param {string} addr | ||
* @param {Buffer|string} nodeId | ||
* @param {function} cb called with response | ||
*/ | ||
DHT.prototype._sendFindNode = function (addr, nodeId, cb) { | ||
var self = this | ||
var nodeIdBuffer = idToBuffer(nodeId) | ||
function onResponse (err, res) { | ||
function done (err, n) { | ||
if (err) return cb(err) | ||
if (res.nodes) { | ||
res.nodes = parseNodeInfo(res.nodes) | ||
res.nodes.forEach(function (node) { | ||
self._addNode(node.addr, node.id, addr) | ||
}) | ||
} | ||
cb(null, res) | ||
self._tables.set(target.toString('hex'), table) | ||
self._debug('visited %d nodes', n) | ||
cb(null, n) | ||
} | ||
var data = { | ||
q: 'find_node', | ||
a: { | ||
id: self.nodeIdBuffer, | ||
target: nodeIdBuffer | ||
} | ||
} | ||
function onreply (message, node) { | ||
if (!message.r) return true | ||
self._query(data, addr, onResponse) | ||
} | ||
DHT.prototype._sendGet = function (addr, nodeId, cb) { | ||
var self = this | ||
var nodeIdBuffer = idToBuffer(nodeId) | ||
function onResponse (err, res) { | ||
if (err) return cb(err) | ||
if (res.nodes) { | ||
res.nodes = parseNodeInfo(res.nodes) | ||
res.nodes.forEach(function (node) { | ||
self._addNode(node.addr, node.id, addr) | ||
if (message.r.token && message.r.id && Buffer.isBuffer(message.r.id) && message.r.id.length === 20) { | ||
self._debug('found node %s (target: %s)', message.r.id, target) | ||
table.add({ | ||
id: message.r.id, | ||
host: node.host || node.address, | ||
port: node.port, | ||
token: message.r.token | ||
}) | ||
} | ||
cb(null, res) | ||
} | ||
var data = { | ||
q: 'get', | ||
a: { | ||
id: self.nodeIdBuffer, | ||
target: nodeIdBuffer | ||
} | ||
if (!onmessage) return true | ||
return onmessage(message, node) | ||
} | ||
self._query(data, addr, onResponse) | ||
} | ||
/** | ||
* Called when another node sends a "find_node" query. | ||
* @param {string} addr | ||
* @param {Object} message | ||
*/ | ||
DHT.prototype._onFindNode = function (addr, message) { | ||
var self = this | ||
var nodeIdBuffer = message.a && message.a.target | ||
var nodeId = idToHexString(nodeIdBuffer) | ||
if (!nodeIdBuffer) { | ||
var errMessage = '`find_node` missing required `a.target` field' | ||
self._debug(errMessage) | ||
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage) | ||
return | ||
DHT.prototype._debug = function () { | ||
if (!debug.enabled) return | ||
var args = [].slice.call(arguments) | ||
args[0] = '[' + this.nodeId.toString('hex').substring(0, 7) + '] ' + args[0] | ||
for (var i = 1; i < args.length; i++) { | ||
if (Buffer.isBuffer(args[i])) args[i] = args[i].toString('hex') | ||
} | ||
self._debug('got find_node %s from %s', nodeId, addr) | ||
debug.apply(null, args) | ||
} | ||
// Convert nodes to "compact node info" representation | ||
var nodes = convertToNodeInfo(self.nodes.closest({ id: nodeIdBuffer }, K)) | ||
var res = { | ||
t: message.t, | ||
y: MESSAGE_TYPE.RESPONSE, | ||
r: { | ||
id: self.nodeIdBuffer, | ||
nodes: nodes | ||
} | ||
} | ||
self._send(addr, res) | ||
DHT.prototype._validateToken = function (host, token) { | ||
var tokenA = this._generateToken(host, this._secrets[0]) | ||
var tokenB = this._generateToken(host, this._secrets[1]) | ||
return equals(token, tokenA) || equals(token, tokenB) | ||
} | ||
/** | ||
* Send "get_peers" query to given addr. | ||
* @param {string} addr | ||
* @param {Buffer|string} infoHash | ||
* @param {function} cb called with response | ||
*/ | ||
DHT.prototype._sendGetPeers = function (addr, infoHash, cb) { | ||
var self = this | ||
var infoHashBuffer = idToBuffer(infoHash) | ||
infoHash = idToHexString(infoHash) | ||
function onResponse (err, res) { | ||
if (err) return cb(err) | ||
if (res.nodes) { | ||
res.nodes = parseNodeInfo(res.nodes) | ||
res.nodes.forEach(function (node) { | ||
self._addNode(node.addr, node.id, addr) | ||
}) | ||
} | ||
if (res.values) { | ||
res.values = parsePeerInfo(res.values) | ||
res.values.forEach(function (peerAddr) { | ||
self._debug('emit peer %s %s from %s', peerAddr, infoHash, addr) | ||
self.emit('peer', peerAddr, infoHash, addr) | ||
}) | ||
} | ||
cb(null, res) | ||
} | ||
var data = { | ||
q: 'get_peers', | ||
a: { | ||
id: self.nodeIdBuffer, | ||
info_hash: infoHashBuffer | ||
} | ||
} | ||
self._query(data, addr, onResponse) | ||
DHT.prototype._generateToken = function (host, secret) { | ||
if (!secret) secret = this._secrets[0] | ||
return crypto.createHash('sha1').update(new Buffer(host, 'utf8')).update(secret).digest() | ||
} | ||
/** | ||
* Called when another node sends a "get_peers" query. | ||
* @param {string} addr | ||
* @param {Object} message | ||
*/ | ||
DHT.prototype._onGetPeers = function (addr, message) { | ||
var self = this | ||
var addrData = addrToIPPort(addr) | ||
var infoHashBuffer = message.a && message.a.info_hash | ||
if (!infoHashBuffer) { | ||
var errMessage = '`get_peers` missing required `a.info_hash` field' | ||
self._debug(errMessage) | ||
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage) | ||
return | ||
} | ||
var infoHash = idToHexString(infoHashBuffer) | ||
self._debug('got get_peers %s from %s', infoHash, addr) | ||
var res = { | ||
t: message.t, | ||
y: MESSAGE_TYPE.RESPONSE, | ||
r: { | ||
id: self.nodeIdBuffer, | ||
token: idToBuffer(self._generateToken(addrData[0])) | ||
} | ||
} | ||
var peers = self.peers[infoHash] && self.peers[infoHash].list | ||
if (peers) { | ||
// We know of peers for the target info hash. Peers are stored as an array of | ||
// compact peer info, so return it as-is. | ||
res.r.values = peers | ||
DHT.prototype._rotateSecrets = function () { | ||
if (!this._secrets) { | ||
this._secrets = [crypto.randomBytes(20), crypto.randomBytes(20)] | ||
} else { | ||
// No peers, so return the K closest nodes instead. Convert nodes to "compact node | ||
// info" representation | ||
res.r.nodes = convertToNodeInfo(self.nodes.closest({ id: infoHashBuffer }, K)) | ||
this._secrets[1] = this._secrets[0] | ||
this._secrets[0] = crypto.randomBytes(20) | ||
} | ||
self._send(addr, res) | ||
} | ||
/** | ||
* Send "announce_peer" query to given host and port. | ||
* @param {string} addr | ||
* @param {Buffer|string} infoHash | ||
* @param {number} port | ||
* @param {Buffer} token | ||
* @param {function=} cb called with response | ||
*/ | ||
DHT.prototype._sendAnnouncePeer = function (addr, infoHash, port, token, cb) { | ||
var self = this | ||
var infoHashBuffer = idToBuffer(infoHash) | ||
if (!cb) cb = noop | ||
function noop () {} | ||
var data = { | ||
q: 'announce_peer', | ||
a: { | ||
id: self.nodeIdBuffer, | ||
info_hash: infoHashBuffer, | ||
port: port, | ||
token: token, | ||
implied_port: 0 | ||
} | ||
} | ||
self._query(data, addr, cb) | ||
function sha1 (buf) { | ||
return crypto.createHash('sha1').update(buf).digest() | ||
} | ||
/** | ||
* Called when another node sends a "announce_peer" query. | ||
* @param {string} addr | ||
* @param {Object} message | ||
*/ | ||
DHT.prototype._onAnnouncePeer = function (addr, message) { | ||
var self = this | ||
var errMessage | ||
var addrData = addrToIPPort(addr) | ||
var infoHashBuffer = message.a && message.a.info_hash | ||
if (!infoHashBuffer) { | ||
errMessage = '`announce_peer` missing required `a.info_hash` field' | ||
self._debug(errMessage) | ||
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage) | ||
return | ||
function createGetResponse (id, token, value) { | ||
var r = {id: id, token: token, v: value.v} | ||
if (value.sig) { | ||
r.sig = value.sig | ||
r.k = value.k | ||
if (value.salt) r.salt = value.salt | ||
if (typeof value.seq === 'number') r.seq = value.seq | ||
} | ||
var infoHash = idToHexString(infoHashBuffer) | ||
var tokenBuffer = message.a && message.a.token | ||
var token = idToHexString(tokenBuffer) | ||
if (!self._isValidToken(token, addrData[0])) { | ||
errMessage = 'cannot `announce_peer` with bad token' | ||
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage) | ||
return | ||
} | ||
var port = message.a.implied_port !== 0 | ||
? addrData[1] // use port of udp packet | ||
: message.a.port // use port in `announce_peer` message | ||
self._debug( | ||
'got announce_peer %s %s from %s with token %s', | ||
infoHash, port, addr, token | ||
) | ||
self._addPeer(addrData[0] + ':' + port, infoHash) | ||
// send acknowledgement | ||
var res = { | ||
t: message.t, | ||
y: MESSAGE_TYPE.RESPONSE, | ||
r: { | ||
id: self.nodeIdBuffer | ||
} | ||
} | ||
self._send(addr, res) | ||
return r | ||
} | ||
/** | ||
* Send an error to given host and port. | ||
* @param {string} addr | ||
* @param {Buffer|number} transactionId | ||
* @param {number} code | ||
* @param {string} errMessage | ||
*/ | ||
DHT.prototype._sendError = function (addr, transactionId, code, errMessage) { | ||
var self = this | ||
if (transactionId && !Buffer.isBuffer(transactionId)) { | ||
transactionId = transactionIdToBuffer(transactionId) | ||
} | ||
var message = { | ||
y: MESSAGE_TYPE.ERROR, | ||
e: [code, errMessage] | ||
} | ||
if (transactionId) { | ||
message.t = transactionId | ||
} | ||
self._debug('sent error %s to %s', JSON.stringify(message), addr) | ||
self._send(addr, message) | ||
function encodePeer (host, port) { | ||
var buf = new Buffer(6) | ||
var ip = host.split('.') | ||
for (var i = 0; i < 4; i++) buf[i] = parseInt(ip[i] || 0, 10) | ||
buf.writeUInt16BE(port, 4) | ||
return buf | ||
} | ||
/** | ||
* Get a transaction id, and (optionally) set a function to be called | ||
* @param {string} addr | ||
* @param {function} fn | ||
*/ | ||
DHT.prototype._getTransactionId = function (addr, fn) { | ||
var self = this | ||
fn = once(fn) | ||
var reqs = self.transactions[addr] | ||
if (!reqs) { | ||
reqs = self.transactions[addr] = {} | ||
reqs.nextTransactionId = 0 | ||
} | ||
var transactionId = reqs.nextTransactionId | ||
reqs.nextTransactionId = UINT16 & (reqs.nextTransactionId + 1) | ||
function decodePeers (buf) { | ||
var peers = [] | ||
function onTimeout () { | ||
reqs[transactionId] = null | ||
fn(new Error('query timed out')) | ||
try { | ||
for (var i = 0; i < buf.length; i++) { | ||
var port = buf[i].readUInt16BE(4) | ||
if (!port) continue | ||
peers.push({ | ||
host: parseIp(buf[i], 0), | ||
port: port | ||
}) | ||
} | ||
} catch (err) { | ||
// do nothing | ||
} | ||
function onResponse (err, res) { | ||
clearTimeout(reqs[transactionId].timeout) | ||
reqs[transactionId] = null | ||
fn(err, res) | ||
} | ||
var timeout = setTimeout(onTimeout, SEND_TIMEOUT) | ||
if (timeout.unref) timeout.unref() | ||
reqs[transactionId] = { | ||
cb: onResponse, | ||
timeout: timeout | ||
} | ||
return transactionId | ||
return peers | ||
} | ||
/** | ||
* Generate token (for response to `get_peers` query). Tokens are the SHA1 hash of | ||
* the IP address concatenated onto a secret that changes every five minutes. Tokens up | ||
* to ten minutes old are accepted. | ||
* @param {string} host | ||
* @param {string=} secret force token to use this secret, otherwise use current one | ||
* @return {string} | ||
*/ | ||
DHT.prototype._generateToken = function (host, secret) { | ||
var self = this | ||
if (!secret) secret = self.secrets[0] | ||
return sha1.sync(host + secret) | ||
function parseIp (buf, offset) { | ||
return buf[offset++] + '.' + buf[offset++] + '.' + buf[offset++] + '.' + buf[offset++] | ||
} | ||
/** | ||
* Checks if a token is valid for a given node's IP address. | ||
* | ||
* @param {string} token | ||
* @param {string} host | ||
* @return {boolean} | ||
*/ | ||
DHT.prototype._isValidToken = function (token, host) { | ||
var self = this | ||
var validToken0 = self._generateToken(host, self.secrets[0]) | ||
var validToken1 = self._generateToken(host, self.secrets[1]) | ||
return token === validToken0 || token === validToken1 | ||
function encodeSigData (msg) { | ||
var ref = { seq: msg.seq || 0, v: msg.v } | ||
if (msg.salt) ref.salt = msg.salt | ||
return bencode.encode(ref).slice(1, -1) | ||
} | ||
/** | ||
* Rotate secrets. Secrets are rotated every 5 minutes and tokens up to ten minutes | ||
* old are accepted. | ||
*/ | ||
DHT.prototype._rotateSecrets = function () { | ||
var self = this | ||
function createSecret () { | ||
return hat(SECRET_ENTROPY) | ||
function toNode (node) { | ||
return { | ||
host: node.host, | ||
port: node.port | ||
} | ||
} | ||
// Initialize secrets array | ||
// self.secrets[0] is the current secret, used to generate new tokens | ||
// self.secrets[1] is the last secret, which is still accepted | ||
if (!self.secrets) { | ||
self.secrets = [ createSecret(), createSecret() ] | ||
return | ||
} | ||
self.secrets[1] = self.secrets[0] | ||
self.secrets[0] = createSecret() | ||
function PeerStore (max) { | ||
this.max = max || 10000 | ||
this.used = 0 | ||
this.peers = LRU(Infinity) | ||
} | ||
/** | ||
* Get a string that can be used to initialize and bootstrap the DHT in the | ||
* future. | ||
* @return {Array.<Object>} | ||
*/ | ||
DHT.prototype.toArray = function () { | ||
var self = this | ||
var nodes = self.nodes.toArray().filter(dropData).map(function (contact) { | ||
// to remove properties added by k-bucket, like `distance`, etc. | ||
return { | ||
id: contact.id.toString('hex'), | ||
addr: contact.addr | ||
PeerStore.prototype.add = function (key, peer) { | ||
var peers = this.peers.get(key) | ||
if (!peers) { | ||
peers = { | ||
values: [], | ||
map: LRU(Infinity) | ||
} | ||
}) | ||
return nodes | ||
this.peers.set(key, peers) | ||
} | ||
function dropData (x) { return !x.data } | ||
} | ||
var id = peer.toString('hex') | ||
if (peers.map.get(id)) return | ||
DHT.prototype._addrIsSelf = function (addr) { | ||
var self = this | ||
return self._port && | ||
LOCAL_HOSTS[self._ipv].some(function (host) { return host + ':' + self._port === addr }) | ||
var node = {index: peers.values.length, peer: peer} | ||
peers.map.set(id, node) | ||
peers.values.push(node) | ||
if (++this.used > this.max) this._evict() | ||
} | ||
DHT.prototype._debug = function () { | ||
var self = this | ||
var args = [].slice.call(arguments) | ||
args[0] = '[' + self.nodeId.substring(0, 7) + '] ' + args[0] | ||
debug.apply(null, args) | ||
PeerStore.prototype._evict = function () { | ||
var a = this.peers.peek(this.peers.tail) | ||
var b = a.map.remove(a.map.tail).value | ||
var values = a.values | ||
swap(values, b.index, values.length - 1) | ||
values.pop() | ||
this.used-- | ||
if (!values.length) this.peers.remove(this.peers.tail) | ||
} | ||
/** | ||
* Parse saved string | ||
* @param {Array.<Object>} nodes | ||
* @return {Buffer} | ||
*/ | ||
function fromArray (nodes) { | ||
nodes.forEach(function (node) { | ||
if (node.id) node.id = idToBuffer(node.id) | ||
}) | ||
return nodes | ||
PeerStore.prototype.get = function (key) { | ||
var node = this.peers.get(key) | ||
if (!node) return [] | ||
return pick(node.values, 100) | ||
} | ||
/** | ||
* Convert "contacts" from the routing table into "compact node info" representation. | ||
* @param {Array.<Object>} contacts | ||
* @return {Buffer} | ||
*/ | ||
function convertToNodeInfo (contacts) { | ||
return Buffer.concat(contacts.map(function (contact) { | ||
return Buffer.concat([ contact.id, string2compact(contact.addr) ]) | ||
})) | ||
function parseAddr (addr) { | ||
return {host: addr.split(':')[0], port: Number(addr.split(':')[1])} | ||
} | ||
/** | ||
* Parse "compact node info" representation into "contacts". | ||
* @param {Buffer} nodeInfo | ||
* @return {Array.<string>} array of | ||
*/ | ||
function parseNodeInfo (nodeInfo) { | ||
var contacts = [] | ||
try { | ||
for (var i = 0; i < nodeInfo.length; i += 26) { | ||
contacts.push({ | ||
id: nodeInfo.slice(i, i + 20), | ||
addr: compact2string(nodeInfo.slice(i + 20, i + 26)) | ||
}) | ||
} | ||
} catch (err) { | ||
debug('error parsing node info ' + nodeInfo) | ||
} | ||
return contacts | ||
function swap (list, a, b) { | ||
if (a === b) return | ||
var tmp = list[a] | ||
list[a] = list[b] | ||
list[b] = tmp | ||
list[a].index = a | ||
list[b].index = b | ||
} | ||
/** | ||
* Parse list of "compact addr info" into an array of addr "host:port" strings. | ||
* @param {Array.<Buffer>} list | ||
* @return {Array.<string>} | ||
*/ | ||
function parsePeerInfo (list) { | ||
try { | ||
return list.map(compact2string) | ||
} catch (err) { | ||
debug('error parsing peer info ' + list) | ||
return [] | ||
} | ||
} | ||
function pick (values, n) { | ||
var len = Math.min(values.length, n) | ||
var ptr = 0 | ||
var res = new Array(len) | ||
/** | ||
* Ensure a transacation id is a 16-bit buffer, so it can be sent on the wire as | ||
* the transaction id ("t" field). | ||
* @param {number|Buffer} transactionId | ||
* @return {Buffer} | ||
*/ | ||
function transactionIdToBuffer (transactionId) { | ||
if (Buffer.isBuffer(transactionId)) { | ||
return transactionId | ||
} else { | ||
var buf = new Buffer(2) | ||
buf.writeUInt16BE(transactionId, 0) | ||
return buf | ||
for (var i = 0; i < len; i++) { | ||
var next = ptr + (Math.random() * (values.length - ptr)) | 0 | ||
res[ptr] = values[next].peer | ||
swap(values, ptr++, next) | ||
} | ||
} | ||
/** | ||
* Ensure info hash or node id is a Buffer. | ||
* @param {string|Buffer} id | ||
* @return {Buffer} | ||
*/ | ||
function idToBuffer (id) { | ||
if (Buffer.isBuffer(id)) { | ||
return id | ||
} else if (typeof id === 'string') { | ||
return new Buffer(id, 'hex') | ||
} else { | ||
return null | ||
} | ||
return res | ||
} | ||
/** | ||
* Ensure info hash or node id is a hex string. | ||
* @param {string|Buffer} id | ||
* @return {Buffer} | ||
*/ | ||
function idToHexString (id) { | ||
if (Buffer.isBuffer(id)) { | ||
return id.toString('hex') | ||
} else { | ||
return id | ||
} | ||
function toBuffer (str) { | ||
if (Buffer.isBuffer(str)) return str | ||
if (typeof str === 'string') return new Buffer(str, 'hex') | ||
throw new Error('Pass a buffer or a string') | ||
} | ||
function encodeSigData (msg) { | ||
var ref = { seq: msg.seq || 0, v: msg.v } | ||
if (msg.salt) ref.salt = msg.salt | ||
return bencode.encode(ref).slice(1, -1) | ||
} | ||
function noop () {} |
{ | ||
"name": "bittorrent-dht", | ||
"description": "Simple, robust, BitTorrent DHT implementation", | ||
"version": "5.1.3", | ||
"version": "6.0.0", | ||
"author": { | ||
@@ -14,19 +14,9 @@ "name": "Feross Aboukhadijeh", | ||
"dependencies": { | ||
"addr-to-ip-port": "^1.0.0", | ||
"bencode": "^0.7.0", | ||
"buffer-equal": "0.0.1", | ||
"compact2string": "^1.2.0", | ||
"debug": "^2.1.0", | ||
"hat": "^0.0.3", | ||
"buffer-equals": "^1.0.3", | ||
"debug": "^2.2.0", | ||
"inherits": "^2.0.1", | ||
"is-ip": "^1.0.0", | ||
"isarray": "^1.0.0", | ||
"k-bucket": "^0.6.0", | ||
"network-address": "^1.0.0", | ||
"once": "^1.3.1", | ||
"run-parallel": "^1.0.0", | ||
"simple-get": "^1.3.1", | ||
"simple-sha1": "^2.0.7", | ||
"string2compact": "^1.1.1", | ||
"thunky": "^0.1.0" | ||
"k-rpc": "^3.4.1", | ||
"lru": "^1.2.0" | ||
}, | ||
@@ -36,4 +26,6 @@ "devDependencies": { | ||
"ip": "^1.1.0", | ||
"once": "^1.3.3", | ||
"run-parallel": "^1.1.4", | ||
"standard": "^5.4.1", | ||
"tape": "^4.0.0" | ||
"tape": "^4.4.0" | ||
}, | ||
@@ -40,0 +32,0 @@ "keywords": [ |
@@ -61,4 +61,4 @@ # bittorrent-dht [![travis][travis-image]][travis-url] [![npm][npm-image]][npm-url] [![downloads][downloads-image]][downloads-url] | ||
dht.on('peer', function (addr, infoHash, from) { | ||
console.log('found potential peer ' + addr + ' through ' + from) | ||
dht.on('peer', function (peer, infoHash, from) { | ||
console.log('found potential peer ' + peer.host + ':' + peer.port + ' through ' + from.host + ':' + from.port) | ||
}) | ||
@@ -106,6 +106,3 @@ ``` | ||
Note: `dht.lookup()` should only be called after the ready event has fired, otherwise the | ||
lookup may fail because the DHT routing table doesn't contain enough nodes. | ||
#### `dht.listen([port], [address], [onlistening])` | ||
@@ -152,5 +149,5 @@ | ||
Returns the nodes in the DHT as an array. This is useful for persisting the DHT | ||
to disk between restarts of a BitTorrent client (as recommended by the spec). Each node in the array is an object with `id` (hex string) and `addr` (string) properties. | ||
to disk between restarts of a BitTorrent client (as recommended by the spec). Each node in the array is an object with `host` (string) and `port` (number) properties. | ||
To restore the DHT nodes when instantiating a new `DHT` object, simply pass in the array as the value of the `bootstrap` option. | ||
To restore the DHT nodes when instantiating a new `DHT` object, simply loop over the nodes in the array and add them with the `addNode` method. | ||
@@ -169,7 +166,11 @@ ```js | ||
// initialize a new dht with the same routing table as the first | ||
var dht2 = new DHT({ bootstrap: arr }) | ||
var dht2 = new DHT() | ||
arr.forEach(function (node) { | ||
dht2.add(node) | ||
}) | ||
``` | ||
#### `dht.addNode(addr, [nodeId])` | ||
#### `dht.addNode(node)` | ||
@@ -181,5 +182,15 @@ Manually add a node to the DHT routing table. If there is space in the routing table (or | ||
If `nodeId` is undefined, then the peer will be pinged to learn their node id. If the peer does not respond, the will not be added to the routing table. | ||
A node should look like this | ||
``` js | ||
{ | ||
host: nodeHost, | ||
port: nodePort, | ||
id: optionalNodeId | ||
} | ||
``` | ||
If `id` is undefined, then the peer will be pinged to learn their node id. If the peer does not respond, the will not be added to the routing table. | ||
#### `dht.destroy([callback])` | ||
@@ -209,4 +220,4 @@ | ||
dht.on('ready', function () { | ||
dht.put({ v: value }, function (errors, hash) { | ||
console.error('errors=', errors) | ||
dht.put({ v: value }, function (err, hash) { | ||
console.error('error=', err) | ||
console.log('hash=', hash) | ||
@@ -254,4 +265,4 @@ }) | ||
dht.on('ready', function () { | ||
dht.put(opts, function (errors, hash) { | ||
console.error('errors=', errors) | ||
dht.put(opts, function (err, hash) { | ||
console.error('error=', err) | ||
console.log('hash=', hash) | ||
@@ -262,6 +273,6 @@ }) | ||
In either mutable or immutable forms, `callback(errors, hash)` fires with an | ||
array `errors` of any errors encountered when announcing the content to peers | ||
and `hash`, the location where the mutable or immutable content can be retrieved | ||
(with `dht.get(hash)`). | ||
In either mutable or immutable forms, `callback(error, hash, n)` fires with an | ||
`error` if no nodes were able to store the `value`. `n` is set the amount of peers | ||
that accepted the `put` and `hash`, the location where the mutable or immutable | ||
content can be retrieved (with `dht.get(hash)`). | ||
@@ -293,4 +304,5 @@ Note that you should call `.put()` every hour for content that you want to keep | ||
Emitted when the DHT is ready to handle lookups (i.e. the routing table is sufficiently | ||
populated via the bootstrap nodes). | ||
Emitted when the DHT is fully bootstrapped (i.e. the routing table is sufficiently | ||
populated via the bootstrap nodes). Note that it is okay to do lookups before the 'ready' | ||
event fires. | ||
@@ -308,5 +320,5 @@ Note: If you initialize the DHT with the `{ bootstrap: false }` option, then the 'ready' | ||
#### `dht.on('peer', function (addr, infoHash, from) { ... })` | ||
#### `dht.on('peer', function (peer, infoHash, from) { ... })` | ||
Emitted when a potential peer is found. `addr` is of the form `IP_ADDRESS:PORT`. | ||
Emitted when a potential peer is found. `peer` is of the form `{host, port}`. | ||
`infoHash` is the torrent info hash of the swarm that the peer belongs to. Emitted | ||
@@ -323,3 +335,3 @@ in response to a `lookup(infoHash)` call. | ||
#### `dht.on('node', function (addr, nodeId, from) { ... })` | ||
#### `dht.on('node', function (node) { ... })` | ||
@@ -329,3 +341,3 @@ Emitted when the DHT finds a new node. | ||
#### `dht.on('announce', function (addr, infoHash) { ... })` | ||
#### `dht.on('announce', function (peer, infoHash) { ... })` | ||
@@ -332,0 +344,0 @@ Emitted when a peer announces itself in order to be stored in the DHT. |
@@ -15,3 +15,3 @@ var common = require('./common') | ||
t.deepEqual(dht.nodeId, nodeId.toString('hex')) | ||
t.deepEqual(dht.nodeId, nodeId) | ||
dht.destroy() | ||
@@ -22,3 +22,3 @@ t.end() | ||
test('call `addNode` with nodeId argument', function (t) { | ||
t.plan(2) | ||
t.plan(3) | ||
@@ -30,13 +30,14 @@ var dht = new DHT({ bootstrap: false }) | ||
dht.on('node', function (addr, _nodeId) { | ||
t.equal(addr, '127.0.0.1:9999') | ||
t.deepEqual(_nodeId, nodeId.toString('hex')) | ||
dht.on('node', function (node) { | ||
t.equal(node.host, '127.0.0.1') | ||
t.equal(node.port, 9999) | ||
t.deepEqual(node.id, nodeId) | ||
dht.destroy() | ||
}) | ||
dht.addNode('127.0.0.1:9999', nodeId) | ||
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId}) | ||
}) | ||
test('call `addNode` without nodeId argument', function (t) { | ||
t.plan(2) | ||
t.plan(3) | ||
@@ -53,7 +54,8 @@ var dht1 = new DHT({ bootstrap: false }) | ||
// If `nodeId` is undefined, then the peer will be pinged to learn their node id. | ||
dht2.addNode('127.0.0.1:' + port) | ||
dht2.addNode({host: '127.0.0.1', port: port}) | ||
dht2.on('node', function (addr, _nodeId) { | ||
t.equal(addr, '127.0.0.1:' + port) | ||
t.deepEqual(_nodeId, dht1.nodeId) | ||
dht2.on('node', function (node) { | ||
t.equal(node.host, '127.0.0.1') | ||
t.equal(node.port, port) | ||
t.deepEqual(node.id, dht1.nodeId) | ||
dht1.destroy() | ||
@@ -73,3 +75,3 @@ dht2.destroy() | ||
// If the peer DOES NOT RESPOND, the will not be added to the routing table. | ||
dht.addNode('127.0.0.1:9999') | ||
dht.addNode({host: '127.0.0.1', port: 9999}) | ||
@@ -98,8 +100,11 @@ dht.on('node', function () { | ||
var nodeId = common.randomId() | ||
dht.addNode('127.0.0.1:9999', nodeId) | ||
dht.addNode('127.0.0.1:9999', nodeId) | ||
dht.addNode('127.0.0.1:9999', nodeId) | ||
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId}) | ||
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId}) | ||
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId}) | ||
var togo = 1 | ||
setTimeout(t.pass(), 100) | ||
setTimeout(function () { | ||
dht.destroy() | ||
t.pass() | ||
}, 100) | ||
}) | ||
@@ -115,3 +120,3 @@ | ||
b.listen() | ||
b._sendPing('127.0.0.1:' + port, function (err) { | ||
b._sendPing({host: '127.0.0.1', port: port}, function (err) { | ||
t.error(err) | ||
@@ -118,0 +123,0 @@ a.destroy() |
var crypto = require('crypto') | ||
var hat = require('hat') | ||
var ip = require('ip') | ||
@@ -10,10 +9,16 @@ | ||
exports.randomHost = function () { | ||
return ip.toString(crypto.randomBytes(4)) | ||
} | ||
exports.randomPort = function () { | ||
return crypto.randomBytes(2).readUInt16LE(0) | ||
} | ||
exports.randomAddr = function () { | ||
var host = ip.toString(crypto.randomBytes(4)) | ||
var port = crypto.randomBytes(2).readUInt16LE(0) | ||
return host + ':' + port | ||
return exports.randomHost() + ':' + exports.randomPort() | ||
} | ||
exports.randomId = function () { | ||
return new Buffer(hat(160), 'hex') | ||
return crypto.randomBytes(20) | ||
} | ||
@@ -23,3 +28,7 @@ | ||
for (var i = 0; i < num; i++) { | ||
dht.addNode(exports.randomAddr(), exports.randomId()) | ||
dht.addNode({ | ||
id: exports.randomId(), | ||
host: exports.randomHost(), | ||
port: exports.randomPort() | ||
}) | ||
} | ||
@@ -26,0 +35,0 @@ } |
@@ -16,5 +16,3 @@ var common = require('./common') | ||
var value = fill(500, 'abc') | ||
dht.put({ v: value }, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht.put({ v: value }, function (_, hash) { | ||
t.equal( | ||
@@ -35,3 +33,3 @@ hash.toString('hex'), | ||
test('multi-party immutable put/get', function (t) { | ||
t.plan(3) | ||
t.plan(4) | ||
@@ -61,4 +59,4 @@ var dht1 = new DHT({ bootstrap: false }) | ||
var value = fill(500, 'abc') | ||
dht1.put({ v: value }, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht1.put({ v: value }, function (err, hash) { | ||
t.error(err) | ||
@@ -69,2 +67,3 @@ t.equal( | ||
) | ||
dht2.get(hash, function (err, res) { | ||
@@ -71,0 +70,0 @@ t.ifError(err) |
var common = require('./common') | ||
var DHT = require('../') | ||
var ed = require('ed25519-supercop') | ||
var sha1 = require('simple-sha1') | ||
var test = require('tape') | ||
var crypto = require('crypto') | ||
@@ -26,8 +26,11 @@ test('local mutable put/get', function (t) { | ||
} | ||
var expectedHash = sha1.sync(opts.k) | ||
dht.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
var expectedHash = crypto.createHash('sha1').update(opts.k).digest() | ||
t.equal(hash, expectedHash, 'hash of the public key') | ||
dht.put(opts, function (_, hash) { | ||
t.equal( | ||
hash.toString('hex'), | ||
expectedHash.toString('hex'), | ||
'hash of the public key' | ||
) | ||
dht.get(hash, function (err, res) { | ||
@@ -45,3 +48,3 @@ t.ifError(err) | ||
test('multiparty mutable put/get', function (t) { | ||
t.plan(3) | ||
t.plan(4) | ||
@@ -80,8 +83,9 @@ var keypair = ed.createKeyPair(ed.createSeed()) | ||
} | ||
var expectedHash = sha1.sync(opts.k) | ||
dht1.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
var expectedHash = crypto.createHash('sha1').update(opts.k).digest() | ||
t.equal(hash, expectedHash, 'hash of the public key') | ||
dht1.put(opts, function (err, hash) { | ||
t.error(err) | ||
t.deepEqual(hash, expectedHash, 'hash of the public key') | ||
dht2.get(hash, function (err, res) { | ||
@@ -98,3 +102,3 @@ t.ifError(err) | ||
test('multiparty mutable put/get sequence', function (t) { | ||
t.plan(9) | ||
t.plan(12) | ||
@@ -132,8 +136,9 @@ var keypair = ed.createKeyPair(ed.createSeed()) | ||
} | ||
var expectedHash = sha1.sync(opts.k) | ||
dht1.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
var expectedHash = crypto.createHash('sha1').update(opts.k).digest() | ||
t.equal(hash, expectedHash, 'hash of the public key') | ||
dht1.put(opts, function (err, hash) { | ||
t.error(err) | ||
t.deepEqual(hash, expectedHash, 'hash of the public key') | ||
dht2.get(hash, function (err, res) { | ||
@@ -152,6 +157,6 @@ t.ifError(err) | ||
dht1.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht1.put(opts, function (err, hash) { | ||
t.error(err) | ||
t.equal(hash, expectedHash, 'hash of the public key (again)') | ||
t.deepEqual(hash, expectedHash, 'hash of the public key (again)') | ||
dht2.get(hash, function (err, res) { | ||
@@ -171,6 +176,6 @@ t.ifError(err) | ||
dht1.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht1.put(opts, function (err, hash) { | ||
t.error(err) | ||
t.equal(hash, expectedHash, 'hash of the public key (yet again)') | ||
t.deepEqual(hash, expectedHash, 'hash of the public key (yet again)') | ||
dht2.get(hash, function (err, res) { | ||
@@ -188,3 +193,3 @@ t.ifError(err) | ||
test('salted multikey multiparty mutable put/get sequence', function (t) { | ||
t.plan(9) | ||
t.plan(12) | ||
@@ -232,9 +237,10 @@ var keypair = ed.createKeyPair(ed.createSeed()) | ||
} | ||
var first = sha1.sync(Buffer.concat([ new Buffer('first'), fopts.k ])) | ||
var second = sha1.sync(Buffer.concat([ new Buffer('second'), sopts.k ])) | ||
dht1.put(fopts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
var first = crypto.createHash('sha1').update('first').update(fopts.k).digest() | ||
var second = crypto.createHash('sha1').update('second').update(sopts.k).digest() | ||
t.equal(hash, first, 'first hash') | ||
dht1.put(fopts, function (err, hash) { | ||
t.error(err) | ||
t.deepEqual(hash, first, 'first hash') | ||
dht2.get(hash, function (err, res) { | ||
@@ -250,6 +256,6 @@ t.ifError(err) | ||
function putSecondKey () { | ||
dht1.put(sopts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht1.put(sopts, function (err, hash) { | ||
t.error(err) | ||
t.equal(hash, second, 'second hash') | ||
t.deepEqual(hash, second, 'second hash') | ||
dht2.get(hash, function (err, res) { | ||
@@ -269,6 +275,6 @@ t.ifError(err) | ||
dht1.put(fopts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht1.put(fopts, function (err, hash) { | ||
t.error(err) | ||
t.equal(hash, first, 'first salt (again)') | ||
t.deepEqual(hash, first, 'first salt (again)') | ||
dht2.get(hash, function (err, res) { | ||
@@ -286,3 +292,3 @@ t.ifError(err) | ||
test('transitive mutable update', function (t) { | ||
t.plan(3) | ||
t.plan(4) | ||
@@ -325,9 +331,10 @@ var keypair = ed.createKeyPair(ed.createSeed()) | ||
} | ||
var expectedHash = sha1.sync(opts.k) | ||
dht1.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
var expectedHash = crypto.createHash('sha1').update(opts.k).digest() | ||
t.equal(hash, expectedHash, 'hash of the public key') | ||
dht1.put(opts, function (err, hash) { | ||
t.error(err) | ||
t.deepEqual(hash, expectedHash, 'hash of the public key') | ||
dht3.get(expectedHash, function (err, res) { | ||
@@ -344,3 +351,3 @@ t.ifError(err) | ||
test('mutable update mesh', function (t) { | ||
t.plan(9) | ||
t.plan(12) | ||
/* | ||
@@ -412,7 +419,8 @@ 0 <-> 1 <-> 2 | ||
} | ||
var xhash = sha1.sync(opts.k) | ||
src.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
t.equal(hash, xhash) | ||
var xhash = crypto.createHash('sha1').update(opts.k).digest() | ||
src.put(opts, function (err, hash) { | ||
t.error(err) | ||
t.equal(hash.toString('hex'), xhash.toString('hex')) | ||
dst.get(xhash, function (err, res) { | ||
@@ -429,3 +437,3 @@ t.ifError(err) | ||
test('invalid sequence', function (t) { | ||
t.plan(4) | ||
t.plan(5) | ||
@@ -464,7 +472,7 @@ var keypair = ed.createKeyPair(ed.createSeed()) | ||
dht0.put(opts0, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht0.put(opts0, function (err, hash) { | ||
t.error(err) | ||
hash0 = hash | ||
dht0.put(opts1, function (errors, hash) { | ||
t.ok(errors.length, 'caught expected error: ' + errors[0]) | ||
dht0.put(opts1, function (err, hash) { | ||
t.ok(err, 'caught expected error: ' + err) | ||
check() | ||
@@ -489,3 +497,3 @@ }) | ||
test('valid sequence', function (t) { | ||
t.plan(4) | ||
t.plan(6) | ||
@@ -524,7 +532,7 @@ var keypair = ed.createKeyPair(ed.createSeed()) | ||
dht0.put(opts0, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht0.put(opts0, function (err, hash) { | ||
t.error(err) | ||
hash0 = hash | ||
dht0.put(opts1, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
dht0.put(opts1, function (err, hash) { | ||
t.error(err) | ||
hash1 = hash | ||
@@ -531,0 +539,0 @@ t.deepEqual(hash0, hash1) |
var common = require('./common') | ||
var DHT = require('../') | ||
var sha1 = require('simple-sha1') | ||
var test = require('tape') | ||
var crypto = require('crypto') | ||
var ed = require('ed25519-supercop') | ||
@@ -45,8 +44,12 @@ | ||
} | ||
var expectedHash = sha1.sync(opts.k) | ||
dht.put(opts, function (errors, hash) { | ||
errors.forEach(t.error.bind(t)) | ||
var expectedHash = crypto.createHash('sha1').update(opts.k).digest() | ||
t.equal(hash, expectedHash, 'hash of the public key') | ||
dht.put(opts, function (_, hash) { | ||
t.equal( | ||
hash.toString('hex'), | ||
expectedHash.toString('hex'), | ||
'hash of the public key' | ||
) | ||
dht.get(hash, function (err, res) { | ||
@@ -53,0 +56,0 @@ t.ifError(err) |
@@ -13,2 +13,3 @@ var common = require('./common') | ||
if (numNodes === 100) { | ||
dht.destroy() | ||
t.pass('100 nodes added, 100 `node` events emitted') | ||
@@ -30,2 +31,3 @@ t.end() | ||
if (numNodes === 10000) { | ||
dht.destroy() | ||
t.pass('10000 nodes added, 10000 `node` events emitted') | ||
@@ -47,2 +49,3 @@ t.end() | ||
if (numPeers === 100) { | ||
dht.destroy() | ||
t.pass('100 peers added, 100 `announce` events emitted') | ||
@@ -64,2 +67,3 @@ t.end() | ||
if (numPeers === 10000) { | ||
dht.destroy() | ||
t.pass('10000 peers added, 10000 `announce` events emitted') | ||
@@ -66,0 +70,0 @@ t.end() |
@@ -76,5 +76,5 @@ var common = require('./common') | ||
dhts[1].on('peer', function (addr, hash) { | ||
t.equal(hash, infoHash) | ||
t.equal(Number(addr.split(':')[1]), 9998) | ||
dhts[1].on('peer', function (peer, hash) { | ||
t.equal(hash.toString('hex'), infoHash) | ||
t.equal(peer.port, 9998) | ||
clearTimeout(timeoutId) | ||
@@ -93,4 +93,4 @@ cb(null, dhts) | ||
var next = dhts[(i + 1) % len] | ||
dhts[i].addNode('127.0.0.1:' + next.address().port, next.nodeId) | ||
dhts[i].addNode({host: '127.0.0.1', port: next.address().port, id: next.nodeId}) | ||
} | ||
} |
@@ -14,5 +14,10 @@ var common = require('./common') | ||
dht1.listen(function () { | ||
dht2._sendPing('127.0.0.1:' + dht1.address().port, function (err, res) { | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: dht1.address().port | ||
}, { | ||
q: 'ping' | ||
}, function (err, res) { | ||
t.error(err) | ||
t.deepEqual(res.id, dht1.nodeIdBuffer) | ||
t.deepEqual(res.r.id, dht1.nodeId) | ||
@@ -35,13 +40,17 @@ dht1.destroy() | ||
dht1.addNode('255.255.255.255:6969', targetNodeId) | ||
dht1.addNode({host: '255.255.255.255', port: 6969, id: targetNodeId}) | ||
dht1.listen(function () { | ||
dht2._sendFindNode('127.0.0.1:' + dht1.address().port, targetNodeId, function (err, res) { | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: dht1.address().port | ||
}, { | ||
q: 'find_node', | ||
a: {target: targetNodeId} | ||
}, function (err, res) { | ||
t.error(err) | ||
t.deepEqual(res.id, dht1.nodeIdBuffer) | ||
t.deepEqual( | ||
res.nodes.map(function (node) { return node.addr }), | ||
[ '255.255.255.255:6969', '127.0.0.1:' + dht2.address().port ] | ||
) | ||
t.deepEqual(res.r.id, dht1.nodeId) | ||
t.deepEqual(res.r.nodes.length, 2 * 26) | ||
dht1.destroy() | ||
@@ -61,17 +70,20 @@ dht2.destroy() | ||
dht1.addNode('1.1.1.1:6969', common.randomId()) | ||
dht1.addNode('10.10.10.10:6969', common.randomId()) | ||
dht1.addNode('255.255.255.255:6969', common.randomId()) | ||
dht1.addNode({host: '1.1.1.1', port: 6969, id: common.randomId()}) | ||
dht1.addNode({host: '10.10.10.10', port: 6969, id: common.randomId()}) | ||
dht1.addNode({host: '255.255.255.255', port: 6969, id: common.randomId()}) | ||
dht1.listen(function () { | ||
var targetNodeId = common.randomId() | ||
dht2._sendFindNode('127.0.0.1:' + dht1.address().port, targetNodeId, function (err, res) { | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: dht1.address().port | ||
}, { | ||
q: 'find_node', | ||
a: {target: targetNodeId} | ||
}, function (err, res) { | ||
t.error(err) | ||
t.deepEqual(res.id, dht1.nodeIdBuffer) | ||
t.deepEqual( | ||
res.nodes.map(function (node) { return node.addr }).sort(), | ||
[ '1.1.1.1:6969', '10.10.10.10:6969', '127.0.0.1:' + dht2.address().port, | ||
'255.255.255.255:6969' ] | ||
) | ||
t.deepEqual(res.r.id, dht1.nodeId) | ||
t.deepEqual(res.r.nodes.length, 26 * 4) | ||
dht1.destroy() | ||
@@ -91,15 +103,20 @@ dht2.destroy() | ||
dht1.addNode('1.1.1.1:6969', common.randomId()) | ||
dht1.addNode('2.2.2.2:6969', common.randomId()) | ||
dht1.addNode({host: '1.1.1.1', port: 6969, id: common.randomId()}) | ||
dht1.addNode({host: '2.2.2.2', port: 6969, id: common.randomId()}) | ||
dht1.listen(function () { | ||
var targetInfoHash = common.randomId() | ||
dht2._sendGetPeers('127.0.0.1:' + dht1.address().port, targetInfoHash, function (err, res) { | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: dht1.address().port | ||
}, { | ||
q: 'get_peers', | ||
a: { | ||
info_hash: targetInfoHash | ||
} | ||
}, function (err, res) { | ||
t.error(err) | ||
t.deepEqual(res.id, dht1.nodeIdBuffer) | ||
t.ok(Buffer.isBuffer(res.token)) | ||
t.deepEqual( | ||
res.nodes.map(function (node) { return node.addr }).sort(), | ||
[ '1.1.1.1:6969', '127.0.0.1:' + dht2.address().port, '2.2.2.2:6969' ] | ||
) | ||
t.deepEqual(res.r.id, dht1.nodeId) | ||
t.ok(Buffer.isBuffer(res.r.token)) | ||
t.deepEqual(res.r.nodes.length, 3 * 26) | ||
@@ -128,11 +145,17 @@ dht1.destroy() | ||
dht1.listen(function () { | ||
dht2._sendGetPeers('127.0.0.1:' + dht1.address().port, targetInfoHash, function (err, res) { | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: dht1.address().port | ||
}, { | ||
q: 'get_peers', | ||
a: { | ||
info_hash: targetInfoHash | ||
} | ||
}, function (err, res) { | ||
t.error(err) | ||
t.deepEqual(res.id, dht1.nodeIdBuffer) | ||
t.ok(Buffer.isBuffer(res.token)) | ||
t.deepEqual( | ||
res.values.sort(), | ||
['1.1.1.1:6969', '10.10.10.10:6969', '255.255.255.255:6969'] | ||
) | ||
t.deepEqual(res.r.id, dht1.nodeId) | ||
t.ok(Buffer.isBuffer(res.r.token)) | ||
t.deepEqual(res.r.values.length, 3) | ||
dht1.destroy() | ||
@@ -156,3 +179,13 @@ dht2.destroy() | ||
var token = new Buffer('bad token') | ||
dht2._sendAnnouncePeer('127.0.0.1:' + dht1.address().port, infoHash, 9999, token, function (err, res) { | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: dht1.address().port | ||
}, { | ||
q: 'announce_peer', | ||
a: { | ||
info_hash: infoHash, | ||
port: 9999, | ||
token: token | ||
} | ||
}, function (err, res) { | ||
t.ok(err, 'got error') | ||
@@ -180,10 +213,29 @@ t.ok(err.message.indexOf('bad token') !== -1) | ||
var port = dht1.address().port | ||
dht2._sendGetPeers('127.0.0.1:' + port, infoHash, function (err, res1) { | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: port | ||
}, { | ||
q: 'get_peers', | ||
a: { | ||
info_hash: infoHash | ||
} | ||
}, function (err, res1) { | ||
t.error(err) | ||
t.deepEqual(res1.id, dht1.nodeIdBuffer) | ||
t.ok(Buffer.isBuffer(res1.token)) | ||
dht2._sendAnnouncePeer('127.0.0.1:' + port, infoHash, 9999, res1.token, function (err, res2) { | ||
t.deepEqual(res1.r.id, dht1.nodeId) | ||
t.ok(Buffer.isBuffer(res1.r.token)) | ||
dht2._rpc.query({ | ||
host: '127.0.0.1', | ||
port: port | ||
}, { | ||
q: 'announce_peer', | ||
a: { | ||
info_hash: infoHash, | ||
port: 9999, | ||
token: res1.r.token | ||
} | ||
}, function (err, res2) { | ||
t.error(err) | ||
t.deepEqual(res2.id, dht1.nodeIdBuffer) | ||
t.deepEqual(res2.r.id, dht1.nodeId) | ||
@@ -190,0 +242,0 @@ dht1.destroy() |
@@ -34,3 +34,3 @@ var DHT = require('../../') | ||
t.pass('Found at least one peer that has the file') | ||
t.equal(infoHash, pride) | ||
t.equal(infoHash.toString('hex'), pride) | ||
dht.destroy() | ||
@@ -55,7 +55,7 @@ }) | ||
dht.on('peer', function (peer, infoHash) { | ||
if (!prideDone && infoHash === pride) { | ||
if (!prideDone && infoHash.toString('hex') === pride) { | ||
prideDone = true | ||
t.pass('Found at least one peer for Pride & Prejudice') | ||
} | ||
if (!leavesDone && infoHash === leaves) { | ||
if (!leavesDone && infoHash.toString('hex') === leaves) { | ||
leavesDone = true | ||
@@ -70,1 +70,20 @@ t.pass('Found at least one peer for Leaves of Grass') | ||
}) | ||
test('Find peers before ready is emitted', function (t) { | ||
t.plan(3) | ||
var dht = new DHT() | ||
var then = Date.now() | ||
dht.once('node', function (node) { | ||
t.pass('Found at least one other DHT node') | ||
}) | ||
dht.once('peer', function (peer, infoHash) { | ||
t.pass('Found at least one peer that has the file') | ||
t.equal(infoHash.toString('hex'), pride, 'found a peer in ' + (Date.now() - then) + ' ms') | ||
dht.destroy() | ||
}) | ||
dht.lookup(pride) | ||
}) |
Network access
Supply chain riskThis module accesses the network.
Found 3 instances in 1 package
7
350
0
67000
6
19
1786
+ Addedbuffer-equals@^1.0.3
+ Addedk-rpc@^3.4.1
+ Addedlru@^1.2.0
+ Addedbencode@2.0.3(transitive)
+ Addedbuffer-equals@1.0.4(transitive)
+ Addedchrome-dgram@3.0.6(transitive)
+ Addedchrome-dns@1.0.1(transitive)
+ Addedchrome-net@3.3.4(transitive)
+ Addedk-bucket@2.0.1(transitive)
+ Addedk-rpc@3.7.0(transitive)
+ Addedk-rpc-socket@1.11.1(transitive)
+ Addedlru@1.2.1(transitive)
+ Addedrandombytes@2.1.0(transitive)
+ Addedrun-series@1.1.9(transitive)
+ Addedsafe-buffer@5.2.1(transitive)
- Removedaddr-to-ip-port@^1.0.0
- Removedbuffer-equal@0.0.1
- Removedcompact2string@^1.2.0
- Removedhat@^0.0.3
- Removedis-ip@^1.0.0
- Removedisarray@^1.0.0
- Removednetwork-address@^1.0.0
- Removedonce@^1.3.1
- Removedrun-parallel@^1.0.0
- Removedsimple-get@^1.3.1
- Removedsimple-sha1@^2.0.7
- Removedstring2compact@^1.1.1
- Removedthunky@^0.1.0
- Removedaddr-to-ip-port@1.5.4(transitive)
- Removedcompact2string@1.4.1(transitive)
- Removedhat@0.0.3(transitive)
- Removedip-regex@1.0.3(transitive)
- Removedipaddr.js@2.2.0(transitive)
- Removedis-ip@1.0.0(transitive)
- Removedisarray@1.0.0(transitive)
- Removednetwork-address@1.1.2(transitive)
- Removedonce@1.4.0(transitive)
- Removedqueue-microtask@1.2.3(transitive)
- Removedrun-parallel@1.2.0(transitive)
- Removedrusha@0.8.14(transitive)
- Removedsimple-get@1.4.3(transitive)
- Removedsimple-sha1@2.1.2(transitive)
- Removedstring2compact@1.3.2(transitive)
- Removedthunky@0.1.0(transitive)
- Removedunzip-response@1.0.2(transitive)
- Removedwrappy@1.0.2(transitive)
- Removedxtend@4.0.2(transitive)
Updateddebug@^2.2.0