New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

bittorrent-dht

Package Overview
Dependencies
Maintainers
8
Versions
137
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bittorrent-dht - npm Package Compare versions

Comparing version 9.0.3 to 10.0.0

1163

client.js

@@ -1,729 +0,724 @@

module.exports = DHT
const bencode = require('bencode')
const debug = require('debug')('bittorrent-dht')
const KBucket = require('k-bucket')
const krpc = require('k-rpc')
const low = require('last-one-wins')
const LRU = require('lru')
const randombytes = require('randombytes')
const records = require('record-cache')
const simpleSha1 = require('simple-sha1')
const { EventEmitter } = require('events')
var bencode = require('bencode')
var debug = require('debug')('bittorrent-dht')
var EventEmitter = require('events').EventEmitter
var inherits = require('inherits')
var KBucket = require('k-bucket')
var krpc = require('k-rpc')
var LRU = require('lru')
var randombytes = require('randombytes')
var simpleSha1 = require('simple-sha1')
var records = require('record-cache')
var low = require('last-one-wins')
const ROTATE_INTERVAL = 5 * 60 * 1000 // rotate secrets every 5 minutes
const BUCKET_OUTDATED_TIMESPAN = 15 * 60 * 1000 // check nodes in bucket in 15 minutes old buckets
var ROTATE_INTERVAL = 5 * 60 * 1000 // rotate secrets every 5 minutes
var BUCKET_OUTDATED_TIMESPAN = 15 * 60 * 1000 // check nodes in bucket in 15 minutes old buckets
class DHT extends EventEmitter {
constructor (opts = {}) {
super()
inherits(DHT, EventEmitter)
this._tables = new LRU({ maxAge: ROTATE_INTERVAL, max: opts.maxTables || 1000 })
this._values = new LRU(opts.maxValues || 1000)
this._peers = records({
maxAge: opts.maxAge || 0,
maxSize: opts.maxPeers || 10000
})
function DHT (opts) {
if (!(this instanceof DHT)) return new DHT(opts)
if (!opts) opts = {}
this._secrets = null
this._hash = opts.hash || sha1
this._hashLength = this._hash(Buffer.from('')).length
this._rpc = opts.krpc || krpc(Object.assign({ idLength: this._hashLength }, opts))
this._rpc.on('query', onquery)
this._rpc.on('node', onnode)
this._rpc.on('warning', onwarning)
this._rpc.on('error', onerror)
this._rpc.on('listening', onlistening)
this._rotateSecrets()
this._verify = opts.verify || null
this._host = opts.host || null
this._interval = setInterval(rotateSecrets, ROTATE_INTERVAL)
this._runningBucketCheck = false
this._bucketCheckTimeout = null
this._bucketOutdatedTimeSpan = opts.timeBucketOutdated || BUCKET_OUTDATED_TIMESPAN
var self = this
this.listening = false
this.destroyed = false
this.nodeId = this._rpc.id
this.nodes = this._rpc.nodes
this._tables = new LRU({ maxAge: ROTATE_INTERVAL, max: opts.maxTables || 1000 })
this._values = new LRU(opts.maxValues || 1000)
this._peers = records({
maxAge: opts.maxAge || 0,
maxSize: opts.maxPeers || 10000
})
// ensure only *one* ping it running at the time to avoid infinite async
// ping recursion, and make the latest one is always ran, but inbetween ones
// are disregarded
const onping = low(ping)
this._secrets = null
this._hash = opts.hash || sha1
this._hashLength = this._hash(Buffer.from('')).length
this._rpc = opts.krpc || krpc(Object.assign({ idLength: this._hashLength }, opts))
this._rpc.on('query', onquery)
this._rpc.on('node', onnode)
this._rpc.on('warning', onwarning)
this._rpc.on('error', onerror)
this._rpc.on('listening', onlistening)
this._rotateSecrets()
this._verify = opts.verify || null
this._host = opts.host || null
this._interval = setInterval(rotateSecrets, ROTATE_INTERVAL)
this._runningBucketCheck = false
this._bucketCheckTimeout = null
this._bucketOutdatedTimeSpan = opts.timeBucketOutdated || BUCKET_OUTDATED_TIMESPAN
this._rpc.on('ping', (older, swap) => {
onping({ older, swap })
})
this.listening = false
this.destroyed = false
this.nodeId = this._rpc.id
this.nodes = this._rpc.nodes
process.nextTick(bootstrap)
// ensure only *one* ping it running at the time to avoid infinite async
// ping recursion, and make the latest one is always ran, but inbetween ones
// are disregarded
var onping = low(ping)
this._debug('new DHT %s', this.nodeId)
this._rpc.on('ping', function (older, swap) {
onping({ older: older, swap: swap })
})
const self = this
process.nextTick(bootstrap)
function ping (opts, cb) {
const older = opts.older
const swap = opts.swap
EventEmitter.call(this)
this._debug('new DHT %s', this.nodeId)
self._debug('received ping', older)
self._checkNodes(older, false, (_, deadNode) => {
if (deadNode) {
self._debug('swaping dead node with newer', deadNode)
swap(deadNode)
return cb()
}
function ping (opts, cb) {
var older = opts.older
var swap = opts.swap
self._debug('no node added, all other nodes ok')
cb()
})
}
self._debug('received ping', older)
self._checkNodes(older, false, function (_, deadNode) {
if (deadNode) {
self._debug('swaping dead node with newer', deadNode)
swap(deadNode)
return cb()
}
function onlistening () {
self.listening = true
self._debug('listening %d', self.address().port)
self.updateBucketTimestamp()
self._setBucketCheckInterval()
self.emit('listening')
}
self._debug('no node added, all other nodes ok')
cb()
})
}
function onquery (query, peer) {
self._onquery(query, peer)
}
function onlistening () {
self.listening = true
self._debug('listening %d', self.address().port)
self.updateBucketTimestamp()
self._setBucketCheckInterval()
self.emit('listening')
}
function rotateSecrets () {
self._rotateSecrets()
}
function onquery (query, peer) {
self._onquery(query, peer)
}
function bootstrap () {
if (!self.destroyed) self._bootstrap(opts.bootstrap !== false)
}
function rotateSecrets () {
self._rotateSecrets()
}
function onwarning (err) {
self.emit('warning', err)
}
function bootstrap () {
if (!self.destroyed) self._bootstrap(opts.bootstrap !== false)
}
function onerror (err) {
self.emit('error', err)
}
function onwarning (err) {
self.emit('warning', err)
function onnode (node) {
self.emit('node', node)
}
}
function onerror (err) {
self.emit('error', err)
}
_setBucketCheckInterval () {
const self = this
const interval = 1 * 60 * 1000 // check age of bucket every minute
function onnode (node) {
self.emit('node', node)
}
}
this._runningBucketCheck = true
queueNext()
DHT.prototype._setBucketCheckInterval = function () {
var self = this
var interval = 1 * 60 * 1000 // check age of bucket every minute
function checkBucket () {
const diff = Date.now() - self._rpc.nodes.metadata.lastChange
this._runningBucketCheck = true
queueNext()
if (diff < self._bucketOutdatedTimeSpan) return queueNext()
function checkBucket () {
const diff = Date.now() - self._rpc.nodes.metadata.lastChange
self._pingAll(() => {
if (self.destroyed) return
if (diff < self._bucketOutdatedTimeSpan) return queueNext()
if (self.nodes.toArray().length < 1) {
// node is currently isolated,
// retry with initial bootstrap nodes
self._bootstrap(true)
}
self._pingAll(function () {
if (self.destroyed) return
queueNext()
})
}
if (self.nodes.toArray().length < 1) {
// node is currently isolated,
// retry with initial bootstrap nodes
self._bootstrap(true)
}
function queueNext () {
if (!self._runningBucketCheck || self.destroyed) return
const nextTimeout = Math.floor(Math.random() * interval + interval / 2)
self._bucketCheckTimeout = setTimeout(checkBucket, nextTimeout)
}
}
queueNext()
})
_pingAll (cb) {
this._checkAndRemoveNodes(this.nodes.toArray(), cb)
}
function queueNext () {
if (!self._runningBucketCheck || self.destroyed) return
var nextTimeout = Math.floor(Math.random() * interval + interval / 2)
self._bucketCheckTimeout = setTimeout(checkBucket, nextTimeout)
removeBucketCheckInterval () {
this._runningBucketCheck = false
clearTimeout(this._bucketCheckTimeout)
}
}
DHT.prototype._pingAll = function (cb) {
this._checkAndRemoveNodes(this.nodes.toArray(), cb)
}
updateBucketTimestamp () {
this._rpc.nodes.metadata.lastChange = Date.now()
}
DHT.prototype.removeBucketCheckInterval = function () {
this._runningBucketCheck = false
clearTimeout(this._bucketCheckTimeout)
}
_checkAndRemoveNodes (nodes, cb) {
const self = this
DHT.prototype.updateBucketTimestamp = function () {
this._rpc.nodes.metadata.lastChange = Date.now()
}
this._checkNodes(nodes, true, (_, node) => {
if (node) self.removeNode(node.id)
cb(null, node)
})
}
DHT.prototype._checkAndRemoveNodes = function (nodes, cb) {
var self = this
_checkNodes (nodes, force, cb) {
const self = this
this._checkNodes(nodes, true, function (_, node) {
if (node) self.removeNode(node.id)
cb(null, node)
})
}
test(nodes)
DHT.prototype._checkNodes = function (nodes, force, cb) {
var self = this
function test (acc) {
let current = null
test(nodes)
while (acc.length) {
current = acc.pop()
if (!current.id || force) break
if (Date.now() - (current.seen || 0) > 10000) break // not pinged within 10s
current = null
}
function test (acc) {
var current = null
if (!current) return cb(null)
while (acc.length) {
current = acc.pop()
if (!current.id || force) break
if (Date.now() - (current.seen || 0) > 10000) break // not pinged within 10s
current = null
self._sendPing(current, err => {
if (!err) {
self.updateBucketTimestamp()
return test(acc)
}
cb(null, current)
})
}
}
if (!current) return cb(null)
self._sendPing(current, function (err) {
if (!err) {
self.updateBucketTimestamp()
return test(acc)
addNode (node) {
const self = this
if (node.id) {
node.id = toBuffer(node.id)
const old = !!this._rpc.nodes.get(node.id)
this._rpc.nodes.add(node)
if (!old) {
this.emit('node', node)
this.updateBucketTimestamp()
}
cb(null, current)
return
}
this._sendPing(node, (_, node) => {
if (node) self.addNode(node)
})
}
}
DHT.prototype.addNode = function (node) {
var self = this
if (node.id) {
node.id = toBuffer(node.id)
var old = !!this._rpc.nodes.get(node.id)
this._rpc.nodes.add(node)
if (!old) {
this.emit('node', node)
this.updateBucketTimestamp()
}
return
removeNode (id) {
this._rpc.nodes.remove(toBuffer(id))
}
this._sendPing(node, function (_, node) {
if (node) self.addNode(node)
})
}
DHT.prototype.removeNode = function (id) {
this._rpc.nodes.remove(toBuffer(id))
}
_sendPing (node, cb) {
const self = this
const expectedId = node.id
this._rpc.query(node, { q: 'ping' }, (err, pong, node) => {
if (err) return cb(err)
if (!pong.r || !pong.r.id || !Buffer.isBuffer(pong.r.id) || pong.r.id.length !== self._hashLength) {
return cb(new Error('Bad reply'))
}
if (Buffer.isBuffer(expectedId) && !expectedId.equals(pong.r.id)) {
return cb(new Error('Unexpected node id'))
}
DHT.prototype._sendPing = function (node, cb) {
var self = this
var expectedId = node.id
this._rpc.query(node, { q: 'ping' }, function (err, pong, node) {
if (err) return cb(err)
if (!pong.r || !pong.r.id || !Buffer.isBuffer(pong.r.id) || pong.r.id.length !== self._hashLength) {
return cb(new Error('Bad reply'))
}
if (Buffer.isBuffer(expectedId) && !expectedId.equals(pong.r.id)) {
return cb(new Error('Unexpected node id'))
}
self.updateBucketTimestamp()
cb(null, {
id: pong.r.id,
host: node.host || node.address,
port: node.port
})
})
}
self.updateBucketTimestamp()
cb(null, {
id: pong.r.id,
host: node.host || node.address,
port: node.port
toJSON () {
const self = this
const values = {}
Object.keys(this._values.cache).forEach(key => {
const value = self._values.cache[key].value
values[key] = {
v: value.v.toString('hex'),
id: value.id.toString('hex')
}
if (value.seq != null) values[key].seq = value.seq
if (value.sig != null) values[key].sig = value.sig.toString('hex')
if (value.k != null) values[key].k = value.k.toString('hex')
})
})
}
DHT.prototype.toJSON = function () {
var self = this
var values = {}
Object.keys(this._values.cache).forEach(function (key) {
var value = self._values.cache[key].value
values[key] = {
v: value.v.toString('hex'),
id: value.id.toString('hex')
return {
nodes: this._rpc.nodes.toArray().map(toNode),
values
}
if (value.seq != null) values[key].seq = value.seq
if (value.sig != null) values[key].sig = value.sig.toString('hex')
if (value.k != null) values[key].k = value.k.toString('hex')
})
return {
nodes: this._rpc.nodes.toArray().map(toNode),
values: values
}
}
DHT.prototype.put = function (opts, cb) {
if (Buffer.isBuffer(opts) || typeof opts === 'string') opts = { v: opts }
var isMutable = !!opts.k
if (opts.v === undefined) {
throw new Error('opts.v not given')
put (opts, cb) {
if (Buffer.isBuffer(opts) || typeof opts === 'string') opts = { v: opts }
const isMutable = !!opts.k
if (opts.v === undefined) {
throw new Error('opts.v not given')
}
if (opts.v.length >= 1000) {
throw new Error('v must be less than 1000 bytes in put()')
}
if (isMutable && opts.cas !== undefined && typeof opts.cas !== 'number') {
throw new Error('opts.cas must be an integer if provided')
}
if (isMutable && opts.k.length !== 32) {
throw new Error('opts.k ed25519 public key must be 32 bytes')
}
if (isMutable && typeof opts.sign !== 'function' && !Buffer.isBuffer(opts.sig)) {
throw new Error('opts.sign function or options.sig signature is required for mutable put')
}
if (isMutable && opts.salt && opts.salt.length > 64) {
throw new Error('opts.salt is > 64 bytes long')
}
if (isMutable && opts.seq === undefined) {
throw new Error('opts.seq not provided for a mutable update')
}
if (isMutable && typeof opts.seq !== 'number') {
throw new Error('opts.seq not an integer')
}
return this._put(opts, cb)
}
if (opts.v.length >= 1000) {
throw new Error('v must be less than 1000 bytes in put()')
}
if (isMutable && opts.cas !== undefined && typeof opts.cas !== 'number') {
throw new Error('opts.cas must be an integer if provided')
}
if (isMutable && opts.k.length !== 32) {
throw new Error('opts.k ed25519 public key must be 32 bytes')
}
if (isMutable && typeof opts.sign !== 'function' && !Buffer.isBuffer(opts.sig)) {
throw new Error('opts.sign function or options.sig signature is required for mutable put')
}
if (isMutable && opts.salt && opts.salt.length > 64) {
throw new Error('opts.salt is > 64 bytes long')
}
if (isMutable && opts.seq === undefined) {
throw new Error('opts.seq not provided for a mutable update')
}
if (isMutable && typeof opts.seq !== 'number') {
throw new Error('opts.seq not an integer')
}
return this._put(opts, cb)
}
_put (opts, cb) {
if (!cb) cb = noop
DHT.prototype._put = function (opts, cb) {
if (!cb) cb = noop
const isMutable = !!opts.k
const v = typeof opts.v === 'string' ? Buffer.from(opts.v) : opts.v
const key = isMutable
? this._hash(opts.salt ? Buffer.concat([opts.k, opts.salt]) : opts.k)
: this._hash(bencode.encode(v))
var isMutable = !!opts.k
var v = typeof opts.v === 'string' ? Buffer.from(opts.v) : opts.v
var key = isMutable
? this._hash(opts.salt ? Buffer.concat([opts.k, opts.salt]) : opts.k)
: this._hash(bencode.encode(v))
const table = this._tables.get(key.toString('hex'))
if (!table) return this._preput(key, opts, cb)
var table = this._tables.get(key.toString('hex'))
if (!table) return this._preput(key, opts, cb)
const message = {
q: 'put',
a: {
id: this._rpc.id,
token: null, // queryAll sets this
v
}
}
var message = {
q: 'put',
a: {
id: this._rpc.id,
token: null, // queryAll sets this
v: v
if (isMutable) {
if (typeof opts.cas === 'number') message.a.cas = opts.cas
if (opts.salt) message.a.salt = opts.salt
message.a.k = opts.k
message.a.seq = opts.seq
if (typeof opts.sign === 'function') message.a.sig = opts.sign(encodeSigData(message.a))
else if (Buffer.isBuffer(opts.sig)) message.a.sig = opts.sig
} else {
this._values.set(key.toString('hex'), message.a)
}
}
if (isMutable) {
if (typeof opts.cas === 'number') message.a.cas = opts.cas
if (opts.salt) message.a.salt = opts.salt
message.a.k = opts.k
message.a.seq = opts.seq
if (typeof opts.sign === 'function') message.a.sig = opts.sign(encodeSigData(message.a))
else if (Buffer.isBuffer(opts.sig)) message.a.sig = opts.sig
} else {
this._values.set(key.toString('hex'), message.a)
this._rpc.queryAll(table.closest(key), message, null, (err, n) => {
if (err) return cb(err, key, n)
cb(null, key, n)
})
return key
}
this._rpc.queryAll(table.closest(key), message, null, function (err, n) {
if (err) return cb(err, key, n)
cb(null, key, n)
})
_preput (key, opts, cb) {
const self = this
return key
}
this._closest(key, {
q: 'get',
a: {
id: this._rpc.id,
target: key
}
}, null, (err, n) => {
if (err) return cb(err)
self.put(opts, cb)
})
DHT.prototype._preput = function (key, opts, cb) {
var self = this
return key
}
this._closest(key, {
q: 'get',
a: {
id: this._rpc.id,
target: key
get (key, opts, cb) {
key = toBuffer(key)
if (typeof opts === 'function') {
cb = opts
opts = null
}
}, null, function (err, n) {
if (err) return cb(err)
self.put(opts, cb)
})
return key
}
if (!opts) opts = {}
const verify = opts.verify || this._verify
const hash = this._hash
let value = this._values.get(key.toString('hex')) || null
DHT.prototype.get = function (key, opts, cb) {
key = toBuffer(key)
if (typeof opts === 'function') {
cb = opts
opts = null
}
if (value && (opts.cache !== false)) {
value = createGetResponse(this._rpc.id, null, value)
return process.nextTick(done)
}
if (!opts) opts = {}
var verify = opts.verify || this._verify
var hash = this._hash
var value = this._values.get(key.toString('hex')) || null
this._closest(key, {
q: 'get',
a: {
id: this._rpc.id,
target: key
}
}, onreply, done)
if (value && (opts.cache !== false)) {
value = createGetResponse(this._rpc.id, null, value)
return process.nextTick(done)
}
this._closest(key, {
q: 'get',
a: {
id: this._rpc.id,
target: key
function done (err) {
if (err) return cb(err)
cb(null, value)
}
}, onreply, done)
function done (err) {
if (err) return cb(err)
cb(null, value)
}
function onreply (message) {
const r = message.r
if (!r || !r.v) return true
function onreply (message) {
var r = message.r
if (!r || !r.v) return true
const isMutable = r.k || r.sig
var isMutable = r.k || r.sig
if (opts.salt) r.salt = Buffer.from(opts.salt)
if (opts.salt) r.salt = Buffer.from(opts.salt)
if (isMutable) {
if (!verify || !r.sig || !r.k) return true
if (!verify(r.sig, encodeSigData(r), r.k)) return true
if (hash(r.salt ? Buffer.concat([r.k, r.salt]) : r.k).equals(key)) {
if (!value || r.seq > value.seq) value = r
}
} else {
if (hash(bencode.encode(r.v)).equals(key)) {
value = r
return false
}
}
if (isMutable) {
if (!verify || !r.sig || !r.k) return true
if (!verify(r.sig, encodeSigData(r), r.k)) return true
if (hash(r.salt ? Buffer.concat([r.k, r.salt]) : r.k).equals(key)) {
if (!value || r.seq > value.seq) value = r
}
} else {
if (hash(bencode.encode(r.v)).equals(key)) {
value = r
return false
}
return true
}
return true
}
}
DHT.prototype.announce = function (infoHash, port, cb) {
if (typeof port === 'function') return this.announce(infoHash, 0, port)
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
announce (infoHash, port, cb) {
if (typeof port === 'function') return this.announce(infoHash, 0, port)
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
var table = this._tables.get(infoHash.toString('hex'))
if (!table) return this._preannounce(infoHash, port, cb)
const table = this._tables.get(infoHash.toString('hex'))
if (!table) return this._preannounce(infoHash, port, cb)
if (this._host) {
var dhtPort = this.listening ? this.address().port : 0
this._addPeer(
{ host: this._host, port: port || dhtPort },
infoHash,
{ host: this._host, port: dhtPort }
)
}
if (this._host) {
const dhtPort = this.listening ? this.address().port : 0
this._addPeer(
{ host: this._host, port: port || dhtPort },
infoHash,
{ host: this._host, port: dhtPort }
)
}
var message = {
q: 'announce_peer',
a: {
id: this._rpc.id,
token: null, // queryAll sets this
info_hash: infoHash,
port: port,
implied_port: port ? 0 : 1
const message = {
q: 'announce_peer',
a: {
id: this._rpc.id,
token: null, // queryAll sets this
info_hash: infoHash,
port,
implied_port: port ? 0 : 1
}
}
this._debug('announce %s %d', infoHash, port)
this._rpc.queryAll(table.closest(infoHash), message, null, cb)
}
this._debug('announce %s %d', infoHash, port)
this._rpc.queryAll(table.closest(infoHash), message, null, cb)
}
_preannounce (infoHash, port, cb) {
const self = this
DHT.prototype._preannounce = function (infoHash, port, cb) {
var self = this
this.lookup(infoHash, err => {
if (self.destroyed) return cb(new Error('dht is destroyed'))
if (err) return cb(err)
self.announce(infoHash, port, cb)
})
}
this.lookup(infoHash, function (err) {
if (self.destroyed) return cb(new Error('dht is destroyed'))
if (err) return cb(err)
self.announce(infoHash, port, cb)
})
}
lookup (infoHash, cb) {
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
const self = this
let aborted = false
DHT.prototype.lookup = function (infoHash, cb) {
infoHash = toBuffer(infoHash)
if (!cb) cb = noop
var self = this
var aborted = false
this._debug('lookup %s', infoHash)
process.nextTick(emit)
this._closest(infoHash, {
q: 'get_peers',
a: {
id: this._rpc.id,
info_hash: infoHash
}
}, onreply, cb)
this._debug('lookup %s', infoHash)
process.nextTick(emit)
this._closest(infoHash, {
q: 'get_peers',
a: {
id: this._rpc.id,
info_hash: infoHash
function emit (values, from) {
if (!values) values = self._peers.get(infoHash.toString('hex'), 100)
const peers = decodePeers(values)
for (let i = 0; i < peers.length; i++) {
self.emit('peer', peers[i], infoHash, from || null)
}
}
}, onreply, cb)
function emit (values, from) {
if (!values) values = self._peers.get(infoHash.toString('hex'), 100)
var peers = decodePeers(values)
for (var i = 0; i < peers.length; i++) {
self.emit('peer', peers[i], infoHash, from || null)
function onreply (message, node) {
if (aborted) return false
if (message.r.values) emit(message.r.values, node)
}
return function abort () { aborted = true }
}
function onreply (message, node) {
if (aborted) return false
if (message.r.values) emit(message.r.values, node)
address () {
return this._rpc.address()
}
return function abort () { aborted = true }
}
// listen([port], [address], [onlistening])
listen (...args) {
this._rpc.bind(...args)
}
DHT.prototype.address = function () {
return this._rpc.address()
}
// listen([port], [address], [onlistening])
DHT.prototype.listen = function () {
this._rpc.bind.apply(this._rpc, arguments)
}
DHT.prototype.destroy = function (cb) {
if (this.destroyed) {
if (cb) process.nextTick(cb)
return
destroy (cb) {
if (this.destroyed) {
if (cb) process.nextTick(cb)
return
}
this.destroyed = true
const self = this
clearInterval(this._interval)
this.removeBucketCheckInterval()
this._peers.destroy()
this._debug('destroying')
this._rpc.destroy(() => {
self.emit('close')
if (cb) cb()
})
}
this.destroyed = true
var self = this
clearInterval(this._interval)
this.removeBucketCheckInterval()
this._peers.destroy()
this._debug('destroying')
this._rpc.destroy(function () {
self.emit('close')
if (cb) cb()
})
}
DHT.prototype._onquery = function (query, peer) {
var q = query.q.toString()
this._debug('received %s query from %s:%d', q, peer.address, peer.port)
if (!query.a) return
_onquery (query, peer) {
const q = query.q.toString()
this._debug('received %s query from %s:%d', q, peer.address, peer.port)
if (!query.a) return
switch (q) {
case 'ping':
return this._rpc.response(peer, query, { id: this._rpc.id })
switch (q) {
case 'ping':
return this._rpc.response(peer, query, { id: this._rpc.id })
case 'find_node':
return this._onfindnode(query, peer)
case 'find_node':
return this._onfindnode(query, peer)
case 'get_peers':
return this._ongetpeers(query, peer)
case 'get_peers':
return this._ongetpeers(query, peer)
case 'announce_peer':
return this._onannouncepeer(query, peer)
case 'announce_peer':
return this._onannouncepeer(query, peer)
case 'get':
return this._onget(query, peer)
case 'get':
return this._onget(query, peer)
case 'put':
return this._onput(query, peer)
case 'put':
return this._onput(query, peer)
}
}
}
DHT.prototype._onfindnode = function (query, peer) {
var target = query.a.target
if (!target) return this._rpc.error(peer, query, [203, '`find_node` missing required `a.target` field'])
_onfindnode (query, peer) {
const target = query.a.target
if (!target) return this._rpc.error(peer, query, [203, '`find_node` missing required `a.target` field'])
this.emit('find_node', target)
this.emit('find_node', target)
var nodes = this._rpc.nodes.closest(target)
this._rpc.response(peer, query, { id: this._rpc.id }, nodes)
}
const nodes = this._rpc.nodes.closest(target)
this._rpc.response(peer, query, { id: this._rpc.id }, nodes)
}
DHT.prototype._ongetpeers = function (query, peer) {
var host = peer.address || peer.host
var infoHash = query.a.info_hash
if (!infoHash) return this._rpc.error(peer, query, [203, '`get_peers` missing required `a.info_hash` field'])
_ongetpeers (query, peer) {
const host = peer.address || peer.host
const infoHash = query.a.info_hash
if (!infoHash) return this._rpc.error(peer, query, [203, '`get_peers` missing required `a.info_hash` field'])
this.emit('get_peers', infoHash)
this.emit('get_peers', infoHash)
var r = { id: this._rpc.id, token: this._generateToken(host) }
var peers = this._peers.get(infoHash.toString('hex'))
const r = { id: this._rpc.id, token: this._generateToken(host) }
const peers = this._peers.get(infoHash.toString('hex'))
if (peers.length) {
r.values = peers
this._rpc.response(peer, query, r)
} else {
this._rpc.response(peer, query, r, this._rpc.nodes.closest(infoHash))
if (peers.length) {
r.values = peers
this._rpc.response(peer, query, r)
} else {
this._rpc.response(peer, query, r, this._rpc.nodes.closest(infoHash))
}
}
}
DHT.prototype._onannouncepeer = function (query, peer) {
var host = peer.address || peer.host
var port = query.a.implied_port ? peer.port : query.a.port
if (!port || typeof port !== 'number' || port <= 0 || port > 65535) return
var infoHash = query.a.info_hash
var token = query.a.token
if (!infoHash || !token) return
_onannouncepeer (query, peer) {
const host = peer.address || peer.host
const port = query.a.implied_port ? peer.port : query.a.port
if (!port || typeof port !== 'number' || port <= 0 || port > 65535) return
const infoHash = query.a.info_hash
const token = query.a.token
if (!infoHash || !token) return
if (!this._validateToken(host, token)) {
return this._rpc.error(peer, query, [203, 'cannot `announce_peer` with bad token'])
}
if (!this._validateToken(host, token)) {
return this._rpc.error(peer, query, [203, 'cannot `announce_peer` with bad token'])
}
this.emit('announce_peer', infoHash, { host: host, port: peer.port })
this.emit('announce_peer', infoHash, { host, port: peer.port })
this._addPeer({ host: host, port: port }, infoHash, { host: host, port: peer.port })
this._rpc.response(peer, query, { id: this._rpc.id })
}
this._addPeer({ host, port }, infoHash, { host, port: peer.port })
this._rpc.response(peer, query, { id: this._rpc.id })
}
DHT.prototype._addPeer = function (peer, infoHash, from) {
this._peers.add(infoHash.toString('hex'), encodePeer(peer.host, peer.port))
this.emit('announce', peer, infoHash, from)
}
_addPeer (peer, infoHash, from) {
this._peers.add(infoHash.toString('hex'), encodePeer(peer.host, peer.port))
this.emit('announce', peer, infoHash, from)
}
DHT.prototype._onget = function (query, peer) {
var host = peer.address || peer.host
var target = query.a.target
if (!target) return
var token = this._generateToken(host)
var value = this._values.get(target.toString('hex'))
_onget (query, peer) {
const host = peer.address || peer.host
const target = query.a.target
if (!target) return
const token = this._generateToken(host)
const value = this._values.get(target.toString('hex'))
this.emit('get', target, value)
this.emit('get', target, value)
if (!value) {
var nodes = this._rpc.nodes.closest(target)
this._rpc.response(peer, query, { id: this._rpc.id, token: token }, nodes)
} else {
this._rpc.response(peer, query, createGetResponse(this._rpc.id, token, value))
if (!value) {
const nodes = this._rpc.nodes.closest(target)
this._rpc.response(peer, query, { id: this._rpc.id, token }, nodes)
} else {
this._rpc.response(peer, query, createGetResponse(this._rpc.id, token, value))
}
}
}
DHT.prototype._onput = function (query, peer) {
var host = peer.address || peer.host
_onput (query, peer) {
const host = peer.address || peer.host
var a = query.a
if (!a) return
var v = query.a.v
if (!v) return
var id = query.a.id
if (!id) return
const a = query.a
if (!a) return
const v = query.a.v
if (!v) return
const id = query.a.id
if (!id) return
var token = a.token
if (!token) return
const token = a.token
if (!token) return
if (!this._validateToken(host, token)) {
return this._rpc.error(peer, query, [203, 'cannot `put` with bad token'])
}
if (v.length > 1000) {
return this._rpc.error(peer, query, [205, 'data payload too large'])
}
if (!this._validateToken(host, token)) {
return this._rpc.error(peer, query, [203, 'cannot `put` with bad token'])
}
if (v.length > 1000) {
return this._rpc.error(peer, query, [205, 'data payload too large'])
}
var isMutable = !!(a.k || a.sig)
if (isMutable && !a.k && !a.sig) return
const isMutable = !!(a.k || a.sig)
if (isMutable && !a.k && !a.sig) return
var key = isMutable
? this._hash(a.salt ? Buffer.concat([a.k, a.salt]) : a.k)
: this._hash(bencode.encode(v))
var keyHex = key.toString('hex')
const key = isMutable
? this._hash(a.salt ? Buffer.concat([a.k, a.salt]) : a.k)
: this._hash(bencode.encode(v))
const keyHex = key.toString('hex')
this.emit('put', key, v)
this.emit('put', key, v)
if (isMutable) {
if (!this._verify) return this._rpc.error(peer, query, [400, 'verification not supported'])
if (!this._verify(a.sig, encodeSigData(a), a.k)) return
var prev = this._values.get(keyHex)
if (prev && typeof a.cas === 'number' && prev.seq !== a.cas) {
return this._rpc.error(peer, query, [301, 'CAS mismatch, re-read and try again'])
if (isMutable) {
if (!this._verify) return this._rpc.error(peer, query, [400, 'verification not supported'])
if (!this._verify(a.sig, encodeSigData(a), a.k)) return
const prev = this._values.get(keyHex)
if (prev && typeof a.cas === 'number' && prev.seq !== a.cas) {
return this._rpc.error(peer, query, [301, 'CAS mismatch, re-read and try again'])
}
if (prev && typeof prev.seq === 'number' && !(a.seq > prev.seq)) {
return this._rpc.error(peer, query, [302, 'sequence number less than current'])
}
this._values.set(keyHex, { v, k: a.k, salt: a.salt, sig: a.sig, seq: a.seq, id })
} else {
this._values.set(keyHex, { v, id })
}
if (prev && typeof prev.seq === 'number' && !(a.seq > prev.seq)) {
return this._rpc.error(peer, query, [302, 'sequence number less than current'])
}
this._values.set(keyHex, { v: v, k: a.k, salt: a.salt, sig: a.sig, seq: a.seq, id: id })
} else {
this._values.set(keyHex, { v: v, id: id })
this._rpc.response(peer, query, { id: this._rpc.id })
}
this._rpc.response(peer, query, { id: this._rpc.id })
}
_bootstrap (populate) {
const self = this
if (!populate) return process.nextTick(ready)
DHT.prototype._bootstrap = function (populate) {
var self = this
if (!populate) return process.nextTick(ready)
this._rpc.populate(self._rpc.id, {
q: 'find_node',
a: {
id: self._rpc.id,
target: self._rpc.id
}
}, ready)
this._rpc.populate(self._rpc.id, {
q: 'find_node',
a: {
id: self._rpc.id,
target: self._rpc.id
function ready () {
if (self.ready) return
self._debug('emit ready')
self.ready = true
self.emit('ready')
}
}, ready)
}
function ready () {
if (self.ready) return
_closest (target, message, onmessage, cb) {
const self = this
self._debug('emit ready')
self.ready = true
self.emit('ready')
}
}
const table = new KBucket({
localNodeId: target,
numberOfNodesPerKBucket: this._rpc.k
})
DHT.prototype._closest = function (target, message, onmessage, cb) {
var self = this
this._rpc.closest(target, message, onreply, done)
var table = new KBucket({
localNodeId: target,
numberOfNodesPerKBucket: this._rpc.k
})
function done (err, n) {
if (err) return cb(err)
self._tables.set(target.toString('hex'), table)
self._debug('visited %d nodes', n)
cb(null, n)
}
this._rpc.closest(target, message, onreply, done)
function onreply (message, node) {
if (!message.r) return true
function done (err, n) {
if (err) return cb(err)
self._tables.set(target.toString('hex'), table)
self._debug('visited %d nodes', n)
cb(null, n)
if (message.r.token && message.r.id && Buffer.isBuffer(message.r.id) && message.r.id.length === self._hashLength) {
self._debug('found node %s (target: %s)', message.r.id, target)
table.add({
id: message.r.id,
host: node.host || node.address,
port: node.port,
token: message.r.token
})
}
if (!onmessage) return true
return onmessage(message, node)
}
}
function onreply (message, node) {
if (!message.r) return true
if (message.r.token && message.r.id && Buffer.isBuffer(message.r.id) && message.r.id.length === self._hashLength) {
self._debug('found node %s (target: %s)', message.r.id, target)
table.add({
id: message.r.id,
host: node.host || node.address,
port: node.port,
token: message.r.token
})
_debug () {
if (!debug.enabled) return
const args = [].slice.call(arguments)
args[0] = `[${this.nodeId.toString('hex').substring(0, 7)}] ${args[0]}`
for (let i = 1; i < args.length; i++) {
if (Buffer.isBuffer(args[i])) args[i] = args[i].toString('hex')
}
debug(...args)
}
if (!onmessage) return true
return onmessage(message, node)
_validateToken (host, token) {
const tokenA = this._generateToken(host, this._secrets[0])
const tokenB = this._generateToken(host, this._secrets[1])
return token.equals(tokenA) || token.equals(tokenB)
}
}
DHT.prototype._debug = function () {
if (!debug.enabled) return
var args = [].slice.call(arguments)
args[0] = '[' + this.nodeId.toString('hex').substring(0, 7) + '] ' + args[0]
for (var i = 1; i < args.length; i++) {
if (Buffer.isBuffer(args[i])) args[i] = args[i].toString('hex')
_generateToken (host, secret) {
if (!secret) secret = this._secrets[0]
return this._hash(Buffer.concat([Buffer.from(host), secret]))
}
debug.apply(null, args)
}
DHT.prototype._validateToken = function (host, token) {
var tokenA = this._generateToken(host, this._secrets[0])
var tokenB = this._generateToken(host, this._secrets[1])
return token.equals(tokenA) || token.equals(tokenB)
}
DHT.prototype._generateToken = function (host, secret) {
if (!secret) secret = this._secrets[0]
return this._hash(Buffer.concat([Buffer.from(host), secret]))
}
DHT.prototype._rotateSecrets = function () {
if (!this._secrets) {
this._secrets = [randombytes(this._hashLength), randombytes(this._hashLength)]
} else {
this._secrets[1] = this._secrets[0]
this._secrets[0] = randombytes(this._hashLength)
_rotateSecrets () {
if (!this._secrets) {
this._secrets = [randombytes(this._hashLength), randombytes(this._hashLength)]
} else {
this._secrets[1] = this._secrets[0]
this._secrets[0] = randombytes(this._hashLength)
}
}

@@ -739,3 +734,3 @@ }

function createGetResponse (id, token, value) {
var r = { id: id, token: token, v: value.v }
const r = { id, token, v: value.v }
if (value.sig) {

@@ -750,5 +745,5 @@ r.sig = value.sig

function encodePeer (host, port) {
var buf = Buffer.allocUnsafe(6)
var ip = host.split('.')
for (var i = 0; i < 4; i++) buf[i] = parseInt(ip[i] || 0, 10)
const buf = Buffer.allocUnsafe(6)
const ip = host.split('.')
for (let i = 0; i < 4; i++) buf[i] = parseInt(ip[i] || 0, 10)
buf.writeUInt16BE(port, 4)

@@ -759,11 +754,11 @@ return buf

function decodePeers (buf) {
var peers = []
const peers = []
try {
for (var i = 0; i < buf.length; i++) {
var port = buf[i].readUInt16BE(4)
for (let i = 0; i < buf.length; i++) {
const port = buf[i].readUInt16BE(4)
if (!port) continue
peers.push({
host: parseIp(buf[i], 0),
port: port
port
})

@@ -779,7 +774,7 @@ }

function parseIp (buf, offset) {
return buf[offset++] + '.' + buf[offset++] + '.' + buf[offset++] + '.' + buf[offset++]
return `${buf[offset++]}.${buf[offset++]}.${buf[offset++]}.${buf[offset++]}`
}
function encodeSigData (msg) {
var ref = { seq: msg.seq || 0, v: msg.v }
const ref = { seq: msg.seq || 0, v: msg.v }
if (msg.salt) ref.salt = msg.salt

@@ -802,1 +797,3 @@ return bencode.encode(ref).slice(1, -1)

}
module.exports = DHT

@@ -1,3 +0,4 @@

var Client = require('./client')
var Server = require('./server')
/*! bittorrent-dht. MIT License. WebTorrent LLC <https://webtorrent.io/opensource> */
const Client = require('./client')
const Server = require('./server')

@@ -4,0 +5,0 @@ module.exports = Client

{
"name": "bittorrent-dht",
"description": "Simple, robust, BitTorrent DHT implementation",
"version": "9.0.3",
"version": "10.0.0",
"author": {
"name": "WebTorrent, LLC",
"name": "WebTorrent LLC",
"email": "feross@webtorrent.io",

@@ -16,3 +16,2 @@ "url": "https://webtorrent.io"

"debug": "^4.1.1",
"inherits": "^2.0.1",
"k-bucket": "^5.0.0",

@@ -27,3 +26,3 @@ "k-rpc": "^5.0.0",

"devDependencies": {
"ed25519-supercop": "^1.0.0",
"bittorrent-dht-sodium": "^1.1.0",
"ip": "^1.1.0",

@@ -33,3 +32,3 @@ "once": "^1.3.3",

"standard": "*",
"tape": "^4.4.0"
"tape": "^5.0.0"
},

@@ -46,2 +45,5 @@ "keywords": [

],
"engines": {
"node": ">=10"
},
"license": "MIT",

@@ -57,3 +59,17 @@ "main": "index.js",

"test-live": "tape test/live/*.js"
}
},
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
]
}

@@ -44,11 +44,11 @@ # bittorrent-dht [![travis][travis-image]][travis-url] [![npm][npm-image]][npm-url] [![downloads][downloads-image]][downloads-url] [![javascript style guide][standard-image]][standard-url]

```js
var DHT = require('bittorrent-dht')
var magnet = require('magnet-uri')
const DHT = require('bittorrent-dht')
const magnet = require('magnet-uri')
var uri = 'magnet:?xt=urn:btih:e3811b9539cacff680e418124272177c47477157'
var parsed = magnet(uri)
const uri = 'magnet:?xt=urn:btih:e3811b9539cacff680e418124272177c47477157'
const parsed = magnet(uri)
console.log(parsed.infoHash) // 'e3811b9539cacff680e418124272177c47477157'
var dht = new DHT()
const dht = new DHT()

@@ -97,4 +97,4 @@ dht.listen(20000, function () {

``` js
var ed = require('ed25519-supercop')
var dht = new DHT({ verify: ed.verify })
const ed = require('ed25519-supercop')
const dht = new DHT({ verify: ed.verify })
```

@@ -175,3 +175,3 @@

```js
var dht1 = new DHT()
const dht1 = new DHT()

@@ -181,3 +181,3 @@ // some time passes ...

// destroy the dht
var nodes = dht1.toJSON().nodes
const nodes = dht1.toJSON().nodes
dht1.destroy()

@@ -188,3 +188,3 @@

// initialize a new dht with the same routing table as the first
var dht2 = new DHT()
const dht2 = new DHT()

@@ -232,5 +232,5 @@ nodes.forEach(function (node) {

``` js
var DHT = require('bittorrent-dht')
var dht = new DHT()
var value = Buffer.alloc(200).fill('abc')
const DHT = require('bittorrent-dht')
const dht = new DHT()
const value = Buffer.alloc(200).fill('abc')

@@ -250,10 +250,10 @@ dht.put({ v: value }, function (err, hash) {

* `opts.cas` - optional previous sequence for compare-and-swap
* `opts.salt` - optional salt buffer to include (< 64 bytes) when calculating
* `opts.salt` - optional salt buffer to include (<= 64 bytes) when calculating
the hash of the content. You can use a salt to have multiple mutable addresses
for the same public key `opts.k`.
Note that bittorrent bep44 uses ed25519 supercop/ref10 keys, NOT nacl/sodium
keys. You can use the [ed25519-supercop](https://npmjs.com/package/ed25519-supercop)
package to generate the appropriate signatures or
[bittorrent-dht-store-keypair](https://npmjs.com/package/bittorrent-dht-store-keypair)
Note that bittorrent bep44 uses ed25519. You can use the [ed25519-supercop](https://npmjs.com/package/ed25519-supercop)
package to generate the appropriate signatures,
[bittorrent-dht-store-keypair](https://npmjs.com/package/bittorrent-dht-store-keypair),
[bittorrent-dht-sodium](https://npmjs.com/package/bittorrent-dht-sodium) or
for a more convenient version.

@@ -265,17 +265,17 @@

``` js
var ed = require('ed25519-supercop')
var keypair = ed.createKeyPair(ed.createSeed())
const ed = require('bittorrent-dht-sodium')
const keypair = ed.keygen()
var value = Buffer.alloc(200).fill('whatever') // the payload you want to send
var opts = {
k: keypair.publicKey,
const value = Buffer.alloc(200).fill('whatever') // the payload you want to send
const opts = {
k: keypair.pk,
seq: 0,
v: value,
sign: function (buf) {
return ed.sign(buf, keypair.publicKey, keypair.secretKey)
return ed.sign(buf, keypair.sk)
}
}
var DHT = require('bittorrent-dht')
var dht = new DHT
const DHT = require('bittorrent-dht')
const dht = new DHT

@@ -300,4 +300,4 @@ dht.put(opts, function (err, hash) {

``` js
var ed = require('ed25519-supercop')
var dht = new DHT({ verify: ed.verify }) // you MUST specify the "verify" param if you want to get mutable content, otherwise null will be returned
const ed = require('bittorrent-dht-sodium')
const dht = new DHT({ verify: ed.verify }) // you MUST specify the "verify" param if you want to get mutable content, otherwise null will be returned

@@ -304,0 +304,0 @@ dht.get(key, function (err, res) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc