libp2p-kad-dht
Advanced tools
Comparing version 0.12.1 to 0.13.0
@@ -0,1 +1,16 @@ | ||
<a name="0.13.0"></a> | ||
# [0.13.0](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.12.1...v0.13.0) (2018-12-05) | ||
### Bug Fixes | ||
* make 'find peer query' test reliable ([#58](https://github.com/libp2p/js-libp2p-kad-dht/issues/58)) ([54336dd](https://github.com/libp2p/js-libp2p-kad-dht/commit/54336dd)) | ||
### Features | ||
* run queries on disjoint paths ([#37](https://github.com/libp2p/js-libp2p-kad-dht/issues/37)) ([#39](https://github.com/libp2p/js-libp2p-kad-dht/issues/39)) ([742b3fb](https://github.com/libp2p/js-libp2p-kad-dht/commit/742b3fb)) | ||
<a name="0.12.1"></a> | ||
@@ -2,0 +17,0 @@ ## [0.12.1](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.12.0...v0.12.1) (2018-11-30) |
{ | ||
"name": "libp2p-kad-dht", | ||
"version": "0.12.1", | ||
"version": "0.13.0", | ||
"description": "JavaScript implementation of the Kad-DHT for libp2p", | ||
@@ -82,9 +82,10 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>", | ||
"Jacob Heun <jacobheun@gmail.com>", | ||
"John Hiesey <jhiesey@cs.stanford.edu>", | ||
"Lars Gierth <larsg@systemli.org>", | ||
"Pedro Teixeira <i@pgte.me>", | ||
"Richard Schneider <makaretu@gmail.com>", | ||
"Vasco Santos <vasco.santos@moxy.studio>", | ||
"Vasco Santos <vasco.santos@ua.pt>", | ||
"Vasco Santos <vasco.santos@moxy.studio>", | ||
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <victorbjelkholm@gmail.com>" | ||
] | ||
} |
@@ -34,2 +34,6 @@ 'use strict' | ||
// Number of disjoint query paths to use | ||
// This is set to K/2 per the S/Kademlia paper | ||
exports.DISJOINT_PATHS = 10 | ||
exports.maxMessageSize = 2 << 22 // 4MB |
129
src/index.js
@@ -256,3 +256,3 @@ 'use strict' | ||
this._log('getMany %b (%s)', key, nvals) | ||
const vals = [] | ||
let vals = [] | ||
@@ -275,2 +275,3 @@ this._getLocal(key, (err, localRec) => { | ||
const paths = [] | ||
waterfall([ | ||
@@ -289,28 +290,36 @@ (cb) => utils.convertBuffer(key, cb), | ||
// we have peers, lets do the actualy query to them | ||
const query = new Query(this, key, (peer, cb) => { | ||
this._getValueOrPeers(peer, key, (err, rec, peers) => { | ||
if (err) { | ||
// If we have an invalid record we just want to continue and fetch a new one. | ||
if (!(err.code === 'ERR_INVALID_RECORD')) { | ||
return cb(err) | ||
// we have peers, lets do the actual query to them | ||
const query = new Query(this, key, (pathIndex, numPaths) => { | ||
// This function body runs once per disjoint path | ||
const pathSize = utils.pathSize(nvals - vals.length, numPaths) | ||
const pathVals = [] | ||
paths.push(pathVals) | ||
// Here we return the query function to use on this particular disjoint path | ||
return (peer, cb) => { | ||
this._getValueOrPeers(peer, key, (err, rec, peers) => { | ||
if (err) { | ||
// If we have an invalid record we just want to continue and fetch a new one. | ||
if (!(err.code === 'ERR_INVALID_RECORD')) { | ||
return cb(err) | ||
} | ||
} | ||
} | ||
const res = { closerPeers: peers } | ||
const res = { closerPeers: peers } | ||
if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) { | ||
vals.push({ | ||
val: rec && rec.value, | ||
from: peer | ||
}) | ||
} | ||
if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) { | ||
pathVals.push({ | ||
val: rec && rec.value, | ||
from: peer | ||
}) | ||
} | ||
// enough is enough | ||
if (vals.length >= nvals) { | ||
res.success = true | ||
} | ||
// enough is enough | ||
if (pathVals.length >= pathSize) { | ||
res.success = true | ||
} | ||
cb(null, res) | ||
}) | ||
cb(null, res) | ||
}) | ||
} | ||
}) | ||
@@ -322,2 +331,5 @@ | ||
], (err) => { | ||
// combine vals from each path | ||
vals = [].concat.apply(vals, paths).slice(0, nvals) | ||
if (err && vals.length === 0) { | ||
@@ -348,11 +360,16 @@ return callback(err) | ||
const q = new Query(this, key, (peer, callback) => { | ||
waterfall([ | ||
(cb) => this._closerPeersSingle(key, peer, cb), | ||
(closer, cb) => { | ||
cb(null, { | ||
closerPeers: closer | ||
}) | ||
} | ||
], callback) | ||
const q = new Query(this, key, () => { | ||
// There is no distinction between the disjoint paths, | ||
// so there are no per-path variables in this scope. | ||
// Just return the actual query function. | ||
return (peer, callback) => { | ||
waterfall([ | ||
(cb) => this._closerPeersSingle(key, peer, cb), | ||
(closer, cb) => { | ||
cb(null, { | ||
closerPeers: closer | ||
}) | ||
} | ||
], callback) | ||
} | ||
}) | ||
@@ -557,21 +574,26 @@ | ||
// query the network | ||
const query = new Query(this, id.id, (peer, cb) => { | ||
waterfall([ | ||
(cb) => this._findPeerSingle(peer, id, cb), | ||
(msg, cb) => { | ||
const match = msg.closerPeers.find((p) => p.id.isEqual(id)) | ||
const query = new Query(this, id.id, () => { | ||
// There is no distinction between the disjoint paths, | ||
// so there are no per-path variables in this scope. | ||
// Just return the actual query function. | ||
return (peer, cb) => { | ||
waterfall([ | ||
(cb) => this._findPeerSingle(peer, id, cb), | ||
(msg, cb) => { | ||
const match = msg.closerPeers.find((p) => p.id.isEqual(id)) | ||
// found it | ||
if (match) { | ||
return cb(null, { | ||
peer: match, | ||
success: true | ||
// found it | ||
if (match) { | ||
return cb(null, { | ||
peer: match, | ||
success: true | ||
}) | ||
} | ||
cb(null, { | ||
closerPeers: msg.closerPeers | ||
}) | ||
} | ||
cb(null, { | ||
closerPeers: msg.closerPeers | ||
}) | ||
} | ||
], cb) | ||
], cb) | ||
} | ||
}) | ||
@@ -584,7 +606,14 @@ | ||
(result, cb) => { | ||
this._log('findPeer %s: %s', id.toB58String(), result.success) | ||
if (!result.peer) { | ||
let success = false | ||
result.paths.forEach((result) => { | ||
if (result.success) { | ||
success = true | ||
this.peerBook.put(result.peer) | ||
} | ||
}) | ||
this._log('findPeer %s: %s', id.toB58String(), success) | ||
if (!success) { | ||
return cb(errcode(new Error('No peer found'), 'ERR_NOT_FOUND')) | ||
} | ||
cb(null, result.peer) | ||
cb(null, this.peerBook.get(id)) | ||
} | ||
@@ -591,0 +620,0 @@ ], callback) |
@@ -492,24 +492,33 @@ 'use strict' | ||
// need more, query the network | ||
const query = new Query(dht, key.buffer, (peer, cb) => { | ||
waterfall([ | ||
(cb) => dht._findProvidersSingle(peer, key, cb), | ||
(msg, cb) => { | ||
const provs = msg.providerPeers | ||
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length) | ||
const paths = [] | ||
const query = new Query(dht, key.buffer, (pathIndex, numPaths) => { | ||
// This function body runs once per disjoint path | ||
const pathSize = utils.pathSize(out.length - n, numPaths) | ||
const pathProviders = new LimitedPeerList(pathSize) | ||
paths.push(pathProviders) | ||
provs.forEach((prov) => { | ||
out.push(dht.peerBook.put(prov)) | ||
}) | ||
// Here we return the query function to use on this particular disjoint path | ||
return (peer, cb) => { | ||
waterfall([ | ||
(cb) => dht._findProvidersSingle(peer, key, cb), | ||
(msg, cb) => { | ||
const provs = msg.providerPeers | ||
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length) | ||
// hooray we have all that we want | ||
if (out.length >= n) { | ||
return cb(null, {success: true}) | ||
provs.forEach((prov) => { | ||
pathProviders.push(dht.peerBook.put(prov)) | ||
}) | ||
// hooray we have all that we want | ||
if (pathProviders.length >= pathSize) { | ||
return cb(null, {success: true}) | ||
} | ||
// it looks like we want some more | ||
cb(null, { | ||
closerPeers: msg.closerPeers | ||
}) | ||
} | ||
// it looks like we want some more | ||
cb(null, { | ||
closerPeers: msg.closerPeers | ||
}) | ||
} | ||
], cb) | ||
], cb) | ||
} | ||
}) | ||
@@ -520,2 +529,9 @@ | ||
timeout((cb) => query.run(peers, cb), maxTimeout)((err) => { | ||
// combine peers from each path | ||
paths.forEach((path) => { | ||
path.toArray().forEach((peer) => { | ||
out.push(peer) | ||
}) | ||
}) | ||
if (err) { | ||
@@ -522,0 +538,0 @@ if (err.code === 'ETIMEDOUT' && out.length > 0) { |
117
src/query.js
@@ -13,17 +13,34 @@ 'use strict' | ||
/** | ||
* Query peers from closest to farthest away. | ||
* Divide peers up into disjoint paths (subqueries). Any peer can only be used once over all paths. | ||
* Within each path, query peers from closest to farthest away. | ||
*/ | ||
class Query { | ||
/** | ||
* Create a new query. | ||
* User-supplied function to set up an individual disjoint path. Per-path | ||
* query state should be held in this function's closure. | ||
* @typedef {makePath} function | ||
* @param {number} pathNum - Numeric index from zero to numPaths - 1 | ||
* @returns {queryFunc} - Function to call on each peer in the query | ||
*/ | ||
/** | ||
* Query function. | ||
* @typedef {queryFunc} function | ||
* @param {PeerId} next - Peer to query | ||
* @param {function(Error, Object)} callback - Query result callback | ||
*/ | ||
/** | ||
* Create a new query. The makePath function is called once per disjoint path, so that per-path | ||
* variables can be created in that scope. makePath then returns the actual query function (queryFunc) to | ||
* use when on that path. | ||
* | ||
* @param {DHT} dht - DHT instance | ||
* @param {Buffer} key | ||
* @param {function(PeerId, function(Error, Object))} query - The query function to exectue | ||
* | ||
* @param {makePath} makePath - Called to set up each disjoint path. Must return the query function. | ||
*/ | ||
constructor (dht, key, query) { | ||
constructor (dht, key, makePath) { | ||
this.dht = dht | ||
this.key = key | ||
this.query = query | ||
this.makePath = makePath | ||
this.concurrency = c.ALPHA | ||
@@ -44,3 +61,3 @@ this._log = utils.logger(this.dht.peerInfo.id, 'query:' + mh.toB58String(key)) | ||
errors: [], | ||
peersToQuery: null | ||
paths: null // array of states per disjoint path | ||
} | ||
@@ -53,10 +70,32 @@ | ||
waterfall([ | ||
(cb) => PeerQueue.fromKey(this.key, cb), | ||
(q, cb) => { | ||
run.peersToQuery = q | ||
each(peers, (p, cb) => addPeerToQuery(p, this.dht, run, cb), cb) | ||
}, | ||
(cb) => workerQueue(this, run, cb) | ||
], (err) => { | ||
// create correct number of paths | ||
const numPaths = Math.min(c.DISJOINT_PATHS, peers.length) | ||
const pathPeers = [] | ||
for (let i = 0; i < numPaths; i++) { | ||
pathPeers.push([]) | ||
} | ||
// assign peers to paths round-robin style | ||
peers.forEach((peer, i) => { | ||
pathPeers[i % numPaths].push(peer) | ||
}) | ||
run.paths = pathPeers.map((peers, i) => { | ||
return { | ||
peers, | ||
run, | ||
query: this.makePath(i, numPaths), | ||
peersToQuery: null | ||
} | ||
}) | ||
each(run.paths, (path, cb) => { | ||
waterfall([ | ||
(cb) => PeerQueue.fromKey(this.key, cb), | ||
(q, cb) => { | ||
path.peersToQuery = q | ||
each(path.peers, (p, cb) => addPeerToQuery(p, this.dht, path, cb), cb) | ||
}, | ||
(cb) => workerQueue(this, path, cb) | ||
], cb) | ||
}, (err, results) => { | ||
this._log('query:done') | ||
@@ -70,10 +109,15 @@ if (err) { | ||
} | ||
if (run.res && run.res.success) { | ||
run.res.finalSet = run.peersSeen | ||
return callback(null, run.res) | ||
run.res = { | ||
finalSet: run.peersSeen, | ||
paths: [] | ||
} | ||
callback(null, { | ||
finalSet: run.peersSeen | ||
run.paths.forEach((path) => { | ||
if (path.res && path.res.success) { | ||
run.res.paths.push(path.res) | ||
} | ||
}) | ||
callback(null, run.res) | ||
}) | ||
@@ -84,14 +128,16 @@ } | ||
/** | ||
* Use the queue from async to keep `concurrency` amount items running. | ||
* Use the queue from async to keep `concurrency` amount items running | ||
* per path. | ||
* | ||
* @param {Query} query | ||
* @param {Object} run | ||
* @param {Object} path | ||
* @param {function(Error)} callback | ||
* @returns {void} | ||
* @private | ||
*/ | ||
function workerQueue (query, run, callback) { | ||
function workerQueue (query, path, callback) { | ||
let killed = false | ||
const q = queue((next, cb) => { | ||
query._log('queue:work') | ||
execQuery(next, query, run, (err, done) => { | ||
execQuery(next, query, path, (err, done) => { | ||
// Ignore after kill | ||
@@ -117,4 +163,4 @@ if (killed) { | ||
while (q.length() < query.concurrency && | ||
run.peersToQuery.length > 0) { | ||
q.push(run.peersToQuery.dequeue()) | ||
path.peersToQuery.length > 0) { | ||
q.push(path.peersToQuery.dequeue()) | ||
} | ||
@@ -149,3 +195,3 @@ } | ||
* @param {Query} query | ||
* @param {Object} run | ||
* @param {Object} path | ||
* @param {function(Error)} callback | ||
@@ -155,9 +201,9 @@ * @returns {void} | ||
*/ | ||
function execQuery (next, query, run, callback) { | ||
query.query(next, (err, res) => { | ||
function execQuery (next, query, path, callback) { | ||
path.query(next, (err, res) => { | ||
if (err) { | ||
run.errors.push(err) | ||
path.run.errors.push(err) | ||
callback() | ||
} else if (res.success) { | ||
run.res = res | ||
path.res = res | ||
callback(null, true) | ||
@@ -171,3 +217,3 @@ } else if (res.closerPeers && res.closerPeers.length > 0) { | ||
closer = query.dht.peerBook.put(closer) | ||
addPeerToQuery(closer.id, query.dht, run, cb) | ||
addPeerToQuery(closer.id, query.dht, path, cb) | ||
}, callback) | ||
@@ -185,3 +231,3 @@ } else { | ||
* @param {DHT} dht | ||
* @param {Object} run | ||
* @param {Object} path | ||
* @param {function(Error)} callback | ||
@@ -191,3 +237,4 @@ * @returns {void} | ||
*/ | ||
function addPeerToQuery (next, dht, run, callback) { | ||
function addPeerToQuery (next, dht, path, callback) { | ||
const run = path.run | ||
if (dht._isSelf(next)) { | ||
@@ -202,5 +249,5 @@ return callback() | ||
run.peersSeen.add(next) | ||
run.peersToQuery.enqueue(next, callback) | ||
path.peersToQuery.enqueue(next, callback) | ||
} | ||
module.exports = Query |
@@ -137,2 +137,14 @@ 'use strict' | ||
/** | ||
* Computes how many results to collect on each disjoint path, rounding up. | ||
* This ensures that we look for at least one result per path. | ||
* | ||
* @param {number} resultsWanted | ||
* @param {number} numPaths - total number of paths | ||
* @returns {number} | ||
*/ | ||
exports.pathSize = (resultsWanted, numPaths) => { | ||
return Math.ceil(resultsWanted / numPaths) | ||
} | ||
/** | ||
* Create a new put record, encodes and signs it if enabled. | ||
@@ -139,0 +151,0 @@ * |
@@ -589,2 +589,3 @@ /* eslint-env mocha */ | ||
const val = Buffer.from('foobar') | ||
const connected = {} // indexes in others that are reachable from guy | ||
@@ -595,2 +596,3 @@ series([ | ||
const t = 20 + random(79) | ||
connected[t] = true | ||
connect(others[i], others[t], cb) | ||
@@ -600,2 +602,3 @@ }, cb) | ||
(cb) => times(20, (i, cb) => { | ||
connected[i] = true | ||
connect(guy, others[i], cb) | ||
@@ -616,5 +619,7 @@ }, cb), | ||
const connectedIds = ids.slice(1).filter((id, i) => connected[i]) | ||
series([ | ||
(cb) => guy.getClosestPeers(val, cb), | ||
(cb) => kadUtils.sortClosestPeers(ids.slice(1), rtval, cb) | ||
(cb) => kadUtils.sortClosestPeers(connectedIds, rtval, cb) | ||
], (err, res) => { | ||
@@ -621,0 +626,0 @@ expect(err).to.not.exist() |
@@ -16,2 +16,3 @@ /* eslint-env mocha */ | ||
const createPeerInfo = require('./utils/create-peer-info') | ||
const createDisjointTracks = require('./utils/create-disjoint-tracks') | ||
@@ -24,3 +25,3 @@ describe('Query', () => { | ||
this.timeout(5 * 1000) | ||
createPeerInfo(3, (err, result) => { | ||
createPeerInfo(10, (err, result) => { | ||
if (err) { | ||
@@ -63,7 +64,7 @@ return done(err) | ||
const q = new Query(dht, peer.id.id, query) | ||
const q = new Query(dht, peer.id.id, () => query) | ||
q.run([peerInfos[1].id], (err, res) => { | ||
expect(err).to.not.exist() | ||
expect(res.value).to.eql(Buffer.from('cool')) | ||
expect(res.success).to.eql(true) | ||
expect(res.paths[0].value).to.eql(Buffer.from('cool')) | ||
expect(res.paths[0].success).to.eql(true) | ||
expect(res.finalSet.size).to.eql(2) | ||
@@ -82,3 +83,3 @@ done() | ||
const q = new Query(dht, peer.id.id, query) | ||
const q = new Query(dht, peer.id.id, () => query) | ||
q.run([peerInfos[1].id], (err, res) => { | ||
@@ -103,3 +104,3 @@ expect(err).to.exist() | ||
const q = new Query(dht, peer.id.id, query) | ||
const q = new Query(dht, peer.id.id, () => query) | ||
q.run([peerInfos[1].id], (err, res) => { | ||
@@ -111,2 +112,55 @@ expect(err).to.not.exist() | ||
}) | ||
/* | ||
* This test creates two disjoint tracks of peers, one for | ||
* each of the query's two paths to follow. The "good" | ||
* track that leads to the target initially has high | ||
* distances to the target, while the "bad" track that | ||
* goes nowhere has small distances to the target. | ||
* Only by going down both simultaneously will it find | ||
* the target before the end of the bad track. The greedy | ||
* behavior without disjoint paths would reach the target | ||
* only after visiting every single peer. | ||
* | ||
* xor distance to target | ||
* far <-----------------------------------------------> close | ||
* <us> | ||
* <good 0> <g 1> <g 2> <target> | ||
* <bad 0> <b 1> ... <b n> | ||
* | ||
*/ | ||
it('uses disjoint paths', (done) => { | ||
const goodLength = 3 | ||
createDisjointTracks(peerInfos, goodLength, (err, targetId, starts, getResponse) => { | ||
expect(err).to.not.exist() | ||
// mock this so we can dial non existing peers | ||
dht.switch.dial = (peer, callback) => callback() | ||
let badEndVisited = false | ||
const q = new Query(dht, targetId, (trackNum) => { | ||
return (p, cb) => { | ||
const response = getResponse(p, trackNum) | ||
expect(response).to.exist() // or we aren't on the right track | ||
if (response.end && !response.success) { | ||
badEndVisited = true | ||
} | ||
if (response.success) { | ||
expect(badEndVisited).to.eql(false) | ||
} | ||
cb(null, response) | ||
} | ||
}) | ||
q.concurrency = 1 | ||
// due to round-robin allocation of peers from starts, first | ||
// path is good, second bad | ||
q.run(starts, (err, res) => { | ||
expect(err).to.not.exist() | ||
// we should visit all nodes (except the target) | ||
expect(res.finalSet.size).to.eql(peerInfos.length - 1) | ||
// there should be one successful path | ||
expect(res.paths.length).to.eql(1) | ||
done() | ||
}) | ||
}) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
172185
57
5295