libp2p-kad-dht
Advanced tools
Comparing version 0.15.2 to 0.15.3
module.exports = { | ||
bundlesize: { maxSize: '197kB' } | ||
bundlesize: { maxSize: '222kB' } | ||
} | ||
@@ -0,1 +1,11 @@ | ||
<a name="0.15.3"></a> | ||
## [0.15.3](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.15.2...v0.15.3) (2019-07-29) | ||
### Bug Fixes | ||
* _findNProvidersAsync discarding search results ([#137](https://github.com/libp2p/js-libp2p-kad-dht/issues/137)) ([e656c6b](https://github.com/libp2p/js-libp2p-kad-dht/commit/e656c6b)) | ||
<a name="0.15.2"></a> | ||
@@ -2,0 +12,0 @@ ## [0.15.2](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.15.1...v0.15.2) (2019-05-31) |
{ | ||
"name": "libp2p-kad-dht", | ||
"version": "0.15.2", | ||
"version": "0.15.3", | ||
"description": "JavaScript implementation of the Kad-DHT for libp2p", | ||
@@ -57,2 +57,4 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>", | ||
"multihashing-async": "~0.5.2", | ||
"p-queue": "^5.0.0", | ||
"p-times": "^2.1.0", | ||
"peer-id": "~0.12.2", | ||
@@ -66,2 +68,3 @@ "peer-info": "~0.15.1", | ||
"pull-stream": "^3.6.9", | ||
"pull-stream-to-async-iterator": "^1.0.1", | ||
"varint": "^5.0.0", | ||
@@ -95,2 +98,3 @@ "xor-distance": "^2.0.0" | ||
"Pedro Teixeira <i@pgte.me>", | ||
"Qmstream <51881352+Qmstream@users.noreply.github.com>", | ||
"Richard Schneider <makaretu@gmail.com>", | ||
@@ -97,0 +101,0 @@ "Thomas Eizinger <thomas@eizinger.io>", |
@@ -65,2 +65,6 @@ # js-libp2p-kad-dht | ||
### Implementation Summary | ||
A [summary](docs/IMPL_SUMMARY.MD) of the algorithms and API for this implementation of Kademlia. | ||
## Contribute | ||
@@ -67,0 +71,0 @@ |
@@ -350,27 +350,31 @@ 'use strict' | ||
// Here we return the query function to use on this particular disjoint path | ||
return (peer, cb) => { | ||
this._getValueOrPeers(peer, key, (err, rec, peers) => { | ||
if (err) { | ||
// If we have an invalid record we just want to continue and fetch a new one. | ||
if (!(err.code === 'ERR_INVALID_RECORD')) { | ||
return cb(err) | ||
} | ||
return async (peer) => { | ||
let rec, peers, lookupErr | ||
try { | ||
const results = await this._getValueOrPeersAsync(peer, key) | ||
rec = results.record | ||
peers = results.peers | ||
} catch (err) { | ||
// If we have an invalid record we just want to continue and fetch a new one. | ||
if (err.code !== 'ERR_INVALID_RECORD') { | ||
throw err | ||
} | ||
lookupErr = err | ||
} | ||
const res = { closerPeers: peers } | ||
const res = { closerPeers: peers } | ||
if ((rec && rec.value) || (err && err.code === 'ERR_INVALID_RECORD')) { | ||
pathVals.push({ | ||
val: rec && rec.value, | ||
from: peer | ||
}) | ||
} | ||
if ((rec && rec.value) || lookupErr) { | ||
pathVals.push({ | ||
val: rec && rec.value, | ||
from: peer | ||
}) | ||
} | ||
// enough is enough | ||
if (pathVals.length >= pathSize) { | ||
res.pathComplete = true | ||
} | ||
// enough is enough | ||
if (pathVals.length >= pathSize) { | ||
res.pathComplete = true | ||
} | ||
cb(null, res) | ||
}) | ||
return res | ||
} | ||
@@ -430,12 +434,8 @@ }) | ||
// Just return the actual query function. | ||
return (peer, callback) => { | ||
waterfall([ | ||
(cb) => this._closerPeersSingle(key, peer, cb), | ||
(closer, cb) => { | ||
cb(null, { | ||
closerPeers: closer, | ||
pathComplete: options.shallow ? true : undefined | ||
}) | ||
} | ||
], callback) | ||
return async (peer) => { | ||
const closer = await this._closerPeersSingleAsync(key, peer) | ||
return { | ||
closerPeers: closer, | ||
pathComplete: options.shallow ? true : undefined | ||
} | ||
} | ||
@@ -542,3 +542,4 @@ }) | ||
waterfall([ | ||
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb), | ||
// TODO: refactor this in method in async and remove this wrapper | ||
(cb) => promiseToCallback(this.providers.addProvider(key, this.peerInfo.id))(err => cb(err)), | ||
(cb) => this.getClosestPeers(key.buffer, cb), | ||
@@ -657,21 +658,17 @@ (peers, cb) => { | ||
// Just return the actual query function. | ||
return (peer, cb) => { | ||
waterfall([ | ||
(cb) => this._findPeerSingle(peer, id, cb), | ||
(msg, cb) => { | ||
const match = msg.closerPeers.find((p) => p.id.isEqual(id)) | ||
return async (peer) => { | ||
const msg = await this._findPeerSingleAsync(peer, id) | ||
const match = msg.closerPeers.find((p) => p.id.isEqual(id)) | ||
// found it | ||
if (match) { | ||
return cb(null, { | ||
peer: match, | ||
queryComplete: true | ||
}) | ||
} | ||
// found it | ||
if (match) { | ||
return { | ||
peer: match, | ||
queryComplete: true | ||
} | ||
} | ||
cb(null, { | ||
closerPeers: msg.closerPeers | ||
}) | ||
} | ||
], cb) | ||
return { | ||
closerPeers: msg.closerPeers | ||
} | ||
} | ||
@@ -678,0 +675,0 @@ }) |
@@ -9,4 +9,2 @@ 'use strict' | ||
const promiseToCallback = require('promise-to-callback') | ||
const callbackify = require('callbackify') | ||
const errcode = require('err-code') | ||
@@ -167,3 +165,2 @@ | ||
))() | ||
return undefined | ||
}, | ||
@@ -341,4 +338,3 @@ | ||
try { | ||
await promisify(cb => dht._putLocal(key, fixupRec, cb))() | ||
// await dht._putLocalAsync(key, fixupRec) | ||
await dht._putLocalAsync(key, fixupRec) | ||
} catch (err) { | ||
@@ -352,4 +348,3 @@ dht._log.error('Failed error correcting self', err) | ||
try { | ||
await promisify(cb => dht._putValueToPeer(key, fixupRec, v.from, cb))() | ||
// await dht._putValueToPeerAsync(key, fixupRec, v.from) | ||
await dht._putValueToPeerAsync(key, fixupRec, v.from) | ||
} catch (err) { | ||
@@ -416,4 +411,3 @@ dht._log.error('Failed error correcting entry', err) | ||
try { | ||
// await dht._verifyRecordOnlineAsync(record) | ||
await promisify(cb => dht._verifyRecordOnline(record, cb))() | ||
await dht._verifyRecordOnlineAsync(record) | ||
} catch (err) { | ||
@@ -471,3 +465,3 @@ const errMsg = 'invalid record received, discarded' | ||
async _verifyRecordOnlineAsync (record) { | ||
return promisify(cb => libp2pRecord.validator.verifyRecord(dht.validators, record, cb))() | ||
await promisify(cb => libp2pRecord.validator.verifyRecord(dht.validators, record, cb))() | ||
}, | ||
@@ -526,3 +520,3 @@ | ||
const provs = await promisify(cb => dht.providers.getProviders(key, cb))() | ||
const provs = await dht.providers.getProviders(key) | ||
@@ -553,3 +547,3 @@ provs.forEach((id) => { | ||
// Here we return the query function to use on this particular disjoint path | ||
return callbackify(async (peer) => { | ||
return async (peer) => { | ||
const msg = await dht._findProvidersSingleAsync(peer, key) | ||
@@ -570,3 +564,3 @@ const provs = msg.providerPeers | ||
return { closerPeers: msg.closerPeers } | ||
}) | ||
} | ||
}) | ||
@@ -581,3 +575,3 @@ | ||
} catch (err) { | ||
if (err.code !== 'ETIMEDOUT' || out.length === 0) { | ||
if (err.code !== 'ETIMEDOUT') { | ||
throw err | ||
@@ -596,2 +590,6 @@ } | ||
if (out.length === 0) { | ||
throw errcode(new Error('no providers found'), 'ERR_NOT_FOUND') | ||
} | ||
return out.toArray() | ||
@@ -598,0 +596,0 @@ }, |
@@ -5,7 +5,7 @@ 'use strict' | ||
const varint = require('varint') | ||
const each = require('async/each') | ||
const pull = require('pull-stream') | ||
const CID = require('cids') | ||
const PeerId = require('peer-id') | ||
const Key = require('interface-datastore').Key | ||
const Queue = require('p-queue') | ||
const promisify = require('promisify-es6') | ||
const toIterator = require('pull-stream-to-async-iterator') | ||
@@ -60,2 +60,4 @@ const c = require('./constants') | ||
this.providers = cache(this.lruCacheSize) | ||
this.syncQueue = new Queue({ concurrency: 1 }) | ||
} | ||
@@ -76,6 +78,5 @@ | ||
/** | ||
* Check all providers if they are still valid, and if not | ||
* delete them. | ||
* Check all providers if they are still valid, and if not delete them. | ||
* | ||
* @returns {undefined} | ||
* @returns {Promise} | ||
* | ||
@@ -85,33 +86,59 @@ * @private | ||
_cleanup () { | ||
this._getProviderCids((err, cids) => { | ||
if (err) { | ||
return this._log.error('Failed to get cids', err) | ||
} | ||
return this.syncQueue.add(async () => { | ||
this._log('start cleanup') | ||
const start = Date.now() | ||
each(cids, (cid, cb) => { | ||
this._getProvidersMap(cid, (err, provs) => { | ||
if (err) { | ||
return cb(err) | ||
let count = 0 | ||
let deleteCount = 0 | ||
const deleted = new Map() | ||
const batch = this.datastore.batch() | ||
// Get all provider entries from the datastore | ||
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }) | ||
for await (const entry of toIterator(query)) { | ||
try { | ||
// Add a delete to the batch for each expired entry | ||
const { cid, peerId } = parseProviderKey(entry.key) | ||
const time = readTime(entry.value) | ||
const now = Date.now() | ||
const delta = now - time | ||
const expired = delta > this.provideValidity | ||
this._log('comparing: %d - %d = %d > %d %s', | ||
now, time, delta, this.provideValidity, expired ? '(expired)' : '') | ||
if (expired) { | ||
deleteCount++ | ||
batch.delete(entry.key) | ||
const peers = deleted.get(cid) || new Set() | ||
peers.add(peerId) | ||
deleted.set(cid, peers) | ||
} | ||
count++ | ||
} catch (err) { | ||
this._log.error(err.message) | ||
} | ||
} | ||
this._log('deleting %d / %d entries', deleteCount, count) | ||
provs.forEach((time, provider) => { | ||
this._log('comparing: %s - %s > %s', Date.now(), time, this.provideValidity) | ||
if (Date.now() - time > this.provideValidity) { | ||
provs.delete(provider) | ||
} | ||
}) | ||
// Commit the deletes to the datastore | ||
if (deleted.size) { | ||
await promisify(cb => batch.commit(cb))() | ||
} | ||
// Clear expired entries from the cache | ||
for (const [cid, peers] of deleted) { | ||
const key = makeProviderKey(cid) | ||
const provs = this.providers.get(key) | ||
if (provs) { | ||
for (const peerId of peers) { | ||
provs.delete(peerId) | ||
} | ||
if (provs.size === 0) { | ||
return this._deleteProvidersMap(cid, cb) | ||
this.providers.remove(key) | ||
} else { | ||
this.providers.set(key, provs) | ||
} | ||
cb() | ||
}) | ||
}, (err) => { | ||
if (err) { | ||
return this._log.error('Failed to cleanup', err) | ||
} | ||
} | ||
this._log('Cleanup successfull') | ||
}) | ||
this._log('Cleanup successful (%dms)', Date.now() - start) | ||
}) | ||
@@ -121,89 +148,19 @@ } | ||
/** | ||
* Get a list of all cids that providers are known for. | ||
* Get the currently known provider peer ids for a given CID. | ||
* | ||
* @param {function(Error, Array<CID>)} callback | ||
* @returns {undefined} | ||
* | ||
* @private | ||
*/ | ||
_getProviderCids (callback) { | ||
pull( | ||
this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }), | ||
pull.map((entry) => { | ||
const parts = entry.key.toString().split('/') | ||
if (parts.length !== 4) { | ||
this._log.error('incorrectly formatted provider entry in datastore: %s', entry.key) | ||
return | ||
} | ||
let decoded | ||
try { | ||
decoded = utils.decodeBase32(parts[2]) | ||
} catch (err) { | ||
this._log.error('error decoding base32 provider key: %s', parts[2]) | ||
return | ||
} | ||
let cid | ||
try { | ||
cid = new CID(decoded) | ||
} catch (err) { | ||
this._log.error('error converting key to cid from datastore: %s', err.message) | ||
} | ||
return cid | ||
}), | ||
pull.filter(Boolean), | ||
pull.collect(callback) | ||
) | ||
} | ||
/** | ||
* Get the currently known provider maps for a given CID. | ||
* | ||
* @param {CID} cid | ||
* @param {function(Error, Map<PeerId, Date>)} callback | ||
* @returns {undefined} | ||
* @returns {Promise<Map<String, Date>>} | ||
* | ||
* @private | ||
*/ | ||
_getProvidersMap (cid, callback) { | ||
const provs = this.providers.get(makeProviderKey(cid)) | ||
async _getProvidersMap (cid) { | ||
const cacheKey = makeProviderKey(cid) | ||
let provs = this.providers.get(cacheKey) | ||
if (!provs) { | ||
return loadProviders(this.datastore, cid, callback) | ||
provs = await loadProviders(this.datastore, cid) | ||
this.providers.set(cacheKey, provs) | ||
} | ||
callback(null, provs) | ||
return provs | ||
} | ||
/** | ||
* Completely remove a providers map entry for a given CID. | ||
* | ||
* @param {CID} cid | ||
* @param {function(Error)} callback | ||
* @returns {undefined} | ||
* | ||
* @private | ||
*/ | ||
_deleteProvidersMap (cid, callback) { | ||
const dsKey = makeProviderKey(cid) | ||
this.providers.set(dsKey, null) | ||
const batch = this.datastore.batch() | ||
pull( | ||
this.datastore.query({ | ||
keysOnly: true, | ||
prefix: dsKey | ||
}), | ||
pull.through((entry) => batch.delete(entry.key)), | ||
pull.onEnd((err) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
batch.commit(callback) | ||
}) | ||
) | ||
} | ||
get cleanupInterval () { | ||
@@ -227,32 +184,21 @@ return this._cleanupInterval | ||
/** | ||
* Add a new provider. | ||
* Add a new provider for the given CID. | ||
* | ||
* @param {CID} cid | ||
* @param {PeerId} provider | ||
* @param {function(Error)} callback | ||
* @returns {undefined} | ||
* @returns {Promise} | ||
*/ | ||
addProvider (cid, provider, callback) { | ||
this._log('addProvider %s', cid.toBaseEncodedString()) | ||
const dsKey = makeProviderKey(cid) | ||
const provs = this.providers.get(dsKey) | ||
async addProvider (cid, provider) { | ||
return this.syncQueue.add(async () => { | ||
this._log('addProvider %s', cid.toBaseEncodedString()) | ||
const provs = await this._getProvidersMap(cid) | ||
const next = (err, provs) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
this._log('loaded %s provs', provs.size) | ||
const now = Date.now() | ||
provs.set(provider, now) | ||
provs.set(utils.encodeBase32(provider.id), now) | ||
const dsKey = makeProviderKey(cid) | ||
this.providers.set(dsKey, provs) | ||
writeProviderEntry(this.datastore, cid, provider, now, callback) | ||
} | ||
if (!provs) { | ||
loadProviders(this.datastore, cid, next) | ||
} else { | ||
next(null, provs) | ||
} | ||
return writeProviderEntry(this.datastore, cid, provider, now) | ||
}) | ||
} | ||
@@ -264,13 +210,11 @@ | ||
* @param {CID} cid | ||
* @param {function(Error, Array<PeerId>)} callback | ||
* @returns {undefined} | ||
* @returns {Promise<Array<PeerId>>} | ||
*/ | ||
getProviders (cid, callback) { | ||
this._log('getProviders %s', cid.toBaseEncodedString()) | ||
this._getProvidersMap(cid, (err, provs) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
callback(null, Array.from(provs.keys())) | ||
async getProviders (cid) { | ||
return this.syncQueue.add(async () => { | ||
this._log('getProviders %s', cid.toBaseEncodedString()) | ||
const provs = await this._getProvidersMap(cid) | ||
return [...provs.keys()].map((base32PeerId) => { | ||
return new PeerId(utils.decodeBase32(base32PeerId)) | ||
}) | ||
}) | ||
@@ -283,3 +227,3 @@ } | ||
* | ||
* @param {CID} cid | ||
* @param {CID|string} cid - cid or base32 encoded string | ||
* @returns {string} | ||
@@ -290,3 +234,4 @@ * | ||
function makeProviderKey (cid) { | ||
return c.PROVIDERS_KEY_PREFIX + utils.encodeBase32(cid.buffer) | ||
cid = typeof cid === 'string' ? cid : utils.encodeBase32(cid.buffer) | ||
return c.PROVIDERS_KEY_PREFIX + cid | ||
} | ||
@@ -301,8 +246,7 @@ | ||
* @param {number} time | ||
* @param {function(Error)} callback | ||
* @returns {undefined} | ||
* @returns {Promise} | ||
* | ||
* @private | ||
*/ | ||
function writeProviderEntry (store, cid, peer, time, callback) { | ||
async function writeProviderEntry (store, cid, peer, time) { | ||
const dsKey = [ | ||
@@ -314,32 +258,44 @@ makeProviderKey(cid), | ||
store.put(new Key(dsKey), Buffer.from(varint.encode(time)), callback) | ||
const key = new Key(dsKey) | ||
const buffer = Buffer.from(varint.encode(time)) | ||
return promisify(cb => store.put(key, buffer, cb))() | ||
} | ||
/** | ||
* Load providers from the store. | ||
* Parse the CID and provider peer id from the key | ||
* | ||
* @param {DKey} key | ||
* @returns {Object} object with peer id and cid | ||
* | ||
* @private | ||
*/ | ||
function parseProviderKey (key) { | ||
const parts = key.toString().split('/') | ||
if (parts.length !== 4) { | ||
throw new Error('incorrectly formatted provider entry key in datastore: ' + key) | ||
} | ||
return { | ||
cid: parts[2], | ||
peerId: parts[3] | ||
} | ||
} | ||
/** | ||
* Load providers for the given CID from the store. | ||
* | ||
* @param {Datastore} store | ||
* @param {CID} cid | ||
* @param {function(Error, Map<PeerId, Date>)} callback | ||
* @returns {undefined} | ||
* @returns {Promise<Map<PeerId, Date>>} | ||
* | ||
* @private | ||
*/ | ||
function loadProviders (store, cid, callback) { | ||
pull( | ||
store.query({ prefix: makeProviderKey(cid) }), | ||
pull.map((entry) => { | ||
const parts = entry.key.toString().split('/') | ||
const lastPart = parts[parts.length - 1] | ||
const rawPeerId = utils.decodeBase32(lastPart) | ||
return [new PeerId(rawPeerId), readTime(entry.value)] | ||
}), | ||
pull.collect((err, res) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
return callback(null, new Map(res)) | ||
}) | ||
) | ||
async function loadProviders (store, cid) { | ||
const providers = new Map() | ||
const query = store.query({ prefix: makeProviderKey(cid) }) | ||
for await (const entry of toIterator(query)) { | ||
const { peerId } = parseProviderKey(entry.key) | ||
providers.set(peerId, readTime(entry.value)) | ||
} | ||
return providers | ||
} | ||
@@ -346,0 +302,0 @@ |
'use strict' | ||
const timeout = require('async/timeout') | ||
const promisify = require('promisify-es6') | ||
const PeerQueue = require('../peer-queue') | ||
const utils = require('../utils') | ||
@@ -25,4 +24,5 @@ // TODO: Temporary until parallel dial in Switch have a proper | ||
this.run = run | ||
this.queryFunc = timeout(queryFunc, QUERY_FUNC_TIMEOUT) | ||
this.queryFuncAsync = promisify(this.queryFunc) | ||
this.queryFunc = utils.withTimeout(queryFunc, QUERY_FUNC_TIMEOUT) | ||
if (!this.queryFunc) throw new Error('Path requires a `queryFn` to be specified') | ||
if (typeof this.queryFunc !== 'function') throw new Error('Path expected `queryFn` to be a function. Got ' + typeof this.queryFunc) | ||
@@ -29,0 +29,0 @@ /** |
@@ -213,3 +213,3 @@ 'use strict' | ||
try { | ||
res = await this.path.queryFuncAsync(peer) | ||
res = await this.path.queryFunc(peer) | ||
} catch (err) { | ||
@@ -216,0 +216,0 @@ queryError = err |
'use strict' | ||
const times = require('async/times') | ||
const promisify = require('promisify-es6') | ||
const crypto = require('libp2p-crypto') | ||
const waterfall = require('async/waterfall') | ||
const multihashing = require('multihashing-async') | ||
const multihashing = promisify(require('multihashing-async')) | ||
const PeerId = require('peer-id') | ||
const assert = require('assert') | ||
const AbortController = require('abort-controller') | ||
const errcode = require('err-code') | ||
const times = require('p-times') | ||
const c = require('./constants') | ||
const { logger } = require('./utils') | ||
const AbortController = require('abort-controller') | ||
const errcode = require('err-code') | ||
class RandomWalk { | ||
@@ -32,2 +31,3 @@ /** | ||
this.log = logger(dht.peerInfo.id, 'random-walk') | ||
this._timeoutId = undefined | ||
} | ||
@@ -49,6 +49,3 @@ | ||
// Start runner immediately | ||
this._runPeriodically((done) => { | ||
// Each subsequent walk should run on a `this._options.interval` interval | ||
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval)) | ||
}, 0) | ||
this._runPeriodically() | ||
}, this._options.delay) | ||
@@ -64,4 +61,6 @@ } | ||
stop () { | ||
clearTimeout(this._timeoutId) | ||
this._timeoutId = null | ||
if (this._timeoutId) { | ||
clearTimeout(this._timeoutId) | ||
this._timeoutId = undefined | ||
} | ||
this._controller && this._controller.abort() | ||
@@ -71,15 +70,19 @@ } | ||
/** | ||
* Run function `walk` on every `interval` ms | ||
* @param {function(callback)} walk The function to execute on `interval` | ||
* @param {number} interval The interval to run on in ms | ||
* Run function `randomWalk._walk` on every `options.interval` ms | ||
* | ||
* @private | ||
*/ | ||
_runPeriodically (walk, interval) { | ||
this._timeoutId = setTimeout(() => { | ||
walk((nextInterval) => { | ||
// Schedule next | ||
this._runPeriodically(walk, nextInterval) | ||
async _runPeriodically () { | ||
// run until the walk has been stopped | ||
while (this._timeoutId) { | ||
try { | ||
await this._walk(this._options.queriesPerPeriod, this._options.timeout) | ||
} catch (err) { | ||
this._kadDHT._log.error('random-walk:error', err) | ||
} | ||
// Each subsequent walk should run on a `this._options.interval` interval | ||
await new Promise(resolve => { | ||
this._timeoutId = setTimeout(resolve, this._options.interval) | ||
}) | ||
}, interval) | ||
} | ||
} | ||
@@ -92,40 +95,36 @@ | ||
* @param {number} walkTimeout | ||
* @param {function(Error)} callback | ||
* @returns {void} | ||
* @returns {Promise} | ||
* | ||
* @private | ||
*/ | ||
_walk (queries, walkTimeout, callback) { | ||
async _walk (queries, walkTimeout) { | ||
this.log('start') | ||
this._controller = new AbortController() | ||
times(queries, (i, next) => { | ||
this.log('running query %d', i) | ||
try { | ||
await times(queries, async (index) => { | ||
this.log('running query %d', index) | ||
try { | ||
const id = await this._randomPeerId() | ||
// Perform the walk | ||
waterfall([ | ||
(cb) => this._randomPeerId(cb), | ||
(id, cb) => { | ||
// Check if we've happened to already abort | ||
if (!this._controller) return cb() | ||
if (!this._controller) return | ||
this._query(id, { | ||
await this._query(id, { | ||
timeout: walkTimeout, | ||
signal: this._controller.signal | ||
}, cb) | ||
}) | ||
} catch (err) { | ||
if (err && err.code !== 'ETIMEDOUT') { | ||
this.log.error('query %d finished with error', index, err) | ||
throw err | ||
} | ||
} | ||
], (err) => { | ||
if (err && err.code !== 'ETIMEDOUT') { | ||
this.log.error('query %d finished with error', i, err) | ||
return next(err) | ||
} | ||
this.log('finished query %d', i) | ||
next(null) | ||
this.log('finished query %d', index) | ||
}) | ||
}, (err) => { | ||
} finally { | ||
this._controller = null | ||
this.log('finished queries') | ||
callback(err) | ||
}) | ||
} | ||
} | ||
@@ -147,23 +146,25 @@ | ||
* @param {AbortControllerSignal} options.signal | ||
* @param {function(Error)} callback | ||
* @returns {void} | ||
* @returns {Promise} | ||
* | ||
* @private | ||
*/ | ||
_query (id, options, callback) { | ||
async _query (id, options) { | ||
this.log('query:%s', id.toB58String()) | ||
this._kadDHT.findPeer(id, options, (err, peer) => { | ||
let peer | ||
try { | ||
peer = await promisify(cb => this._kadDHT.findPeer(id, options, cb))() | ||
} catch (err) { | ||
if (err && err.code === 'ERR_NOT_FOUND') { | ||
// expected case, we asked for random stuff after all | ||
return callback() | ||
return | ||
} | ||
if (err) { | ||
return callback(err) | ||
} | ||
this.log('query:found', peer) | ||
// wait what, there was something found? Lucky day! | ||
callback(errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER')) | ||
}) | ||
throw err | ||
} | ||
this.log('query:found', peer) | ||
// wait what, there was something found? Lucky day! | ||
throw errcode(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`, 'ERR_FOUND_RANDOM_PEER') | ||
} | ||
@@ -174,14 +175,9 @@ | ||
* | ||
* @param {function(Error, PeerId)} callback | ||
* @returns {void} | ||
* @returns {Promise<PeerId>} | ||
* | ||
* @private | ||
*/ | ||
_randomPeerId (callback) { | ||
multihashing(crypto.randomBytes(16), 'sha2-256', (err, digest) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
callback(null, new PeerId(digest)) | ||
}) | ||
async _randomPeerId () { | ||
const digest = await multihashing(crypto.randomBytes(16), 'sha2-256') | ||
return new PeerId(digest) | ||
} | ||
@@ -188,0 +184,0 @@ } |
'use strict' | ||
const CID = require('cids') | ||
const utils = require('../../utils') | ||
const errcode = require('err-code') | ||
const promiseToCallback = require('promise-to-callback') | ||
const utils = require('../../utils') | ||
module.exports = (dht) => { | ||
@@ -14,3 +16,3 @@ const log = utils.logger(dht.peerInfo.id, 'rpc:add-provider') | ||
* @param {Message} msg | ||
* @param {function(Error, Message)} callback | ||
* @param {function(Error)} callback | ||
* @returns {undefined} | ||
@@ -52,3 +54,3 @@ */ | ||
dht.peerBook.put(pi) | ||
dht.providers.addProvider(cid, pi.id, callback) | ||
promiseToCallback(dht.providers.addProvider(cid, pi.id))(err => callback(err)) | ||
} | ||
@@ -64,5 +66,5 @@ }) | ||
if (!foundProvider) { | ||
dht.providers.addProvider(cid, peer.id, callback) | ||
promiseToCallback(dht.providers.addProvider(cid, peer.id))(err => callback(err)) | ||
} | ||
} | ||
} |
@@ -6,3 +6,3 @@ 'use strict' | ||
const PeerInfo = require('peer-info') | ||
const promiseToCallback = require('promise-to-callback') | ||
const errcode = require('err-code') | ||
@@ -45,3 +45,3 @@ | ||
}), | ||
(cb) => dht.providers.getProviders(cid, cb), | ||
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), | ||
(cb) => dht._betterPeersToQuery(msg, peer, cb) | ||
@@ -48,0 +48,0 @@ ], (err, res) => { |
@@ -13,2 +13,3 @@ 'use strict' | ||
const PeerId = require('peer-id') | ||
const errcode = require('err-code') | ||
@@ -194,1 +195,30 @@ /** | ||
} | ||
exports.TimeoutError = class TimeoutError extends Error { | ||
get code () { | ||
return 'ETIMEDOUT' | ||
} | ||
} | ||
/** | ||
* Creates an async function that calls the given `asyncFn` and Errors | ||
* if it does not resolve within `time` ms | ||
* | ||
* @param {Function} [asyncFn] | ||
* @param {Number} [time] | ||
* @returns {Function} | ||
* | ||
* @private | ||
*/ | ||
exports.withTimeout = (asyncFn, time) => { | ||
return async (...args) => { | ||
return Promise.race([ | ||
asyncFn(...args), | ||
new Promise((resolve, reject) => { | ||
setTimeout(() => { | ||
reject(errcode(new Error('Async function did not complete before timeout'), 'ETIMEDOUT')) | ||
}, time) | ||
}) | ||
]) | ||
} | ||
} |
@@ -305,3 +305,3 @@ /* eslint-env mocha */ | ||
const dhtD = dhts[3] | ||
const stub = sinon.stub(dhtD, '_verifyRecordLocally').callsArgWithAsync(1, error) | ||
const stub = sinon.stub(dhtD, '_verifyRecordLocallyAsync').rejects(error) | ||
@@ -339,4 +339,4 @@ waterfall([ | ||
const dhtD = dhts[3] | ||
const stub = sinon.stub(dhtD, '_verifyRecordLocally').callsArgWithAsync(1, error) | ||
const stub2 = sinon.stub(dhtC, '_verifyRecordLocally').callsArgWithAsync(1, error) | ||
const stub = sinon.stub(dhtD, '_verifyRecordLocallyAsync').rejects(error) | ||
const stub2 = sinon.stub(dhtC, '_verifyRecordLocallyAsync').rejects(error) | ||
@@ -375,3 +375,3 @@ waterfall([ | ||
const dhtC = dhts[2] | ||
const stub = sinon.stub(dhtC, '_verifyRecordLocally').callsArgWithAsync(1, error) | ||
const stub = sinon.stub(dhtC, '_verifyRecordLocallyAsync').rejects(error) | ||
@@ -483,3 +483,3 @@ waterfall([ | ||
const dhtASpy = sinon.spy(dhtA, '_putValueToPeer') | ||
const dhtASpy = sinon.spy(dhtA, '_putValueToPeerAsync') | ||
@@ -1017,3 +1017,5 @@ series([ | ||
const dht = new KadDHT(sw) | ||
dht.start(() => { | ||
dht.start((err) => { | ||
expect(err).to.not.exist() | ||
const key = Buffer.from('/v/hello') | ||
@@ -1027,5 +1029,3 @@ const value = Buffer.from('world') | ||
// Simulate going out to the network and returning the record | ||
sinon.stub(dht, '_getValueOrPeers').callsFake((peer, k, cb) => { | ||
cb(null, rec) | ||
}) | ||
sinon.stub(dht, '_getValueOrPeersAsync').callsFake(async () => ({ record: rec })) | ||
] | ||
@@ -1079,3 +1079,3 @@ | ||
const dhtB = dhts[1] | ||
const stub = sinon.stub(dhtA, '_getValueOrPeers').callsArgWithAsync(2, error) | ||
const stub = sinon.stub(dhtA, '_getValueOrPeersAsync').rejects(error) | ||
@@ -1108,3 +1108,3 @@ waterfall([ | ||
const dhtB = dhts[1] | ||
const stub = sinon.stub(dhtA, '_getValueOrPeers').callsArgWithAsync(2, error) | ||
const stub = sinon.stub(dhtA, '_getValueOrPeersAsync').rejects(error) | ||
@@ -1111,0 +1111,0 @@ waterfall([ |
@@ -42,2 +42,18 @@ /* eslint-env mocha */ | ||
describe('withTimeout', () => { | ||
it('rejects with the error in the original function', async () => { | ||
const original = async () => { throw new Error('explode') } | ||
const asyncFn = utils.withTimeout(original, 100) | ||
let err | ||
try { | ||
await asyncFn() | ||
} catch (_err) { | ||
err = _err | ||
} | ||
expect(err).to.exist() | ||
expect(err.message).to.include('explode') | ||
}) | ||
}) | ||
describe('sortClosestPeers', () => { | ||
@@ -44,0 +60,0 @@ it('sorts a list of PeerInfos', (done) => { |
@@ -7,90 +7,84 @@ /* eslint-env mocha */ | ||
const expect = chai.expect | ||
const promisify = require('promisify-es6') | ||
const Store = require('interface-datastore').MemoryDatastore | ||
const parallel = require('async/parallel') | ||
const waterfall = require('async/waterfall') | ||
const CID = require('cids') | ||
const multihashing = require('multihashing-async') | ||
const map = require('async/map') | ||
const timesSeries = require('async/timesSeries') | ||
const each = require('async/each') | ||
const eachSeries = require('async/eachSeries') | ||
const range = require('lodash.range') | ||
const LevelStore = require('datastore-level') | ||
const path = require('path') | ||
const os = require('os') | ||
const multihashing = promisify(require('multihashing-async')) | ||
const Providers = require('../src/providers') | ||
const createPeerInfo = require('./utils/create-peer-info') | ||
const createValues = require('./utils/create-values') | ||
const createPeerInfo = promisify(require('./utils/create-peer-info')) | ||
const createValues = promisify(require('./utils/create-values')) | ||
describe('Providers', () => { | ||
let infos | ||
let providers | ||
before(function (done) { | ||
before(async function () { | ||
this.timeout(10 * 1000) | ||
createPeerInfo(3, (err, peers) => { | ||
if (err) { | ||
return done(err) | ||
} | ||
infos = await createPeerInfo(3) | ||
}) | ||
infos = peers | ||
done() | ||
}) | ||
afterEach(() => { | ||
providers && providers.stop() | ||
}) | ||
it('simple add and get of providers', (done) => { | ||
const providers = new Providers(new Store(), infos[2].id) | ||
it('simple add and get of providers', async () => { | ||
providers = new Providers(new Store(), infos[2].id) | ||
const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') | ||
parallel([ | ||
(cb) => providers.addProvider(cid, infos[0].id, cb), | ||
(cb) => providers.addProvider(cid, infos[1].id, cb) | ||
], (err) => { | ||
expect(err).to.not.exist() | ||
providers.getProviders(cid, (err, provs) => { | ||
expect(err).to.not.exist() | ||
expect(provs).to.be.eql([infos[0].id, infos[1].id]) | ||
providers.stop() | ||
await Promise.all([ | ||
providers.addProvider(cid, infos[0].id), | ||
providers.addProvider(cid, infos[1].id) | ||
]) | ||
done() | ||
}) | ||
}) | ||
const provs = await providers.getProviders(cid) | ||
const ids = new Set(provs.map((peerId) => peerId.toB58String())) | ||
expect(ids.has(infos[0].id.toB58String())).to.be.eql(true) | ||
expect(ids.has(infos[1].id.toB58String())).to.be.eql(true) | ||
}) | ||
it('more providers than space in the lru cache', (done) => { | ||
const providers = new Providers(new Store(), infos[2].id, 10) | ||
it('duplicate add of provider is deduped', async () => { | ||
providers = new Providers(new Store(), infos[2].id) | ||
waterfall([ | ||
(cb) => map( | ||
range(100), | ||
(i, cb) => multihashing(Buffer.from(`hello ${i}`), 'sha2-256', cb), | ||
cb | ||
), | ||
(hashes, cb) => { | ||
const cids = hashes.map((h) => new CID(h)) | ||
const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') | ||
map(cids, (cid, cb) => { | ||
providers.addProvider(cid, infos[0].id, cb) | ||
}, (err) => cb(err, cids)) | ||
}, | ||
(cids, cb) => { | ||
map(cids, (cid, cb) => { | ||
providers.getProviders(cid, cb) | ||
}, (err, provs) => { | ||
expect(err).to.not.exist() | ||
expect(provs).to.have.length(100) | ||
provs.forEach((p) => { | ||
expect(p[0].id).to.be.eql(infos[0].id.id) | ||
}) | ||
providers.stop() | ||
cb() | ||
}) | ||
} | ||
], done) | ||
await Promise.all([ | ||
providers.addProvider(cid, infos[0].id), | ||
providers.addProvider(cid, infos[0].id), | ||
providers.addProvider(cid, infos[1].id), | ||
providers.addProvider(cid, infos[1].id), | ||
providers.addProvider(cid, infos[1].id) | ||
]) | ||
const provs = await providers.getProviders(cid) | ||
expect(provs).to.have.length(2) | ||
const ids = new Set(provs.map((peerId) => peerId.toB58String())) | ||
expect(ids.has(infos[0].id.toB58String())).to.be.eql(true) | ||
expect(ids.has(infos[1].id.toB58String())).to.be.eql(true) | ||
}) | ||
it('expires', (done) => { | ||
const providers = new Providers(new Store(), infos[2].id) | ||
it('more providers than space in the lru cache', async () => { | ||
providers = new Providers(new Store(), infos[2].id, 10) | ||
const hashes = await Promise.all([...new Array(100)].map((i) => { | ||
return multihashing(Buffer.from(`hello ${i}`), 'sha2-256') | ||
})) | ||
const cids = hashes.map((h) => new CID(h)) | ||
await Promise.all(cids.map(cid => providers.addProvider(cid, infos[0].id))) | ||
const provs = await Promise.all(cids.map(cid => providers.getProviders(cid))) | ||
expect(provs).to.have.length(100) | ||
for (const p of provs) { | ||
expect(p[0].id).to.be.eql(infos[0].id.id) | ||
} | ||
}) | ||
it('expires', async () => { | ||
providers = new Providers(new Store(), infos[2].id) | ||
providers.cleanupInterval = 100 | ||
@@ -100,29 +94,21 @@ providers.provideValidity = 200 | ||
const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') | ||
parallel([ | ||
(cb) => providers.addProvider(cid, infos[0].id, cb), | ||
(cb) => providers.addProvider(cid, infos[1].id, cb) | ||
], (err) => { | ||
expect(err).to.not.exist() | ||
await Promise.all([ | ||
providers.addProvider(cid, infos[0].id), | ||
providers.addProvider(cid, infos[1].id) | ||
]) | ||
providers.getProviders(cid, (err, provs) => { | ||
expect(err).to.not.exist() | ||
expect(provs).to.have.length(2) | ||
expect(provs[0].id).to.be.eql(infos[0].id.id) | ||
expect(provs[1].id).to.be.eql(infos[1].id.id) | ||
}) | ||
const provs = await providers.getProviders(cid) | ||
setTimeout(() => { | ||
providers.getProviders(cid, (err, provs) => { | ||
expect(err).to.not.exist() | ||
expect(provs).to.have.length(0) | ||
providers.stop() | ||
done() | ||
}) | ||
// TODO: this is a timeout based check, make cleanup monitorable | ||
}, 400) | ||
}) | ||
expect(provs).to.have.length(2) | ||
expect(provs[0].id).to.be.eql(infos[0].id.id) | ||
expect(provs[1].id).to.be.eql(infos[1].id.id) | ||
await new Promise(resolve => setTimeout(resolve, 400)) | ||
const provsAfter = await providers.getProviders(cid) | ||
expect(provsAfter).to.have.length(0) | ||
}) | ||
// slooow so only run when you need to | ||
it.skip('many', (done) => { | ||
it.skip('many', async function () { | ||
const p = path.join( | ||
@@ -132,40 +118,34 @@ os.tmpdir(), (Math.random() * 100).toString() | ||
const store = new LevelStore(p) | ||
const providers = new Providers(store, infos[2].id, 10) | ||
providers = new Providers(store, infos[2].id, 10) | ||
console.log('starting') | ||
waterfall([ | ||
(cb) => parallel([ | ||
(cb) => createValues(100, cb), | ||
(cb) => createPeerInfo(600, cb) | ||
], cb), | ||
(res, cb) => { | ||
console.log('got values and peers') | ||
const values = res[0] | ||
const peers = res[1] | ||
let total = Date.now() | ||
eachSeries(values, (v, cb) => { | ||
eachSeries(peers, (p, cb) => { | ||
providers.addProvider(v.cid, p.id, cb) | ||
}, cb) | ||
}, (err) => { | ||
console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) | ||
expect(err).to.not.exist() | ||
console.log('starting profile with %s peers and %s cids', peers.length, values.length) | ||
timesSeries(3, (i, cb) => { | ||
const start = Date.now() | ||
each(values, (v, cb) => { | ||
providers.getProviders(v.cid, cb) | ||
}, (err) => { | ||
expect(err).to.not.exist() | ||
console.log('query %sms', (Date.now() - start)) | ||
cb() | ||
}) | ||
}, cb) | ||
}) | ||
const res = await Promise.all([ | ||
createValues(100), | ||
createPeerInfo(600) | ||
]) | ||
console.log('got values and peers') | ||
const values = res[0] | ||
const peers = res[1] | ||
let total = Date.now() | ||
for (const v of values) { | ||
for (const p of peers) { | ||
await providers.addProvider(v.cid, p.id) | ||
} | ||
], (err) => { | ||
expect(err).to.not.exist() | ||
store.close(done) | ||
}) | ||
} | ||
console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) | ||
console.log('starting profile with %s peers and %s cids', peers.length, values.length) | ||
for (let i = 0; i < 3; i++) { | ||
const start = Date.now() | ||
for (const v of values) { | ||
await providers.getProviders(v.cid) | ||
console.log('query %sms', (Date.now() - start)) | ||
} | ||
} | ||
await store.close() | ||
}) | ||
}) |
@@ -11,4 +11,4 @@ /* eslint-env mocha */ | ||
const Mplex = require('libp2p-mplex') | ||
const setImmediate = require('async/setImmediate') | ||
const promiseToCallback = require('promise-to-callback') | ||
const promisify = require('promisify-es6') | ||
@@ -61,18 +61,18 @@ const DHT = require('../src') | ||
let i = 0 | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
if (i++ === 1) { | ||
expect(p.id).to.eql(peerInfos[2].id.id) | ||
return cb(null, { | ||
return { | ||
value: Buffer.from('cool'), | ||
pathComplete: true | ||
}) | ||
} | ||
} | ||
expect(p.id).to.eql(peerInfos[1].id.id) | ||
cb(null, { | ||
return { | ||
closerPeers: [peerInfos[2]] | ||
}) | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { | ||
@@ -95,15 +95,15 @@ expect(err).to.not.exist() | ||
const visited = [] | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
visited.push(p) | ||
if (i++ === 1) { | ||
return cb(new Error('fail')) | ||
throw new Error('fail') | ||
} | ||
cb(null, { | ||
return { | ||
closerPeers: [peerInfos[2]] | ||
}) | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { | ||
@@ -132,5 +132,5 @@ expect(err).not.to.exist() | ||
const query = (p, cb) => cb(new Error('fail')) | ||
const queryFunc = async (p) => { throw new Error('fail') } | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { | ||
@@ -146,7 +146,6 @@ expect(err).to.exist() | ||
const query = (p, cb) => {} | ||
const queryFunc = async (p) => {} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([]))((err, res) => { | ||
if (err) console.error(err) | ||
expect(err).to.not.exist() | ||
@@ -168,9 +167,9 @@ | ||
const query = (p, cb) => { | ||
cb(null, { | ||
const queryFunc = async (p) => { | ||
return { | ||
closerPeers: [peerInfos[2]] | ||
}) | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { | ||
@@ -217,10 +216,10 @@ expect(err).to.not.exist() | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
const closer = topology[p.toB58String()] | ||
cb(null, { | ||
return { | ||
closerPeers: closer || [] | ||
}) | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id, peerInfos[2].id, peerInfos[3].id]))((err, res) => { | ||
@@ -257,12 +256,12 @@ expect(err).to.not.exist() | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
const res = topology[p.toB58String()] || {} | ||
cb(null, { | ||
return { | ||
closerPeers: res.closer || [], | ||
value: res.value, | ||
pathComplete: res.pathComplete | ||
}) | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { | ||
@@ -308,10 +307,13 @@ expect(err).to.not.exist() | ||
const visited = [] | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
visited.push(p) | ||
const invokeCb = () => { | ||
const getResult = async () => { | ||
const res = topology[p.toB58String()] || {} | ||
cb(null, { | ||
// this timeout is necesary so `dhtA.stop` has time to stop the | ||
// requests before they all complete | ||
await new Promise(resolve => setTimeout(resolve, 100)) | ||
return { | ||
closerPeers: res.closer || [] | ||
}) | ||
} | ||
} | ||
@@ -321,10 +323,10 @@ | ||
if (p.toB58String() === peerInfos[2].id.toB58String()) { | ||
dhtA.stop(invokeCb) | ||
await promisify(cb => dhtA.stop(cb)) | ||
setTimeout(checkExpectations, 100) | ||
} else { | ||
invokeCb() | ||
return getResult() | ||
} | ||
return getResult() | ||
} | ||
const q = new Query(dhtA, peer.id.id, () => query) | ||
const q = new Query(dhtA, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { | ||
@@ -364,10 +366,10 @@ expect(err).to.not.exist() | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
const res = topology[p.toB58String()] || {} | ||
cb(null, { | ||
return { | ||
closerPeers: res.closer || [] | ||
}) | ||
} | ||
} | ||
const q = new Query(dhtA, peer.id.id, () => query) | ||
const q = new Query(dhtA, peer.id.id, () => queryFunc) | ||
@@ -422,14 +424,13 @@ dhtA.stop(() => { | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
const res = topology[p.toB58String()] || {} | ||
setTimeout(() => { | ||
cb(null, { | ||
closerPeers: res.closer || [], | ||
value: res.value, | ||
pathComplete: res.pathComplete | ||
}) | ||
}, res.delay) | ||
await new Promise(resolve => setTimeout(resolve, res.delay)) | ||
return { | ||
closerPeers: res.closer || [], | ||
value: res.value, | ||
pathComplete: res.pathComplete | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => { | ||
@@ -489,17 +490,16 @@ expect(err).to.not.exist() | ||
const visited = [] | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
visited.push(p) | ||
const res = topology[p.toB58String()] || {} | ||
setTimeout(() => { | ||
cb(null, { | ||
closerPeers: res.closer || [], | ||
value: res.value, | ||
pathComplete: res.pathComplete, | ||
queryComplete: res.queryComplete | ||
}) | ||
}, res.delay) | ||
await new Promise(resolve => setTimeout(resolve, res.delay)) | ||
return { | ||
closerPeers: res.closer || [], | ||
value: res.value, | ||
pathComplete: res.pathComplete, | ||
queryComplete: res.queryComplete | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => { | ||
@@ -567,19 +567,18 @@ expect(err).to.not.exist() | ||
const visited = [] | ||
const query = (p, cb) => { | ||
const queryFunc = async (p) => { | ||
visited.push(p) | ||
const res = topology[p.toB58String()] || {} | ||
setTimeout(() => { | ||
if (res.error) { | ||
return cb(new Error('path error')) | ||
} | ||
cb(null, { | ||
closerPeers: res.closer || [], | ||
value: res.value, | ||
pathComplete: res.pathComplete | ||
}) | ||
}, res.delay) | ||
await new Promise(resolve => setTimeout(resolve, res.delay)) | ||
if (res.error) { | ||
throw new Error('path error') | ||
} | ||
return { | ||
closerPeers: res.closer || [], | ||
value: res.value, | ||
pathComplete: res.pathComplete | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => { | ||
@@ -657,3 +656,3 @@ expect(err).to.not.exist() | ||
const visited = [] | ||
const query = (peerId, cb) => { | ||
const queryFunc = async (peerId) => { | ||
visited.push(peerId) | ||
@@ -663,6 +662,6 @@ const i = peerIndex(peerId) | ||
const closerPeers = closerIndexes.map(j => peerIdToInfo(sorted[j])) | ||
setTimeout(() => cb(null, { closerPeers })) | ||
return { closerPeers } | ||
} | ||
const q = new Query(dht, peerInfos[0].id.id, () => query) | ||
const q = new Query(dht, peerInfos[0].id.id, () => queryFunc) | ||
promiseToCallback(q.run(initial))((err, res) => { | ||
@@ -726,3 +725,3 @@ expect(err).to.not.exist() | ||
const q = new Query(dht, targetId, (trackNum) => { | ||
return (p, cb) => { | ||
return async (p) => { | ||
const response = getResponse(p, trackNum) | ||
@@ -737,3 +736,3 @@ expect(response).to.exist() // or we aren't on the right track | ||
} | ||
setImmediate(() => cb(null, response)) | ||
return response | ||
} | ||
@@ -763,9 +762,9 @@ }) | ||
const query = (p, cb) => { | ||
cb(null, { | ||
const queryFunc = async (p) => { | ||
return { | ||
closerPeers: [peerInfos[2]] | ||
}) | ||
} | ||
} | ||
const q = new Query(dht, peer.id.id, () => query) | ||
const q = new Query(dht, peer.id.id, () => queryFunc) | ||
promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { | ||
@@ -772,0 +771,0 @@ expect(err).to.not.exist() |
@@ -70,3 +70,3 @@ /* eslint-env mocha */ | ||
sinon.stub(dht._queryManager, 'running').value(true) | ||
let querySpy = sinon.stub().callsArgWith(1, null, {}) | ||
const querySpy = sinon.stub().resolves({}) | ||
@@ -126,3 +126,3 @@ let query = new Query(dht, targetKey.key, () => querySpy) | ||
let querySpy = sinon.stub().callsArgWith(1, null, {}) | ||
const querySpy = sinon.stub().resolves({}) | ||
let query = new Query(dht, targetKey.key, () => querySpy) | ||
@@ -152,4 +152,7 @@ | ||
// return peers 16 - 19 | ||
querySpy.onCall(1).callsFake((_, cb) => { | ||
setTimeout(() => cb(null, { closerPeers: returnPeers }), 10) | ||
querySpy.onCall(1).callsFake(async () => { | ||
// this timeout ensures the queries finish in serial | ||
// see https://github.com/libp2p/js-libp2p-kad-dht/pull/121#discussion_r286437978 | ||
await new Promise(resolve => setTimeout(resolve, 10)) | ||
return { closerPeers: returnPeers } | ||
}) | ||
@@ -156,0 +159,0 @@ |
@@ -57,13 +57,10 @@ /* eslint-env mocha */ | ||
it('should be able to specify the number of queries', (done) => { | ||
it('should be able to specify the number of queries', async () => { | ||
const queries = 5 | ||
sinon.stub(randomWalk, '_query').callsArgWith(2, null) | ||
randomWalk._walk(queries, 1e3, (err) => { | ||
expect(err).to.not.exist() | ||
expect(randomWalk._query.callCount).to.eql(queries) | ||
done() | ||
}) | ||
sinon.stub(randomWalk, '_query').resolves(null) | ||
await randomWalk._walk(queries, 1e3) | ||
expect(randomWalk._query.callCount).to.eql(queries) | ||
}) | ||
it('should stop walking if a query errors', (done) => { | ||
it('should NOT stop walking if a query errors', async () => { | ||
const queries = 5 | ||
@@ -75,60 +72,57 @@ const error = new Error('ERR_BOOM') | ||
randomWalk._walk(queries, 1e3, (err) => { | ||
expect(err).to.eql(error) | ||
// 2 successes and error on the 3rd | ||
expect(findPeerStub.callCount).to.eql(3) | ||
done() | ||
}) | ||
let err | ||
try { | ||
await randomWalk._walk(queries, 1e3) | ||
} catch (_err) { | ||
err = _err | ||
} | ||
expect(err.message).to.include('ERR_BOOM') | ||
expect(findPeerStub.callCount).to.eql(5) | ||
}) | ||
it('should ignore timeout errors and keep walking', (done) => { | ||
it('should ignore timeout errors and keep walking', async () => { | ||
const queries = 5 | ||
const _queryStub = sinon.stub(randomWalk, '_query') | ||
_queryStub.onCall(2).callsArgWith(2, { | ||
code: 'ETIMEDOUT' | ||
}) | ||
_queryStub.callsArgWith(2, null) | ||
_queryStub.onCall(2).rejects({ code: 'ETIMEDOUT' }) | ||
_queryStub.resolves(null) | ||
randomWalk._walk(queries, 1e3, (err) => { | ||
expect(err).to.not.exist() | ||
expect(randomWalk._query.callCount).to.eql(queries) | ||
done() | ||
}) | ||
await randomWalk._walk(queries, 1e3) | ||
expect(randomWalk._query.callCount).to.eql(queries) | ||
}) | ||
it('should pass its timeout to the find peer query', (done) => { | ||
it('should pass its timeout to the find peer query', async () => { | ||
sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' }) | ||
randomWalk._walk(1, 111, (err) => { | ||
const mockCalls = randomWalk._kadDHT.findPeer.getCalls() | ||
expect(err).to.not.exist() | ||
expect(mockCalls).to.have.length(1) | ||
expect(mockCalls[0].args[1]).to.include({ | ||
timeout: 111 | ||
}) | ||
done() | ||
}) | ||
await randomWalk._walk(1, 111) | ||
const mockCalls = randomWalk._kadDHT.findPeer.getCalls() | ||
expect(mockCalls).to.have.length(1) | ||
expect(mockCalls[0].args[1]).to.include({ timeout: 111 }) | ||
}) | ||
it('should error if the random id peer is found', (done) => { | ||
it('should error if the random id peer is found', async () => { | ||
const queries = 5 | ||
const findPeerStub = sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' }) | ||
findPeerStub.onCall(2).callsArgWith(2, null, { | ||
id: 'QmB' | ||
}) | ||
findPeerStub.onCall(2).callsArgWith(2, null, { id: 'QmB' }) | ||
randomWalk._walk(queries, 1e3, (err) => { | ||
expect(err).to.exist() | ||
expect(findPeerStub.callCount).to.eql(3) | ||
done() | ||
}) | ||
let err | ||
try { | ||
await randomWalk._walk(queries, 1e3) | ||
} catch (_err) { | ||
err = _err | ||
} | ||
expect(err).to.exist() | ||
expect(findPeerStub.callCount).to.eql(5) | ||
}) | ||
it('should error if random id generation errors', (done) => { | ||
it('should error if random id generation errors', async () => { | ||
const error = new Error('ERR_BOOM') | ||
sinon.stub(randomWalk, '_randomPeerId').callsArgWith(0, error) | ||
randomWalk._walk(1, 1e3, (err) => { | ||
expect(err).to.eql(error) | ||
done() | ||
}) | ||
sinon.stub(randomWalk, '_randomPeerId').rejects(error) | ||
let err | ||
try { | ||
await randomWalk._walk(1, 1e3) | ||
} catch (_err) { | ||
err = _err | ||
} | ||
expect(err).to.eql(error) | ||
}) | ||
@@ -146,3 +140,3 @@ }) | ||
sinon.stub(randomWalk, '_walk').callsFake(() => { | ||
sinon.stub(randomWalk, '_walk').callsFake(async () => { | ||
// Try to start again | ||
@@ -186,3 +180,3 @@ randomWalk.start() | ||
const randomWalk = new RandomWalk(mockDHT, options) | ||
sinon.stub(randomWalk, '_walk').callsFake((queries, timeout) => { | ||
sinon.stub(randomWalk, '_walk').callsFake(async (queries, timeout) => { | ||
expect(queries).to.eql(options.queriesPerPeriod) | ||
@@ -207,3 +201,3 @@ expect(timeout).to.eql(options.timeout) | ||
expect(opts.timeout).to.eql(options.timeout).mark() | ||
callback(error) | ||
setTimeout(() => callback(error), 100) | ||
}) | ||
@@ -232,3 +226,3 @@ | ||
it('should cancel the timer if the walk is not active', () => { | ||
it('should not be running if the walk is not active', () => { | ||
const randomWalk = new RandomWalk(mockDHT, { | ||
@@ -242,3 +236,3 @@ enabled: true, | ||
randomWalk.stop() | ||
expect(randomWalk._timeoutId).to.not.exist() | ||
expect(randomWalk._timeoutId).to.eql(undefined) | ||
}) | ||
@@ -245,0 +239,0 @@ |
@@ -11,2 +11,3 @@ /* eslint-env mocha */ | ||
const _ = require('lodash') | ||
const promiseToCallback = require('promise-to-callback') | ||
@@ -86,3 +87,3 @@ const Message = require('../../../src/message') | ||
(cb) => handler(dht)(sender, msg, cb), | ||
(cb) => dht.providers.getProviders(cid, cb), | ||
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), | ||
(provs, cb) => { | ||
@@ -111,3 +112,3 @@ expect(provs).to.have.length(1) | ||
(cb) => handler(dht)(sender, msg, cb), | ||
(cb) => dht.providers.getProviders(cid, cb), | ||
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), | ||
(provs, cb) => { | ||
@@ -114,0 +115,0 @@ expect(dht.peerBook.has(provider.id)).to.equal(false) |
@@ -9,2 +9,3 @@ /* eslint-env mocha */ | ||
const waterfall = require('async/waterfall') | ||
const promiseToCallback = require('promise-to-callback') | ||
@@ -93,3 +94,3 @@ const Message = require('../../../src/message') | ||
(cb) => dht._add(closer, cb), | ||
(cb) => dht.providers.addProvider(v.cid, prov, cb), | ||
(cb) => promiseToCallback(dht.providers.addProvider(v.cid, prov))(err => cb(err)), | ||
(cb) => handler(dht)(peers[0], msg, cb) | ||
@@ -96,0 +97,0 @@ ], (err, response) => { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
458169
70
80
30
7335
+ Addedp-queue@^5.0.0
+ Addedp-times@^2.1.0
+ Addedeventemitter3@3.1.2(transitive)
+ Addedp-map@2.1.0(transitive)
+ Addedp-queue@5.0.0(transitive)
+ Addedp-times@2.1.0(transitive)
+ Addedpull-stream-to-async-iterator@1.0.2(transitive)