New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

bittorrent-dht

Package Overview
Dependencies
Maintainers
2
Versions
137
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bittorrent-dht - npm Package Compare versions

Comparing version 5.1.3 to 6.0.0

1921

client.js
module.exports = DHT
module.exports.dgram = require('dgram') // allow override for chrome apps (chrome-dgram)
var addrToIPPort = require('addr-to-ip-port')
var inherits = require('inherits')
var EventEmitter = require('events').EventEmitter
var krpc = require('k-rpc')
var KBucket = require('k-bucket')
var crypto = require('crypto')
var bencode = require('bencode')
var bufferEqual = require('buffer-equal')
var compact2string = require('compact2string')
var equals = require('buffer-equals')
var LRU = require('lru')
var debug = require('debug')('bittorrent-dht')
var dns = require('dns')
var EventEmitter = require('events').EventEmitter
var hat = require('hat')
var inherits = require('inherits')
var isIP = require('is-ip')
var KBucket = require('k-bucket')
var networkAddress = require('network-address')
var once = require('once')
var os = require('os')
var parallel = require('run-parallel')
var publicAddress = require('./lib/public-address')
var sha1 = require('simple-sha1')
var string2compact = require('string2compact')
var BOOTSTRAP_NODES = [
'router.bittorrent.com:6881',
'router.utorrent.com:6881',
'dht.transmissionbt.com:6881'
]
var BOOTSTRAP_TIMEOUT = 10000
var K = module.exports.K = 20 // number of nodes per bucket
var MAX_CONCURRENCY = 6 // α from Kademlia paper
var ROTATE_INTERVAL = 5 * 60 * 1000 // rotate secrets every 5 minutes
var SECRET_ENTROPY = 160 // entropy of token secrets
var SEND_TIMEOUT = 2000
var UINT16 = 0xffff
var MESSAGE_TYPE = module.exports.MESSAGE_TYPE = {
QUERY: 'q',
RESPONSE: 'r',
ERROR: 'e'
}
var ERROR_TYPE = module.exports.ERROR_TYPE = {
GENERIC: 201,
SERVER: 202,
PROTOCOL: 203, // malformed packet, invalid arguments, or bad token
METHOD_UNKNOWN: 204
}
var LOCAL_HOSTS = { 4: [], 6: [] }
var interfaces = os.networkInterfaces()
for (var i in interfaces) {
for (var j = 0; j < interfaces[i].length; j++) {
var face = interfaces[i][j]
if (face.family === 'IPv4') LOCAL_HOSTS[4].push(face.address)
if (face.family === 'IPv6') LOCAL_HOSTS[6].push(face.address)
}
}
inherits(DHT, EventEmitter)
/**
* A DHT client implementation. The DHT is the main peer discovery layer for BitTorrent,
* which allows for trackerless torrents.
* @param {string|Buffer} opts
*/
function DHT (opts) {
if (!(this instanceof DHT)) return new DHT(opts)
if (!opts) opts = {}
var self = this
if (!(self instanceof DHT)) return new DHT(opts)
EventEmitter.call(self)
if (!debug.enabled) self.setMaxListeners(0)
if (!opts) opts = {}
this._tables = LRU({maxAge: ROTATE_INTERVAL, max: opts.maxTables || 1000})
this._values = LRU(opts.maxValues || 1000)
this._peers = new PeerStore(opts.maxPeers || 10000)
self.nodeId = idToHexString(opts.nodeId || hat(160))
self.nodeIdBuffer = idToBuffer(self.nodeId)
this._secrets = null
this._rpc = krpc(opts)
this._rpc.on('query', onquery)
this._rpc.on('node', onnode)
this._rpc.on('warning', onwarning)
this._rpc.on('listening', onlistening)
this._rotateSecrets()
this._verify = opts.verify || null
this._interval = setInterval(rotateSecrets, ROTATE_INTERVAL)
self._debug('new DHT %s', self.nodeId)
this.listening = false
this.destroyed = false
this.nodeId = this._rpc.id
this.nodes = this._rpc.nodes
self.ready = false
self.listening = false
self.destroyed = false
process.nextTick(bootstrap)
self._binding = false
self._port = null
self._ipv = opts.ipv || 4
self._rotateInterval = null
self._verify = opts.verify
EventEmitter.call(this)
this._debug('new DHT %s', this.nodeId)
/**
* Query Handlers table
* @type {Object} string -> function
*/
self.queryHandler = {
ping: self._onPing,
find_node: self._onFindNode,
get_peers: self._onGetPeers,
announce_peer: self._onAnnouncePeer,
put: self._onPut,
get: self._onGet
function onlistening () {
self.listening = true
self._debug('listening %d', self.address().port)
self.emit('listening')
}
/**
* Routing table
* @type {KBucket}
*/
self.nodes = new KBucket({
localNodeId: self.nodeIdBuffer,
numberOfNodesPerKBucket: K,
numberOfNodesToPing: MAX_CONCURRENCY
})
function onquery (query, peer) {
self._onquery(query, peer)
}
/**
* Cache of routing tables used during a lookup. Saved in this object so we can access
* each node's unique token for announces later.
* TODO: Clean up tables after 5 minutes.
* @type {Object} infoHash:string -> KBucket
*/
self.tables = {}
function rotateSecrets () {
self._rotateSecrets()
}
/**
* Pending transactions (unresolved requests to peers)
* @type {Object} addr:string -> array of pending transactions
*/
self.transactions = {}
function bootstrap () {
if (!self.destroyed) self._bootstrap(opts.bootstrap !== false)
}
/**
* Peer address data (tracker storage)
* @type {Object} infoHash:string -> Object {index:Object, list:Array.<Buffer>}
*/
self.peers = {}
function onwarning (err) {
self.emit('warning', err)
}
/**
* Secrets for token generation.
*/
self.secrets = null
/**
* IP addresses of the local DHT node. Used to store the peer, controlling this DHT
* node, into the local table when `client.announce()` is called.
* @type {Array.<string>}
*/
self.localAddresses = [ networkAddress.ipv4() ]
publicAddress(function (err, ip) {
if (err) return self._debug('failed to get public ip: %s', err.message || err)
self.localAddresses.push(ip)
})
// Create socket and attach listeners
self.socket = module.exports.dgram.createSocket('udp' + self._ipv)
self.socket.on('message', self._onData.bind(self))
self.socket.on('listening', self._onListening.bind(self))
self.socket.on('error', noop) // throw away errors
self._rotateSecrets()
self._rotateInterval = setInterval(self._rotateSecrets.bind(self), ROTATE_INTERVAL)
if (self._rotateInterval.unref) self._rotateInterval.unref()
process.nextTick(function () {
if (opts.bootstrap === false) {
// Emit `ready` right away because the user does not want to bootstrap. Presumably,
// the user will call addNode() to populate the routing table manually.
self.ready = true
self.emit('ready')
} else if (typeof opts.bootstrap === 'string') {
self._bootstrap([ opts.bootstrap ])
} else if (Array.isArray(opts.bootstrap)) {
self._bootstrap(fromArray(opts.bootstrap))
} else {
// opts.bootstrap is undefined or true
self._bootstrap(BOOTSTRAP_NODES)
}
})
self.on('ready', function () {
self._debug('emit ready')
})
function onnode (node) {
self.emit('node', node)
}
}
/**
* Start listening for UDP messages on given port.
* @param {number} port
* @param {string} address
* @param {function=} onlistening added as handler for listening event
*/
DHT.prototype.listen = function (port, address, onlistening) {
DHT.prototype.addNode = function (node) {
if (typeof node === 'string') node = parseAddr(node)
var self = this
if (self.destroyed) throw new Error('dht is destroyed')
if (self._binding || self.listening) throw new Error('dht is already listening')
if (typeof port === 'string') {
onlistening = address
address = port
port = undefined
if (node.id) {
node.id = toBuffer(node.id)
var old = !!this._rpc.nodes.get(node.id)
this._rpc.nodes.add(node)
if (!old) this.emit('node', node)
return
}
if (typeof port === 'function') {
onlistening = port
port = undefined
address = undefined
}
if (typeof address === 'function') {
onlistening = address
address = undefined
}
if (onlistening) self.once('listening', onlistening)
self._binding = true
self._debug('listen %s', port)
self.socket.bind(port, address)
this._sendPing(node, function (_, node) {
if (node) self.addNode(node)
})
}
/**
* Called when DHT is listening for UDP messages.
*/
DHT.prototype._onListening = function () {
var self = this
self._binding = false
self.listening = true
self._port = self.socket.address().port
self._debug('emit listening %s', self._port)
self.emit('listening')
DHT.prototype.removeNode = function (id) {
this._rpc.nodes.remove({id: toBuffer(id)})
}
DHT.prototype.address = function () {
var self = this
return self.socket.address()
}
/**
* Announce that the peer, controlling the querying node, is downloading a torrent on a
* port.
* @param {string|Buffer} infoHash
* @param {number} port
* @param {function=} cb
*/
DHT.prototype.announce = function (infoHash, port, cb) {
var self = this
if (!cb) cb = noop
if (self.destroyed) throw new Error('dht is destroyed')
var infoHashBuffer = idToBuffer(infoHash)
infoHash = idToHexString(infoHash)
self._debug('announce %s %s', infoHash, port)
self.localAddresses.forEach(function (address) {
self._addPeer(address + ':' + port, infoHash)
})
// TODO: it would be nice to not use a table when a lookup is in progress
var table = self.tables[infoHash]
if (table) {
process.nextTick(function () {
onClosest(null, table.closest({ id: infoHashBuffer }, K))
})
} else {
self._lookup(infoHash, onClosest)
}
function onClosest (err, closest) {
DHT.prototype._sendPing = function (node, cb) {
this._rpc.query(node, {q: 'ping'}, function (err, pong, node) {
if (err) return cb(err)
closest.forEach(function (contact) {
self._sendAnnouncePeer(contact.addr, infoHashBuffer, port, contact.token)
if (!pong.r || !pong.r.id || !Buffer.isBuffer(pong.r.id) || pong.r.id.length !== 20) {
return cb(new Error('Bad reply'))
}
cb(null, {
id: pong.r.id,
host: node.host || node.address,
port: node.port
})
self._debug('announce end %s %s', infoHash, port)
cb(null)
}
})
}
/**
* Write arbitrary mutable and immutable data to the DHT.
* Specified in BEP44: http://bittorrent.org/beps/bep_0044.html
* @param {Object} opts
* @param {function=} cb
*/
DHT.prototype.toJSON =
DHT.prototype.toArray = function () {
return this._rpc.nodes.toArray().map(toNode)
}
DHT.prototype.put = function (opts, cb) {
var self = this
if (self.destroyed) throw new Error('dht is destroyed')
var isMutable = opts.k
if (Buffer.isBuffer(opts) || typeof opts === 'string') opts = {v: opts}
var isMutable = !!opts.k
if (opts.v === undefined) {

@@ -290,3 +121,2 @@ throw new Error('opts.v not given')

}
if (isMutable && opts.cas !== undefined && typeof opts.cas !== 'number') {

@@ -313,1401 +143,544 @@ throw new Error('opts.cas must be an integer if provided')

}
return self._put(opts, cb)
return this._put(opts, cb)
}
/**
* put() without type checks for internal use
* @param {Object} opts
* @param {function=} cb
*/
DHT.prototype._put = function (opts, cb) {
var self = this
var pending = 0
var errors = []
var isMutable = opts.k
var hash = isMutable
? sha1.sync(opts.salt ? Buffer.concat([ opts.salt, opts.k ]) : opts.k)
: sha1.sync(bencode.encode(opts.v))
var hashBuffer = idToBuffer(hash)
if (!cb) cb = noop
if (self.nodes.toArray().length === 0) {
process.nextTick(function () {
addLocal(null, [])
})
} else {
self._lookup(hash, {findNode: true}, onLookup)
}
var isMutable = !!opts.k
var v = typeof opts.v === 'string' ? new Buffer(opts.v) : opts.v
var key = isMutable
? sha1(opts.salt ? Buffer.concat([opts.salt, opts.k]) : opts.k)
: sha1(bencode.encode(v))
function onLookup (err, nodes) {
if (err) return cb(err)
nodes.forEach(function (node) {
put(node)
})
addLocal()
}
var table = this._tables.get(key.toString('hex'))
if (!table) return this._preput(key, opts, cb)
function addLocal () {
var localData = {
id: self.nodeIdBuffer,
v: opts.v
var message = {
q: 'put',
a: {
id: this._rpc.id,
token: null, // queryAll sets this
v: v
}
var localAddr = '127.0.0.1:' + self._port
if (isMutable) {
if (opts.cas) localData.cas = opts.cas
localData.sig = opts.sign(encodeSigData(opts))
localData.k = opts.k
localData.seq = opts.seq
localData.token = idToBuffer(opts.token || self._generateToken(localAddr))
}
self.nodes.add({
id: hashBuffer,
addr: localAddr,
data: localData
})
if (pending === 0) {
process.nextTick(function () { cb(errors, hash) })
}
}
return hash
function put (node) {
if (node.data) return // skip data nodes
pending += 1
if (isMutable) {
if (typeof opts.cas === 'number') message.a.cas = opts.cas
if (opts.salt) message.a.salt = opts.salt
message.a.k = opts.k
message.a.seq = opts.seq
if (!message.a.sig) message.a.sig = opts.sign(encodeSigData(message.a))
}
var t = self._getTransactionId(node.addr, putOnGet)
self._send(node.addr, {
a: {
id: self.nodeIdBuffer,
target: hashBuffer
},
t: transactionIdToBuffer(t),
y: 'q',
q: 'get'
})
this._values.set(key.toString('hex'), message.a)
this._rpc.queryAll(table.closest({id: key}), message, null, function (err, n) {
if (err) return cb(err, key, n)
cb(null, key, n)
})
function putOnGet (err, res) {
if (err) return next(node)(err)
var t = self._getTransactionId(node.addr, next(node))
var data = {
a: {
id: opts.id || self.nodeIdBuffer,
v: opts.v,
token: res && res.token
},
t: transactionIdToBuffer(t),
y: 'q',
q: 'put'
}
if (isMutable) {
data.a.seq = opts.seq
data.a.sig = opts.sign(encodeSigData(opts))
data.a.k = opts.k
if (opts.salt) data.a.salt = opts.salt
if (typeof opts.cas === 'number') data.a.cas = opts.cas
}
self._send(node.addr, data)
}
}
function next (node) {
return function (err) {
if (err) {
err.address = node.addr
errors.push(err)
}
if (--pending === 0) cb(errors, hash)
}
}
return key
}
DHT.prototype.get = function (hash, cb) {
DHT.prototype._preput = function (key, opts, cb) {
var self = this
if (self.destroyed) throw new Error('dht is destroyed')
var hashBuffer = idToBuffer(hash)
var local = self.nodes.get(hashBuffer)
if (local && local.data) {
return process.nextTick(function () {
cb(null, local.data)
})
}
this._closest(key, {
q: 'get',
a: {
id: this._rpc.id,
target: key
}
}, null, function (err, n) {
if (err) return cb(err)
self.put(opts, cb)
})
self._lookup(hash, {get: true}, cb)
return key
}
DHT.prototype._onPut = function (addr, message) {
var self = this
var msg = message.a
if (!msg || !msg.v || !msg.id) {
return self._sendError(addr, message.t, 203, 'not enough parameters')
DHT.prototype.get = function (key, opts, cb) {
key = toBuffer(key)
if (typeof opts === 'function') {
cb = opts
opts = null
}
var isMutable = message.a.k || message.a.sig
self._debug('put from %s', addr)
if (!opts) opts = {}
var verify = opts.verify || this._verify
var value = this._values.get(key.toString('hex')) || null
var data = {
id: message.a.id,
addr: addr,
v: message.a.v
if (value) {
value = createGetResponse(this._rpc.id, null, value)
return process.nextTick(done)
}
if (data.v && data.v.length > 1000) {
return self._sendError(addr, message.t, 205, 'data payload too large')
}
if (isMutable && !msg.k) {
return self._sendError(addr, message.t, 203, 'missing public key')
}
var hash
if (isMutable) {
hash = msg.salt
? sha1.sync(Buffer.concat([ msg.salt, msg.k ]))
: sha1.sync(msg.k)
} else {
hash = sha1.sync(bencode.encode(data.v))
}
var hashBuffer = idToBuffer(hash)
if (isMutable) {
if (!self._verify) {
return self._sendError(addr, message.t, 400, 'verification not supported')
this._closest(key, {
q: 'get',
a: {
id: this._rpc.id,
target: key
}
var sdata = encodeSigData(msg)
if (!msg.sig || !Buffer.isBuffer(msg.sig) || !self._verify(msg.sig, sdata, msg.k)) {
return self._sendError(addr, message.t, 206, 'invalid signature')
}
var prev = self.nodes.get(hashBuffer)
if (prev && prev.data && prev.data.seq !== undefined && typeof msg.cas === 'number') {
if (msg.cas !== prev.data.seq) {
return self._sendError(addr, message.t, 301,
'CAS mismatch, re-read and try again')
}
}
if (prev && prev.data && prev.data.seq !== undefined) {
if (msg.seq === undefined || msg.seq <= prev.data.seq) {
return self._sendError(addr, message.t, 302,
'sequence number less than current')
}
}
}, onreply, done)
data.sig = msg.sig
data.k = msg.k
data.seq = msg.seq
data.token = msg.token
if (msg.salt && msg.salt.length > 64) {
return self._sendError(addr, message.t, 207, 'salt too big')
}
if (msg.salt) data.salt = msg.salt
function done (err) {
if (err) return cb(err)
cb(null, value)
}
self.nodes.add({ id: hashBuffer, addr: addr, data: data })
self._send(addr, {
t: message.t,
y: MESSAGE_TYPE.RESPONSE,
r: { id: self.nodeIdBuffer }
})
}
function onreply (message) {
var r = message.r
if (!r || !r.v) return true
DHT.prototype._onGet = function (addr, message) {
var self = this
var msg = message.a
if (!msg) return self._debug('skipping malformed get request from %s', addr)
if (!msg.target) return self._debug('missing a.target in get() from %s', addr)
var isMutable = r.k || r.sig
var addrData = addrToIPPort(addr)
var hashBuffer = message.a.target
var rec = self.nodes.get(hashBuffer)
if (rec && rec.data) {
msg = {
t: message.t,
y: MESSAGE_TYPE.RESPONSE,
r: {
id: self.nodeIdBuffer,
nodes: [], // found, so we don't need to know the nodes
nodes6: [],
v: rec.data.v
}
}
var isMutable = rec.data.k || rec.data.sig
if (isMutable) {
msg.r.k = rec.data.k
msg.r.seq = rec.data.seq
msg.r.sig = rec.data.sig
msg.r.token = rec.data.token
if (rec.data.salt) {
msg.r.salt = rec.data.salt
if (!verify || !r.sig || !r.k) return true
if (!verify(r.sig, encodeSigData(r), r.k)) return true
if (equals(sha1(r.salt ? Buffer.concat([r.salt, r.k]) : r.k), key)) {
value = r
return false
}
if (rec.data.cas) {
msg.r.cas = rec.data.cas
} else {
if (equals(sha1(bencode.encode(r.v)), key)) {
value = r
return false
}
}
self._send(addr, msg)
} else {
self._lookup(hashBuffer, function (err, nodes) {
if (err && self.destroyed) return
if (err) return self._sendError(addr, message.t, 201, err)
var res = {
t: message.t,
y: MESSAGE_TYPE.RESPONSE,
r: {
token: idToBuffer(self._generateToken(addrData[0])),
id: self.nodeIdBuffer,
nodes: nodes.map(function (node) {
return node.addr
}),
nodes6: [] // todo: filter the addrs
}
}
if (rec && rec.data && rec.data.k) res.k = rec.data.k
if (rec && rec.data && rec.data.seq) res.seq = rec.data.seq
if (rec && rec.data && rec.data.sig) res.sig = rec.data.sig
if (rec && rec.data && rec.data.token) res.token = rec.data.token
if (rec && rec.data && rec.data.v) res.v = rec.data.v
self._send(addr, res)
})
return true
}
}
/**
* Destroy and cleanup the DHT.
* @param {function=} cb
*/
DHT.prototype.destroy = function (cb) {
var self = this
if (self.destroyed) throw new Error('dht is destroyed')
DHT.prototype.announce = function (infoHash, port, cb) {
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
if (cb) cb = once(cb)
else cb = noop
var table = this._tables.get(infoHash.toString('hex'))
if (!table) return this._preannounce(infoHash, port, cb)
if (self._binding) return self.once('listening', self.destroy.bind(self, cb))
self._debug('destroy')
self.destroyed = true
self.listening = false
// garbage collect large data structures
self.nodes = null
self.tables = null
self.transactions = null
self.peers = null
clearInterval(self._rotateInterval)
self.socket.on('close', cb)
try {
self.socket.close()
} catch (err) {
// ignore error, socket was either already closed / not yet bound
process.nextTick(function () {
cb(null)
})
var message = {
q: 'announce_peer',
a: {
id: this._rpc.id,
token: null, // queryAll sets this
info_hash: infoHash,
port: port
}
}
}
/**
* Add a DHT node to the routing table.
* @param {string} addr
* @param {string|Buffer} nodeId
* @param {string=} from addr
*/
DHT.prototype.addNode = function (addr, nodeId) {
var self = this
if (self.destroyed) throw new Error('dht is destroyed')
// If `nodeId` is undefined, then the peer will be pinged to learn their node id.
// If the peer does not respond, the will not be added to the routing table.
if (nodeId == null) {
self._sendPing(addr, function (err, res) {
if (err) {
self._debug('skipping addNode %s; peer did not respond: %s', addr, err.message)
}
// No need to call `self._addNode()` explicitly here. `_onData` automatically
// attempts to add every node the client gets a message from to the routing table.
})
return
}
var nodeIdBuffer = idToBuffer(nodeId)
if (nodeIdBuffer.length !== 20) throw new Error('invalid node id length')
self._addNode(addr, nodeIdBuffer)
this._debug('announce %s %d', infoHash, port)
this._rpc.queryAll(table.closest({id: infoHash}), message, null, cb)
}
/**
* Internal version of `addNode` that doesn't throw errors on invalid arguments, but
* silently fails instead. Useful for dealing with potentially bad data from the network.
* @param {string} addr
* @param {string|Buffer} nodeId
* @param {string=} from addr
* @return {boolean} was the node valid and new and added to the table
*/
DHT.prototype._addNode = function (addr, nodeId, from) {
DHT.prototype._preannounce = function (infoHash, port, cb) {
var self = this
if (self.destroyed) return
var nodeIdBuffer = idToBuffer(nodeId)
if (!nodeIdBuffer) return
nodeId = idToHexString(nodeId)
if (nodeIdBuffer.length !== 20) {
self._debug('skipping addNode %s %s; invalid id length', addr, nodeId)
return
}
if (self._addrIsSelf(addr) || nodeId === self.nodeId) {
self._debug('skip addNode %s %s; that is us!', addr, nodeId)
return
}
var existing = self.nodes.get(nodeIdBuffer)
if (existing && existing.addr === addr) return
self.nodes.add({
id: nodeIdBuffer,
addr: addr
this.lookup(infoHash, function (err) {
if (err) return cb(err)
self.announce(infoHash, port, cb)
})
process.nextTick(function () {
self.emit('node', addr, nodeId, from)
})
self._debug('addNode %s %s discovered from %s', nodeId, addr, from)
}
/**
* Remove a DHT node from the routing table.
* @param {string|Buffer} nodeId
*/
DHT.prototype.removeNode = function (nodeId) {
DHT.prototype.lookup = function (infoHash, cb) {
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
var self = this
if (self.destroyed) throw new Error('dht is destroyed')
var nodeIdBuffer = idToBuffer(nodeId)
var contact = self.nodes.get(nodeIdBuffer)
if (contact) {
self._debug('removeNode %s %s', contact.nodeId, contact.addr)
self.nodes.remove(contact)
}
}
process.nextTick(emit)
this._debug('lookup %s', infoHash)
this._closest(infoHash, {
q: 'get_peers',
a: {
id: this._rpc.id,
info_hash: infoHash
}
}, onreply, cb)
/**
* Store a peer in the DHT. Called when a peer sends a `announce_peer` message.
* @param {string} addr
* @param {Buffer|string} infoHash
*/
DHT.prototype._addPeer = function (addr, infoHash) {
var self = this
if (self.destroyed) return
infoHash = idToHexString(infoHash)
var peers = self.peers[infoHash]
if (!peers) {
peers = self.peers[infoHash] = {
index: {}, // addr -> true
list: [] // compactAddr
function emit (values, from) {
if (!values) values = self._peers.get(infoHash.toString('hex'))
var peers = decodePeers(values)
for (var i = 0; i < peers.length; i++) {
self.emit('peer', peers[i], infoHash, from || null)
}
}
if (!peers.index[addr]) {
peers.index[addr] = true
peers.list.push(string2compact(addr))
self._debug('addPeer %s %s', addr, infoHash)
self.emit('announce', addr, infoHash)
function onreply (message, node) {
if (message.r.values) emit(message.r.values, node)
}
}
/**
* Remove a peer from the DHT.
* @param {string} addr
* @param {Buffer|string} infoHash
*/
DHT.prototype._removePeer = function (addr, infoHash) {
var self = this
if (self.destroyed) return
infoHash = idToHexString(infoHash)
var peers = self.peers[infoHash]
if (peers && peers.index[addr]) {
peers.index[addr] = null
var compactPeerInfo = string2compact(addr)
peers.list.some(function (peer, index) {
if (bufferEqual(peer, compactPeerInfo)) {
peers.list.splice(index, 1)
self._debug('removePeer %s %s', addr, infoHash)
return true // abort early
}
})
}
DHT.prototype.address = function () {
return this._rpc.address()
}
/**
* Join the DHT network. To join initially, connect to known nodes (either public
* bootstrap nodes, or known nodes from a previous run of bittorrent-client).
* @param {Array.<string|Object>} nodes
*/
DHT.prototype._bootstrap = function (nodes) {
var self = this
if (self.destroyed) return
self._debug('bootstrap with %s', JSON.stringify(nodes))
var contacts = nodes.map(function (obj) {
if (typeof obj === 'string') {
return { addr: obj }
} else {
return obj
}
})
self._resolveContacts(contacts, function (err, contacts) {
if (self.destroyed) return
if (err) return self.emit('error', err)
// add all non-bootstrap nodes to routing table
contacts
.filter(function (contact) {
return !!contact.id
})
.forEach(function (contact) {
self._addNode(contact.addr, contact.id, contact.from)
})
// get addresses of bootstrap nodes
var addrs = contacts
.filter(function (contact) {
return !contact.id
})
.map(function (contact) {
return contact.addr
})
lookup()
function lookup () {
self._lookup(self.nodeId, {
findNode: true,
addrs: addrs.length ? addrs : null
}, function (err) {
if (err) return self._debug('lookup error during bootstrap: %s', err.message)
// emit `ready` once the recursive lookup for our own node ID is finished
// (successful or not), so that later get_peer lookups will have a good shot at
// succeeding.
if (!self.ready) {
self.ready = true
self.emit('ready')
}
})
startBootstrapTimeout()
}
function startBootstrapTimeout () {
var bootstrapTimeout = setTimeout(function () {
if (self.destroyed) return
// If 0 nodes are in the table after a timeout, retry with bootstrap nodes
if (self.nodes.count() === 0) {
self._debug('No DHT bootstrap nodes replied, retry')
lookup()
}
}, BOOTSTRAP_TIMEOUT)
if (bootstrapTimeout.unref) bootstrapTimeout.unref()
}
})
DHT.prototype.listen = function (port, cb) {
if (typeof port === 'function') return this.listen(0, port)
this._rpc.bind(port, cb)
}
/**
* Resolve the DNS for nodes whose hostname is a domain name (often the case for
* bootstrap nodes).
* @param {Array.<Object>} contacts array of contact objects with domain addresses
* @param {function} cb
*/
DHT.prototype._resolveContacts = function (contacts, cb) {
DHT.prototype.destroy = function (cb) {
if (this.destroyed) {
if (cb) process.nextTick(cb)
return
}
this.destroyed = true
var self = this
var tasks = contacts.map(function (contact) {
return function (cb) {
var addrData = addrToIPPort(contact.addr)
if (isIP(addrData[0])) {
cb(null, contact)
} else {
dns.lookup(addrData[0], self._ipv, function (err, host) {
if (err) return cb(null, null)
contact.addr = host + ':' + addrData[1]
cb(null, contact)
})
}
}
clearInterval(this._interval)
this._debug('destroying')
this._rpc.destroy(function () {
self.emit('close')
if (cb) cb()
})
parallel(tasks, function (err, contacts) {
if (err) return cb(err)
// filter out hosts that don't resolve
contacts = contacts.filter(function (contact) { return !!contact })
cb(null, contacts)
})
}
/**
* Perform a recurive node lookup for the given nodeId. If isFindNode is true, then
* `find_node` will be sent to each peer instead of `get_peers`.
* @param {Buffer|string} id node id or info hash
* @param {Object=} opts
* @param {boolean} opts.findNode
* @param {Array.<string>} opts.addrs
* @param {function} cb called with K closest nodes
*/
DHT.prototype.lookup = function (id, opts, cb) {
var self = this
if (self.destroyed) throw new Error('dht is destroyed')
self._lookup(id, opts, cb)
}
DHT.prototype._onquery = function (query, peer) {
var q = query.q.toString()
this._debug('received %s query from %s:%d', q, peer.address, peer.port)
if (!query.a) return
/**
* lookup() for private use. If DHT is destroyed, returns an error via callback.
*/
DHT.prototype._lookup = function (id, opts, cb) {
var self = this
switch (q) {
case 'ping':
return this._rpc.response(peer, query, {id: this._rpc.id})
if (self.destroyed) {
return process.nextTick(function () {
cb(new Error('dht is destroyed'))
})
}
case 'find_node':
return this._onfindnode(query, peer)
if (typeof opts === 'function') {
cb = opts
opts = {}
}
if (!opts) opts = {}
case 'get_peers':
return this._ongetpeers(query, peer)
if (cb) cb = once(cb)
else cb = noop
case 'announce_peer':
return this._onannouncepeer(query, peer)
var idBuffer = idToBuffer(id)
id = idToHexString(id)
case 'get':
return this._onget(query, peer)
if (self._binding) return self.once('listening', self._lookup.bind(self, id, opts, cb))
if (!self.listening) return self.listen(self._lookup.bind(self, id, opts, cb))
if (idBuffer.length !== 20) throw new Error('invalid node id / info hash length')
self._debug('lookup %s %s', (opts.findNode ? '(find_node)' : '(get_peers)'), id)
// Return local peers, if we have any in our table
var peers = self.peers[id]
if (peers) {
peers = parsePeerInfo(peers.list)
peers.forEach(function (peerAddr) {
self._debug('emit peer %s %s from %s', peerAddr, id, 'local')
self.emit('peer', peerAddr, id, 'local')
})
case 'put':
return this._onput(query, peer)
}
}
var table = new KBucket({
localNodeId: idBuffer,
numberOfNodesPerKBucket: K,
numberOfNodesToPing: MAX_CONCURRENCY
})
DHT.prototype._onfindnode = function (query, peer) {
var target = query.a.target
if (!target) return this._rpc.error(peer, query, [203, '`find_node` missing required `a.target` field'])
// NOT the same table as the one used for the lookup, as that table may have nodes without tokens
if (!self.tables[id]) {
self.tables[id] = new KBucket({
localNodeId: idBuffer,
numberOfNodesPerKBucket: K,
numberOfNodesToPing: MAX_CONCURRENCY
})
}
var nodes = this._rpc.nodes.closest({ id: target })
this._rpc.response(peer, query, {id: this._rpc.id}, nodes)
}
var tokenful = self.tables[id]
DHT.prototype._ongetpeers = function (query, peer) {
var host = peer.address || peer.host
var infoHash = query.a.info_hash
if (!infoHash) return this._rpc.error(peer, query, [203, '`get_peers` missing required `a.info_hash` field'])
function add (contact) {
if (self._addrIsSelf(contact.addr) || bufferEqual(contact.id, self.nodeIdBuffer)) return
if (contact.token) tokenful.add(contact)
var r = {id: this._rpc.id, token: this._generateToken(host)}
var peers = this._peers.get(infoHash.toString('hex'))
table.add(contact)
}
var queried = {}
var pending = 0 // pending queries
if (opts.addrs) {
// kick off lookup with explicitly passed nodes (usually, bootstrap servers)
opts.addrs.forEach(query)
if (peers.length) {
r.values = peers
this._rpc.response(peer, query, r)
} else {
// kick off lookup with nodes in the main table
queryClosest()
this._rpc.response(peer, query, r, this._rpc.nodes.closest({id: infoHash}))
}
}
function query (addr) {
pending += 1
queried[addr] = true
DHT.prototype._onannouncepeer = function (query, peer) {
var host = peer.address || peer.host
var port = query.a.implied_port ? peer.port : query.a.port
if (!port || typeof port !== 'number') return
var infoHash = query.a.info_hash
var token = query.a.token
if (!infoHash || !token) return
if (opts.get) {
self._sendGet(addr, idBuffer, onResponse.bind(null, addr))
} else if (opts.findNode) {
self._sendFindNode(addr, idBuffer, onResponse.bind(null, addr))
} else {
self._sendGetPeers(addr, idBuffer, onResponse.bind(null, addr))
}
if (!this._validateToken(host, token)) {
return this._rpc.error(peer, query, [203, 'cannot `announce_peer` with bad token'])
}
function queryClosest () {
self.nodes.closest({ id: idBuffer }, K).forEach(function (contact) {
query(contact.addr)
})
}
// Note: `_sendFindNode` and `_sendGetPeers` will insert newly discovered nodes into
// the routing table, so that's not done here.
function onResponse (addr, err, res) {
if (cb.called) return
if (self.destroyed) return cb(new Error('dht is destroyed'))
if (opts.get && res && res.v) {
var isMutable = res.k || res.sig
var sdata = encodeSigData(res)
if (isMutable && !self._verify) {
self._debug('ed25519 verify not provided')
} else if (isMutable && !self._verify(res.sig, sdata, res.k)) {
self._debug('invalid mutable hash from %s', addr)
} else if (!isMutable && sha1.sync(bencode.encode(res.v)) !== id) {
self._debug('invalid immutable hash from %s', addr)
} else {
return cb(null, res)
}
}
pending -= 1
var nodeIdBuffer = res && res.id
var nodeId = idToHexString(nodeIdBuffer)
// ignore errors - they are just timeouts
if (err) {
self._debug('got lookup error: %s', err.message)
} else {
self._debug('got lookup response from %s', nodeId)
// add node that sent this response
var contact = table.get(nodeIdBuffer) || { id: nodeIdBuffer, addr: addr }
contact.token = res && res.token
add(contact)
// add nodes to this routing table for this lookup
if (res && res.nodes) {
res.nodes.forEach(function (contact) {
add(contact)
})
}
}
// find closest unqueried nodes
var candidates = table.closest({ id: idBuffer }, K)
.filter(function (contact) {
return !queried[contact.addr]
})
while (pending < MAX_CONCURRENCY && candidates.length) {
// query as many candidates as our concurrency limit will allow
query(candidates.pop().addr)
}
if (pending === 0 && candidates.length === 0) {
// recursive lookup should terminate because there are no closer nodes to find
self._debug('terminating lookup %s %s',
(opts.findNode ? '(find_node)' : '(get_peers)'), id)
var closest = (opts.findNode ? table : tokenful).closest({ id: idBuffer }, K)
self._debug('K closest nodes are:')
closest.forEach(function (contact) {
self._debug(' ' + contact.addr + ' ' + idToHexString(contact.id))
})
if (opts.get) return cb(new Error('hash not found'))
cb(null, closest)
}
}
this._addPeer({host: host, port: port}, infoHash, {host: host, port: peer.port})
this._rpc.response(peer, query, {id: this._rpc.id})
}
/**
* Called when another node sends a UDP message
* @param {Buffer} data
* @param {Object} rinfo
*/
DHT.prototype._onData = function (data, rinfo) {
var self = this
var addr = rinfo.address + ':' + rinfo.port
var message, errMessage
try {
message = bencode.decode(data)
if (!message) throw new Error('message is empty')
} catch (err) {
errMessage = err.message + ' from ' + addr + ' (' + data + ')'
self._debug(errMessage)
self.emit('warning', new Error(errMessage))
return
}
var type = message.y && message.y.toString()
if (type !== MESSAGE_TYPE.QUERY && type !== MESSAGE_TYPE.RESPONSE &&
type !== MESSAGE_TYPE.ERROR) {
errMessage = 'unknown message type ' + type + ' from ' + addr
self._debug(errMessage)
self.emit('warning', new Error(errMessage))
return
}
// self._debug('got data %s from %s', JSON.stringify(message), addr)
// Attempt to add every (valid) node that we see to the routing table.
// TODO: If the node is already in the table, just update the "last heard from" time
var nodeIdBuffer = (message.r && message.r.id) || (message.a && message.a.id)
if (nodeIdBuffer) {
// self._debug('adding (potentially) new node %s %s', idToHexString(nodeId), addr)
self._addNode(addr, nodeIdBuffer, addr)
}
if (type === MESSAGE_TYPE.QUERY) {
self._onQuery(addr, message)
} else if (type === MESSAGE_TYPE.RESPONSE || type === MESSAGE_TYPE.ERROR) {
self._onResponseOrError(addr, type, message)
}
DHT.prototype._addPeer = function (peer, infoHash, from) {
if (typeof peer === 'string') peer = parseAddr(peer)
this._peers.add(infoHash.toString('hex'), encodePeer(peer.host, peer.port))
this.emit('announce', peer, infoHash, from)
}
/**
* Called when another node sends a query.
* @param {string} addr
* @param {Object} message
*/
DHT.prototype._onQuery = function (addr, message) {
var self = this
var query = message.q.toString()
DHT.prototype._onget = function (query, peer) {
var host = peer.address || peer.host
var target = query.a.target
if (!target) return
var token = this._generateToken(host)
var value = this._values.get(target.toString('hex'))
if (typeof self.queryHandler[query] === 'function') {
self.queryHandler[query].call(self, addr, message)
if (!value) {
var nodes = this._rpc.nodes.closest({id: target})
this._rpc.response(peer, query, {id: this._rpc.id, token: token}, nodes)
} else {
var errMessage = 'unexpected query type'
self._debug(errMessage)
self._sendError(addr, message.t, ERROR_TYPE.METHOD_UNKNOWN, errMessage)
this._rpc.response(peer, query, createGetResponse(this._rpc.id, token, value))
}
}
/**
* Called when another node sends a response or error.
* @param {string} addr
* @param {string} type
* @param {Object} message
*/
DHT.prototype._onResponseOrError = function (addr, type, message) {
var self = this
if (self.destroyed) return
DHT.prototype._onput = function (query, peer) {
var host = peer.address || peer.host
var transactionId = Buffer.isBuffer(message.t) && message.t.length === 2 &&
message.t.readUInt16BE(0)
var a = query.a
if (!a) return
var v = query.a.v
if (!v) return
var transaction = self.transactions && self.transactions[addr] &&
self.transactions[addr][transactionId]
var token = a.token
if (!token) return
var err = null
if (type === MESSAGE_TYPE.ERROR) {
err = new Error(Array.isArray(message.e) ? message.e.join(' ') : undefined)
if (!this._validateToken(host, token)) {
return this._rpc.error(peer, query, [203, 'cannot `put` with bad token'])
}
if (!transaction || !transaction.cb) {
// unexpected message!
var errMessage
if (err) {
errMessage = 'got unexpected error from ' + addr + ' ' + err.message
self._debug(errMessage)
self.emit('warning', new Error(errMessage))
} else {
errMessage = 'got unexpected message from ' + addr + ' ' + JSON.stringify(message)
self._debug(errMessage)
self.emit('warning', new Error(errMessage))
}
return
if (v.length > 1000) {
return this._rpc.error(peer, query, [205, 'data payload too large'])
}
transaction.cb(err, message.r)
}
var isMutable = !!(a.k || a.sig)
if (isMutable && !a.k && !a.sig) return
/**
* Send a UDP message to the given addr.
* @param {string} addr
* @param {Object} message
* @param {function=} cb called once message has been sent
*/
DHT.prototype._send = function (addr, message, cb) {
var self = this
if (self._binding) return self.once('listening', self._send.bind(self, addr, message, cb))
if (!cb) cb = noop
var addrData = addrToIPPort(addr)
var host = addrData[0]
var port = addrData[1]
var key = isMutable
? sha1(a.salt ? Buffer.concat([a.salt, a.k]) : a.k)
: sha1(bencode.encode(v))
var keyHex = key.toString('hex')
if (!(port > 0 && port < 65535)) {
return
if (isMutable) {
if (!this._verify) return this._rpc.error(peer, query, [400, 'verification not supported'])
if (!this._verify(a.sig, encodeSigData(a), a.k)) return
var prev = this._values.get(keyHex)
if (prev && typeof a.cas === 'number' && prev.seq !== a.cas) {
return this._rpc.error(peer, query, [301, 'CAS mismatch, re-read and try again'])
}
if (prev && typeof prev.seq === 'number' && !(a.seq > prev.seq)) {
return this._rpc.error(peer, query, [302, 'sequence number less than current'])
}
this._values.set(keyHex, {v: v, k: a.k, salt: a.salt, sig: a.sig, seq: a.seq})
} else {
this._values.set(keyHex, {v: v})
}
// self._debug('send %s to %s', JSON.stringify(message), addr)
message = bencode.encode(message)
self.socket.send(message, 0, message.length, port, host, cb)
this._rpc.response(peer, query, {id: this._rpc.id})
}
DHT.prototype._query = function (data, addr, cb) {
DHT.prototype._bootstrap = function (populate) {
var self = this
if (!populate) return process.nextTick(ready)
if (!data.a) data.a = {}
if (!data.a.id) data.a.id = self.nodeIdBuffer
this._rpc.populate(self._rpc.id, {
q: 'find_node',
a: {
id: self._rpc.id,
target: self._rpc.id
}
}, ready)
var transactionId = self._getTransactionId(addr, cb)
var message = {
t: transactionIdToBuffer(transactionId),
y: MESSAGE_TYPE.QUERY,
q: data.q,
a: data.a
function ready () {
self._debug('emit ready')
self.emit('ready')
}
if (data.q === 'find_node') {
self._debug('sent find_node %s to %s', data.a.target.toString('hex'), addr)
} else if (data.q === 'get_peers') {
self._debug('sent get_peers %s to %s', data.a.info_hash.toString('hex'), addr)
}
self._send(addr, message)
}
/**
* Send "ping" query to given addr.
* @param {string} addr
* @param {function} cb called with response
*/
DHT.prototype._sendPing = function (addr, cb) {
DHT.prototype._closest = function (target, message, onmessage, cb) {
var self = this
self._query({ q: 'ping' }, addr, cb)
}
/**
* Called when another node sends a "ping" query.
* @param {string} addr
* @param {Object} message
*/
DHT.prototype._onPing = function (addr, message) {
var self = this
var res = {
t: message.t,
y: MESSAGE_TYPE.RESPONSE,
r: {
id: self.nodeIdBuffer
}
}
var table = new KBucket({
localNodeId: target,
numberOfNodesPerKBucket: this._rpc.k
})
self._debug('got ping from %s', addr)
self._send(addr, res)
}
this._rpc.closest(target, message, onreply, done)
/**
* Send "find_node" query to given addr.
* @param {string} addr
* @param {Buffer|string} nodeId
* @param {function} cb called with response
*/
DHT.prototype._sendFindNode = function (addr, nodeId, cb) {
var self = this
var nodeIdBuffer = idToBuffer(nodeId)
function onResponse (err, res) {
function done (err, n) {
if (err) return cb(err)
if (res.nodes) {
res.nodes = parseNodeInfo(res.nodes)
res.nodes.forEach(function (node) {
self._addNode(node.addr, node.id, addr)
})
}
cb(null, res)
self._tables.set(target.toString('hex'), table)
self._debug('visited %d nodes', n)
cb(null, n)
}
var data = {
q: 'find_node',
a: {
id: self.nodeIdBuffer,
target: nodeIdBuffer
}
}
function onreply (message, node) {
if (!message.r) return true
self._query(data, addr, onResponse)
}
DHT.prototype._sendGet = function (addr, nodeId, cb) {
var self = this
var nodeIdBuffer = idToBuffer(nodeId)
function onResponse (err, res) {
if (err) return cb(err)
if (res.nodes) {
res.nodes = parseNodeInfo(res.nodes)
res.nodes.forEach(function (node) {
self._addNode(node.addr, node.id, addr)
if (message.r.token && message.r.id && Buffer.isBuffer(message.r.id) && message.r.id.length === 20) {
self._debug('found node %s (target: %s)', message.r.id, target)
table.add({
id: message.r.id,
host: node.host || node.address,
port: node.port,
token: message.r.token
})
}
cb(null, res)
}
var data = {
q: 'get',
a: {
id: self.nodeIdBuffer,
target: nodeIdBuffer
}
if (!onmessage) return true
return onmessage(message, node)
}
self._query(data, addr, onResponse)
}
/**
* Called when another node sends a "find_node" query.
* @param {string} addr
* @param {Object} message
*/
DHT.prototype._onFindNode = function (addr, message) {
var self = this
var nodeIdBuffer = message.a && message.a.target
var nodeId = idToHexString(nodeIdBuffer)
if (!nodeIdBuffer) {
var errMessage = '`find_node` missing required `a.target` field'
self._debug(errMessage)
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage)
return
DHT.prototype._debug = function () {
if (!debug.enabled) return
var args = [].slice.call(arguments)
args[0] = '[' + this.nodeId.toString('hex').substring(0, 7) + '] ' + args[0]
for (var i = 1; i < args.length; i++) {
if (Buffer.isBuffer(args[i])) args[i] = args[i].toString('hex')
}
self._debug('got find_node %s from %s', nodeId, addr)
debug.apply(null, args)
}
// Convert nodes to "compact node info" representation
var nodes = convertToNodeInfo(self.nodes.closest({ id: nodeIdBuffer }, K))
var res = {
t: message.t,
y: MESSAGE_TYPE.RESPONSE,
r: {
id: self.nodeIdBuffer,
nodes: nodes
}
}
self._send(addr, res)
DHT.prototype._validateToken = function (host, token) {
var tokenA = this._generateToken(host, this._secrets[0])
var tokenB = this._generateToken(host, this._secrets[1])
return equals(token, tokenA) || equals(token, tokenB)
}
/**
* Send "get_peers" query to given addr.
* @param {string} addr
* @param {Buffer|string} infoHash
* @param {function} cb called with response
*/
DHT.prototype._sendGetPeers = function (addr, infoHash, cb) {
var self = this
var infoHashBuffer = idToBuffer(infoHash)
infoHash = idToHexString(infoHash)
function onResponse (err, res) {
if (err) return cb(err)
if (res.nodes) {
res.nodes = parseNodeInfo(res.nodes)
res.nodes.forEach(function (node) {
self._addNode(node.addr, node.id, addr)
})
}
if (res.values) {
res.values = parsePeerInfo(res.values)
res.values.forEach(function (peerAddr) {
self._debug('emit peer %s %s from %s', peerAddr, infoHash, addr)
self.emit('peer', peerAddr, infoHash, addr)
})
}
cb(null, res)
}
var data = {
q: 'get_peers',
a: {
id: self.nodeIdBuffer,
info_hash: infoHashBuffer
}
}
self._query(data, addr, onResponse)
DHT.prototype._generateToken = function (host, secret) {
if (!secret) secret = this._secrets[0]
return crypto.createHash('sha1').update(new Buffer(host, 'utf8')).update(secret).digest()
}
/**
* Called when another node sends a "get_peers" query.
* @param {string} addr
* @param {Object} message
*/
DHT.prototype._onGetPeers = function (addr, message) {
var self = this
var addrData = addrToIPPort(addr)
var infoHashBuffer = message.a && message.a.info_hash
if (!infoHashBuffer) {
var errMessage = '`get_peers` missing required `a.info_hash` field'
self._debug(errMessage)
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage)
return
}
var infoHash = idToHexString(infoHashBuffer)
self._debug('got get_peers %s from %s', infoHash, addr)
var res = {
t: message.t,
y: MESSAGE_TYPE.RESPONSE,
r: {
id: self.nodeIdBuffer,
token: idToBuffer(self._generateToken(addrData[0]))
}
}
var peers = self.peers[infoHash] && self.peers[infoHash].list
if (peers) {
// We know of peers for the target info hash. Peers are stored as an array of
// compact peer info, so return it as-is.
res.r.values = peers
DHT.prototype._rotateSecrets = function () {
if (!this._secrets) {
this._secrets = [crypto.randomBytes(20), crypto.randomBytes(20)]
} else {
// No peers, so return the K closest nodes instead. Convert nodes to "compact node
// info" representation
res.r.nodes = convertToNodeInfo(self.nodes.closest({ id: infoHashBuffer }, K))
this._secrets[1] = this._secrets[0]
this._secrets[0] = crypto.randomBytes(20)
}
self._send(addr, res)
}
/**
* Send "announce_peer" query to given host and port.
* @param {string} addr
* @param {Buffer|string} infoHash
* @param {number} port
* @param {Buffer} token
* @param {function=} cb called with response
*/
DHT.prototype._sendAnnouncePeer = function (addr, infoHash, port, token, cb) {
var self = this
var infoHashBuffer = idToBuffer(infoHash)
if (!cb) cb = noop
function noop () {}
var data = {
q: 'announce_peer',
a: {
id: self.nodeIdBuffer,
info_hash: infoHashBuffer,
port: port,
token: token,
implied_port: 0
}
}
self._query(data, addr, cb)
function sha1 (buf) {
return crypto.createHash('sha1').update(buf).digest()
}
/**
* Called when another node sends a "announce_peer" query.
* @param {string} addr
* @param {Object} message
*/
DHT.prototype._onAnnouncePeer = function (addr, message) {
var self = this
var errMessage
var addrData = addrToIPPort(addr)
var infoHashBuffer = message.a && message.a.info_hash
if (!infoHashBuffer) {
errMessage = '`announce_peer` missing required `a.info_hash` field'
self._debug(errMessage)
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage)
return
function createGetResponse (id, token, value) {
var r = {id: id, token: token, v: value.v}
if (value.sig) {
r.sig = value.sig
r.k = value.k
if (value.salt) r.salt = value.salt
if (typeof value.seq === 'number') r.seq = value.seq
}
var infoHash = idToHexString(infoHashBuffer)
var tokenBuffer = message.a && message.a.token
var token = idToHexString(tokenBuffer)
if (!self._isValidToken(token, addrData[0])) {
errMessage = 'cannot `announce_peer` with bad token'
self._sendError(addr, message.t, ERROR_TYPE.PROTOCOL, errMessage)
return
}
var port = message.a.implied_port !== 0
? addrData[1] // use port of udp packet
: message.a.port // use port in `announce_peer` message
self._debug(
'got announce_peer %s %s from %s with token %s',
infoHash, port, addr, token
)
self._addPeer(addrData[0] + ':' + port, infoHash)
// send acknowledgement
var res = {
t: message.t,
y: MESSAGE_TYPE.RESPONSE,
r: {
id: self.nodeIdBuffer
}
}
self._send(addr, res)
return r
}
/**
* Send an error to given host and port.
* @param {string} addr
* @param {Buffer|number} transactionId
* @param {number} code
* @param {string} errMessage
*/
DHT.prototype._sendError = function (addr, transactionId, code, errMessage) {
var self = this
if (transactionId && !Buffer.isBuffer(transactionId)) {
transactionId = transactionIdToBuffer(transactionId)
}
var message = {
y: MESSAGE_TYPE.ERROR,
e: [code, errMessage]
}
if (transactionId) {
message.t = transactionId
}
self._debug('sent error %s to %s', JSON.stringify(message), addr)
self._send(addr, message)
function encodePeer (host, port) {
var buf = new Buffer(6)
var ip = host.split('.')
for (var i = 0; i < 4; i++) buf[i] = parseInt(ip[i] || 0, 10)
buf.writeUInt16BE(port, 4)
return buf
}
/**
* Get a transaction id, and (optionally) set a function to be called
* @param {string} addr
* @param {function} fn
*/
DHT.prototype._getTransactionId = function (addr, fn) {
var self = this
fn = once(fn)
var reqs = self.transactions[addr]
if (!reqs) {
reqs = self.transactions[addr] = {}
reqs.nextTransactionId = 0
}
var transactionId = reqs.nextTransactionId
reqs.nextTransactionId = UINT16 & (reqs.nextTransactionId + 1)
function decodePeers (buf) {
var peers = []
function onTimeout () {
reqs[transactionId] = null
fn(new Error('query timed out'))
try {
for (var i = 0; i < buf.length; i++) {
var port = buf[i].readUInt16BE(4)
if (!port) continue
peers.push({
host: parseIp(buf[i], 0),
port: port
})
}
} catch (err) {
// do nothing
}
function onResponse (err, res) {
clearTimeout(reqs[transactionId].timeout)
reqs[transactionId] = null
fn(err, res)
}
var timeout = setTimeout(onTimeout, SEND_TIMEOUT)
if (timeout.unref) timeout.unref()
reqs[transactionId] = {
cb: onResponse,
timeout: timeout
}
return transactionId
return peers
}
/**
* Generate token (for response to `get_peers` query). Tokens are the SHA1 hash of
* the IP address concatenated onto a secret that changes every five minutes. Tokens up
* to ten minutes old are accepted.
* @param {string} host
* @param {string=} secret force token to use this secret, otherwise use current one
* @return {string}
*/
DHT.prototype._generateToken = function (host, secret) {
var self = this
if (!secret) secret = self.secrets[0]
return sha1.sync(host + secret)
function parseIp (buf, offset) {
return buf[offset++] + '.' + buf[offset++] + '.' + buf[offset++] + '.' + buf[offset++]
}
/**
* Checks if a token is valid for a given node's IP address.
*
* @param {string} token
* @param {string} host
* @return {boolean}
*/
DHT.prototype._isValidToken = function (token, host) {
var self = this
var validToken0 = self._generateToken(host, self.secrets[0])
var validToken1 = self._generateToken(host, self.secrets[1])
return token === validToken0 || token === validToken1
function encodeSigData (msg) {
var ref = { seq: msg.seq || 0, v: msg.v }
if (msg.salt) ref.salt = msg.salt
return bencode.encode(ref).slice(1, -1)
}
/**
* Rotate secrets. Secrets are rotated every 5 minutes and tokens up to ten minutes
* old are accepted.
*/
DHT.prototype._rotateSecrets = function () {
var self = this
function createSecret () {
return hat(SECRET_ENTROPY)
function toNode (node) {
return {
host: node.host,
port: node.port
}
}
// Initialize secrets array
// self.secrets[0] is the current secret, used to generate new tokens
// self.secrets[1] is the last secret, which is still accepted
if (!self.secrets) {
self.secrets = [ createSecret(), createSecret() ]
return
}
self.secrets[1] = self.secrets[0]
self.secrets[0] = createSecret()
function PeerStore (max) {
this.max = max || 10000
this.used = 0
this.peers = LRU(Infinity)
}
/**
* Get a string that can be used to initialize and bootstrap the DHT in the
* future.
* @return {Array.<Object>}
*/
DHT.prototype.toArray = function () {
var self = this
var nodes = self.nodes.toArray().filter(dropData).map(function (contact) {
// to remove properties added by k-bucket, like `distance`, etc.
return {
id: contact.id.toString('hex'),
addr: contact.addr
PeerStore.prototype.add = function (key, peer) {
var peers = this.peers.get(key)
if (!peers) {
peers = {
values: [],
map: LRU(Infinity)
}
})
return nodes
this.peers.set(key, peers)
}
function dropData (x) { return !x.data }
}
var id = peer.toString('hex')
if (peers.map.get(id)) return
DHT.prototype._addrIsSelf = function (addr) {
var self = this
return self._port &&
LOCAL_HOSTS[self._ipv].some(function (host) { return host + ':' + self._port === addr })
var node = {index: peers.values.length, peer: peer}
peers.map.set(id, node)
peers.values.push(node)
if (++this.used > this.max) this._evict()
}
DHT.prototype._debug = function () {
var self = this
var args = [].slice.call(arguments)
args[0] = '[' + self.nodeId.substring(0, 7) + '] ' + args[0]
debug.apply(null, args)
PeerStore.prototype._evict = function () {
var a = this.peers.peek(this.peers.tail)
var b = a.map.remove(a.map.tail).value
var values = a.values
swap(values, b.index, values.length - 1)
values.pop()
this.used--
if (!values.length) this.peers.remove(this.peers.tail)
}
/**
* Parse saved string
* @param {Array.<Object>} nodes
* @return {Buffer}
*/
function fromArray (nodes) {
nodes.forEach(function (node) {
if (node.id) node.id = idToBuffer(node.id)
})
return nodes
PeerStore.prototype.get = function (key) {
var node = this.peers.get(key)
if (!node) return []
return pick(node.values, 100)
}
/**
* Convert "contacts" from the routing table into "compact node info" representation.
* @param {Array.<Object>} contacts
* @return {Buffer}
*/
function convertToNodeInfo (contacts) {
return Buffer.concat(contacts.map(function (contact) {
return Buffer.concat([ contact.id, string2compact(contact.addr) ])
}))
function parseAddr (addr) {
return {host: addr.split(':')[0], port: Number(addr.split(':')[1])}
}
/**
* Parse "compact node info" representation into "contacts".
* @param {Buffer} nodeInfo
* @return {Array.<string>} array of
*/
function parseNodeInfo (nodeInfo) {
var contacts = []
try {
for (var i = 0; i < nodeInfo.length; i += 26) {
contacts.push({
id: nodeInfo.slice(i, i + 20),
addr: compact2string(nodeInfo.slice(i + 20, i + 26))
})
}
} catch (err) {
debug('error parsing node info ' + nodeInfo)
}
return contacts
function swap (list, a, b) {
if (a === b) return
var tmp = list[a]
list[a] = list[b]
list[b] = tmp
list[a].index = a
list[b].index = b
}
/**
* Parse list of "compact addr info" into an array of addr "host:port" strings.
* @param {Array.<Buffer>} list
* @return {Array.<string>}
*/
function parsePeerInfo (list) {
try {
return list.map(compact2string)
} catch (err) {
debug('error parsing peer info ' + list)
return []
}
}
function pick (values, n) {
var len = Math.min(values.length, n)
var ptr = 0
var res = new Array(len)
/**
* Ensure a transacation id is a 16-bit buffer, so it can be sent on the wire as
* the transaction id ("t" field).
* @param {number|Buffer} transactionId
* @return {Buffer}
*/
function transactionIdToBuffer (transactionId) {
if (Buffer.isBuffer(transactionId)) {
return transactionId
} else {
var buf = new Buffer(2)
buf.writeUInt16BE(transactionId, 0)
return buf
for (var i = 0; i < len; i++) {
var next = ptr + (Math.random() * (values.length - ptr)) | 0
res[ptr] = values[next].peer
swap(values, ptr++, next)
}
}
/**
* Ensure info hash or node id is a Buffer.
* @param {string|Buffer} id
* @return {Buffer}
*/
function idToBuffer (id) {
if (Buffer.isBuffer(id)) {
return id
} else if (typeof id === 'string') {
return new Buffer(id, 'hex')
} else {
return null
}
return res
}
/**
* Ensure info hash or node id is a hex string.
* @param {string|Buffer} id
* @return {Buffer}
*/
function idToHexString (id) {
if (Buffer.isBuffer(id)) {
return id.toString('hex')
} else {
return id
}
function toBuffer (str) {
if (Buffer.isBuffer(str)) return str
if (typeof str === 'string') return new Buffer(str, 'hex')
throw new Error('Pass a buffer or a string')
}
function encodeSigData (msg) {
var ref = { seq: msg.seq || 0, v: msg.v }
if (msg.salt) ref.salt = msg.salt
return bencode.encode(ref).slice(1, -1)
}
function noop () {}
{
"name": "bittorrent-dht",
"description": "Simple, robust, BitTorrent DHT implementation",
"version": "5.1.3",
"version": "6.0.0",
"author": {

@@ -14,19 +14,9 @@ "name": "Feross Aboukhadijeh",

"dependencies": {
"addr-to-ip-port": "^1.0.0",
"bencode": "^0.7.0",
"buffer-equal": "0.0.1",
"compact2string": "^1.2.0",
"debug": "^2.1.0",
"hat": "^0.0.3",
"buffer-equals": "^1.0.3",
"debug": "^2.2.0",
"inherits": "^2.0.1",
"is-ip": "^1.0.0",
"isarray": "^1.0.0",
"k-bucket": "^0.6.0",
"network-address": "^1.0.0",
"once": "^1.3.1",
"run-parallel": "^1.0.0",
"simple-get": "^1.3.1",
"simple-sha1": "^2.0.7",
"string2compact": "^1.1.1",
"thunky": "^0.1.0"
"k-rpc": "^3.4.1",
"lru": "^1.2.0"
},

@@ -36,4 +26,6 @@ "devDependencies": {

"ip": "^1.1.0",
"once": "^1.3.3",
"run-parallel": "^1.1.4",
"standard": "^5.4.1",
"tape": "^4.0.0"
"tape": "^4.4.0"
},

@@ -40,0 +32,0 @@ "keywords": [

@@ -61,4 +61,4 @@ # bittorrent-dht [![travis][travis-image]][travis-url] [![npm][npm-image]][npm-url] [![downloads][downloads-image]][downloads-url]

dht.on('peer', function (addr, infoHash, from) {
console.log('found potential peer ' + addr + ' through ' + from)
dht.on('peer', function (peer, infoHash, from) {
console.log('found potential peer ' + peer.host + ':' + peer.port + ' through ' + from.host + ':' + from.port)
})

@@ -106,6 +106,3 @@ ```

Note: `dht.lookup()` should only be called after the ready event has fired, otherwise the
lookup may fail because the DHT routing table doesn't contain enough nodes.
#### `dht.listen([port], [address], [onlistening])`

@@ -152,5 +149,5 @@

Returns the nodes in the DHT as an array. This is useful for persisting the DHT
to disk between restarts of a BitTorrent client (as recommended by the spec). Each node in the array is an object with `id` (hex string) and `addr` (string) properties.
to disk between restarts of a BitTorrent client (as recommended by the spec). Each node in the array is an object with `host` (string) and `port` (number) properties.
To restore the DHT nodes when instantiating a new `DHT` object, simply pass in the array as the value of the `bootstrap` option.
To restore the DHT nodes when instantiating a new `DHT` object, simply loop over the nodes in the array and add them with the `addNode` method.

@@ -169,7 +166,11 @@ ```js

// initialize a new dht with the same routing table as the first
var dht2 = new DHT({ bootstrap: arr })
var dht2 = new DHT()
arr.forEach(function (node) {
dht2.add(node)
})
```
#### `dht.addNode(addr, [nodeId])`
#### `dht.addNode(node)`

@@ -181,5 +182,15 @@ Manually add a node to the DHT routing table. If there is space in the routing table (or

If `nodeId` is undefined, then the peer will be pinged to learn their node id. If the peer does not respond, the will not be added to the routing table.
A node should look like this
``` js
{
host: nodeHost,
port: nodePort,
id: optionalNodeId
}
```
If `id` is undefined, then the peer will be pinged to learn their node id. If the peer does not respond, the will not be added to the routing table.
#### `dht.destroy([callback])`

@@ -209,4 +220,4 @@

dht.on('ready', function () {
dht.put({ v: value }, function (errors, hash) {
console.error('errors=', errors)
dht.put({ v: value }, function (err, hash) {
console.error('error=', err)
console.log('hash=', hash)

@@ -254,4 +265,4 @@ })

dht.on('ready', function () {
dht.put(opts, function (errors, hash) {
console.error('errors=', errors)
dht.put(opts, function (err, hash) {
console.error('error=', err)
console.log('hash=', hash)

@@ -262,6 +273,6 @@ })

In either mutable or immutable forms, `callback(errors, hash)` fires with an
array `errors` of any errors encountered when announcing the content to peers
and `hash`, the location where the mutable or immutable content can be retrieved
(with `dht.get(hash)`).
In either mutable or immutable forms, `callback(error, hash, n)` fires with an
`error` if no nodes were able to store the `value`. `n` is set the amount of peers
that accepted the `put` and `hash`, the location where the mutable or immutable
content can be retrieved (with `dht.get(hash)`).

@@ -293,4 +304,5 @@ Note that you should call `.put()` every hour for content that you want to keep

Emitted when the DHT is ready to handle lookups (i.e. the routing table is sufficiently
populated via the bootstrap nodes).
Emitted when the DHT is fully bootstrapped (i.e. the routing table is sufficiently
populated via the bootstrap nodes). Note that it is okay to do lookups before the 'ready'
event fires.

@@ -308,5 +320,5 @@ Note: If you initialize the DHT with the `{ bootstrap: false }` option, then the 'ready'

#### `dht.on('peer', function (addr, infoHash, from) { ... })`
#### `dht.on('peer', function (peer, infoHash, from) { ... })`
Emitted when a potential peer is found. `addr` is of the form `IP_ADDRESS:PORT`.
Emitted when a potential peer is found. `peer` is of the form `{host, port}`.
`infoHash` is the torrent info hash of the swarm that the peer belongs to. Emitted

@@ -323,3 +335,3 @@ in response to a `lookup(infoHash)` call.

#### `dht.on('node', function (addr, nodeId, from) { ... })`
#### `dht.on('node', function (node) { ... })`

@@ -329,3 +341,3 @@ Emitted when the DHT finds a new node.

#### `dht.on('announce', function (addr, infoHash) { ... })`
#### `dht.on('announce', function (peer, infoHash) { ... })`

@@ -332,0 +344,0 @@ Emitted when a peer announces itself in order to be stored in the DHT.

@@ -15,3 +15,3 @@ var common = require('./common')

t.deepEqual(dht.nodeId, nodeId.toString('hex'))
t.deepEqual(dht.nodeId, nodeId)
dht.destroy()

@@ -22,3 +22,3 @@ t.end()

test('call `addNode` with nodeId argument', function (t) {
t.plan(2)
t.plan(3)

@@ -30,13 +30,14 @@ var dht = new DHT({ bootstrap: false })

dht.on('node', function (addr, _nodeId) {
t.equal(addr, '127.0.0.1:9999')
t.deepEqual(_nodeId, nodeId.toString('hex'))
dht.on('node', function (node) {
t.equal(node.host, '127.0.0.1')
t.equal(node.port, 9999)
t.deepEqual(node.id, nodeId)
dht.destroy()
})
dht.addNode('127.0.0.1:9999', nodeId)
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})
})
test('call `addNode` without nodeId argument', function (t) {
t.plan(2)
t.plan(3)

@@ -53,7 +54,8 @@ var dht1 = new DHT({ bootstrap: false })

// If `nodeId` is undefined, then the peer will be pinged to learn their node id.
dht2.addNode('127.0.0.1:' + port)
dht2.addNode({host: '127.0.0.1', port: port})
dht2.on('node', function (addr, _nodeId) {
t.equal(addr, '127.0.0.1:' + port)
t.deepEqual(_nodeId, dht1.nodeId)
dht2.on('node', function (node) {
t.equal(node.host, '127.0.0.1')
t.equal(node.port, port)
t.deepEqual(node.id, dht1.nodeId)
dht1.destroy()

@@ -73,3 +75,3 @@ dht2.destroy()

// If the peer DOES NOT RESPOND, the will not be added to the routing table.
dht.addNode('127.0.0.1:9999')
dht.addNode({host: '127.0.0.1', port: 9999})

@@ -98,8 +100,11 @@ dht.on('node', function () {

var nodeId = common.randomId()
dht.addNode('127.0.0.1:9999', nodeId)
dht.addNode('127.0.0.1:9999', nodeId)
dht.addNode('127.0.0.1:9999', nodeId)
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})
var togo = 1
setTimeout(t.pass(), 100)
setTimeout(function () {
dht.destroy()
t.pass()
}, 100)
})

@@ -115,3 +120,3 @@

b.listen()
b._sendPing('127.0.0.1:' + port, function (err) {
b._sendPing({host: '127.0.0.1', port: port}, function (err) {
t.error(err)

@@ -118,0 +123,0 @@ a.destroy()

var crypto = require('crypto')
var hat = require('hat')
var ip = require('ip')

@@ -10,10 +9,16 @@

exports.randomHost = function () {
return ip.toString(crypto.randomBytes(4))
}
exports.randomPort = function () {
return crypto.randomBytes(2).readUInt16LE(0)
}
exports.randomAddr = function () {
var host = ip.toString(crypto.randomBytes(4))
var port = crypto.randomBytes(2).readUInt16LE(0)
return host + ':' + port
return exports.randomHost() + ':' + exports.randomPort()
}
exports.randomId = function () {
return new Buffer(hat(160), 'hex')
return crypto.randomBytes(20)
}

@@ -23,3 +28,7 @@

for (var i = 0; i < num; i++) {
dht.addNode(exports.randomAddr(), exports.randomId())
dht.addNode({
id: exports.randomId(),
host: exports.randomHost(),
port: exports.randomPort()
})
}

@@ -26,0 +35,0 @@ }

@@ -16,5 +16,3 @@ var common = require('./common')

var value = fill(500, 'abc')
dht.put({ v: value }, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht.put({ v: value }, function (_, hash) {
t.equal(

@@ -35,3 +33,3 @@ hash.toString('hex'),

test('multi-party immutable put/get', function (t) {
t.plan(3)
t.plan(4)

@@ -61,4 +59,4 @@ var dht1 = new DHT({ bootstrap: false })

var value = fill(500, 'abc')
dht1.put({ v: value }, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht1.put({ v: value }, function (err, hash) {
t.error(err)

@@ -69,2 +67,3 @@ t.equal(

)
dht2.get(hash, function (err, res) {

@@ -71,0 +70,0 @@ t.ifError(err)

var common = require('./common')
var DHT = require('../')
var ed = require('ed25519-supercop')
var sha1 = require('simple-sha1')
var test = require('tape')
var crypto = require('crypto')

@@ -26,8 +26,11 @@ test('local mutable put/get', function (t) {

}
var expectedHash = sha1.sync(opts.k)
dht.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
var expectedHash = crypto.createHash('sha1').update(opts.k).digest()
t.equal(hash, expectedHash, 'hash of the public key')
dht.put(opts, function (_, hash) {
t.equal(
hash.toString('hex'),
expectedHash.toString('hex'),
'hash of the public key'
)
dht.get(hash, function (err, res) {

@@ -45,3 +48,3 @@ t.ifError(err)

test('multiparty mutable put/get', function (t) {
t.plan(3)
t.plan(4)

@@ -80,8 +83,9 @@ var keypair = ed.createKeyPair(ed.createSeed())

}
var expectedHash = sha1.sync(opts.k)
dht1.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
var expectedHash = crypto.createHash('sha1').update(opts.k).digest()
t.equal(hash, expectedHash, 'hash of the public key')
dht1.put(opts, function (err, hash) {
t.error(err)
t.deepEqual(hash, expectedHash, 'hash of the public key')
dht2.get(hash, function (err, res) {

@@ -98,3 +102,3 @@ t.ifError(err)

test('multiparty mutable put/get sequence', function (t) {
t.plan(9)
t.plan(12)

@@ -132,8 +136,9 @@ var keypair = ed.createKeyPair(ed.createSeed())

}
var expectedHash = sha1.sync(opts.k)
dht1.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
var expectedHash = crypto.createHash('sha1').update(opts.k).digest()
t.equal(hash, expectedHash, 'hash of the public key')
dht1.put(opts, function (err, hash) {
t.error(err)
t.deepEqual(hash, expectedHash, 'hash of the public key')
dht2.get(hash, function (err, res) {

@@ -152,6 +157,6 @@ t.ifError(err)

dht1.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht1.put(opts, function (err, hash) {
t.error(err)
t.equal(hash, expectedHash, 'hash of the public key (again)')
t.deepEqual(hash, expectedHash, 'hash of the public key (again)')
dht2.get(hash, function (err, res) {

@@ -171,6 +176,6 @@ t.ifError(err)

dht1.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht1.put(opts, function (err, hash) {
t.error(err)
t.equal(hash, expectedHash, 'hash of the public key (yet again)')
t.deepEqual(hash, expectedHash, 'hash of the public key (yet again)')
dht2.get(hash, function (err, res) {

@@ -188,3 +193,3 @@ t.ifError(err)

test('salted multikey multiparty mutable put/get sequence', function (t) {
t.plan(9)
t.plan(12)

@@ -232,9 +237,10 @@ var keypair = ed.createKeyPair(ed.createSeed())

}
var first = sha1.sync(Buffer.concat([ new Buffer('first'), fopts.k ]))
var second = sha1.sync(Buffer.concat([ new Buffer('second'), sopts.k ]))
dht1.put(fopts, function (errors, hash) {
errors.forEach(t.error.bind(t))
var first = crypto.createHash('sha1').update('first').update(fopts.k).digest()
var second = crypto.createHash('sha1').update('second').update(sopts.k).digest()
t.equal(hash, first, 'first hash')
dht1.put(fopts, function (err, hash) {
t.error(err)
t.deepEqual(hash, first, 'first hash')
dht2.get(hash, function (err, res) {

@@ -250,6 +256,6 @@ t.ifError(err)

function putSecondKey () {
dht1.put(sopts, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht1.put(sopts, function (err, hash) {
t.error(err)
t.equal(hash, second, 'second hash')
t.deepEqual(hash, second, 'second hash')
dht2.get(hash, function (err, res) {

@@ -269,6 +275,6 @@ t.ifError(err)

dht1.put(fopts, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht1.put(fopts, function (err, hash) {
t.error(err)
t.equal(hash, first, 'first salt (again)')
t.deepEqual(hash, first, 'first salt (again)')
dht2.get(hash, function (err, res) {

@@ -286,3 +292,3 @@ t.ifError(err)

test('transitive mutable update', function (t) {
t.plan(3)
t.plan(4)

@@ -325,9 +331,10 @@ var keypair = ed.createKeyPair(ed.createSeed())

}
var expectedHash = sha1.sync(opts.k)
dht1.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
var expectedHash = crypto.createHash('sha1').update(opts.k).digest()
t.equal(hash, expectedHash, 'hash of the public key')
dht1.put(opts, function (err, hash) {
t.error(err)
t.deepEqual(hash, expectedHash, 'hash of the public key')
dht3.get(expectedHash, function (err, res) {

@@ -344,3 +351,3 @@ t.ifError(err)

test('mutable update mesh', function (t) {
t.plan(9)
t.plan(12)
/*

@@ -412,7 +419,8 @@ 0 <-> 1 <-> 2

}
var xhash = sha1.sync(opts.k)
src.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
t.equal(hash, xhash)
var xhash = crypto.createHash('sha1').update(opts.k).digest()
src.put(opts, function (err, hash) {
t.error(err)
t.equal(hash.toString('hex'), xhash.toString('hex'))
dst.get(xhash, function (err, res) {

@@ -429,3 +437,3 @@ t.ifError(err)

test('invalid sequence', function (t) {
t.plan(4)
t.plan(5)

@@ -464,7 +472,7 @@ var keypair = ed.createKeyPair(ed.createSeed())

dht0.put(opts0, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht0.put(opts0, function (err, hash) {
t.error(err)
hash0 = hash
dht0.put(opts1, function (errors, hash) {
t.ok(errors.length, 'caught expected error: ' + errors[0])
dht0.put(opts1, function (err, hash) {
t.ok(err, 'caught expected error: ' + err)
check()

@@ -489,3 +497,3 @@ })

test('valid sequence', function (t) {
t.plan(4)
t.plan(6)

@@ -524,7 +532,7 @@ var keypair = ed.createKeyPair(ed.createSeed())

dht0.put(opts0, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht0.put(opts0, function (err, hash) {
t.error(err)
hash0 = hash
dht0.put(opts1, function (errors, hash) {
errors.forEach(t.error.bind(t))
dht0.put(opts1, function (err, hash) {
t.error(err)
hash1 = hash

@@ -531,0 +539,0 @@ t.deepEqual(hash0, hash1)

var common = require('./common')
var DHT = require('../')
var sha1 = require('simple-sha1')
var test = require('tape')
var crypto = require('crypto')
var ed = require('ed25519-supercop')

@@ -45,8 +44,12 @@

}
var expectedHash = sha1.sync(opts.k)
dht.put(opts, function (errors, hash) {
errors.forEach(t.error.bind(t))
var expectedHash = crypto.createHash('sha1').update(opts.k).digest()
t.equal(hash, expectedHash, 'hash of the public key')
dht.put(opts, function (_, hash) {
t.equal(
hash.toString('hex'),
expectedHash.toString('hex'),
'hash of the public key'
)
dht.get(hash, function (err, res) {

@@ -53,0 +56,0 @@ t.ifError(err)

@@ -13,2 +13,3 @@ var common = require('./common')

if (numNodes === 100) {
dht.destroy()
t.pass('100 nodes added, 100 `node` events emitted')

@@ -30,2 +31,3 @@ t.end()

if (numNodes === 10000) {
dht.destroy()
t.pass('10000 nodes added, 10000 `node` events emitted')

@@ -47,2 +49,3 @@ t.end()

if (numPeers === 100) {
dht.destroy()
t.pass('100 peers added, 100 `announce` events emitted')

@@ -64,2 +67,3 @@ t.end()

if (numPeers === 10000) {
dht.destroy()
t.pass('10000 peers added, 10000 `announce` events emitted')

@@ -66,0 +70,0 @@ t.end()

@@ -76,5 +76,5 @@ var common = require('./common')

dhts[1].on('peer', function (addr, hash) {
t.equal(hash, infoHash)
t.equal(Number(addr.split(':')[1]), 9998)
dhts[1].on('peer', function (peer, hash) {
t.equal(hash.toString('hex'), infoHash)
t.equal(peer.port, 9998)
clearTimeout(timeoutId)

@@ -93,4 +93,4 @@ cb(null, dhts)

var next = dhts[(i + 1) % len]
dhts[i].addNode('127.0.0.1:' + next.address().port, next.nodeId)
dhts[i].addNode({host: '127.0.0.1', port: next.address().port, id: next.nodeId})
}
}

@@ -14,5 +14,10 @@ var common = require('./common')

dht1.listen(function () {
dht2._sendPing('127.0.0.1:' + dht1.address().port, function (err, res) {
dht2._rpc.query({
host: '127.0.0.1',
port: dht1.address().port
}, {
q: 'ping'
}, function (err, res) {
t.error(err)
t.deepEqual(res.id, dht1.nodeIdBuffer)
t.deepEqual(res.r.id, dht1.nodeId)

@@ -35,13 +40,17 @@ dht1.destroy()

dht1.addNode('255.255.255.255:6969', targetNodeId)
dht1.addNode({host: '255.255.255.255', port: 6969, id: targetNodeId})
dht1.listen(function () {
dht2._sendFindNode('127.0.0.1:' + dht1.address().port, targetNodeId, function (err, res) {
dht2._rpc.query({
host: '127.0.0.1',
port: dht1.address().port
}, {
q: 'find_node',
a: {target: targetNodeId}
}, function (err, res) {
t.error(err)
t.deepEqual(res.id, dht1.nodeIdBuffer)
t.deepEqual(
res.nodes.map(function (node) { return node.addr }),
[ '255.255.255.255:6969', '127.0.0.1:' + dht2.address().port ]
)
t.deepEqual(res.r.id, dht1.nodeId)
t.deepEqual(res.r.nodes.length, 2 * 26)
dht1.destroy()

@@ -61,17 +70,20 @@ dht2.destroy()

dht1.addNode('1.1.1.1:6969', common.randomId())
dht1.addNode('10.10.10.10:6969', common.randomId())
dht1.addNode('255.255.255.255:6969', common.randomId())
dht1.addNode({host: '1.1.1.1', port: 6969, id: common.randomId()})
dht1.addNode({host: '10.10.10.10', port: 6969, id: common.randomId()})
dht1.addNode({host: '255.255.255.255', port: 6969, id: common.randomId()})
dht1.listen(function () {
var targetNodeId = common.randomId()
dht2._sendFindNode('127.0.0.1:' + dht1.address().port, targetNodeId, function (err, res) {
dht2._rpc.query({
host: '127.0.0.1',
port: dht1.address().port
}, {
q: 'find_node',
a: {target: targetNodeId}
}, function (err, res) {
t.error(err)
t.deepEqual(res.id, dht1.nodeIdBuffer)
t.deepEqual(
res.nodes.map(function (node) { return node.addr }).sort(),
[ '1.1.1.1:6969', '10.10.10.10:6969', '127.0.0.1:' + dht2.address().port,
'255.255.255.255:6969' ]
)
t.deepEqual(res.r.id, dht1.nodeId)
t.deepEqual(res.r.nodes.length, 26 * 4)
dht1.destroy()

@@ -91,15 +103,20 @@ dht2.destroy()

dht1.addNode('1.1.1.1:6969', common.randomId())
dht1.addNode('2.2.2.2:6969', common.randomId())
dht1.addNode({host: '1.1.1.1', port: 6969, id: common.randomId()})
dht1.addNode({host: '2.2.2.2', port: 6969, id: common.randomId()})
dht1.listen(function () {
var targetInfoHash = common.randomId()
dht2._sendGetPeers('127.0.0.1:' + dht1.address().port, targetInfoHash, function (err, res) {
dht2._rpc.query({
host: '127.0.0.1',
port: dht1.address().port
}, {
q: 'get_peers',
a: {
info_hash: targetInfoHash
}
}, function (err, res) {
t.error(err)
t.deepEqual(res.id, dht1.nodeIdBuffer)
t.ok(Buffer.isBuffer(res.token))
t.deepEqual(
res.nodes.map(function (node) { return node.addr }).sort(),
[ '1.1.1.1:6969', '127.0.0.1:' + dht2.address().port, '2.2.2.2:6969' ]
)
t.deepEqual(res.r.id, dht1.nodeId)
t.ok(Buffer.isBuffer(res.r.token))
t.deepEqual(res.r.nodes.length, 3 * 26)

@@ -128,11 +145,17 @@ dht1.destroy()

dht1.listen(function () {
dht2._sendGetPeers('127.0.0.1:' + dht1.address().port, targetInfoHash, function (err, res) {
dht2._rpc.query({
host: '127.0.0.1',
port: dht1.address().port
}, {
q: 'get_peers',
a: {
info_hash: targetInfoHash
}
}, function (err, res) {
t.error(err)
t.deepEqual(res.id, dht1.nodeIdBuffer)
t.ok(Buffer.isBuffer(res.token))
t.deepEqual(
res.values.sort(),
['1.1.1.1:6969', '10.10.10.10:6969', '255.255.255.255:6969']
)
t.deepEqual(res.r.id, dht1.nodeId)
t.ok(Buffer.isBuffer(res.r.token))
t.deepEqual(res.r.values.length, 3)
dht1.destroy()

@@ -156,3 +179,13 @@ dht2.destroy()

var token = new Buffer('bad token')
dht2._sendAnnouncePeer('127.0.0.1:' + dht1.address().port, infoHash, 9999, token, function (err, res) {
dht2._rpc.query({
host: '127.0.0.1',
port: dht1.address().port
}, {
q: 'announce_peer',
a: {
info_hash: infoHash,
port: 9999,
token: token
}
}, function (err, res) {
t.ok(err, 'got error')

@@ -180,10 +213,29 @@ t.ok(err.message.indexOf('bad token') !== -1)

var port = dht1.address().port
dht2._sendGetPeers('127.0.0.1:' + port, infoHash, function (err, res1) {
dht2._rpc.query({
host: '127.0.0.1',
port: port
}, {
q: 'get_peers',
a: {
info_hash: infoHash
}
}, function (err, res1) {
t.error(err)
t.deepEqual(res1.id, dht1.nodeIdBuffer)
t.ok(Buffer.isBuffer(res1.token))
dht2._sendAnnouncePeer('127.0.0.1:' + port, infoHash, 9999, res1.token, function (err, res2) {
t.deepEqual(res1.r.id, dht1.nodeId)
t.ok(Buffer.isBuffer(res1.r.token))
dht2._rpc.query({
host: '127.0.0.1',
port: port
}, {
q: 'announce_peer',
a: {
info_hash: infoHash,
port: 9999,
token: res1.r.token
}
}, function (err, res2) {
t.error(err)
t.deepEqual(res2.id, dht1.nodeIdBuffer)
t.deepEqual(res2.r.id, dht1.nodeId)

@@ -190,0 +242,0 @@ dht1.destroy()

@@ -34,3 +34,3 @@ var DHT = require('../../')

t.pass('Found at least one peer that has the file')
t.equal(infoHash, pride)
t.equal(infoHash.toString('hex'), pride)
dht.destroy()

@@ -55,7 +55,7 @@ })

dht.on('peer', function (peer, infoHash) {
if (!prideDone && infoHash === pride) {
if (!prideDone && infoHash.toString('hex') === pride) {
prideDone = true
t.pass('Found at least one peer for Pride & Prejudice')
}
if (!leavesDone && infoHash === leaves) {
if (!leavesDone && infoHash.toString('hex') === leaves) {
leavesDone = true

@@ -70,1 +70,20 @@ t.pass('Found at least one peer for Leaves of Grass')

})
test('Find peers before ready is emitted', function (t) {
t.plan(3)
var dht = new DHT()
var then = Date.now()
dht.once('node', function (node) {
t.pass('Found at least one other DHT node')
})
dht.once('peer', function (peer, infoHash) {
t.pass('Found at least one peer that has the file')
t.equal(infoHash.toString('hex'), pride, 'found a peer in ' + (Date.now() - then) + ' ms')
dht.destroy()
})
dht.lookup(pride)
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc