Socket
Socket
Sign inDemoInstall

libp2p-kad-dht

Package Overview
Dependencies
Maintainers
2
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

libp2p-kad-dht - npm Package Compare versions

Comparing version 0.15.2 to 0.15.3

docs/index.html

2

.aegir.js
module.exports = {
bundlesize: { maxSize: '197kB' }
bundlesize: { maxSize: '222kB' }
}

@@ -0,1 +1,11 @@

<a name="0.15.3"></a>
## [0.15.3](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.15.2...v0.15.3) (2019-07-29)
### Bug Fixes
* _findNProvidersAsync discarding search results ([#137](https://github.com/libp2p/js-libp2p-kad-dht/issues/137)) ([e656c6b](https://github.com/libp2p/js-libp2p-kad-dht/commit/e656c6b))
<a name="0.15.2"></a>

@@ -2,0 +12,0 @@ ## [0.15.2](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.15.1...v0.15.2) (2019-05-31)

{
"name": "libp2p-kad-dht",
"version": "0.15.2",
"version": "0.15.3",
"description": "JavaScript implementation of the Kad-DHT for libp2p",

@@ -57,2 +57,4 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",

"multihashing-async": "~0.5.2",
"p-queue": "^5.0.0",
"p-times": "^2.1.0",
"peer-id": "~0.12.2",

@@ -66,2 +68,3 @@ "peer-info": "~0.15.1",

"pull-stream": "^3.6.9",
"pull-stream-to-async-iterator": "^1.0.1",
"varint": "^5.0.0",

@@ -95,2 +98,3 @@ "xor-distance": "^2.0.0"

"Pedro Teixeira <i@pgte.me>",
"Qmstream <51881352+Qmstream@users.noreply.github.com>",
"Richard Schneider <makaretu@gmail.com>",

@@ -97,0 +101,0 @@ "Thomas Eizinger <thomas@eizinger.io>",

@@ -65,2 +65,6 @@ # js-libp2p-kad-dht

### Implementation Summary
A [summary](docs/IMPL_SUMMARY.MD) of the algorithms and API for this implementation of Kademlia.
## Contribute

@@ -67,0 +71,0 @@

@@ -350,27 +350,31 @@ 'use strict'

// Here we return the query function to use on this particular disjoint path
return (peer, cb) => {
this._getValueOrPeers(peer, key, (err, rec, peers) => {
if (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (!(err.code === 'ERR_INVALID_RECORD')) {
return cb(err)
}
return async (peer) => {
let rec, peers, lookupErr
try {
const results = await this._getValueOrPeersAsync(peer, key)
rec = results.record
peers = results.peers
} catch (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (err.code !== 'ERR_INVALID_RECORD') {
throw err
}
lookupErr = err
}
const res = { closerPeers: peers }
const res = { closerPeers: peers }
if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) {
pathVals.push({
val: rec && rec.value,
from: peer
})
}
if ((rec && rec.value) || lookupErr) {
pathVals.push({
val: rec && rec.value,
from: peer
})
}
// enough is enough
if (pathVals.length >= pathSize) {
res.pathComplete = true
}
// enough is enough
if (pathVals.length >= pathSize) {
res.pathComplete = true
}
cb(null, res)
})
return res
}

@@ -430,12 +434,8 @@ })

// Just return the actual query function.
return (peer, callback) => {
waterfall([
(cb) => this._closerPeersSingle(key, peer, cb),
(closer, cb) => {
cb(null, {
closerPeers: closer,
pathComplete: options.shallow ? true : undefined
})
}
], callback)
return async (peer) => {
const closer = await this._closerPeersSingleAsync(key, peer)
return {
closerPeers: closer,
pathComplete: options.shallow ? true : undefined
}
}

@@ -542,3 +542,4 @@ })

waterfall([
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
// TODO: refactor this in method in async and remove this wrapper
(cb) => promiseToCallback(this.providers.addProvider(key, this.peerInfo.id))(err => cb(err)),
(cb) => this.getClosestPeers(key.buffer, cb),

@@ -657,21 +658,17 @@ (peers, cb) => {

// Just return the actual query function.
return (peer, cb) => {
waterfall([
(cb) => this._findPeerSingle(peer, id, cb),
(msg, cb) => {
const match = msg.closerPeers.find((p) => p.id.isEqual(id))
return async (peer) => {
const msg = await this._findPeerSingleAsync(peer, id)
const match = msg.closerPeers.find((p) => p.id.isEqual(id))
// found it
if (match) {
return cb(null, {
peer: match,
queryComplete: true
})
}
// found it
if (match) {
return {
peer: match,
queryComplete: true
}
}
cb(null, {
closerPeers: msg.closerPeers
})
}
], cb)
return {
closerPeers: msg.closerPeers
}
}

@@ -678,0 +675,0 @@ })

@@ -9,4 +9,2 @@ 'use strict'

const promiseToCallback = require('promise-to-callback')
const callbackify = require('callbackify')
const errcode = require('err-code')

@@ -167,3 +165,2 @@

))()
return undefined
},

@@ -341,4 +338,3 @@

try {
await promisify(cb => dht._putLocal(key, fixupRec, cb))()
// await dht._putLocalAsync(key, fixupRec)
await dht._putLocalAsync(key, fixupRec)
} catch (err) {

@@ -352,4 +348,3 @@ dht._log.error('Failed error correcting self', err)

try {
await promisify(cb => dht._putValueToPeer(key, fixupRec, v.from, cb))()
// await dht._putValueToPeerAsync(key, fixupRec, v.from)
await dht._putValueToPeerAsync(key, fixupRec, v.from)
} catch (err) {

@@ -416,4 +411,3 @@ dht._log.error('Failed error correcting entry', err)

try {
// await dht._verifyRecordOnlineAsync(record)
await promisify(cb => dht._verifyRecordOnline(record, cb))()
await dht._verifyRecordOnlineAsync(record)
} catch (err) {

@@ -471,3 +465,3 @@ const errMsg = 'invalid record received, discarded'

async _verifyRecordOnlineAsync (record) {
return promisify(cb => libp2pRecord.validator.verifyRecord(dht.validators, record, cb))()
await promisify(cb => libp2pRecord.validator.verifyRecord(dht.validators, record, cb))()
},

@@ -526,3 +520,3 @@

const provs = await promisify(cb => dht.providers.getProviders(key, cb))()
const provs = await dht.providers.getProviders(key)

@@ -553,3 +547,3 @@ provs.forEach((id) => {

// Here we return the query function to use on this particular disjoint path
return callbackify(async (peer) => {
return async (peer) => {
const msg = await dht._findProvidersSingleAsync(peer, key)

@@ -570,3 +564,3 @@ const provs = msg.providerPeers

return { closerPeers: msg.closerPeers }
})
}
})

@@ -581,3 +575,3 @@

} catch (err) {
if (err.code !== 'ETIMEDOUT' || out.length === 0) {
if (err.code !== 'ETIMEDOUT') {
throw err

@@ -596,2 +590,6 @@ }

if (out.length === 0) {
throw errcode(new Error('no providers found'), 'ERR_NOT_FOUND')
}
return out.toArray()

@@ -598,0 +596,0 @@ },

@@ -5,7 +5,7 @@ 'use strict'

const varint = require('varint')
const each = require('async/each')
const pull = require('pull-stream')
const CID = require('cids')
const PeerId = require('peer-id')
const Key = require('interface-datastore').Key
const Queue = require('p-queue')
const promisify = require('promisify-es6')
const toIterator = require('pull-stream-to-async-iterator')

@@ -60,2 +60,4 @@ const c = require('./constants')

this.providers = cache(this.lruCacheSize)
this.syncQueue = new Queue({ concurrency: 1 })
}

@@ -76,6 +78,5 @@

/**
* Check all providers if they are still valid, and if not
* delete them.
* Check all providers if they are still valid, and if not delete them.
*
* @returns {undefined}
* @returns {Promise}
*

@@ -85,33 +86,59 @@ * @private

_cleanup () {
this._getProviderCids((err, cids) => {
if (err) {
return this._log.error('Failed to get cids', err)
}
return this.syncQueue.add(async () => {
this._log('start cleanup')
const start = Date.now()
each(cids, (cid, cb) => {
this._getProvidersMap(cid, (err, provs) => {
if (err) {
return cb(err)
let count = 0
let deleteCount = 0
const deleted = new Map()
const batch = this.datastore.batch()
// Get all provider entries from the datastore
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX })
for await (const entry of toIterator(query)) {
try {
// Add a delete to the batch for each expired entry
const { cid, peerId } = parseProviderKey(entry.key)
const time = readTime(entry.value)
const now = Date.now()
const delta = now - time
const expired = delta > this.provideValidity
this._log('comparing: %d - %d = %d > %d %s',
now, time, delta, this.provideValidity, expired ? '(expired)' : '')
if (expired) {
deleteCount++
batch.delete(entry.key)
const peers = deleted.get(cid) || new Set()
peers.add(peerId)
deleted.set(cid, peers)
}
count++
} catch (err) {
this._log.error(err.message)
}
}
this._log('deleting %d / %d entries', deleteCount, count)
provs.forEach((time, provider) => {
this._log('comparing: %s - %s > %s', Date.now(), time, this.provideValidity)
if (Date.now() - time > this.provideValidity) {
provs.delete(provider)
}
})
// Commit the deletes to the datastore
if (deleted.size) {
await promisify(cb => batch.commit(cb))()
}
// Clear expired entries from the cache
for (const [cid, peers] of deleted) {
const key = makeProviderKey(cid)
const provs = this.providers.get(key)
if (provs) {
for (const peerId of peers) {
provs.delete(peerId)
}
if (provs.size === 0) {
return this._deleteProvidersMap(cid, cb)
this.providers.remove(key)
} else {
this.providers.set(key, provs)
}
cb()
})
}, (err) => {
if (err) {
return this._log.error('Failed to cleanup', err)
}
}
this._log('Cleanup successfull')
})
this._log('Cleanup successful (%dms)', Date.now() - start)
})

@@ -121,89 +148,19 @@ }

/**
* Get a list of all cids that providers are known for.
* Get the currently known provider peer ids for a given CID.
*
* @param {function(Error, Array<CID>)} callback
* @returns {undefined}
*
* @private
*/
_getProviderCids (callback) {
pull(
this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }),
pull.map((entry) => {
const parts = entry.key.toString().split('/')
if (parts.length !== 4) {
this._log.error('incorrectly formatted provider entry in datastore: %s', entry.key)
return
}
let decoded
try {
decoded = utils.decodeBase32(parts[2])
} catch (err) {
this._log.error('error decoding base32 provider key: %s', parts[2])
return
}
let cid
try {
cid = new CID(decoded)
} catch (err) {
this._log.error('error converting key to cid from datastore: %s', err.message)
}
return cid
}),
pull.filter(Boolean),
pull.collect(callback)
)
}
/**
* Get the currently known provider maps for a given CID.
*
* @param {CID} cid
* @param {function(Error, Map<PeerId, Date>)} callback
* @returns {undefined}
* @returns {Promise<Map<String, Date>>}
*
* @private
*/
_getProvidersMap (cid, callback) {
const provs = this.providers.get(makeProviderKey(cid))
async _getProvidersMap (cid) {
const cacheKey = makeProviderKey(cid)
let provs = this.providers.get(cacheKey)
if (!provs) {
return loadProviders(this.datastore, cid, callback)
provs = await loadProviders(this.datastore, cid)
this.providers.set(cacheKey, provs)
}
callback(null, provs)
return provs
}
/**
* Completely remove a providers map entry for a given CID.
*
* @param {CID} cid
* @param {function(Error)} callback
* @returns {undefined}
*
* @private
*/
_deleteProvidersMap (cid, callback) {
const dsKey = makeProviderKey(cid)
this.providers.set(dsKey, null)
const batch = this.datastore.batch()
pull(
this.datastore.query({
keysOnly: true,
prefix: dsKey
}),
pull.through((entry) => batch.delete(entry.key)),
pull.onEnd((err) => {
if (err) {
return callback(err)
}
batch.commit(callback)
})
)
}
get cleanupInterval () {

@@ -227,32 +184,21 @@ return this._cleanupInterval

/**
* Add a new provider.
* Add a new provider for the given CID.
*
* @param {CID} cid
* @param {PeerId} provider
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise}
*/
addProvider (cid, provider, callback) {
this._log('addProvider %s', cid.toBaseEncodedString())
const dsKey = makeProviderKey(cid)
const provs = this.providers.get(dsKey)
async addProvider (cid, provider) {
return this.syncQueue.add(async () => {
this._log('addProvider %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)
const next = (err, provs) => {
if (err) {
return callback(err)
}
this._log('loaded %s provs', provs.size)
const now = Date.now()
provs.set(provider, now)
provs.set(utils.encodeBase32(provider.id), now)
const dsKey = makeProviderKey(cid)
this.providers.set(dsKey, provs)
writeProviderEntry(this.datastore, cid, provider, now, callback)
}
if (!provs) {
loadProviders(this.datastore, cid, next)
} else {
next(null, provs)
}
return writeProviderEntry(this.datastore, cid, provider, now)
})
}

@@ -264,13 +210,11 @@

* @param {CID} cid
* @param {function(Error, Array<PeerId>)} callback
* @returns {undefined}
* @returns {Promise<Array<PeerId>>}
*/
getProviders (cid, callback) {
this._log('getProviders %s', cid.toBaseEncodedString())
this._getProvidersMap(cid, (err, provs) => {
if (err) {
return callback(err)
}
callback(null, Array.from(provs.keys()))
async getProviders (cid) {
return this.syncQueue.add(async () => {
this._log('getProviders %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)
return [...provs.keys()].map((base32PeerId) => {
return new PeerId(utils.decodeBase32(base32PeerId))
})
})

@@ -283,3 +227,3 @@ }

*
* @param {CID} cid
* @param {CID|string} cid - cid or base32 encoded string
* @returns {string}

@@ -290,3 +234,4 @@ *

function makeProviderKey (cid) {
return c.PROVIDERS_KEY_PREFIX + utils.encodeBase32(cid.buffer)
cid = typeof cid === 'string' ? cid : utils.encodeBase32(cid.buffer)
return c.PROVIDERS_KEY_PREFIX + cid
}

@@ -301,8 +246,7 @@

* @param {number} time
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise}
*
* @private
*/
function writeProviderEntry (store, cid, peer, time, callback) {
async function writeProviderEntry (store, cid, peer, time) {
const dsKey = [

@@ -314,32 +258,44 @@ makeProviderKey(cid),

store.put(new Key(dsKey), Buffer.from(varint.encode(time)), callback)
const key = new Key(dsKey)
const buffer = Buffer.from(varint.encode(time))
return promisify(cb => store.put(key, buffer, cb))()
}
/**
* Load providers from the store.
* Parse the CID and provider peer id from the key
*
* @param {DKey} key
* @returns {Object} object with peer id and cid
*
* @private
*/
function parseProviderKey (key) {
const parts = key.toString().split('/')
if (parts.length !== 4) {
throw new Error('incorrectly formatted provider entry key in datastore: ' + key)
}
return {
cid: parts[2],
peerId: parts[3]
}
}
/**
* Load providers for the given CID from the store.
*
* @param {Datastore} store
* @param {CID} cid
* @param {function(Error, Map<PeerId, Date>)} callback
* @returns {undefined}
* @returns {Promise<Map<PeerId, Date>>}
*
* @private
*/
function loadProviders (store, cid, callback) {
pull(
store.query({ prefix: makeProviderKey(cid) }),
pull.map((entry) => {
const parts = entry.key.toString().split('/')
const lastPart = parts[parts.length - 1]
const rawPeerId = utils.decodeBase32(lastPart)
return [new PeerId(rawPeerId), readTime(entry.value)]
}),
pull.collect((err, res) => {
if (err) {
return callback(err)
}
return callback(null, new Map(res))
})
)
async function loadProviders (store, cid) {
const providers = new Map()
const query = store.query({ prefix: makeProviderKey(cid) })
for await (const entry of toIterator(query)) {
const { peerId } = parseProviderKey(entry.key)
providers.set(peerId, readTime(entry.value))
}
return providers
}

@@ -346,0 +302,0 @@

'use strict'
const timeout = require('async/timeout')
const promisify = require('promisify-es6')
const PeerQueue = require('../peer-queue')
const utils = require('../utils')

@@ -25,4 +24,5 @@ // TODO: Temporary until parallel dial in Switch have a proper

this.run = run
this.queryFunc = timeout(queryFunc, QUERY_FUNC_TIMEOUT)
this.queryFuncAsync = promisify(this.queryFunc)
this.queryFunc = utils.withTimeout(queryFunc, QUERY_FUNC_TIMEOUT)
if (!this.queryFunc) throw new Error('Path requires a `queryFn` to be specified')
if (typeof this.queryFunc !== 'function') throw new Error('Path expected `queryFn` to be a function. Got ' + typeof this.queryFunc)

@@ -29,0 +29,0 @@ /**

@@ -213,3 +213,3 @@ 'use strict'

try {
res = await this.path.queryFuncAsync(peer)
res = await this.path.queryFunc(peer)
} catch (err) {

@@ -216,0 +216,0 @@ queryError = err

'use strict'
const times = require('async/times')
const promisify = require('promisify-es6')
const crypto = require('libp2p-crypto')
const waterfall = require('async/waterfall')
const multihashing = require('multihashing-async')
const multihashing = promisify(require('multihashing-async'))
const PeerId = require('peer-id')
const assert = require('assert')
const AbortController = require('abort-controller')
const errcode = require('err-code')
const times = require('p-times')
const c = require('./constants')
const { logger } = require('./utils')
const AbortController = require('abort-controller')
const errcode = require('err-code')
class RandomWalk {

@@ -32,2 +31,3 @@ /**

this.log = logger(dht.peerInfo.id, 'random-walk')
this._timeoutId = undefined
}

@@ -49,6 +49,3 @@

// Start runner immediately
this._runPeriodically((done) => {
// Each subsequent walk should run on a `this._options.interval` interval
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval))
}, 0)
this._runPeriodically()
}, this._options.delay)

@@ -64,4 +61,6 @@ }

stop () {
clearTimeout(this._timeoutId)
this._timeoutId = null
if (this._timeoutId) {
clearTimeout(this._timeoutId)
this._timeoutId = undefined
}
this._controller && this._controller.abort()

@@ -71,15 +70,19 @@ }

/**
* Run function `walk` on every `interval` ms
* @param {function(callback)} walk The function to execute on `interval`
* @param {number} interval The interval to run on in ms
* Run function `randomWalk._walk` on every `options.interval` ms
*
* @private
*/
_runPeriodically (walk, interval) {
this._timeoutId = setTimeout(() => {
walk((nextInterval) => {
// Schedule next
this._runPeriodically(walk, nextInterval)
async _runPeriodically () {
// run until the walk has been stopped
while (this._timeoutId) {
try {
await this._walk(this._options.queriesPerPeriod, this._options.timeout)
} catch (err) {
this._kadDHT._log.error('random-walk:error', err)
}
// Each subsequent walk should run on a `this._options.interval` interval
await new Promise(resolve => {
this._timeoutId = setTimeout(resolve, this._options.interval)
})
}, interval)
}
}

@@ -92,40 +95,36 @@

* @param {number} walkTimeout
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise}
*
* @private
*/
_walk (queries, walkTimeout, callback) {
async _walk (queries, walkTimeout) {
this.log('start')
this._controller = new AbortController()
times(queries, (i, next) => {
this.log('running query %d', i)
try {
await times(queries, async (index) => {
this.log('running query %d', index)
try {
const id = await this._randomPeerId()
// Perform the walk
waterfall([
(cb) => this._randomPeerId(cb),
(id, cb) => {
// Check if we've happened to already abort
if (!this._controller) return cb()
if (!this._controller) return
this._query(id, {
await this._query(id, {
timeout: walkTimeout,
signal: this._controller.signal
}, cb)
})
} catch (err) {
if (err && err.code !== 'ETIMEDOUT') {
this.log.error('query %d finished with error', index, err)
throw err
}
}
], (err) => {
if (err && err.code !== 'ETIMEDOUT') {
this.log.error('query %d finished with error', i, err)
return next(err)
}
this.log('finished query %d', i)
next(null)
this.log('finished query %d', index)
})
}, (err) => {
} finally {
this._controller = null
this.log('finished queries')
callback(err)
})
}
}

@@ -147,23 +146,25 @@

* @param {AbortControllerSignal} options.signal
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise}
*
* @private
*/
_query (id, options, callback) {
async _query (id, options) {
this.log('query:%s', id.toB58String())
this._kadDHT.findPeer(id, options, (err, peer) => {
let peer
try {
peer = await promisify(cb => this._kadDHT.findPeer(id, options, cb))()
} catch (err) {
if (err && err.code === 'ERR_NOT_FOUND') {
// expected case, we asked for random stuff after all
return callback()
return
}
if (err) {
return callback(err)
}
this.log('query:found', peer)
// wait what, there was something found? Lucky day!
callback(errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER'))
})
throw err
}
this.log('query:found', peer)
// wait what, there was something found? Lucky day!
throw errcode(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`, 'ERR_FOUND_RANDOM_PEER')
}

@@ -174,14 +175,9 @@

*
* @param {function(Error, PeerId)} callback
* @returns {void}
* @returns {Promise<PeerId>}
*
* @private
*/
_randomPeerId (callback) {
multihashing(crypto.randomBytes(16), 'sha2-256', (err, digest) => {
if (err) {
return callback(err)
}
callback(null, new PeerId(digest))
})
async _randomPeerId () {
const digest = await multihashing(crypto.randomBytes(16), 'sha2-256')
return new PeerId(digest)
}

@@ -188,0 +184,0 @@ }

'use strict'
const CID = require('cids')
const utils = require('../../utils')
const errcode = require('err-code')
const promiseToCallback = require('promise-to-callback')
const utils = require('../../utils')
module.exports = (dht) => {

@@ -14,3 +16,3 @@ const log = utils.logger(dht.peerInfo.id, 'rpc:add-provider')

* @param {Message} msg
* @param {function(Error, Message)} callback
* @param {function(Error)} callback
* @returns {undefined}

@@ -52,3 +54,3 @@ */

dht.peerBook.put(pi)
dht.providers.addProvider(cid, pi.id, callback)
promiseToCallback(dht.providers.addProvider(cid, pi.id))(err => callback(err))
}

@@ -64,5 +66,5 @@ })

if (!foundProvider) {
dht.providers.addProvider(cid, peer.id, callback)
promiseToCallback(dht.providers.addProvider(cid, peer.id))(err => callback(err))
}
}
}

@@ -6,3 +6,3 @@ 'use strict'

const PeerInfo = require('peer-info')
const promiseToCallback = require('promise-to-callback')
const errcode = require('err-code')

@@ -45,3 +45,3 @@

}),
(cb) => dht.providers.getProviders(cid, cb),
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb),
(cb) => dht._betterPeersToQuery(msg, peer, cb)

@@ -48,0 +48,0 @@ ], (err, res) => {

@@ -13,2 +13,3 @@ 'use strict'

const PeerId = require('peer-id')
const errcode = require('err-code')

@@ -194,1 +195,30 @@ /**

}
exports.TimeoutError = class TimeoutError extends Error {
get code () {
return 'ETIMEDOUT'
}
}
/**
* Creates an async function that calls the given `asyncFn` and Errors
* if it does not resolve within `time` ms
*
* @param {Function} [asyncFn]
* @param {Number} [time]
* @returns {Function}
*
* @private
*/
exports.withTimeout = (asyncFn, time) => {
return async (...args) => {
return Promise.race([
asyncFn(...args),
new Promise((resolve, reject) => {
setTimeout(() => {
reject(errcode(new Error('Async function did not complete before timeout'), 'ETIMEDOUT'))
}, time)
})
])
}
}

@@ -305,3 +305,3 @@ /* eslint-env mocha */

const dhtD = dhts[3]
const stub = sinon.stub(dhtD, '_verifyRecordLocally').callsArgWithAsync(1, error)
const stub = sinon.stub(dhtD, '_verifyRecordLocallyAsync').rejects(error)

@@ -339,4 +339,4 @@ waterfall([

const dhtD = dhts[3]
const stub = sinon.stub(dhtD, '_verifyRecordLocally').callsArgWithAsync(1, error)
const stub2 = sinon.stub(dhtC, '_verifyRecordLocally').callsArgWithAsync(1, error)
const stub = sinon.stub(dhtD, '_verifyRecordLocallyAsync').rejects(error)
const stub2 = sinon.stub(dhtC, '_verifyRecordLocallyAsync').rejects(error)

@@ -375,3 +375,3 @@ waterfall([

const dhtC = dhts[2]
const stub = sinon.stub(dhtC, '_verifyRecordLocally').callsArgWithAsync(1, error)
const stub = sinon.stub(dhtC, '_verifyRecordLocallyAsync').rejects(error)

@@ -483,3 +483,3 @@ waterfall([

const dhtASpy = sinon.spy(dhtA, '_putValueToPeer')
const dhtASpy = sinon.spy(dhtA, '_putValueToPeerAsync')

@@ -1017,3 +1017,5 @@ series([

const dht = new KadDHT(sw)
dht.start(() => {
dht.start((err) => {
expect(err).to.not.exist()
const key = Buffer.from('/v/hello')

@@ -1027,5 +1029,3 @@ const value = Buffer.from('world')

// Simulate going out to the network and returning the record
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => {
cb(null, rec)
})
sinon.stub(dht, '_getValueOrPeersAsync').callsFake(async () => ({ record: rec }))
]

@@ -1079,3 +1079,3 @@

const dhtB = dhts[1]
const stub = sinon.stub(dhtA, '_getValueOrPeers').callsArgWithAsync(2, error)
const stub = sinon.stub(dhtA, '_getValueOrPeersAsync').rejects(error)

@@ -1108,3 +1108,3 @@ waterfall([

const dhtB = dhts[1]
const stub = sinon.stub(dhtA, '_getValueOrPeers').callsArgWithAsync(2, error)
const stub = sinon.stub(dhtA, '_getValueOrPeersAsync').rejects(error)

@@ -1111,0 +1111,0 @@ waterfall([

@@ -42,2 +42,18 @@ /* eslint-env mocha */

describe('withTimeout', () => {
it('rejects with the error in the original function', async () => {
const original = async () => { throw new Error('explode') }
const asyncFn = utils.withTimeout(original, 100)
let err
try {
await asyncFn()
} catch (_err) {
err = _err
}
expect(err).to.exist()
expect(err.message).to.include('explode')
})
})
describe('sortClosestPeers', () => {

@@ -44,0 +60,0 @@ it('sorts a list of PeerInfos', (done) => {

@@ -7,90 +7,84 @@ /* eslint-env mocha */

const expect = chai.expect
const promisify = require('promisify-es6')
const Store = require('interface-datastore').MemoryDatastore
const parallel = require('async/parallel')
const waterfall = require('async/waterfall')
const CID = require('cids')
const multihashing = require('multihashing-async')
const map = require('async/map')
const timesSeries = require('async/timesSeries')
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const range = require('lodash.range')
const LevelStore = require('datastore-level')
const path = require('path')
const os = require('os')
const multihashing = promisify(require('multihashing-async'))
const Providers = require('../src/providers')
const createPeerInfo = require('./utils/create-peer-info')
const createValues = require('./utils/create-values')
const createPeerInfo = promisify(require('./utils/create-peer-info'))
const createValues = promisify(require('./utils/create-values'))
describe('Providers', () => {
let infos
let providers
before(function (done) {
before(async function () {
this.timeout(10 * 1000)
createPeerInfo(3, (err, peers) => {
if (err) {
return done(err)
}
infos = await createPeerInfo(3)
})
infos = peers
done()
})
afterEach(() => {
providers && providers.stop()
})
it('simple add and get of providers', (done) => {
const providers = new Providers(new Store(), infos[2].id)
it('simple add and get of providers', async () => {
providers = new Providers(new Store(), infos[2].id)
const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
parallel([
(cb) => providers.addProvider(cid, infos[0].id, cb),
(cb) => providers.addProvider(cid, infos[1].id, cb)
], (err) => {
expect(err).to.not.exist()
providers.getProviders(cid, (err, provs) => {
expect(err).to.not.exist()
expect(provs).to.be.eql([infos[0].id, infos[1].id])
providers.stop()
await Promise.all([
providers.addProvider(cid, infos[0].id),
providers.addProvider(cid, infos[1].id)
])
done()
})
})
const provs = await providers.getProviders(cid)
const ids = new Set(provs.map((peerId) => peerId.toB58String()))
expect(ids.has(infos[0].id.toB58String())).to.be.eql(true)
expect(ids.has(infos[1].id.toB58String())).to.be.eql(true)
})
it('more providers than space in the lru cache', (done) => {
const providers = new Providers(new Store(), infos[2].id, 10)
it('duplicate add of provider is deduped', async () => {
providers = new Providers(new Store(), infos[2].id)
waterfall([
(cb) => map(
range(100),
(i, cb) => multihashing(Buffer.from(`hello ${i}`), 'sha2-256', cb),
cb
),
(hashes, cb) => {
const cids = hashes.map((h) => new CID(h))
const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
map(cids, (cid, cb) => {
providers.addProvider(cid, infos[0].id, cb)
}, (err) => cb(err, cids))
},
(cids, cb) => {
map(cids, (cid, cb) => {
providers.getProviders(cid, cb)
}, (err, provs) => {
expect(err).to.not.exist()
expect(provs).to.have.length(100)
provs.forEach((p) => {
expect(p[0].id).to.be.eql(infos[0].id.id)
})
providers.stop()
cb()
})
}
], done)
await Promise.all([
providers.addProvider(cid, infos[0].id),
providers.addProvider(cid, infos[0].id),
providers.addProvider(cid, infos[1].id),
providers.addProvider(cid, infos[1].id),
providers.addProvider(cid, infos[1].id)
])
const provs = await providers.getProviders(cid)
expect(provs).to.have.length(2)
const ids = new Set(provs.map((peerId) => peerId.toB58String()))
expect(ids.has(infos[0].id.toB58String())).to.be.eql(true)
expect(ids.has(infos[1].id.toB58String())).to.be.eql(true)
})
it('expires', (done) => {
const providers = new Providers(new Store(), infos[2].id)
it('more providers than space in the lru cache', async () => {
providers = new Providers(new Store(), infos[2].id, 10)
const hashes = await Promise.all([...new Array(100)].map((i) => {
return multihashing(Buffer.from(`hello ${i}`), 'sha2-256')
}))
const cids = hashes.map((h) => new CID(h))
await Promise.all(cids.map(cid => providers.addProvider(cid, infos[0].id)))
const provs = await Promise.all(cids.map(cid => providers.getProviders(cid)))
expect(provs).to.have.length(100)
for (const p of provs) {
expect(p[0].id).to.be.eql(infos[0].id.id)
}
})
it('expires', async () => {
providers = new Providers(new Store(), infos[2].id)
providers.cleanupInterval = 100

@@ -100,29 +94,21 @@ providers.provideValidity = 200

const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
parallel([
(cb) => providers.addProvider(cid, infos[0].id, cb),
(cb) => providers.addProvider(cid, infos[1].id, cb)
], (err) => {
expect(err).to.not.exist()
await Promise.all([
providers.addProvider(cid, infos[0].id),
providers.addProvider(cid, infos[1].id)
])
providers.getProviders(cid, (err, provs) => {
expect(err).to.not.exist()
expect(provs).to.have.length(2)
expect(provs[0].id).to.be.eql(infos[0].id.id)
expect(provs[1].id).to.be.eql(infos[1].id.id)
})
const provs = await providers.getProviders(cid)
setTimeout(() => {
providers.getProviders(cid, (err, provs) => {
expect(err).to.not.exist()
expect(provs).to.have.length(0)
providers.stop()
done()
})
// TODO: this is a timeout based check, make cleanup monitorable
}, 400)
})
expect(provs).to.have.length(2)
expect(provs[0].id).to.be.eql(infos[0].id.id)
expect(provs[1].id).to.be.eql(infos[1].id.id)
await new Promise(resolve => setTimeout(resolve, 400))
const provsAfter = await providers.getProviders(cid)
expect(provsAfter).to.have.length(0)
})
// slooow so only run when you need to
it.skip('many', (done) => {
it.skip('many', async function () {
const p = path.join(

@@ -132,40 +118,34 @@ os.tmpdir(), (Math.random() * 100).toString()

const store = new LevelStore(p)
const providers = new Providers(store, infos[2].id, 10)
providers = new Providers(store, infos[2].id, 10)
console.log('starting')
waterfall([
(cb) => parallel([
(cb) => createValues(100, cb),
(cb) => createPeerInfo(600, cb)
], cb),
(res, cb) => {
console.log('got values and peers')
const values = res[0]
const peers = res[1]
let total = Date.now()
eachSeries(values, (v, cb) => {
eachSeries(peers, (p, cb) => {
providers.addProvider(v.cid, p.id, cb)
}, cb)
}, (err) => {
console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total)
expect(err).to.not.exist()
console.log('starting profile with %s peers and %s cids', peers.length, values.length)
timesSeries(3, (i, cb) => {
const start = Date.now()
each(values, (v, cb) => {
providers.getProviders(v.cid, cb)
}, (err) => {
expect(err).to.not.exist()
console.log('query %sms', (Date.now() - start))
cb()
})
}, cb)
})
const res = await Promise.all([
createValues(100),
createPeerInfo(600)
])
console.log('got values and peers')
const values = res[0]
const peers = res[1]
let total = Date.now()
for (const v of values) {
for (const p of peers) {
await providers.addProvider(v.cid, p.id)
}
], (err) => {
expect(err).to.not.exist()
store.close(done)
})
}
console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total)
console.log('starting profile with %s peers and %s cids', peers.length, values.length)
for (let i = 0; i < 3; i++) {
const start = Date.now()
for (const v of values) {
await providers.getProviders(v.cid)
console.log('query %sms', (Date.now() - start))
}
}
await store.close()
})
})

@@ -11,4 +11,4 @@ /* eslint-env mocha */

const Mplex = require('libp2p-mplex')
const setImmediate = require('async/setImmediate')
const promiseToCallback = require('promise-to-callback')
const promisify = require('promisify-es6')

@@ -61,18 +61,18 @@ const DHT = require('../src')

let i = 0
const query = (p, cb) => {
const queryFunc = async (p) => {
if (i++ === 1) {
expect(p.id).to.eql(peerInfos[2].id.id)
return cb(null, {
return {
value: Buffer.from('cool'),
pathComplete: true
})
}
}
expect(p.id).to.eql(peerInfos[1].id.id)
cb(null, {
return {
closerPeers: [peerInfos[2]]
})
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => {

@@ -95,15 +95,15 @@ expect(err).to.not.exist()

const visited = []
const query = (p, cb) => {
const queryFunc = async (p) => {
visited.push(p)
if (i++ === 1) {
return cb(new Error('fail'))
throw new Error('fail')
}
cb(null, {
return {
closerPeers: [peerInfos[2]]
})
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => {

@@ -132,5 +132,5 @@ expect(err).not.to.exist()

const query = (p, cb) => cb(new Error('fail'))
const queryFunc = async (p) => { throw new Error('fail') }
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => {

@@ -146,7 +146,6 @@ expect(err).to.exist()

const query = (p, cb) => {}
const queryFunc = async (p) => {}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([]))((err, res) => {
if (err) console.error(err)
expect(err).to.not.exist()

@@ -168,9 +167,9 @@

const query = (p, cb) => {
cb(null, {
const queryFunc = async (p) => {
return {
closerPeers: [peerInfos[2]]
})
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => {

@@ -217,10 +216,10 @@ expect(err).to.not.exist()

const query = (p, cb) => {
const queryFunc = async (p) => {
const closer = topology[p.toB58String()]
cb(null, {
return {
closerPeers: closer || []
})
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id, peerInfos[2].id, peerInfos[3].id]))((err, res) => {

@@ -257,12 +256,12 @@ expect(err).to.not.exist()

const query = (p, cb) => {
const queryFunc = async (p) => {
const res = topology[p.toB58String()] || {}
cb(null, {
return {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
})
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => {

@@ -308,10 +307,13 @@ expect(err).to.not.exist()

const visited = []
const query = (p, cb) => {
const queryFunc = async (p) => {
visited.push(p)
const invokeCb = () => {
const getResult = async () => {
const res = topology[p.toB58String()] || {}
cb(null, {
// this timeout is necesary so `dhtA.stop` has time to stop the
// requests before they all complete
await new Promise(resolve => setTimeout(resolve, 100))
return {
closerPeers: res.closer || []
})
}
}

@@ -321,10 +323,10 @@

if (p.toB58String() === peerInfos[2].id.toB58String()) {
dhtA.stop(invokeCb)
await promisify(cb => dhtA.stop(cb))
setTimeout(checkExpectations, 100)
} else {
invokeCb()
return getResult()
}
return getResult()
}
const q = new Query(dhtA, peer.id.id, () => query)
const q = new Query(dhtA, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => {

@@ -364,10 +366,10 @@ expect(err).to.not.exist()

const query = (p, cb) => {
const queryFunc = async (p) => {
const res = topology[p.toB58String()] || {}
cb(null, {
return {
closerPeers: res.closer || []
})
}
}
const q = new Query(dhtA, peer.id.id, () => query)
const q = new Query(dhtA, peer.id.id, () => queryFunc)

@@ -422,14 +424,13 @@ dhtA.stop(() => {

const query = (p, cb) => {
const queryFunc = async (p) => {
const res = topology[p.toB58String()] || {}
setTimeout(() => {
cb(null, {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
})
}, res.delay)
await new Promise(resolve => setTimeout(resolve, res.delay))
return {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => {

@@ -489,17 +490,16 @@ expect(err).to.not.exist()

const visited = []
const query = (p, cb) => {
const queryFunc = async (p) => {
visited.push(p)
const res = topology[p.toB58String()] || {}
setTimeout(() => {
cb(null, {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete,
queryComplete: res.queryComplete
})
}, res.delay)
await new Promise(resolve => setTimeout(resolve, res.delay))
return {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete,
queryComplete: res.queryComplete
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => {

@@ -567,19 +567,18 @@ expect(err).to.not.exist()

const visited = []
const query = (p, cb) => {
const queryFunc = async (p) => {
visited.push(p)
const res = topology[p.toB58String()] || {}
setTimeout(() => {
if (res.error) {
return cb(new Error('path error'))
}
cb(null, {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
})
}, res.delay)
await new Promise(resolve => setTimeout(resolve, res.delay))
if (res.error) {
throw new Error('path error')
}
return {
closerPeers: res.closer || [],
value: res.value,
pathComplete: res.pathComplete
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => {

@@ -657,3 +656,3 @@ expect(err).to.not.exist()

const visited = []
const query = (peerId, cb) => {
const queryFunc = async (peerId) => {
visited.push(peerId)

@@ -663,6 +662,6 @@ const i = peerIndex(peerId)

const closerPeers = closerIndexes.map(j => peerIdToInfo(sorted[j]))
setTimeout(() => cb(null, { closerPeers }))
return { closerPeers }
}
const q = new Query(dht, peerInfos[0].id.id, () => query)
const q = new Query(dht, peerInfos[0].id.id, () => queryFunc)
promiseToCallback(q.run(initial))((err, res) => {

@@ -726,3 +725,3 @@ expect(err).to.not.exist()

const q = new Query(dht, targetId, (trackNum) => {
return (p, cb) => {
return async (p) => {
const response = getResponse(p, trackNum)

@@ -737,3 +736,3 @@ expect(response).to.exist() // or we aren't on the right track

}
setImmediate(() => cb(null, response))
return response
}

@@ -763,9 +762,9 @@ })

const query = (p, cb) => {
cb(null, {
const queryFunc = async (p) => {
return {
closerPeers: [peerInfos[2]]
})
}
}
const q = new Query(dht, peer.id.id, () => query)
const q = new Query(dht, peer.id.id, () => queryFunc)
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => {

@@ -772,0 +771,0 @@ expect(err).to.not.exist()

@@ -70,3 +70,3 @@ /* eslint-env mocha */

sinon.stub(dht._queryManager, 'running').value(true)
let querySpy = sinon.stub().callsArgWith(1, null, {})
const querySpy = sinon.stub().resolves({})

@@ -126,3 +126,3 @@ let query = new Query(dht, targetKey.key, () => querySpy)

let querySpy = sinon.stub().callsArgWith(1, null, {})
const querySpy = sinon.stub().resolves({})
let query = new Query(dht, targetKey.key, () => querySpy)

@@ -152,4 +152,7 @@

// return peers 16 - 19
querySpy.onCall(1).callsFake((_, cb) => {
setTimeout(() => cb(null, { closerPeers: returnPeers }), 10)
querySpy.onCall(1).callsFake(async () => {
// this timeout ensures the queries finish in serial
// see https://github.com/libp2p/js-libp2p-kad-dht/pull/121#discussion_r286437978
await new Promise(resolve => setTimeout(resolve, 10))
return { closerPeers: returnPeers }
})

@@ -156,0 +159,0 @@

@@ -57,13 +57,10 @@ /* eslint-env mocha */

it('should be able to specify the number of queries', (done) => {
it('should be able to specify the number of queries', async () => {
const queries = 5
sinon.stub(randomWalk, '_query').callsArgWith(2, null)
randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.not.exist()
expect(randomWalk._query.callCount).to.eql(queries)
done()
})
sinon.stub(randomWalk, '_query').resolves(null)
await randomWalk._walk(queries, 1e3)
expect(randomWalk._query.callCount).to.eql(queries)
})
it('should stop walking if a query errors', (done) => {
it('should NOT stop walking if a query errors', async () => {
const queries = 5

@@ -75,60 +72,57 @@ const error = new Error('ERR_BOOM')

randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.eql(error)
// 2 successes and error on the 3rd
expect(findPeerStub.callCount).to.eql(3)
done()
})
let err
try {
await randomWalk._walk(queries, 1e3)
} catch (_err) {
err = _err
}
expect(err.message).to.include('ERR_BOOM')
expect(findPeerStub.callCount).to.eql(5)
})
it('should ignore timeout errors and keep walking', (done) => {
it('should ignore timeout errors and keep walking', async () => {
const queries = 5
const _queryStub = sinon.stub(randomWalk, '_query')
_queryStub.onCall(2).callsArgWith(2, {
code: 'ETIMEDOUT'
})
_queryStub.callsArgWith(2, null)
_queryStub.onCall(2).rejects({ code: 'ETIMEDOUT' })
_queryStub.resolves(null)
randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.not.exist()
expect(randomWalk._query.callCount).to.eql(queries)
done()
})
await randomWalk._walk(queries, 1e3)
expect(randomWalk._query.callCount).to.eql(queries)
})
it('should pass its timeout to the find peer query', (done) => {
it('should pass its timeout to the find peer query', async () => {
sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' })
randomWalk._walk(1, 111, (err) => {
const mockCalls = randomWalk._kadDHT.findPeer.getCalls()
expect(err).to.not.exist()
expect(mockCalls).to.have.length(1)
expect(mockCalls[0].args[1]).to.include({
timeout: 111
})
done()
})
await randomWalk._walk(1, 111)
const mockCalls = randomWalk._kadDHT.findPeer.getCalls()
expect(mockCalls).to.have.length(1)
expect(mockCalls[0].args[1]).to.include({ timeout: 111 })
})
it('should error if the random id peer is found', (done) => {
it('should error if the random id peer is found', async () => {
const queries = 5
const findPeerStub = sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' })
findPeerStub.onCall(2).callsArgWith(2, null, {
id: 'QmB'
})
findPeerStub.onCall(2).callsArgWith(2, null, { id: 'QmB' })
randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.exist()
expect(findPeerStub.callCount).to.eql(3)
done()
})
let err
try {
await randomWalk._walk(queries, 1e3)
} catch (_err) {
err = _err
}
expect(err).to.exist()
expect(findPeerStub.callCount).to.eql(5)
})
it('should error if random id generation errors', (done) => {
it('should error if random id generation errors', async () => {
const error = new Error('ERR_BOOM')
sinon.stub(randomWalk, '_randomPeerId').callsArgWith(0, error)
randomWalk._walk(1, 1e3, (err) => {
expect(err).to.eql(error)
done()
})
sinon.stub(randomWalk, '_randomPeerId').rejects(error)
let err
try {
await randomWalk._walk(1, 1e3)
} catch (_err) {
err = _err
}
expect(err).to.eql(error)
})

@@ -146,3 +140,3 @@ })

sinon.stub(randomWalk, '_walk').callsFake(() => {
sinon.stub(randomWalk, '_walk').callsFake(async () => {
// Try to start again

@@ -186,3 +180,3 @@ randomWalk.start()

const randomWalk = new RandomWalk(mockDHT, options)
sinon.stub(randomWalk, '_walk').callsFake((queries, timeout) => {
sinon.stub(randomWalk, '_walk').callsFake(async (queries, timeout) => {
expect(queries).to.eql(options.queriesPerPeriod)

@@ -207,3 +201,3 @@ expect(timeout).to.eql(options.timeout)

expect(opts.timeout).to.eql(options.timeout).mark()
callback(error)
setTimeout(() => callback(error), 100)
})

@@ -232,3 +226,3 @@

it('should cancel the timer if the walk is not active', () => {
it('should not be running if the walk is not active', () => {
const randomWalk = new RandomWalk(mockDHT, {

@@ -242,3 +236,3 @@ enabled: true,

randomWalk.stop()
expect(randomWalk._timeoutId).to.not.exist()
expect(randomWalk._timeoutId).to.eql(undefined)
})

@@ -245,0 +239,0 @@

@@ -11,2 +11,3 @@ /* eslint-env mocha */

const _ = require('lodash')
const promiseToCallback = require('promise-to-callback')

@@ -86,3 +87,3 @@ const Message = require('../../../src/message')

(cb) => handler(dht)(sender, msg, cb),
(cb) => dht.providers.getProviders(cid, cb),
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb),
(provs, cb) => {

@@ -111,3 +112,3 @@ expect(provs).to.have.length(1)

(cb) => handler(dht)(sender, msg, cb),
(cb) => dht.providers.getProviders(cid, cb),
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb),
(provs, cb) => {

@@ -114,0 +115,0 @@ expect(dht.peerBook.has(provider.id)).to.equal(false)

@@ -9,2 +9,3 @@ /* eslint-env mocha */

const waterfall = require('async/waterfall')
const promiseToCallback = require('promise-to-callback')

@@ -93,3 +94,3 @@ const Message = require('../../../src/message')

(cb) => dht._add(closer, cb),
(cb) => dht.providers.addProvider(v.cid, prov, cb),
(cb) => promiseToCallback(dht.providers.addProvider(v.cid, prov))(err => cb(err)),
(cb) => handler(dht)(peers[0], msg, cb)

@@ -96,0 +97,0 @@ ], (err, response) => {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc