Socket
Socket
Sign inDemoInstall

libp2p-kad-dht

Package Overview
Dependencies
Maintainers
2
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

libp2p-kad-dht - npm Package Compare versions

Comparing version 0.14.11 to 0.14.12

src/query-manager.js

10

CHANGELOG.md

@@ -0,1 +1,11 @@

<a name="0.14.12"></a>
## [0.14.12](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.14.11...v0.14.12) (2019-04-04)
### Bug Fixes
* stop running queries on shutdown ([#95](https://github.com/libp2p/js-libp2p-kad-dht/issues/95)) ([e137297](https://github.com/libp2p/js-libp2p-kad-dht/commit/e137297))
<a name="0.14.11"></a>

@@ -2,0 +12,0 @@ ## [0.14.11](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.14.10...v0.14.11) (2019-03-28)

2

package.json
{
"name": "libp2p-kad-dht",
"version": "0.14.11",
"version": "0.14.12",
"description": "JavaScript implementation of the Kad-DHT for libp2p",

@@ -5,0 +5,0 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",

@@ -25,2 +25,3 @@ 'use strict'

const RandomWalk = require('./random-walk')
const QueryManager = require('./query-manager')
const assert = require('assert')

@@ -125,3 +126,3 @@ const mergeOptions = require('merge-options')

/**
* Provider management
* Random walk management
*

@@ -139,2 +140,9 @@ * @type {RandomWalk}

this.randomWalkTimeout = parseInt(options.randomWalk.timeout)
/**
* Keeps track of running queries
*
* @type {QueryManager}
*/
this._queryManager = new QueryManager()
}

@@ -159,2 +167,3 @@

this._running = true
this._queryManager.start()
this.network.start((err) => {

@@ -184,2 +193,3 @@ if (err) {

})
this._queryManager.stop()
}

@@ -186,0 +196,0 @@

@@ -57,2 +57,11 @@ 'use strict'

run (peers, callback) {
if (!this.dht._queryManager.running) {
this._log.error('Attempt to run query after shutdown')
return callback(null, { finalSet: new Set(), paths: [] })
}
if (peers.length === 0) {
this._log.error('Running query with no peers')
return callback(null, { finalSet: new Set(), paths: [] })
}
const run = {

@@ -64,7 +73,2 @@ peersSeen: new Set(),

if (peers.length === 0) {
this._log.error('Running query with no peers')
return callback()
}
// create correct number of paths

@@ -90,2 +94,5 @@ const numPaths = Math.min(c.DISJOINT_PATHS, peers.length)

// Register this query so we stop it if the DHT stops
this.dht._queryManager.queryStarted(this)
// Create a manager to keep track of the worker queue for each path

@@ -139,2 +146,3 @@ this.workerManager = new WorkerManager()

this.workerManager && this.workerManager.stop()
this.dht._queryManager.queryCompleted(this)
}

@@ -141,0 +149,0 @@ }

@@ -956,25 +956,26 @@ /* eslint-env mocha */

const dht = new KadDHT(sw)
dht.start(() => {
const key = Buffer.from('/v/hello')
const value = Buffer.from('world')
const rec = new Record(key, value)
const key = Buffer.from('/v/hello')
const value = Buffer.from('world')
const rec = new Record(key, value)
const stubs = [
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
})
]
const stubs = [
// Simulate returning a peer id to query
sinon.stub(dht.routingTable, 'closestPeers').returns([peerInfos[1].id]),
// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
dht.getMany(key, 1, (err, res) => {
expect(err).to.not.exist()
expect(res.length).to.eql(1)
expect(res[0].val).to.eql(value)
for (const stub of stubs) {
stub.restore()
}
done()
})
]
dht.getMany(key, 1, (err, res) => {
expect(err).to.not.exist()
expect(res.length).to.eql(1)
expect(res[0].val).to.eql(value)
for (const stub of stubs) {
stub.restore()
}
done()
})

@@ -981,0 +982,0 @@ })

@@ -19,2 +19,11 @@ /* eslint-env mocha */

const createDHT = (peerInfos, cb) => {
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
const d = new DHT(sw)
d.start(() => cb(null, d))
}
describe('Query', () => {

@@ -32,9 +41,10 @@ let peerInfos

peerInfos = result
const sw = new Switch(peerInfos[0], new PeerBook())
sw.transport.add('tcp', new TCP())
sw.connection.addStreamMuxer(Mplex)
sw.connection.reuse()
dht = new DHT(sw)
createDHT(peerInfos, (err, d) => {
if (err) {
return done(err)
}
done()
dht = d
done()
})
})

@@ -122,2 +132,19 @@ })

it('returns empty run if initial peer list is empty', (done) => {
const peer = peerInfos[0]
const query = (p, cb) => {}
const q = new Query(dht, peer.id.id, () => query)
q.run([], (err, res) => {
expect(err).to.not.exist()
// Should not visit any peers
expect(res.paths.length).to.eql(0)
expect(res.finalSet.size).to.eql(0)
done()
})
})
it('only closerPeers', (done) => {

@@ -240,2 +267,105 @@ const peer = peerInfos[0]

it('all queries stop after shutdown', (done) => {
createDHT(peerInfos, (err, dhtA) => {
if (err) {
return done(err)
}
const peer = peerInfos[0]
// mock this so we can dial non existing peers
dhtA.switch.dial = (peer, callback) => callback()
// 1 -> 2 -> 3 -> 4
const topology = {
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
[peerInfos[2].id.toB58String()]: {
closer: [peerInfos[3]]
},
// Should not reach here because query gets shut down
[peerInfos[3].id.toB58String()]: {
closer: [peerInfos[4]]
}
}
const visited = []
const query = (p, cb) => {
visited.push(p)
const invokeCb = () => {
const res = topology[p.toB58String()] || {}
cb(null, {
closerPeers: res.closer || []
})
}
// Shut down after visiting peerInfos[2]
if (p.toB58String() === peerInfos[2].id.toB58String()) {
dhtA.stop(invokeCb)
setTimeout(checkExpectations, 100)
} else {
invokeCb()
}
}
const q = new Query(dhtA, peer.id.id, () => query)
q.run([peerInfos[1].id], (err, res) => {
expect(err).to.not.exist()
})
function checkExpectations () {
// Should only visit peers up to the point where we shut down
expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id])
done()
}
})
})
it('queries run after shutdown return immediately', (done) => {
createDHT(peerInfos, (err, dhtA) => {
if (err) {
return done(err)
}
const peer = peerInfos[0]
// mock this so we can dial non existing peers
dhtA.switch.dial = (peer, callback) => callback()
// 1 -> 2 -> 3
const topology = {
[peerInfos[1].id.toB58String()]: {
closer: [peerInfos[2]]
},
[peerInfos[2].id.toB58String()]: {
closer: [peerInfos[3]]
}
}
const query = (p, cb) => {
const res = topology[p.toB58String()] || {}
cb(null, {
closerPeers: res.closer || []
})
}
const q = new Query(dhtA, peer.id.id, () => query)
dhtA.stop(() => {
q.run([peerInfos[1].id], (err, res) => {
expect(err).to.not.exist()
// Should not visit any peers
expect(res.paths.length).to.eql(0)
expect(res.finalSet.size).to.eql(0)
done()
})
})
})
})
it('disjoint path values', (done) => {

@@ -242,0 +372,0 @@ const peer = peerInfos[0]

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc