libp2p-kad-dht
Advanced tools
Comparing version 0.15.3 to 0.16.0
@@ -0,1 +1,19 @@ | ||
<a name="0.16.0"></a> | ||
# [0.16.0](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.15.3...v0.16.0) (2019-08-16) | ||
### Code Refactoring | ||
* use async datastore ([#140](https://github.com/libp2p/js-libp2p-kad-dht/issues/140)) ([daf9b00](https://github.com/libp2p/js-libp2p-kad-dht/commit/daf9b00)) | ||
### BREAKING CHANGES | ||
* The DHT now requires its datastore to have | ||
a promise based api, instead of callbacks. Datastores that use | ||
ipfs/interface-datastore@0.7 or later should be used. | ||
https://github.com/ipfs/interface-datastore/releases/tag/v0.7.0 | ||
<a name="0.15.3"></a> | ||
@@ -2,0 +20,0 @@ ## [0.15.3](https://github.com/libp2p/js-libp2p-kad-dht/compare/v0.15.2...v0.15.3) (2019-07-29) |
{ | ||
"name": "libp2p-kad-dht", | ||
"version": "0.15.3", | ||
"version": "0.16.0", | ||
"description": "JavaScript implementation of the Kad-DHT for libp2p", | ||
@@ -50,3 +50,3 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>", | ||
"heap": "~0.2.6", | ||
"interface-datastore": "~0.6.0", | ||
"interface-datastore": "~0.7.0", | ||
"k-bucket": "^5.0.0", | ||
@@ -68,3 +68,2 @@ "libp2p-crypto": "~0.16.1", | ||
"pull-stream": "^3.6.9", | ||
"pull-stream-to-async-iterator": "^1.0.1", | ||
"varint": "^5.0.0", | ||
@@ -74,5 +73,5 @@ "xor-distance": "^2.0.0" | ||
"devDependencies": { | ||
"aegir": "^18.2.1", | ||
"aegir": "^20.0.0", | ||
"chai": "^4.2.0", | ||
"datastore-level": "~0.10.0", | ||
"datastore-level": "~0.12.1", | ||
"dirty-chai": "^2.0.1", | ||
@@ -79,0 +78,0 @@ "interface-connection": "~0.3.3", |
@@ -98,3 +98,3 @@ 'use strict' | ||
try { | ||
rawRecord = await promisify(cb => dht.datastore.get(dsKey, cb))() | ||
rawRecord = await dht.datastore.get(dsKey) | ||
} catch (err) { | ||
@@ -118,3 +118,3 @@ if (err.code === 'ERR_NOT_FOUND') { | ||
// If record is bad delete it and return | ||
await promisify(cb => dht.datastore.delete(dsKey, cb))() | ||
await dht.datastore.delete(dsKey) | ||
return undefined | ||
@@ -219,3 +219,3 @@ } | ||
async _findPeerSingleAsync (peer, target) { | ||
async _findPeerSingleAsync (peer, target) { // eslint-disable-line require-await | ||
dht._log('_findPeerSingle %s', peer.toB58String()) | ||
@@ -268,3 +268,3 @@ const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0) | ||
async _putLocalAsync (key, rec) { | ||
await promisify(cb => dht.datastore.put(utils.bufferToKey(key), rec, cb))() | ||
await dht.datastore.put(utils.bufferToKey(key), rec) | ||
return undefined | ||
@@ -373,3 +373,3 @@ }, | ||
const raw = await promisify(cb => dht.datastore.get(utils.bufferToKey(key), cb))() | ||
const raw = await dht.datastore.get(utils.bufferToKey(key)) | ||
dht._log('found %b in local datastore', key) | ||
@@ -444,3 +444,3 @@ const rec = Record.deserialize(raw) | ||
async _getValueSingleAsync (peer, key) { | ||
async _getValueSingleAsync (peer, key) { // eslint-disable-line require-await | ||
const msg = new Message(Message.TYPES.GET_VALUE, key, 0) | ||
@@ -607,3 +607,3 @@ return promisify(cb => dht.network.sendRequest(peer, msg, cb))() | ||
async _findProvidersSingleAsync (peer, key) { | ||
async _findProvidersSingleAsync (peer, key) { // eslint-disable-line require-await | ||
const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0) | ||
@@ -610,0 +610,0 @@ return promisify(cb => dht.network.sendRequest(peer, msg, cb))() |
@@ -8,4 +8,2 @@ 'use strict' | ||
const Queue = require('p-queue') | ||
const promisify = require('promisify-es6') | ||
const toIterator = require('pull-stream-to-async-iterator') | ||
@@ -95,3 +93,3 @@ const c = require('./constants') | ||
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }) | ||
for await (const entry of toIterator(query)) { | ||
for await (const entry of query) { | ||
try { | ||
@@ -122,3 +120,3 @@ // Add a delete to the batch for each expired entry | ||
if (deleted.size) { | ||
await promisify(cb => batch.commit(cb))() | ||
await batch.commit() | ||
} | ||
@@ -188,3 +186,3 @@ | ||
*/ | ||
async addProvider (cid, provider) { | ||
async addProvider (cid, provider) { // eslint-disable-line require-await | ||
return this.syncQueue.add(async () => { | ||
@@ -210,3 +208,3 @@ this._log('addProvider %s', cid.toBaseEncodedString()) | ||
*/ | ||
async getProviders (cid) { | ||
async getProviders (cid) { // eslint-disable-line require-await | ||
return this.syncQueue.add(async () => { | ||
@@ -246,3 +244,3 @@ this._log('getProviders %s', cid.toBaseEncodedString()) | ||
*/ | ||
async function writeProviderEntry (store, cid, peer, time) { | ||
async function writeProviderEntry (store, cid, peer, time) { // eslint-disable-line require-await | ||
const dsKey = [ | ||
@@ -256,3 +254,3 @@ makeProviderKey(cid), | ||
const buffer = Buffer.from(varint.encode(time)) | ||
return promisify(cb => store.put(key, buffer, cb))() | ||
return store.put(key, buffer) | ||
} | ||
@@ -292,3 +290,3 @@ | ||
const query = store.query({ prefix: makeProviderKey(cid) }) | ||
for await (const entry of toIterator(query)) { | ||
for await (const entry of query) { | ||
const { peerId } = parseProviderKey(entry.key) | ||
@@ -295,0 +293,0 @@ providers.set(peerId, readTime(entry.value)) |
@@ -55,3 +55,3 @@ 'use strict' | ||
*/ | ||
async run (peers) { | ||
async run (peers) { // eslint-disable-line require-await | ||
if (!this.dht._queryManager.running) { | ||
@@ -58,0 +58,0 @@ this._log.error('Attempt to run query after shutdown') |
'use strict' | ||
const CID = require('cids') | ||
const parallel = require('async/parallel') | ||
const PeerInfo = require('peer-info') | ||
@@ -20,6 +19,5 @@ const promiseToCallback = require('promise-to-callback') | ||
* @param {Message} msg | ||
* @param {function(Error, Message)} callback | ||
* @returns {undefined} | ||
* @returns {Promise<Message>} Resolves a `Message` response | ||
*/ | ||
return function getProviders (peer, msg, callback) { | ||
async function getProvidersAsync (peer, msg) { | ||
let cid | ||
@@ -29,53 +27,51 @@ try { | ||
} catch (err) { | ||
return callback(errcode(new Error(`Invalid CID: ${err.message}`), 'ERR_INVALID_CID')) | ||
throw errcode(new Error(`Invalid CID: ${err.message}`), 'ERR_INVALID_CID') | ||
} | ||
log('%s', cid.toBaseEncodedString()) | ||
const dsKey = utils.bufferToKey(cid.buffer) | ||
parallel([ | ||
(cb) => dht.datastore.has(dsKey, (err, exists) => { | ||
if (err) { | ||
log.error('Failed to check datastore existence', err) | ||
return cb(null, false) | ||
} | ||
const [has, peers, closer] = await Promise.all([ | ||
dht.datastore.has(dsKey), | ||
dht.providers.getProviders(cid), | ||
dht._betterPeersToQueryAsync(msg, peer) | ||
]) | ||
cb(null, exists) | ||
}), | ||
(cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), | ||
(cb) => dht._betterPeersToQuery(msg, peer, cb) | ||
], (err, res) => { | ||
if (err) { | ||
return callback(err) | ||
const providers = peers.map((p) => { | ||
if (dht.peerBook.has(p)) { | ||
return dht.peerBook.get(p) | ||
} | ||
const has = res[0] | ||
const closer = res[2] | ||
const providers = res[1].map((p) => { | ||
if (dht.peerBook.has(p)) { | ||
return dht.peerBook.get(p) | ||
} | ||
return dht.peerBook.put(new PeerInfo(p)) | ||
}) | ||
return dht.peerBook.put(new PeerInfo(p)) | ||
}) | ||
if (has) { | ||
providers.push(dht.peerInfo) | ||
} | ||
if (has) { | ||
providers.push(dht.peerInfo) | ||
} | ||
const response = new Message(msg.type, msg.key, msg.clusterLevel) | ||
const response = new Message(msg.type, msg.key, msg.clusterLevel) | ||
if (providers.length > 0) { | ||
response.providerPeers = providers | ||
} | ||
if (providers.length > 0) { | ||
response.providerPeers = providers | ||
} | ||
if (closer.length > 0) { | ||
response.closerPeers = closer | ||
} | ||
if (closer.length > 0) { | ||
response.closerPeers = closer | ||
} | ||
log('got %s providers %s closerPeers', providers.length, closer.length) | ||
log('got %s providers %s closerPeers', providers.length, closer.length) | ||
return response | ||
} | ||
callback(null, response) | ||
}) | ||
/** | ||
* Process `GetProviders` DHT messages. | ||
* | ||
* @param {PeerInfo} peer | ||
* @param {Message} msg | ||
* @param {function(Error, Message)} callback | ||
* @returns {undefined} | ||
*/ | ||
return function getProviders (peer, msg, callback) { | ||
promiseToCallback(getProvidersAsync(peer, msg))(callback) | ||
} | ||
} |
@@ -5,2 +5,3 @@ 'use strict' | ||
const errcode = require('err-code') | ||
const promiseToCallback = require('promise-to-callback') | ||
@@ -41,3 +42,3 @@ module.exports = (dht) => { | ||
dht.datastore.put(key, record.serialize(), (err) => { | ||
promiseToCallback(dht.datastore.put(key, record.serialize()))(err => { | ||
if (err) { | ||
@@ -44,0 +45,0 @@ return callback(err) |
@@ -212,3 +212,3 @@ 'use strict' | ||
exports.withTimeout = (asyncFn, time) => { | ||
return async (...args) => { | ||
return async (...args) => { // eslint-disable-line require-await | ||
return Promise.race([ | ||
@@ -215,0 +215,0 @@ asyncFn(...args), |
@@ -23,3 +23,3 @@ /* eslint-env mocha */ | ||
const Mplex = require('libp2p-mplex') | ||
const promiseToCallback = require('promise-to-callback') | ||
const errcode = require('err-code') | ||
@@ -959,3 +959,3 @@ | ||
) | ||
let received = new Date() | ||
const received = new Date() | ||
received.setDate(received.getDate() - 2) | ||
@@ -967,3 +967,5 @@ | ||
(cb) => dht._putLocal(record.key, record.serialize(), cb), | ||
(cb) => dht.datastore.get(kadUtils.bufferToKey(record.key), cb), | ||
(cb) => { | ||
promiseToCallback(dht.datastore.get(kadUtils.bufferToKey(record.key)))(cb) | ||
}, | ||
(lookup, cb) => { | ||
@@ -978,3 +980,3 @@ expect(lookup).to.exist('Record should be in the local datastore') | ||
dht.datastore.get(kadUtils.bufferToKey(record.key), (err, lookup) => { | ||
promiseToCallback(dht.datastore.get(kadUtils.bufferToKey(record.key)))((err, lookup) => { | ||
expect(err).to.exist('Should throw error for not existing') | ||
@@ -1028,3 +1030,3 @@ expect(lookup).to.not.exist('Record should be removed from datastore') | ||
// Simulate going out to the network and returning the record | ||
sinon.stub(dht, '_getValueOrPeersAsync').callsFake(async () => ({ record: rec })) | ||
sinon.stub(dht, '_getValueOrPeersAsync').callsFake(async () => ({ record: rec })) // eslint-disable-line require-await | ||
] | ||
@@ -1031,0 +1033,0 @@ |
@@ -44,3 +44,3 @@ /* eslint-env mocha */ | ||
it('rejects with the error in the original function', async () => { | ||
const original = async () => { throw new Error('explode') } | ||
const original = async () => { throw new Error('explode') } // eslint-disable-line require-await | ||
const asyncFn = utils.withTimeout(original, 100) | ||
@@ -47,0 +47,0 @@ let err |
@@ -118,3 +118,3 @@ /* eslint-env mocha */ | ||
console.log('starting') | ||
console.log('starting') // eslint-disable-line no-console | ||
const res = await Promise.all([ | ||
@@ -125,6 +125,6 @@ createValues(100), | ||
console.log('got values and peers') | ||
console.log('got values and peers') // eslint-disable-line no-console | ||
const values = res[0] | ||
const peers = res[1] | ||
let total = Date.now() | ||
const total = Date.now() | ||
@@ -137,4 +137,4 @@ for (const v of values) { | ||
console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) | ||
console.log('starting profile with %s peers and %s cids', peers.length, values.length) | ||
console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) // eslint-disable-line no-console | ||
console.log('starting profile with %s peers and %s cids', peers.length, values.length) // eslint-disable-line no-console | ||
@@ -145,3 +145,3 @@ for (let i = 0; i < 3; i++) { | ||
await providers.getProviders(v.cid) | ||
console.log('query %sms', (Date.now() - start)) | ||
console.log('query %sms', (Date.now() - start)) // eslint-disable-line no-console | ||
} | ||
@@ -148,0 +148,0 @@ } |
@@ -60,3 +60,3 @@ /* eslint-env mocha */ | ||
let i = 0 | ||
const queryFunc = async (p) => { | ||
const queryFunc = async (p) => { // eslint-disable-line require-await | ||
if (i++ === 1) { | ||
@@ -94,3 +94,3 @@ expect(p.id).to.eql(peerInfos[2].id.id) | ||
const visited = [] | ||
const queryFunc = async (p) => { | ||
const queryFunc = async (p) => { // eslint-disable-line require-await | ||
visited.push(p) | ||
@@ -131,3 +131,3 @@ | ||
const queryFunc = async (p) => { throw new Error('fail') } | ||
const queryFunc = async (p) => { throw new Error('fail') } // eslint-disable-line require-await | ||
@@ -165,3 +165,3 @@ const q = new Query(dht, peer.id.id, () => queryFunc) | ||
const queryFunc = async (p) => { | ||
const queryFunc = async (p) => { // eslint-disable-line require-await | ||
return { | ||
@@ -214,3 +214,3 @@ closerPeers: [peerInfos[2]] | ||
const queryFunc = async (p) => { | ||
const queryFunc = async (p) => { // eslint-disable-line require-await | ||
const closer = topology[p.toB58String()] | ||
@@ -254,3 +254,3 @@ return { | ||
const queryFunc = async (p) => { | ||
const queryFunc = async (p) => { // eslint-disable-line require-await | ||
const res = topology[p.toB58String()] || {} | ||
@@ -362,3 +362,3 @@ return { | ||
const queryFunc = async (p) => { | ||
const queryFunc = async (p) => { // eslint-disable-line require-await | ||
const res = topology[p.toB58String()] || {} | ||
@@ -649,3 +649,3 @@ return { | ||
const visited = [] | ||
const queryFunc = async (peerId) => { | ||
const queryFunc = async (peerId) => { // eslint-disable-line require-await | ||
visited.push(peerId) | ||
@@ -717,3 +717,3 @@ const i = peerIndex(peerId) | ||
const q = new Query(dht, targetId, (trackNum) => { | ||
return async (p) => { | ||
return async (p) => { // eslint-disable-line require-await | ||
const response = getResponse(p, trackNum) | ||
@@ -753,3 +753,3 @@ expect(response).to.exist() // or we aren't on the right track | ||
const queryFunc = async (p) => { | ||
const queryFunc = async (p) => { // eslint-disable-line require-await | ||
return { | ||
@@ -756,0 +756,0 @@ closerPeers: [peerInfos[2]] |
@@ -36,3 +36,3 @@ /* eslint-env mocha */ | ||
describe('get closest peers', () => { | ||
let targetKey = { | ||
const targetKey = { | ||
key: Buffer.from('A key to find'), | ||
@@ -73,14 +73,14 @@ dhtKey: null | ||
let query = new Query(dht, targetKey.key, () => querySpy) | ||
const query = new Query(dht, targetKey.key, () => querySpy) | ||
let run = new Run(query) | ||
const run = new Run(query) | ||
promiseToCallback(run.init())(() => { | ||
// Add the sorted peers into 5 paths. This will weight | ||
// the paths with increasingly further peers | ||
let sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) | ||
let peersPerPath = sortedPeerIds.length / PATHS | ||
let paths = [...new Array(PATHS)].map((_, index) => { | ||
let path = new Path(run, query.makePath()) | ||
let start = index * peersPerPath | ||
let peers = sortedPeerIds.slice(start, start + peersPerPath) | ||
const sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) | ||
const peersPerPath = sortedPeerIds.length / PATHS | ||
const paths = [...new Array(PATHS)].map((_, index) => { | ||
const path = new Path(run, query.makePath()) | ||
const start = index * peersPerPath | ||
const peers = sortedPeerIds.slice(start, start + peersPerPath) | ||
peers.forEach(p => path.addInitialPeer(p)) | ||
@@ -93,3 +93,3 @@ return path | ||
// already queried. | ||
let queriedPeers = paths.splice(1, 1)[0].initialPeers | ||
const queriedPeers = paths.splice(1, 1)[0].initialPeers | ||
each(queriedPeers, (peerId, cb) => { | ||
@@ -129,11 +129,11 @@ run.peersQueried.add(peerId, cb) | ||
const querySpy = sinon.stub().resolves({}) | ||
let query = new Query(dht, targetKey.key, () => querySpy) | ||
const query = new Query(dht, targetKey.key, () => querySpy) | ||
let run = new Run(query) | ||
const run = new Run(query) | ||
promiseToCallback(run.init())(() => { | ||
let sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) | ||
const sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) | ||
// Take the top 15 peers and peers 20 - 25 to seed `run.peersQueried` | ||
// This leaves us with only 16 - 19 as closer peers | ||
let queriedPeers = [ | ||
const queriedPeers = [ | ||
...sortedPeerIds.slice(0, 15), | ||
@@ -143,5 +143,5 @@ ...sortedPeerIds.slice(20, 25) | ||
let path = new Path(run, query.makePath()) | ||
const path = new Path(run, query.makePath()) | ||
// Give the path a closet peer and 15 further peers | ||
let pathPeers = [ | ||
const pathPeers = [ | ||
...sortedPeerIds.slice(15, 16), // 1 closer | ||
@@ -148,0 +148,0 @@ ...sortedPeerIds.slice(80, 95) |
@@ -14,3 +14,3 @@ /* eslint-env mocha */ | ||
describe('Random Walk', () => { | ||
let mockDHT = { | ||
const mockDHT = { | ||
peerInfo: { | ||
@@ -139,3 +139,3 @@ id: { | ||
sinon.stub(randomWalk, '_walk').callsFake(async () => { | ||
sinon.stub(randomWalk, '_walk').callsFake(async () => { // eslint-disable-line require-await | ||
// Try to start again | ||
@@ -179,3 +179,3 @@ randomWalk.start() | ||
const randomWalk = new RandomWalk(mockDHT, options) | ||
sinon.stub(randomWalk, '_walk').callsFake(async (queries, timeout) => { | ||
sinon.stub(randomWalk, '_walk').callsFake(async (queries, timeout) => { // eslint-disable-line require-await | ||
expect(queries).to.eql(options.queriesPerPeriod) | ||
@@ -182,0 +182,0 @@ expect(timeout).to.eql(options.timeout) |
@@ -70,4 +70,4 @@ /* eslint-env mocha */ | ||
waterfall([ | ||
(cb) => dht.datastore.put(dsKey, v.value, cb), | ||
(cb) => handler(dht)(peers[0], msg, cb) | ||
(cb) => promiseToCallback(dht.datastore.put(dsKey, v.value))(cb), | ||
(_, cb) => handler(dht)(peers[0], msg, cb) | ||
], (err, response) => { | ||
@@ -74,0 +74,0 @@ expect(err).to.not.exist() |
@@ -9,2 +9,3 @@ /* eslint-env mocha */ | ||
const Record = require('libp2p-record').Record | ||
const promiseToCallback = require('promise-to-callback') | ||
@@ -69,3 +70,3 @@ const Message = require('../../../src/message') | ||
const key = utils.bufferToKey(Buffer.from('hello')) | ||
dht.datastore.get(key, (err, res) => { | ||
promiseToCallback(dht.datastore.get(key))((err, res) => { | ||
expect(err).to.not.exist() | ||
@@ -72,0 +73,0 @@ const rec = Record.deserialize(res) |
/* eslint-env mocha */ | ||
/* eslint max-nested-callbacks: ["error", 6] */ | ||
/* eslint-disable no-console */ | ||
'use strict' | ||
@@ -50,3 +52,3 @@ const { promisify } = require('util') | ||
console.log('Starting %d runs with concurrency %d...', RUNS, ALPHA) | ||
let topRunIds = [] | ||
const topRunIds = [] | ||
for (var i = 0; i < RUNS; i++) { | ||
@@ -103,3 +105,3 @@ const { closestPeers, runTime } = await GetClosestPeersSimulation() | ||
// Add random peers to our table | ||
let ourPeers = randomMembers(peers, randomInteger(MIN_PEERS_KNOWN, MAX_PEERS_KNOWN)) | ||
const ourPeers = randomMembers(peers, randomInteger(MIN_PEERS_KNOWN, MAX_PEERS_KNOWN)) | ||
for (const peer of ourPeers) { | ||
@@ -169,3 +171,3 @@ await promisify((peer, callback) => dht._add(peer, callback))(peer) | ||
async function MockNetwork (peers) { | ||
let network = { | ||
const network = { | ||
peers: {} | ||
@@ -183,3 +185,3 @@ } | ||
for (const peer of peers.slice(NUM_DEAD_NODES)) { | ||
let netPeer = network.peers[peer.id.toB58String()] = { | ||
const netPeer = network.peers[peer.id.toB58String()] = { | ||
// dial latency | ||
@@ -216,3 +218,3 @@ latency: randomInteger(LATENCY_MIN, LATENCY_MAX), | ||
function randomMembers (list, num) { | ||
let randomMembers = [] | ||
const randomMembers = [] | ||
@@ -219,0 +221,0 @@ if (list.length < num) throw new Error(`cant get random members, ${num} is less than ${list.length}`) |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Copyleft License
License(Experimental) Copyleft license information was found.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
Non-permissive License
License(Experimental) A license not known to be considered permissive was found.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2031169
29
94
9157
3
70
+ Addedinterface-datastore@0.7.0(transitive)
- Removedpull-stream-to-async-iterator@^1.0.1
- Removedinterface-datastore@0.6.0(transitive)
- Removedpull-defer@0.2.3(transitive)
- Removedpull-stream-to-async-iterator@1.0.2(transitive)
Updatedinterface-datastore@~0.7.0