Socket
Socket
Sign inDemoInstall

libp2p-kad-dht

Package Overview
Dependencies
Maintainers
2
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

libp2p-kad-dht - npm Package Compare versions

Comparing version 0.12.1 to 0.13.0

test/utils/create-disjoint-tracks.js

15

CHANGELOG.md

@@ -0,1 +1,16 @@

<a name="0.13.0"></a>
# [0.13.0](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.12.1...v0.13.0) (2018-12-05)
### Bug Fixes
* make 'find peer query' test reliable ([#58](https://github.com/libp2p/js-libp2p-kad-dht/issues/58)) ([54336dd](https://github.com/libp2p/js-libp2p-kad-dht/commit/54336dd))
### Features
* run queries on disjoint paths ([#37](https://github.com/libp2p/js-libp2p-kad-dht/issues/37)) ([#39](https://github.com/libp2p/js-libp2p-kad-dht/issues/39)) ([742b3fb](https://github.com/libp2p/js-libp2p-kad-dht/commit/742b3fb))
<a name="0.12.1"></a>

@@ -2,0 +17,0 @@ ## [0.12.1](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.12.0...v0.12.1) (2018-11-30)

5

package.json
{
"name": "libp2p-kad-dht",
"version": "0.12.1",
"version": "0.13.0",
"description": "JavaScript implementation of the Kad-DHT for libp2p",

@@ -82,9 +82,10 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",

"Jacob Heun <jacobheun@gmail.com>",
"John Hiesey <jhiesey@cs.stanford.edu>",
"Lars Gierth <larsg@systemli.org>",
"Pedro Teixeira <i@pgte.me>",
"Richard Schneider <makaretu@gmail.com>",
"Vasco Santos <vasco.santos@moxy.studio>",
"Vasco Santos <vasco.santos@ua.pt>",
"Vasco Santos <vasco.santos@moxy.studio>",
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <victorbjelkholm@gmail.com>"
]
}

4

src/constants.js

@@ -34,2 +34,6 @@ 'use strict'

// Number of disjoint query paths to use
// This is set to K/2 per the S/Kademlia paper
exports.DISJOINT_PATHS = 10
exports.maxMessageSize = 2 << 22 // 4MB

@@ -256,3 +256,3 @@ 'use strict'

this._log('getMany %b (%s)', key, nvals)
const vals = []
let vals = []

@@ -275,2 +275,3 @@ this._getLocal(key, (err, localRec) => {

const paths = []
waterfall([

@@ -289,28 +290,36 @@ (cb) => utils.convertBuffer(key, cb),

// we have peers, lets do the actualy query to them
const query = new Query(this, key, (peer, cb) => {
this._getValueOrPeers(peer, key, (err, rec, peers) => {
if (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (!(err.code === 'ERR_INVALID_RECORD')) {
return cb(err)
// we have peers, lets do the actual query to them
const query = new Query(this, key, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - vals.length, numPaths)
const pathVals = []
paths.push(pathVals)
// Here we return the query function to use on this particular disjoint path
return (peer, cb) => {
this._getValueOrPeers(peer, key, (err, rec, peers) => {
if (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (!(err.code === 'ERR_INVALID_RECORD')) {
return cb(err)
}
}
}
const res = { closerPeers: peers }
const res = { closerPeers: peers }
if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) {
vals.push({
val: rec && rec.value,
from: peer
})
}
if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) {
pathVals.push({
val: rec && rec.value,
from: peer
})
}
// enough is enough
if (vals.length >= nvals) {
res.success = true
}
// enough is enough
if (pathVals.length >= pathSize) {
res.success = true
}
cb(null, res)
})
cb(null, res)
})
}
})

@@ -322,2 +331,5 @@

], (err) => {
// combine vals from each path
vals = [].concat.apply(vals, paths).slice(0, nvals)
if (err && vals.length === 0) {

@@ -348,11 +360,16 @@ return callback(err)

const q = new Query(this, key, (peer, callback) => {
waterfall([
(cb) => this._closerPeersSingle(key, peer, cb),
(closer, cb) => {
cb(null, {
closerPeers: closer
})
}
], callback)
const q = new Query(this, key, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in this scope.
// Just return the actual query function.
return (peer, callback) => {
waterfall([
(cb) => this._closerPeersSingle(key, peer, cb),
(closer, cb) => {
cb(null, {
closerPeers: closer
})
}
], callback)
}
})

@@ -557,21 +574,26 @@

// query the network
const query = new Query(this, id.id, (peer, cb) => {
waterfall([
(cb) => this._findPeerSingle(peer, id, cb),
(msg, cb) => {
const match = msg.closerPeers.find((p) => p.id.isEqual(id))
const query = new Query(this, id.id, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in this scope.
// Just return the actual query function.
return (peer, cb) => {
waterfall([
(cb) => this._findPeerSingle(peer, id, cb),
(msg, cb) => {
const match = msg.closerPeers.find((p) => p.id.isEqual(id))
// found it
if (match) {
return cb(null, {
peer: match,
success: true
// found it
if (match) {
return cb(null, {
peer: match,
success: true
})
}
cb(null, {
closerPeers: msg.closerPeers
})
}
cb(null, {
closerPeers: msg.closerPeers
})
}
], cb)
], cb)
}
})

@@ -584,7 +606,14 @@

(result, cb) => {
this._log('findPeer %s: %s', id.toB58String(), result.success)
if (!result.peer) {
let success = false
result.paths.forEach((result) => {
if (result.success) {
success = true
this.peerBook.put(result.peer)
}
})
this._log('findPeer %s: %s', id.toB58String(), success)
if (!success) {
return cb(errcode(new Error('No peer found'), 'ERR_NOT_FOUND'))
}
cb(null, result.peer)
cb(null, this.peerBook.get(id))
}

@@ -591,0 +620,0 @@ ], callback)

@@ -492,24 +492,33 @@ 'use strict'

// need more, query the network
const query = new Query(dht, key.buffer, (peer, cb) => {
waterfall([
(cb) => dht._findProvidersSingle(peer, key, cb),
(msg, cb) => {
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)
const paths = []
const query = new Query(dht, key.buffer, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(out.length - n, numPaths)
const pathProviders = new LimitedPeerList(pathSize)
paths.push(pathProviders)
provs.forEach((prov) => {
out.push(dht.peerBook.put(prov))
})
// Here we return the query function to use on this particular disjoint path
return (peer, cb) => {
waterfall([
(cb) => dht._findProvidersSingle(peer, key, cb),
(msg, cb) => {
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)
// hooray we have all that we want
if (out.length >= n) {
return cb(null, {success: true})
provs.forEach((prov) => {
pathProviders.push(dht.peerBook.put(prov))
})
// hooray we have all that we want
if (pathProviders.length >= pathSize) {
return cb(null, {success: true})
}
// it looks like we want some more
cb(null, {
closerPeers: msg.closerPeers
})
}
// it looks like we want some more
cb(null, {
closerPeers: msg.closerPeers
})
}
], cb)
], cb)
}
})

@@ -520,2 +529,9 @@

timeout((cb) => query.run(peers, cb), maxTimeout)((err) => {
// combine peers from each path
paths.forEach((path) => {
path.toArray().forEach((peer) => {
out.push(peer)
})
})
if (err) {

@@ -522,0 +538,0 @@ if (err.code === 'ETIMEDOUT' && out.length > 0) {

@@ -13,17 +13,34 @@ 'use strict'

/**
* Query peers from closest to farthest away.
* Divide peers up into disjoint paths (subqueries). Any peer can only be used once over all paths.
* Within each path, query peers from closest to farthest away.
*/
class Query {
/**
* Create a new query.
* User-supplied function to set up an individual disjoint path. Per-path
* query state should be held in this function's closure.
* @typedef {makePath} function
* @param {number} pathNum - Numeric index from zero to numPaths - 1
* @returns {queryFunc} - Function to call on each peer in the query
*/
/**
* Query function.
* @typedef {queryFunc} function
* @param {PeerId} next - Peer to query
* @param {function(Error, Object)} callback - Query result callback
*/
/**
* Create a new query. The makePath function is called once per disjoint path, so that per-path
* variables can be created in that scope. makePath then returns the actual query function (queryFunc) to
* use when on that path.
*
* @param {DHT} dht - DHT instance
* @param {Buffer} key
* @param {function(PeerId, function(Error, Object))} query - The query function to exectue
*
* @param {makePath} makePath - Called to set up each disjoint path. Must return the query function.
*/
constructor (dht, key, query) {
constructor (dht, key, makePath) {
this.dht = dht
this.key = key
this.query = query
this.makePath = makePath
this.concurrency = c.ALPHA

@@ -44,3 +61,3 @@ this._log = utils.logger(this.dht.peerInfo.id, 'query:' + mh.toB58String(key))

errors: [],
peersToQuery: null
paths: null // array of states per disjoint path
}

@@ -53,10 +70,32 @@

waterfall([
(cb) => PeerQueue.fromKey(this.key, cb),
(q, cb) => {
run.peersToQuery = q
each(peers, (p, cb) => addPeerToQuery(p, this.dht, run, cb), cb)
},
(cb) => workerQueue(this, run, cb)
], (err) => {
// create correct number of paths
const numPaths = Math.min(c.DISJOINT_PATHS, peers.length)
const pathPeers = []
for (let i = 0; i < numPaths; i++) {
pathPeers.push([])
}
// assign peers to paths round-robin style
peers.forEach((peer, i) => {
pathPeers[i % numPaths].push(peer)
})
run.paths = pathPeers.map((peers, i) => {
return {
peers,
run,
query: this.makePath(i, numPaths),
peersToQuery: null
}
})
each(run.paths, (path, cb) => {
waterfall([
(cb) => PeerQueue.fromKey(this.key, cb),
(q, cb) => {
path.peersToQuery = q
each(path.peers, (p, cb) => addPeerToQuery(p, this.dht, path, cb), cb)
},
(cb) => workerQueue(this, path, cb)
], cb)
}, (err, results) => {
this._log('query:done')

@@ -70,10 +109,15 @@ if (err) {

}
if (run.res && run.res.success) {
run.res.finalSet = run.peersSeen
return callback(null, run.res)
run.res = {
finalSet: run.peersSeen,
paths: []
}
callback(null, {
finalSet: run.peersSeen
run.paths.forEach((path) => {
if (path.res && path.res.success) {
run.res.paths.push(path.res)
}
})
callback(null, run.res)
})

@@ -84,14 +128,16 @@ }

/**
* Use the queue from async to keep `concurrency` amount items running.
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @param {Query} query
* @param {Object} run
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
*/
function workerQueue (query, run, callback) {
function workerQueue (query, path, callback) {
let killed = false
const q = queue((next, cb) => {
query._log('queue:work')
execQuery(next, query, run, (err, done) => {
execQuery(next, query, path, (err, done) => {
// Ignore after kill

@@ -117,4 +163,4 @@ if (killed) {

while (q.length() < query.concurrency &&
run.peersToQuery.length > 0) {
q.push(run.peersToQuery.dequeue())
path.peersToQuery.length > 0) {
q.push(path.peersToQuery.dequeue())
}

@@ -149,3 +195,3 @@ }

* @param {Query} query
* @param {Object} run
* @param {Object} path
* @param {function(Error)} callback

@@ -155,9 +201,9 @@ * @returns {void}

*/
function execQuery (next, query, run, callback) {
query.query(next, (err, res) => {
function execQuery (next, query, path, callback) {
path.query(next, (err, res) => {
if (err) {
run.errors.push(err)
path.run.errors.push(err)
callback()
} else if (res.success) {
run.res = res
path.res = res
callback(null, true)

@@ -171,3 +217,3 @@ } else if (res.closerPeers && res.closerPeers.length > 0) {

closer = query.dht.peerBook.put(closer)
addPeerToQuery(closer.id, query.dht, run, cb)
addPeerToQuery(closer.id, query.dht, path, cb)
}, callback)

@@ -185,3 +231,3 @@ } else {

* @param {DHT} dht
* @param {Object} run
* @param {Object} path
* @param {function(Error)} callback

@@ -191,3 +237,4 @@ * @returns {void}

*/
function addPeerToQuery (next, dht, run, callback) {
function addPeerToQuery (next, dht, path, callback) {
const run = path.run
if (dht._isSelf(next)) {

@@ -202,5 +249,5 @@ return callback()

run.peersSeen.add(next)
run.peersToQuery.enqueue(next, callback)
path.peersToQuery.enqueue(next, callback)
}
module.exports = Query

@@ -137,2 +137,14 @@ 'use strict'

/**
* Computes how many results to collect on each disjoint path, rounding up.
* This ensures that we look for at least one result per path.
*
* @param {number} resultsWanted
* @param {number} numPaths - total number of paths
* @returns {number}
*/
exports.pathSize = (resultsWanted, numPaths) => {
return Math.ceil(resultsWanted / numPaths)
}
/**
* Create a new put record, encodes and signs it if enabled.

@@ -139,0 +151,0 @@ *

@@ -589,2 +589,3 @@ /* eslint-env mocha */

const val = Buffer.from('foobar')
const connected = {} // indexes in others that are reachable from guy

@@ -595,2 +596,3 @@ series([

const t = 20 + random(79)
connected[t] = true
connect(others[i], others[t], cb)

@@ -600,2 +602,3 @@ }, cb)

(cb) => times(20, (i, cb) => {
connected[i] = true
connect(guy, others[i], cb)

@@ -616,5 +619,7 @@ }, cb),

const connectedIds = ids.slice(1).filter((id, i) => connected[i])
series([
(cb) => guy.getClosestPeers(val, cb),
(cb) => kadUtils.sortClosestPeers(ids.slice(1), rtval, cb)
(cb) => kadUtils.sortClosestPeers(connectedIds, rtval, cb)
], (err, res) => {

@@ -621,0 +626,0 @@ expect(err).to.not.exist()

@@ -16,2 +16,3 @@ /* eslint-env mocha */

const createPeerInfo = require('./utils/create-peer-info')
const createDisjointTracks = require('./utils/create-disjoint-tracks')

@@ -24,3 +25,3 @@ describe('Query', () => {

this.timeout(5 * 1000)
createPeerInfo(3, (err, result) => {
createPeerInfo(10, (err, result) => {
if (err) {

@@ -63,7 +64,7 @@ return done(err)

const q = new Query(dht, peer.id.id, query)
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {
expect(err).to.not.exist()
expect(res.value).to.eql(Buffer.from('cool'))
expect(res.success).to.eql(true)
expect(res.paths[0].value).to.eql(Buffer.from('cool'))
expect(res.paths[0].success).to.eql(true)
expect(res.finalSet.size).to.eql(2)

@@ -82,3 +83,3 @@ done()

const q = new Query(dht, peer.id.id, query)
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {

@@ -103,3 +104,3 @@ expect(err).to.exist()

const q = new Query(dht, peer.id.id, query)
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {

@@ -111,2 +112,55 @@ expect(err).to.not.exist()

})
/*
* This test creates two disjoint tracks of peers, one for
* each of the query's two paths to follow. The "good"
* track that leads to the target initially has high
* distances to the target, while the "bad" track that
* goes nowhere has small distances to the target.
* Only by going down both simultaneously will it find
* the target before the end of the bad track. The greedy
* behavior without disjoint paths would reach the target
* only after visiting every single peer.
*
* xor distance to target
* far <-----------------------------------------------> close
* <us>
* <good 0> <g 1> <g 2> <target>
* <bad 0> <b 1> ... <b n>
*
*/
it('uses disjoint paths', (done) => {
const goodLength = 3
createDisjointTracks(peerInfos, goodLength, (err, targetId, starts, getResponse) => {
expect(err).to.not.exist()
// mock this so we can dial non existing peers
dht.switch.dial = (peer, callback) => callback()
let badEndVisited = false
const q = new Query(dht, targetId, (trackNum) => {
return (p, cb) => {
const response = getResponse(p, trackNum)
expect(response).to.exist() // or we aren't on the right track
if (response.end && !response.success) {
badEndVisited = true
}
if (response.success) {
expect(badEndVisited).to.eql(false)
}
cb(null, response)
}
})
q.concurrency = 1
// due to round-robin allocation of peers from starts, first
// path is good, second bad
q.run(starts, (err, res) => {
expect(err).to.not.exist()
// we should visit all nodes (except the target)
expect(res.finalSet.size).to.eql(peerInfos.length - 1)
// there should be one successful path
expect(res.paths.length).to.eql(1)
done()
})
})
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc