Socket
Socket
Sign inDemoInstall

libp2p-kad-dht

Package Overview
Dependencies
Maintainers
2
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

libp2p-kad-dht - npm Package Compare versions

Comparing version 0.14.10 to 0.14.11

11

CHANGELOG.md

@@ -0,1 +1,12 @@

<a name="0.14.11"></a>
## [0.14.11](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.14.10...v0.14.11) (2019-03-28)
### Bug Fixes
* ensure queries stop after error or success ([#93](https://github.com/libp2p/js-libp2p-kad-dht/issues/93)) ([0e55b20](https://github.com/libp2p/js-libp2p-kad-dht/commit/0e55b20))
* getMany with nvals=1 now goes out to network if no local val ([#91](https://github.com/libp2p/js-libp2p-kad-dht/issues/91)) ([478ee88](https://github.com/libp2p/js-libp2p-kad-dht/commit/478ee88))
<a name="0.14.10"></a>

@@ -2,0 +13,0 @@ ## [0.14.10](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.14.9...v0.14.10) (2019-03-27)

3

package.json
{
"name": "libp2p-kad-dht",
"version": "0.14.10",
"version": "0.14.11",
"description": "JavaScript implementation of the Kad-DHT for libp2p",

@@ -93,4 +93,5 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",

"Vasco Santos <vasco.santos@ua.pt>",
"dirkmc <dirk@mccormick.cx>",
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <victorbjelkholm@gmail.com>"
]
}

@@ -28,2 +28,5 @@ 'use strict'

// The number of records that will be retrieved on a call to getMany()
exports.GET_MANY_RECORD_COUNT = 16
// K is the maximum number of requests to perform before returning failue

@@ -30,0 +33,0 @@ exports.K = 20

@@ -316,3 +316,3 @@ 'use strict'

if (nvals <= 1) {
if (vals.length >= nvals) {
return callback(null, vals)

@@ -363,3 +363,3 @@ }

if (pathVals.length >= pathSize) {
res.success = true
res.pathComplete = true
}

@@ -373,3 +373,8 @@

// run our query
timeout((cb) => query.run(rtp, cb), options.timeout)(cb)
timeout((_cb) => {
query.run(rtp, _cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
})
}

@@ -425,3 +430,3 @@ ], (err) => {

closerPeers: closer,
success: options.shallow ? true : undefined
pathComplete: options.shallow ? true : undefined
})

@@ -643,3 +648,3 @@ }

peer: match,
success: true
queryComplete: true
})

@@ -656,5 +661,8 @@ }

timeout((cb) => {
query.run(peers, cb)
}, options.timeout)(cb)
timeout((_cb) => {
query.run(peers, _cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
})
},

@@ -661,0 +669,0 @@ (result, cb) => {

@@ -272,3 +272,3 @@ 'use strict'

waterfall([
(cb) => dht.getMany(key, 16, options, cb),
(cb) => dht.getMany(key, c.GET_MANY_RECORD_COUNT, options, cb),
(vals, cb) => {

@@ -514,3 +514,3 @@ const recs = vals.map((v) => v.val)

if (pathProviders.length >= pathSize) {
return cb(null, { success: true })
return cb(null, { pathComplete: true })
}

@@ -530,2 +530,4 @@

timeout((cb) => query.run(peers, cb), providerTimeout)((err) => {
query.stop()
// combine peers from each path

@@ -532,0 +534,0 @@ paths.forEach((path) => {

@@ -88,2 +88,4 @@ 'use strict'

// Create a manager to keep track of the worker queue for each path
this.workerManager = new WorkerManager()
each(run.paths, (path, cb) => {

@@ -96,6 +98,12 @@ waterfall([

},
(cb) => workerQueue(this, path, cb)
(cb) => {
this.workerManager.workerQueue(this, path, cb)
}
], cb)
}, (err, results) => {
this._log('query:done')
// Ensure worker queues for all paths are stopped at the end of the query
this.workerManager.stop()
if (err) {

@@ -115,3 +123,4 @@ return callback(err)

run.paths.forEach((path) => {
if (path.res && path.res.success) {
if (path.res && (path.res.pathComplete || path.res.queryComplete)) {
path.res.success = true
run.res.paths.push(path.res)

@@ -124,97 +133,161 @@ }

}
/**
* Stop the query
*/
stop () {
this.workerManager && this.workerManager.stop()
}
}
/**
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
* Manages the worker queues for each path through the DHT
*/
function workerQueue (query, path, callback) {
let killed = false
const q = queue((next, cb) => {
query._log('queue:work')
execQuery(next, query, path, (err, done) => {
// Ignore after kill
if (killed) {
return cb()
}
query._log('queue:work:done', err, done)
if (err) {
return cb(err)
}
if (done) {
q.kill()
killed = true
return callback()
}
cb()
})
}, query.concurrency)
class WorkerManager {
/**
* Creates a new WorkerManager
*/
constructor () {
this.running = true
this.workers = []
}
const fill = () => {
query._log('queue:fill')
while (q.length() < query.concurrency &&
path.peersToQuery.length > 0) {
q.push(path.peersToQuery.dequeue())
/**
* Stop all the workers
*/
stop () {
this.running = false
for (const worker of this.workers) {
worker.stop()
}
}
fill()
/**
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
*/
workerQueue (query, path, callback) {
let workerRunning = true
const q = queue((next, cb) => {
query._log('queue:work')
this.execQuery(next, query, path, (err, state) => {
// Ignore response after worker killed
if (!workerRunning || !this.running) {
return cb()
}
// callback handling
q.error = (err) => {
query._log.error('queue', err)
callback(err)
}
query._log('queue:work:done', err, state)
if (err) {
return cb(err)
}
q.drain = () => {
query._log('queue:drain')
callback()
}
// If query is complete, stop all workers.
// Note: this.stop() calls stop() on all the workers, which kills the
// queue and calls callback(), so we don't need to call cb() here
if (state && state.queryComplete) {
query._log('query:complete')
return this.stop()
}
q.unsaturated = () => {
query._log('queue:unsatured')
// If path is complete, just stop this worker.
// Note: worker.stop() kills the queue and calls callback() so we don't
// need to call cb() here
if (state && state.pathComplete) {
return worker.stop()
}
// Otherwise, process next peer
cb()
})
}, query.concurrency)
// Keeps track of a running worker
const worker = {
stop: (err) => {
if (workerRunning) {
q.kill()
workerRunning = false
callback(err)
}
}
}
this.workers.push(worker)
// Add peers to the queue until there are enough to satisfy the concurrency
const fill = () => {
query._log('queue:fill')
while (q.length() < query.concurrency &&
path.peersToQuery.length > 0) {
q.push(path.peersToQuery.dequeue())
}
}
fill()
// If there's an error, stop the worker
q.error = (err) => {
query._log.error('queue', err)
worker.stop(err)
}
// When all peers in the queue have been processed, stop the worker
q.drain = () => {
query._log('queue:drain')
worker.stop()
}
// When a space opens up in the queue, add some more peers
q.unsaturated = () => {
query._log('queue:unsaturated')
fill()
}
q.buffer = 0
}
q.buffer = 0
}
/**
* Execute a query on the `next` peer.
*
* @param {PeerId} next
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
*/
execQuery (next, query, path, callback) {
path.query(next, (err, res) => {
// If the run has completed, bail out
if (!this.running) {
return callback()
}
/**
* Execute a query on the `next` peer.
*
* @param {PeerId} next
* @param {Query} query
* @param {Object} path
* @param {function(Error)} callback
* @returns {void}
* @private
*/
function execQuery (next, query, path, callback) {
path.query(next, (err, res) => {
if (err) {
path.run.errors.push(err)
callback()
} else if (res.success) {
path.res = res
callback(null, true)
} else if (res.closerPeers && res.closerPeers.length > 0) {
each(res.closerPeers, (closer, cb) => {
// don't add ourselves
if (query.dht._isSelf(closer.id)) {
return cb()
}
closer = query.dht.peerBook.put(closer)
query.dht._peerDiscovered(closer)
addPeerToQuery(closer.id, query.dht, path, cb)
}, callback)
} else {
callback()
}
})
if (err) {
path.run.errors.push(err)
callback()
} else if (res.pathComplete || res.queryComplete) {
path.res = res
callback(null, {
pathComplete: res.pathComplete,
queryComplete: res.queryComplete
})
} else if (res.closerPeers && res.closerPeers.length > 0) {
each(res.closerPeers, (closer, cb) => {
// don't add ourselves
if (query.dht._isSelf(closer.id)) {
return cb()
}
closer = query.dht.peerBook.put(closer)
query.dht._peerDiscovered(closer)
addPeerToQuery(closer.id, query.dht, path, cb)
}, callback)
} else {
callback()
}
})
}
}

@@ -221,0 +294,0 @@

@@ -889,2 +889,3 @@ /* eslint-env mocha */

})
it('delete entries received from peers that have expired', (done) => {

@@ -949,2 +950,36 @@ const sw = new Switch(peerInfos[0], new PeerBook())

describe('getMany', () => {
it('getMany with nvals=1 goes out to swarm if there is no local value', (done) => {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const dht = new KadDHT(sw)
const key = Buffer.from('/v/hello')
const value = Buffer.from('world')
const rec = new Record(key, value)
const stubs = [
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
})
]
dht.getMany(key, 1, (err, res) => {
expect(err).to.not.exist()
expect(res.length).to.eql(1)
expect(res[0].val).to.eql(value)
for (const stub of stubs) {
stub.restore()
}
done()
})
})
})
describe('errors', () => {

@@ -951,0 +986,0 @@ it('get many should fail if only has one peer', function (done) {

@@ -11,2 +11,3 @@ /* eslint-env mocha */

const Mplex = require('libp2p-mplex')
const setImmediate = require('async/setImmediate')

@@ -25,3 +26,3 @@ const DHT = require('../src')

this.timeout(5 * 1000)
createPeerInfo(10, (err, result) => {
createPeerInfo(12, (err, result) => {
if (err) {

@@ -55,3 +56,3 @@ return done(err)

value: Buffer.from('cool'),
success: true
pathComplete: true
})

@@ -75,2 +76,33 @@ }

it('does not return an error if only some queries error', (done) => {
const peer = peerInfos[0]
// mock this so we can dial non existing peers
dht.switch.dial = (peer, callback) => callback()
let i = 0
const query = (p, cb) => {
if (i++ === 1) {
return cb(new Error('fail'))
}
cb(null, {
closerPeers: [peerInfos[2]]
})
}
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {
expect(err).not.to.exist()
// Should have visited
// - the initial peer passed to the query: peerInfos[1]
// - the peer returned in closerPeers: peerInfos[2]
expect(res.finalSet.size).to.eql(2)
expect(res.finalSet.has(peerInfos[1].id)).to.equal(true)
expect(res.finalSet.has(peerInfos[2].id)).to.equal(true)
done()
})
})
it('returns an error if all queries error', (done) => {

@@ -112,2 +144,305 @@ const peer = peerInfos[0]

it('only closerPeers concurrent', (done) => {
const peer = peerInfos[0]
// mock this so we can dial non existing peers
dht.switch.dial = (peer, callback) => callback()
// 1 -> 8
// 2 -> 4 -> 5
// 6 -> 7
// 3 -> 9 -> 10
const topology = {
[peerInfos[1].id.toB58String()]: [
peerInfos[8]
],
[peerInfos[2].id.toB58String()]: [
peerInfos[4],
peerInfos[6]
],
[peerInfos[4].id.toB58String()]: [
peerInfos[5]
],
[peerInfos[6].id.toB58String()]: [
peerInfos[7]
],
[peerInfos[3].id.toB58String()]: [
peerInfos[9]
],
[peerInfos[9].id.toB58String()]: [
peerInfos[10]
]
}
const query = (p, cb) => {
const closer = topology[p.toB58String()]
cb(null, {
closerPeers: closer || []
})
}
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id, peerInfos[2].id, peerInfos[3].id], (err, res) => {
expect(err).to.not.exist()
// Should visit all peers
expect(res.finalSet.size).to.eql(10)
done()
})
})
it('early success', (done) => {
const peer = peerInfos[0]
// mock this so we can dial non existing peers
dht.switch.dial = (peer, callback) => callback()
// 1 -> 2 -> 3 -> 4
const topology = {
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
// Should stop here because pathComplete is true
[peerInfos[2].id.toB58String()]: {
closer: [peerInfos[3]],
pathComplete: true
},
// Should not reach here because previous query returns pathComplete
[peerInfos[3].id.toB58String()]: {
closer: [peerInfos[4]]
}
}
const query = (p, cb) => {
const res = topology[p.toB58String()] || {}
cb(null, {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
})
}
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {
expect(err).to.not.exist()
// Should complete successfully
expect(res.paths.length).to.eql(1)
expect(res.paths[0].success).to.eql(true)
// Should only visit peers up to the success peer
expect(res.finalSet.size).to.eql(2)
done()
})
})
it('disjoint path values', (done) => {
const peer = peerInfos[0]
const values = ['v0', 'v1'].map(Buffer.from)
// mock this so we can dial non existing peers
dht.switch.dial = (peer, callback) => callback()
// 1 -> 2 -> 3 (v0)
// 4 -> 5 (v1)
const topology = {
// Top level node
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
[peerInfos[2].id.toB58String()]: {
closer: [peerInfos[3]]
},
// v0
[peerInfos[3].id.toB58String()]: {
value: values[0],
pathComplete: true
},
// Top level node
[peerInfos[4].id.toB58String()]: {
closer: [peerInfos[5]]
},
// v1
[peerInfos[5].id.toB58String()]: {
value: values[1],
pathComplete: true
}
}
const query = (p, cb) => {
const res = topology[p.toB58String()] || {}
setTimeout(() => {
cb(null, {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
})
}, res.delay)
}
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id, peerInfos[4].id], (err, res) => {
expect(err).to.not.exist()
// We should get back the values from both paths
expect(res.paths.length).to.eql(2)
expect(res.paths[0].value).to.eql(values[0])
expect(res.paths[0].success).to.eql(true)
expect(res.paths[1].value).to.eql(values[1])
expect(res.paths[1].success).to.eql(true)
done()
})
})
it('disjoint path values with early completion', (done) => {
const peer = peerInfos[0]
const values = ['v0', 'v1'].map(Buffer.from)
// mock this so we can dial non existing peers
dht.switch.dial = (peer, callback) => callback()
// 1 -> 2 (delay) -> 3
// 4 -> 5 [query complete]
const topology = {
// Top level node
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
// This query has a delay which means it only returns after the other
// path has already indicated the query is complete, so its result
// should be ignored
[peerInfos[2].id.toB58String()]: {
delay: 100,
closer: [peerInfos[3]]
},
// Query has stopped by the time we reach here, should be ignored
[peerInfos[3].id.toB58String()]: {
value: values[0],
pathComplete: true
},
// Top level node
[peerInfos[4].id.toB58String()]: {
closer: [peerInfos[5]]
},
// This peer indicates that the query is complete
[peerInfos[5].id.toB58String()]: {
closer: [peerInfos[2]],
value: values[1],
queryComplete: true
}
}
const visited = []
const query = (p, cb) => {
visited.push(p)
const res = topology[p.toB58String()] || {}
setTimeout(() => {
cb(null, {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete,
queryComplete: res.queryComplete
})
}, res.delay)
}
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id, peerInfos[4].id], (err, res) => {
expect(err).to.not.exist()
// We should only get back the value from the path 4 -> 5
expect(res.paths.length).to.eql(1)
expect(res.paths[0].value).to.eql(values[1])
expect(res.paths[0].success).to.eql(true)
// Wait a little bit to make sure we don't continue down another path
// after finding a successful path
setTimeout(() => {
if (visited.indexOf(peerInfos[3].id) !== -1) {
expect.fail('Query continued after success was returned')
}
done()
}, 300)
})
})
it('disjoint path continue other paths after error on one path', (done) => {
const peer = peerInfos[0]
const values = ['v0', 'v1'].map(Buffer.from)
// mock this so we can dial non existing peers
dht.switch.dial = (peer, callback) => callback()
// 1 -> 2 (delay) -> 3 [pathComplete]
// 4 -> 5 [error] -> 6
const topology = {
// Top level node
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
// This query has a delay which means it only returns after the other
// path has already returned an error
[peerInfos[2].id.toB58String()]: {
delay: 100,
closer: [peerInfos[3]]
},
// Success peer, should get this value back at the end
[peerInfos[3].id.toB58String()]: {
value: values[0],
pathComplete: true
},
// Top level node
[peerInfos[4].id.toB58String()]: {
closer: [peerInfos[5]]
},
// Return an error at this point
[peerInfos[5].id.toB58String()]: {
closer: [peerInfos[6]],
error: true
},
// Should never reach here
[peerInfos[6].id.toB58String()]: {
value: values[1],
pathComplete: true
}
}
const visited = []
const query = (p, cb) => {
visited.push(p)
const res = topology[p.toB58String()] || {}
setTimeout(() => {
if (res.error) {
return cb(new Error('path error'))
}
cb(null, {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
})
}, res.delay)
}
const q = new Query(dht, peer.id.id, () => query)
q.run([peerInfos[1].id, peerInfos[4].id], (err, res) => {
expect(err).to.not.exist()
// We should only get back the value from the path 1 -> 2 -> 3
expect(res.paths.length).to.eql(1)
expect(res.paths[0].value).to.eql(values[0])
expect(res.paths[0].success).to.eql(true)
done()
})
})
/*

@@ -138,2 +473,3 @@ * This test creates two disjoint tracks of peers, one for

let badEndVisited = false
let targetVisited = false

@@ -144,9 +480,10 @@ const q = new Query(dht, targetId, (trackNum) => {

expect(response).to.exist() // or we aren't on the right track
if (response.end && !response.success) {
if (response.end && !response.pathComplete) {
badEndVisited = true
}
if (response.success) {
if (response.pathComplete) {
targetVisited = true
expect(badEndVisited).to.eql(false)
}
cb(null, response)
setImmediate(() => cb(null, response))
}

@@ -159,2 +496,4 @@ })

expect(err).to.not.exist()
// we should reach the target node
expect(targetVisited).to.eql(true)
// we should visit all nodes (except the target)

@@ -161,0 +500,0 @@ expect(res.finalSet.size).to.eql(peerInfos.length - 1)

@@ -73,6 +73,6 @@ 'use strict'

if (nextPos === track.length) {
if (trackNum === 0) { // good track; success
if (trackNum === 0) { // good track; pathComplete
return {
end: true,
success: true
pathComplete: true
}

@@ -79,0 +79,0 @@ } else { // bad track; dead end

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc