libp2p-kad-dht
Advanced tools
Comparing version 0.25.0 to 0.26.0
@@ -14,3 +14,4 @@ export var second: number; | ||
export var ALPHA: number; | ||
export var QUERY_SELF_INTERVAL: number; | ||
declare const hour: number; | ||
//# sourceMappingURL=constants.d.ts.map |
@@ -1,31 +0,89 @@ | ||
declare function _exports(dht: import('../')): { | ||
export type PeerId = import('peer-id'); | ||
export type ValueEvent = import('../types').ValueEvent; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../types').ValueEvent} ValueEvent | ||
*/ | ||
export class ContentFetching { | ||
/** | ||
* Store the given key/value pair locally, in the datastore. | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {import('libp2p-interfaces/src/types').DhtSelectors} params.selectors | ||
* @param {import('../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('../query/manager').QueryManager} params.queryManager | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('../network').Network} params.network | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor({ peerId, datastore, validators, selectors, peerRouting, queryManager, routingTable, network, lan }: { | ||
peerId: import('peer-id'); | ||
datastore: import('interface-datastore').Datastore; | ||
validators: import('libp2p-interfaces/src/types').DhtValidators; | ||
selectors: import('libp2p-interfaces/src/types').DhtSelectors; | ||
peerRouting: import('../peer-routing').PeerRouting; | ||
queryManager: import('../query/manager').QueryManager; | ||
routingTable: import('../routing-table').RoutingTable; | ||
network: import('../network').Network; | ||
lan: boolean; | ||
}); | ||
_log: debug.Debugger & { | ||
error: debug.Debugger; | ||
}; | ||
_peerId: import("peer-id"); | ||
_datastore: import("interface-datastore").Datastore; | ||
_validators: import("libp2p-interfaces/src/types").DhtValidators; | ||
_selectors: import("libp2p-interfaces/src/types").DhtSelectors; | ||
_peerRouting: import("../peer-routing").PeerRouting; | ||
_queryManager: import("../query/manager").QueryManager; | ||
_routingTable: import("../routing-table").RoutingTable; | ||
_network: import("../network").Network; | ||
/** | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} rec | ||
*/ | ||
putLocal(key: Uint8Array, rec: Uint8Array): Promise<void>; | ||
/** | ||
* Attempt to retrieve the value for the given key from | ||
* the local datastore. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} rec - encoded record | ||
*/ | ||
_putLocal(key: Uint8Array, rec: Uint8Array): Promise<void>; | ||
getLocal(key: Uint8Array): Promise<import("libp2p-record/dist/src/record")>; | ||
/** | ||
* Store the given key/value pair in the DHT. | ||
* Send the best record found to any peers that have an out of date record. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {ValueEvent[]} vals - values retrieved from the DHT | ||
* @param {Uint8Array} best - the best record that was found | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
sendCorrectionRecord(key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").DialingPeerEvent, void, unknown>; | ||
/** | ||
* Store the given key/value pair in the DHT | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
* @param {object} [options] - put options | ||
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length) | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
put(key: Uint8Array, value: Uint8Array, options?: { | ||
minPeers?: number | undefined; | ||
} | undefined): Promise<void>; | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").ProviderEvent | import("../types").ValueEvent | import("../types").AddingPeerEvent | import("../types").DialingPeerEvent, void, undefined>; | ||
/** | ||
* Get the value to the given key. | ||
* Times out after 1 minute by default. | ||
* Get the value to the given key | ||
* | ||
* @param {Uint8Array} key | ||
* @param {object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
get(key: Uint8Array, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<Uint8Array>; | ||
signal?: AbortSignal | undefined; | ||
queryFuncTimeout?: number | undefined; | ||
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, unknown>; | ||
/** | ||
@@ -35,16 +93,11 @@ * Get the `n` values to the given key without sorting. | ||
* @param {Uint8Array} key | ||
* @param {number} nvals | ||
* @param {object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
getMany(key: Uint8Array, nvals: number, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<{ | ||
val: Uint8Array; | ||
from: import("peer-id"); | ||
}[]>; | ||
}; | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
export type DHTQueryResult = import('../query').DHTQueryResult; | ||
getMany(key: Uint8Array, options?: { | ||
signal?: AbortSignal | undefined; | ||
queryFuncTimeout?: number | undefined; | ||
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, undefined>; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,8 +0,53 @@ | ||
declare function _exports(dht: import('../')): { | ||
export type CID = import('multiformats/cid').CID; | ||
export type PeerId = import('peer-id'); | ||
export type Multiaddr = import('multiaddr').Multiaddr; | ||
/** | ||
* @typedef {import('multiformats/cid').CID} CID | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('multiaddr').Multiaddr} Multiaddr | ||
*/ | ||
export class ContentRouting { | ||
/** | ||
* Announce to the network that we can provide the value for a given key | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../network').Network} params.network | ||
* @param {import('../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('../query/manager').QueryManager} params.queryManager | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('../providers').Providers} params.providers | ||
* @param {import('../types').PeerStore} params.peerStore | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor({ peerId, network, peerRouting, queryManager, routingTable, providers, peerStore, lan }: { | ||
peerId: import('peer-id'); | ||
network: import('../network').Network; | ||
peerRouting: import('../peer-routing').PeerRouting; | ||
queryManager: import('../query/manager').QueryManager; | ||
routingTable: import('../routing-table').RoutingTable; | ||
providers: import('../providers').Providers; | ||
peerStore: import('../types').PeerStore; | ||
lan: boolean; | ||
}); | ||
_log: debug.Debugger & { | ||
error: debug.Debugger; | ||
}; | ||
_peerId: import("peer-id"); | ||
_network: import("../network").Network; | ||
_peerRouting: import("../peer-routing").PeerRouting; | ||
_queryManager: import("../query/manager").QueryManager; | ||
_routingTable: import("../routing-table").RoutingTable; | ||
_providers: import("../providers").Providers; | ||
_peerStore: import("../types").PeerStore; | ||
/** | ||
* Announce to the network that we can provide the value for a given key and | ||
* are contactable on the given multiaddrs | ||
* | ||
* @param {CID} key | ||
* @param {Multiaddr[]} multiaddrs | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
provide(key: CID): Promise<void>; | ||
provide(key: CID, multiaddrs: Multiaddr[], options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").ProviderEvent | import("../types").ValueEvent | import("../types").AddingPeerEvent | import("../types").DialingPeerEvent, void, undefined>; | ||
/** | ||
@@ -12,19 +57,13 @@ * Search the dht for up to `K` providers of the given CID. | ||
* @param {CID} key | ||
* @param {Object} [options] - findProviders options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds | ||
* @param {object} [options] - findProviders options | ||
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find | ||
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
findProviders(key: CID, options?: { | ||
timeout?: number | undefined; | ||
maxNumProviders?: number | undefined; | ||
} | undefined): AsyncIterable<{ | ||
id: PeerId; | ||
multiaddrs: Multiaddr[]; | ||
}>; | ||
}; | ||
export = _exports; | ||
export type CID = import('multiformats/cid').CID; | ||
export type PeerId = import('peer-id'); | ||
export type Multiaddr = import('multiaddr').Multiaddr; | ||
signal?: AbortSignal | undefined; | ||
queryFuncTimeout?: number | undefined; | ||
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, unknown>; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,421 +0,5 @@ | ||
export = KadDHT; | ||
/** | ||
* @typedef {*} Libp2p | ||
* @typedef {*} PeerStore | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('interface-datastore').Datastore} Datastore | ||
* @typedef {*} Dialer | ||
* @typedef {*} Registrar | ||
* @typedef {import('multiformats/cid').CID} CID | ||
* @typedef {import('multiaddr').Multiaddr} Multiaddr | ||
* @typedef {object} PeerData | ||
* @property {PeerId} id | ||
* @property {Multiaddr[]} multiaddrs | ||
*/ | ||
/** | ||
* A DHT implementation modeled after Kademlia with S/Kademlia modifications. | ||
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht. | ||
*/ | ||
declare class KadDHT extends EventEmitter { | ||
/** | ||
* Create a new KadDHT. | ||
* | ||
* @param {Object} props | ||
* @param {Libp2p} props.libp2p - the libp2p instance | ||
* @param {Dialer} props.dialer - libp2p dialer instance | ||
* @param {PeerId} props.peerId - peer's peerId | ||
* @param {PeerStore} props.peerStore - libp2p peerStore | ||
* @param {Registrar} props.registrar - libp2p registrar instance | ||
* @param {string} [props.protocolPrefix = '/ipfs'] - libp2p registrar handle protocol | ||
* @param {boolean} [props.forceProtocolLegacy = false] - WARNING: this is not recommended and should only be used for legacy purposes | ||
* @param {number} props.kBucketSize - k-bucket size (default 20) | ||
* @param {boolean} props.clientMode - If true, the DHT will not respond to queries. This should be true if your node will not be dialable. (default: false) | ||
* @param {number} props.concurrency - alpha concurrency of queries (default 3) | ||
* @param {Datastore} props.datastore - datastore (default MemoryDatastore) | ||
* @param {object} props.validators - validators object with namespace as keys and function(key, record, callback) | ||
* @param {object} props.selectors - selectors object with namespace as keys and function(key, records) | ||
* @param {function(import('libp2p-record').Record, PeerId): void} [props.onPut] - Called when an entry is added to or changed in the datastore | ||
* @param {function(import('libp2p-record').Record): void} [props.onRemove] - Called when an entry is removed from the datastore | ||
*/ | ||
constructor({ libp2p, dialer, peerId, peerStore, registrar, protocolPrefix, forceProtocolLegacy, datastore, kBucketSize, clientMode, concurrency, validators, selectors, onPut, onRemove }: { | ||
libp2p: Libp2p; | ||
dialer: Dialer; | ||
peerId: PeerId; | ||
peerStore: PeerStore; | ||
registrar: Registrar; | ||
protocolPrefix?: string | undefined; | ||
forceProtocolLegacy?: boolean | undefined; | ||
kBucketSize: number; | ||
clientMode: boolean; | ||
concurrency: number; | ||
datastore: Datastore; | ||
validators: object; | ||
selectors: object; | ||
onPut?: ((arg0: import("libp2p-record/dist/src/record"), arg1: PeerId) => void) | undefined; | ||
onRemove?: ((arg0: import("libp2p-record/dist/src/record")) => void) | undefined; | ||
}); | ||
/** | ||
* Local reference to the libp2p instance. May be undefined. | ||
* | ||
* @type {Libp2p} | ||
*/ | ||
libp2p: Libp2p; | ||
/** | ||
* Local reference to the libp2p dialer instance | ||
* | ||
* @type {Dialer} | ||
*/ | ||
dialer: Dialer; | ||
/** | ||
* Local peer-id | ||
* | ||
* @type {PeerId} | ||
*/ | ||
peerId: PeerId; | ||
/** | ||
* Local PeerStore | ||
* | ||
* @type {PeerStore} | ||
*/ | ||
peerStore: PeerStore; | ||
/** | ||
* Local peer info | ||
* | ||
* @type {Registrar} | ||
*/ | ||
registrar: Registrar; | ||
/** | ||
* Registrar protocol | ||
* | ||
* @type {string} | ||
*/ | ||
protocol: string; | ||
/** | ||
* k-bucket size | ||
* | ||
* @type {number} | ||
*/ | ||
kBucketSize: number; | ||
_clientMode: boolean; | ||
/** | ||
* ALPHA concurrency at which each query path with run, defaults to 3 | ||
* | ||
* @type {number} | ||
*/ | ||
concurrency: number; | ||
/** | ||
* Number of disjoint query paths to use | ||
* This is set to `kBucketSize`/2 per the S/Kademlia paper | ||
* | ||
* @type {number} | ||
*/ | ||
disjointPaths: number; | ||
/** | ||
* The routing table. | ||
* | ||
* @type {RoutingTable} | ||
*/ | ||
routingTable: RoutingTable; | ||
/** | ||
* Reference to the datastore, uses an in-memory store if none given. | ||
* | ||
* @type {Datastore} | ||
*/ | ||
datastore: Datastore; | ||
/** | ||
* Provider management | ||
* | ||
* @type {Providers} | ||
*/ | ||
providers: Providers; | ||
validators: { | ||
pk: { | ||
func: (key: Uint8Array, publicKey: Uint8Array) => Promise<void>; | ||
sign: boolean; | ||
}; | ||
}; | ||
selectors: { | ||
pk: (k: Uint8Array, records: Uint8Array[]) => number; | ||
}; | ||
network: Network; | ||
_log: debug.Debugger & { | ||
error: debug.Debugger; | ||
}; | ||
/** | ||
* Keeps track of running queries | ||
* | ||
* @type {QueryManager} | ||
*/ | ||
_queryManager: QueryManager; | ||
_running: boolean; | ||
contentFetching: { | ||
_putLocal(key: Uint8Array, rec: Uint8Array): Promise<void>; | ||
put(key: Uint8Array, value: Uint8Array, options?: { | ||
minPeers?: number | undefined; | ||
} | undefined): Promise<void>; | ||
get(key: Uint8Array, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<Uint8Array>; | ||
getMany(key: Uint8Array, nvals: number, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<{ | ||
val: Uint8Array; | ||
from: import("peer-id"); | ||
}[]>; | ||
}; | ||
contentRouting: { | ||
provide(key: import("multiformats/cid").CID): Promise<void>; | ||
findProviders(key: import("multiformats/cid").CID, options?: { | ||
timeout?: number | undefined; | ||
maxNumProviders?: number | undefined; | ||
} | undefined): AsyncIterable<{ | ||
id: import("peer-id"); | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
}>; | ||
}; | ||
peerRouting: { | ||
_findPeerSingle(peer: import("peer-id"), target: import("peer-id")): Promise<Message>; | ||
findPeer(id: import("peer-id"), options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<{ | ||
id: import("peer-id"); | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
}>; | ||
getClosestPeers(key: Uint8Array, options?: { | ||
shallow?: boolean | undefined; | ||
} | undefined): AsyncIterable<import("peer-id")>; | ||
getPublicKey(peer: import("peer-id")): Promise<import("libp2p-crypto").PublicKey>; | ||
}; | ||
onPut: (arg0: import("libp2p-record/dist/src/record"), arg1: PeerId) => void; | ||
onRemove: (arg0: import("libp2p-record/dist/src/record")) => void; | ||
/** | ||
* Is this DHT running. | ||
*/ | ||
get isStarted(): boolean; | ||
/** | ||
* Start listening to incoming connections. | ||
*/ | ||
start(): Promise<[void, void, void, void]>; | ||
/** | ||
* Stop accepting incoming connections and sending outgoing | ||
* messages. | ||
*/ | ||
stop(): Promise<[void, void, void, void]>; | ||
/** | ||
* Store the given key/value pair in the DHT. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
* @param {Object} [options] - put options | ||
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length) | ||
* @returns {Promise<void>} | ||
*/ | ||
put(key: Uint8Array, value: Uint8Array, options?: { | ||
minPeers?: number | undefined; | ||
} | undefined): Promise<void>; | ||
/** | ||
* Get the value to the given key. | ||
* Times out after 1 minute by default. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
* @returns {Promise<Uint8Array>} | ||
*/ | ||
get(key: Uint8Array, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<Uint8Array>; | ||
/** | ||
* Get the `n` values to the given key without sorting. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {number} nvals | ||
* @param {Object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
*/ | ||
getMany(key: Uint8Array, nvals: number, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<{ | ||
val: Uint8Array; | ||
from: import("peer-id"); | ||
}[]>; | ||
/** | ||
* Remove the given key from the local datastore. | ||
* | ||
* @param {Uint8Array} key | ||
*/ | ||
removeLocal(key: Uint8Array): Promise<undefined>; | ||
/** | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
*/ | ||
_putLocal(key: Uint8Array, value: Uint8Array): Promise<void>; | ||
/** | ||
* Announce to the network that we can provide given key's value. | ||
* | ||
* @param {CID} key | ||
* @returns {Promise<void>} | ||
*/ | ||
provide(key: CID): Promise<void>; | ||
/** | ||
* Search the dht for up to `K` providers of the given CID. | ||
* | ||
* @param {CID} key | ||
* @param {Object} [options] - findProviders options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) | ||
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find | ||
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
findProviders(key: CID, options?: { | ||
timeout?: number | undefined; | ||
maxNumProviders?: number | undefined; | ||
} | undefined): AsyncIterable<{ | ||
id: PeerId; | ||
multiaddrs: Multiaddr[]; | ||
}>; | ||
/** | ||
* Search for a peer with the given ID. | ||
* | ||
* @param {PeerId} id | ||
* @param {Object} [options] - findPeer options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) | ||
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
findPeer(id: PeerId, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<{ | ||
id: PeerId; | ||
multiaddrs: Multiaddr[]; | ||
}>; | ||
/** | ||
* Kademlia 'node lookup' operation. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Object} [options] | ||
* @param {boolean} [options.shallow = false] - shallow query | ||
*/ | ||
getClosestPeers(key: Uint8Array, options?: { | ||
shallow?: boolean | undefined; | ||
} | undefined): AsyncGenerator<import("peer-id"), void, undefined>; | ||
/** | ||
* Get the public key for the given peer id. | ||
* | ||
* @param {PeerId} peer | ||
*/ | ||
getPublicKey(peer: PeerId): Promise<import("libp2p-crypto").PublicKey>; | ||
/** | ||
* @param {PeerId} peerId | ||
* @param {Multiaddr[]} multiaddrs | ||
*/ | ||
_peerDiscovered(peerId: PeerId, multiaddrs: Multiaddr[]): void; | ||
/** | ||
* Returns the routing tables closest peers, for the key of | ||
* the message. | ||
* | ||
* @param {Message} msg | ||
*/ | ||
_nearestPeersToQuery(msg: Message): Promise<{ | ||
id: import("peer-id"); | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
}[]>; | ||
/** | ||
* Get the nearest peers to the given query, but iff closer | ||
* than self. | ||
* | ||
* @param {Message} msg | ||
* @param {PeerId} peerId | ||
*/ | ||
_betterPeersToQuery(msg: Message, peerId: PeerId): Promise<{ | ||
id: import("peer-id"); | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
}[]>; | ||
/** | ||
* Try to fetch a given record by from the local datastore. | ||
* Returns the record iff it is still valid, meaning | ||
* - it was either authored by this node, or | ||
* - it was received less than `MAX_RECORD_AGE` ago. | ||
* | ||
* @param {Uint8Array} key | ||
*/ | ||
_checkLocalDatastore(key: Uint8Array): Promise<import("libp2p-record/dist/src/record") | undefined>; | ||
/** | ||
* Add the peer to the routing table and update it in the peerStore. | ||
* | ||
* @param {PeerId} peerId | ||
*/ | ||
_add(peerId: PeerId): Promise<void>; | ||
/** | ||
* Verify a record without searching the DHT. | ||
* | ||
* @param {import('libp2p-record').Record} record | ||
*/ | ||
_verifyRecordLocally(record: import("libp2p-record/dist/src/record")): Promise<void>; | ||
/** | ||
* Is the given peer id our PeerId? | ||
* | ||
* @param {PeerId} other | ||
*/ | ||
_isSelf(other: PeerId): boolean; | ||
/** | ||
* Store the given key/value pair at the peer `target`. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} rec - encoded record | ||
* @param {PeerId} target | ||
*/ | ||
_putValueToPeer(key: Uint8Array, rec: Uint8Array, target: PeerId): Promise<void>; | ||
/** | ||
* Query a particular peer for the value for the given key. | ||
* It will either return the value or a list of closer peers. | ||
* | ||
* Note: The peerStore is updated with new addresses found for the given peer. | ||
* | ||
* @param {PeerId} peer | ||
* @param {Uint8Array} key | ||
*/ | ||
_getValueOrPeers(peer: PeerId, key: Uint8Array): Promise<{ | ||
record: import("libp2p-record/dist/src/record"); | ||
peers: PeerData[]; | ||
} | { | ||
peers: PeerData[]; | ||
record?: undefined; | ||
}>; | ||
/** | ||
* Get a value via rpc call for the given parameters. | ||
* | ||
* @param {PeerId} peer | ||
* @param {Uint8Array} key | ||
*/ | ||
_getValueSingle(peer: PeerId, key: Uint8Array): Promise<Message>; | ||
/** | ||
* Verify a record, fetching missing public keys from the network. | ||
* Calls back with an error if the record is invalid. | ||
* | ||
* @param {import('libp2p-record').Record} record | ||
* @returns {Promise<void>} | ||
*/ | ||
_verifyRecordOnline(record: import("libp2p-record/dist/src/record")): Promise<void>; | ||
} | ||
declare namespace KadDHT { | ||
export { multicodec, Libp2p, PeerStore, PeerId, Datastore, Dialer, Registrar, CID, Multiaddr, PeerData }; | ||
} | ||
import { EventEmitter } from "events"; | ||
type Libp2p = any; | ||
type Dialer = any; | ||
type PeerId = import('peer-id'); | ||
type PeerStore = any; | ||
type Registrar = any; | ||
import RoutingTable = require("./routing-table"); | ||
type Datastore = import('interface-datastore').Datastore; | ||
import Providers = require("./providers"); | ||
import Network = require("./network"); | ||
import QueryManager = require("./query-manager"); | ||
import Message = require("./message"); | ||
type CID = import('multiformats/cid').CID; | ||
type Multiaddr = import('multiaddr').Multiaddr; | ||
type PeerData = { | ||
id: PeerId; | ||
multiaddrs: Multiaddr[]; | ||
}; | ||
declare var multicodec: string; | ||
export type DHT = import('./types').DHT; | ||
export type KadDHTOps = import('./kad-dht').KadDHTOps; | ||
declare function create(opts: import("./kad-dht").KadDHTOps): import("./types").DHT; | ||
export {}; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,2 +0,8 @@ | ||
export = Message; | ||
export type ConnectionType = 0 | 1 | 2 | 3 | 4; | ||
export type PBPeer = { | ||
id: Uint8Array; | ||
addrs: Uint8Array[]; | ||
connection: ConnectionType; | ||
}; | ||
export type PeerData = import('../types').PeerData; | ||
/** | ||
@@ -10,3 +16,3 @@ * @typedef {0|1|2|3|4} ConnectionType | ||
* | ||
* @typedef {import('../index').PeerData} PeerData | ||
* @typedef {import('../types').PeerData} PeerData | ||
*/ | ||
@@ -16,3 +22,3 @@ /** | ||
*/ | ||
declare class Message { | ||
export class Message { | ||
/** | ||
@@ -49,15 +55,11 @@ * Decode from protobuf | ||
} | ||
declare namespace Message { | ||
export { MESSAGE_TYPE as TYPES, CONNECTION_TYPE as CONNECTION_TYPES, ConnectionType, PBPeer, PeerData }; | ||
export namespace Message { | ||
export { MESSAGE_TYPE as TYPES }; | ||
export { CONNECTION_TYPE as CONNECTION_TYPES }; | ||
} | ||
export const MESSAGE_TYPE: typeof Proto.Message.MessageType; | ||
export const MESSAGE_TYPE_LOOKUP: string[]; | ||
import Proto = require("./dht"); | ||
type PeerData = import('../index').PeerData; | ||
declare const MESSAGE_TYPE: typeof Proto.Message.MessageType; | ||
declare const CONNECTION_TYPE: typeof Proto.Message.ConnectionType; | ||
type ConnectionType = 0 | 1 | 2 | 3 | 4; | ||
type PBPeer = { | ||
id: Uint8Array; | ||
addrs: Uint8Array[]; | ||
connection: ConnectionType; | ||
}; | ||
export {}; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,5 +0,10 @@ | ||
export = Network; | ||
export type PeerId = import('peer-id'); | ||
export type MuxedStream = import('libp2p-interfaces/src/stream-muxer/types').MuxedStream; | ||
export type QueryEvent = import('./types').QueryEvent; | ||
export type PeerData = import('./types').PeerData; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream | ||
* @typedef {import('./types').QueryEvent} QueryEvent | ||
* @typedef {import('./types').PeerData} PeerData | ||
*/ | ||
@@ -9,25 +14,31 @@ /** | ||
*/ | ||
declare class Network { | ||
export class Network extends EventEmitter { | ||
/** | ||
* Create a new network | ||
* | ||
* @param {import('./index')} dht | ||
* @param {object} params | ||
* @param {import('./types').Dialer} params.dialer | ||
* @param {string} params.protocol | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor(dht: import('./index')); | ||
dht: import("./index"); | ||
readMessageTimeout: number; | ||
constructor({ dialer, protocol, lan }: { | ||
dialer: import('./types').Dialer; | ||
protocol: string; | ||
lan: boolean; | ||
}); | ||
_log: debug.Debugger & { | ||
error: debug.Debugger; | ||
error: debug.Debugger; /** | ||
* Write a message and read its response. | ||
* If no response is received after the specified timeout | ||
* this will error out. | ||
* | ||
* @param {MuxedStream} stream - the stream to use | ||
* @param {Uint8Array} msg - the message to send | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
}; | ||
_rpc: ({ stream, connection }: { | ||
stream: import("libp2p-interfaces/src/stream-muxer/types").MuxedStream; | ||
connection: import("libp2p-interfaces/src/connection/connection"); | ||
}) => Promise<void>; | ||
/** | ||
* Registrar notifies a connection successfully with dht protocol. | ||
* | ||
* @param {PeerId} peerId - remote peer id | ||
*/ | ||
_onPeerConnected(peerId: PeerId): Promise<void>; | ||
_running: boolean; | ||
_dialer: import("./types").Dialer; | ||
_protocol: string; | ||
/** | ||
@@ -37,3 +48,2 @@ * Start the network | ||
start(): void; | ||
_registrarId: any; | ||
/** | ||
@@ -50,15 +60,12 @@ * Stop all network activity | ||
/** | ||
* Are all network components there? | ||
* Send a request and record RTT for latency measurements | ||
* | ||
* @type {boolean} | ||
*/ | ||
get isConnected(): boolean; | ||
/** | ||
* Send a request and record RTT for latency measurements. | ||
* | ||
* @async | ||
* @param {PeerId} to - The peer that should receive a message | ||
* @param {Message} msg - The message to send. | ||
* @param {Message} msg - The message to send | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
sendRequest(to: PeerId, msg: Message): Promise<Message>; | ||
sendRequest(to: PeerId, msg: Message, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("./types").SendingQueryEvent | import("./types").PeerResponseEvent | import("./types").QueryErrorEvent | import("./types").DialingPeerEvent, void, unknown>; | ||
/** | ||
@@ -69,27 +76,35 @@ * Sends a message without expecting an answer. | ||
* @param {Message} msg | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
sendMessage(to: PeerId, msg: Message): Promise<any>; | ||
sendMessage(to: PeerId, msg: Message, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("./types").SendingQueryEvent | import("./types").PeerResponseEvent | import("./types").QueryErrorEvent | import("./types").DialingPeerEvent, void, unknown>; | ||
/** | ||
* Write a message and read its response. | ||
* If no response is received after the specified timeout | ||
* this will error out. | ||
* Write a message to the given stream | ||
* | ||
* @param {MuxedStream} stream - the stream to use | ||
* @param {Uint8Array} msg - the message to send | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
_writeReadMessage(stream: MuxedStream, msg: Uint8Array): Promise<Message>; | ||
_writeMessage(stream: MuxedStream, msg: Uint8Array, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): Promise<void>; | ||
/** | ||
* Write a message to the given stream. | ||
* Write a message and read its response. | ||
* If no response is received after the specified timeout | ||
* this will error out. | ||
* | ||
* @param {MuxedStream} stream - the stream to use | ||
* @param {Uint8Array} msg - the message to send | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
_writeMessage(stream: MuxedStream, msg: Uint8Array): any; | ||
_writeReadMessage(stream: MuxedStream, msg: Uint8Array, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): Promise<Message>; | ||
} | ||
declare namespace Network { | ||
export { PeerId, MuxedStream }; | ||
} | ||
type PeerId = import('peer-id'); | ||
import Message = require("./message"); | ||
type MuxedStream = import('libp2p-interfaces/src/stream-muxer/types').MuxedStream; | ||
import { EventEmitter } from "events"; | ||
import { Message } from "./message"; | ||
//# sourceMappingURL=network.d.ts.map |
export = PeerList; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../').PeerData} PeerData | ||
*/ | ||
@@ -10,10 +9,10 @@ /** | ||
declare class PeerList { | ||
/** @type {PeerData[]} */ | ||
list: PeerData[]; | ||
/** @type {PeerId[]} */ | ||
list: PeerId[]; | ||
/** | ||
* Add a new peer. Returns `true` if it was a new one | ||
* | ||
* @param {PeerData} peerData | ||
* @param {PeerId} peerId | ||
*/ | ||
push(peerData: PeerData): boolean; | ||
push(peerId: PeerId): boolean; | ||
/** | ||
@@ -28,7 +27,7 @@ * Check if this PeerData is already in here. | ||
*/ | ||
toArray(): import("../").PeerData[]; | ||
toArray(): import("peer-id")[]; | ||
/** | ||
* Remove the last element | ||
*/ | ||
pop(): import("../").PeerData | undefined; | ||
pop(): import("peer-id") | undefined; | ||
/** | ||
@@ -40,6 +39,5 @@ * The length of the list | ||
declare namespace PeerList { | ||
export { PeerId, PeerData }; | ||
export { PeerId }; | ||
} | ||
type PeerData = import('../').PeerData; | ||
type PeerId = import('peer-id'); | ||
//# sourceMappingURL=index.d.ts.map |
export = PeerDistanceList; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../').PeerData} PeerData | ||
*/ | ||
@@ -47,6 +46,5 @@ /** | ||
declare namespace PeerDistanceList { | ||
export { PeerId, PeerData }; | ||
export { PeerId }; | ||
} | ||
type PeerId = import('peer-id'); | ||
type PeerData = import('../').PeerData; | ||
//# sourceMappingURL=peer-distance-list.d.ts.map |
@@ -1,48 +0,126 @@ | ||
declare function _exports(dht: import('../index')): { | ||
export type Multiaddr = import('multiaddr').Multiaddr; | ||
export type PeerData = import('../types').PeerData; | ||
/** | ||
* @typedef {import('multiaddr').Multiaddr} Multiaddr | ||
* @typedef {import('../types').PeerData} PeerData | ||
*/ | ||
export class PeerRouting { | ||
/** | ||
* Ask peer `peer` if they know where the peer with id `target` is. | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('../types').PeerStore} params.peerStore | ||
* @param {import('../network').Network} params.network | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {import('../query/manager').QueryManager} params.queryManager | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor({ peerId, routingTable, peerStore, network, validators, queryManager, lan }: { | ||
peerId: import('peer-id'); | ||
routingTable: import('../routing-table').RoutingTable; | ||
peerStore: import('../types').PeerStore; | ||
network: import('../network').Network; | ||
validators: import('libp2p-interfaces/src/types').DhtValidators; | ||
queryManager: import('../query/manager').QueryManager; | ||
lan: boolean; | ||
}); | ||
_peerId: PeerId; | ||
_routingTable: import("../routing-table").RoutingTable; | ||
_peerStore: import("../types").PeerStore; | ||
_network: import("../network").Network; | ||
_validators: import("libp2p-interfaces/src/types").DhtValidators; | ||
_queryManager: import("../query/manager").QueryManager; | ||
_log: debug.Debugger & { | ||
error: debug.Debugger; | ||
}; | ||
/** | ||
* Look if we are connected to a peer with the given id. | ||
* Returns its id and addresses, if found, otherwise `undefined`. | ||
* | ||
* @param {PeerId} peer | ||
* @param {PeerId} target | ||
* @returns {Promise<Message>} | ||
* @private | ||
*/ | ||
_findPeerSingle(peer: PeerId, target: PeerId): Promise<Message>; | ||
findPeerLocal(peer: PeerId): Promise<{ | ||
id: PeerId; | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
} | undefined>; | ||
/** | ||
* Get a value via rpc call for the given parameters. | ||
* | ||
* @param {PeerId} peer | ||
* @param {Uint8Array} key | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
_getValueSingle(peer: PeerId, key: Uint8Array, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").DialingPeerEvent, void, unknown>; | ||
/** | ||
* Get the public key directly from a node. | ||
* | ||
* @param {PeerId} peer | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
getPublicKeyFromNode(peer: PeerId, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").ValueEvent | import("../types").DialingPeerEvent, void, unknown>; | ||
/** | ||
* Search for a peer with the given ID. | ||
* | ||
* @param {PeerId} id | ||
* @param {Object} [options] - findPeer options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds | ||
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
findPeer(id: PeerId, options?: { | ||
timeout?: number | undefined; | ||
} | undefined): Promise<{ | ||
id: PeerId; | ||
multiaddrs: Multiaddr[]; | ||
}>; | ||
signal?: AbortSignal | undefined; | ||
queryFuncTimeout?: number | undefined; | ||
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, unknown>; | ||
/** | ||
* Kademlia 'node lookup' operation. | ||
* Kademlia 'node lookup' operation | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Object} [options] | ||
* @param {boolean} [options.shallow=false] - shallow query | ||
* @returns {AsyncIterable<PeerId>} | ||
* @param {Uint8Array} key - the key to look up, could be a the bytes from a multihash or a peer ID | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
getClosestPeers(key: Uint8Array, options?: { | ||
shallow?: boolean | undefined; | ||
} | undefined): AsyncIterable<PeerId>; | ||
signal?: AbortSignal | undefined; | ||
queryFuncTimeout?: number | undefined; | ||
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, undefined>; | ||
/** | ||
* Get the public key for the given peer id. | ||
* Query a particular peer for the value for the given key. | ||
* It will either return the value or a list of closer peers. | ||
* | ||
* Note: The peerStore is updated with new addresses found for the given peer. | ||
* | ||
* @param {PeerId} peer | ||
* @param {Uint8Array} key | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
getPublicKey(peer: PeerId): Promise<crypto.PublicKey>; | ||
}; | ||
export = _exports; | ||
export type Multiaddr = import('multiaddr').Multiaddr; | ||
getValueOrPeers(peer: PeerId, key: Uint8Array, options?: { | ||
signal?: AbortSignal | undefined; | ||
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").DialingPeerEvent, void, unknown>; | ||
/** | ||
* Verify a record, fetching missing public keys from the network. | ||
* Calls back with an error if the record is invalid. | ||
* | ||
* @param {import('../types').DHTRecord} record | ||
* @returns {Promise<void>} | ||
*/ | ||
_verifyRecordOnline({ key, value, timeReceived }: import('../types').DHTRecord): Promise<void>; | ||
/** | ||
* Get the nearest peers to the given query, but if closer | ||
* than self | ||
* | ||
* @param {Uint8Array} key | ||
* @param {PeerId} closerThan | ||
*/ | ||
getCloserPeersOffline(key: Uint8Array, closerThan: PeerId): Promise<{ | ||
id: PeerId; | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
}[]>; | ||
} | ||
import PeerId = require("peer-id"); | ||
import Message = require("../message"); | ||
import crypto = require("libp2p-crypto"); | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,2 +0,3 @@ | ||
export = Providers; | ||
export type CID = import('multiformats/cid').CID; | ||
export type Datastore = import('interface-datastore').Datastore; | ||
/** | ||
@@ -18,13 +19,9 @@ * @typedef {import('multiformats/cid').CID} CID | ||
*/ | ||
declare class Providers { | ||
export class Providers { | ||
/** | ||
* @param {Datastore} datastore | ||
* @param {PeerId} [self] | ||
* @param {number} [cacheSize=256] | ||
*/ | ||
constructor(datastore: Datastore, self?: PeerId | undefined, cacheSize?: number | undefined); | ||
constructor(datastore: Datastore, cacheSize?: number | undefined); | ||
datastore: import("interface-datastore").Datastore; | ||
_log: debug.Debugger & { | ||
error: debug.Debugger; | ||
}; | ||
/** | ||
@@ -92,9 +89,4 @@ * How often invalid records are cleaned. (in seconds) | ||
} | ||
declare namespace Providers { | ||
export { CID, Datastore }; | ||
} | ||
import { default as Queue } from "p-queue"; | ||
type CID = import('multiformats/cid').CID; | ||
import PeerId = require("peer-id"); | ||
type Datastore = import('interface-datastore').Datastore; | ||
//# sourceMappingURL=providers.d.ts.map |
@@ -1,21 +0,38 @@ | ||
export = RoutingTable; | ||
export type KBucketPeer = import('./types').KBucketPeer; | ||
export type KBucket = import('./types').KBucket; | ||
export type KBucketTree = import('./types').KBucketTree; | ||
export type PeerId = import('peer-id'); | ||
/** | ||
* @typedef {import('./types').KBucketPeer} KBucketPeer | ||
* @typedef {import('./types').KBucket} KBucket | ||
* @typedef {import('./types').KBucketTree} KBucketTree | ||
* @typedef {import('peer-id')} PeerId | ||
*/ | ||
/** | ||
* A wrapper around `k-bucket`, to provide easy store and | ||
* retrieval for peers. | ||
*/ | ||
declare class RoutingTable { | ||
export class RoutingTable { | ||
/** | ||
* @param {import('../')} dht | ||
* @param {object} [options] | ||
* @param {number} [options.kBucketSize=20] | ||
* @param {number} [options.refreshInterval=30000] | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../types').Dialer} params.dialer | ||
* @param {boolean} params.lan | ||
* @param {number} [params.kBucketSize=20] | ||
* @param {number} [params.pingTimeout=10000] | ||
*/ | ||
constructor(dht: import('../'), { kBucketSize, refreshInterval }?: { | ||
constructor({ peerId, dialer, kBucketSize, pingTimeout, lan }: { | ||
peerId: import('peer-id'); | ||
dialer: import('../types').Dialer; | ||
lan: boolean; | ||
kBucketSize?: number | undefined; | ||
refreshInterval?: number | undefined; | ||
} | undefined); | ||
peerId: PeerId; | ||
dht: import("../"); | ||
pingTimeout?: number | undefined; | ||
}); | ||
_log: debug.Debugger & { | ||
error: debug.Debugger; | ||
}; | ||
_peerId: import("peer-id"); | ||
_dialer: import("../types").Dialer; | ||
_kBucketSize: number; | ||
_refreshInterval: number; | ||
_pingTimeout: number; | ||
/** @type {KBucketTree} */ | ||
@@ -26,17 +43,11 @@ kb: KBucketTree; | ||
/** | ||
* To speed lookups, we seed the table with random PeerIds. This means | ||
* when we are asked to locate a peer on the network, we can find a KadId | ||
* that is close to the requested peer ID and query that, then network | ||
* peers will tell us who they know who is close to the fake ID | ||
* Called on the `ping` event from `k-bucket` when a bucket is full | ||
* and cannot split. | ||
* | ||
* @param {boolean} [force=false] | ||
*/ | ||
_refreshTable(force?: boolean | undefined): Promise<void>; | ||
/** | ||
* Called on the `ping` event from `k-bucket`. | ||
* Currently this just removes the oldest contact from | ||
* the list, without actually pinging the individual peers. | ||
* This is the same as go does, but should probably | ||
* be upgraded to actually ping the individual peers. | ||
* `oldContacts.length` is defined by the `numberOfNodesToPing` param | ||
* passed to the `k-bucket` constructor. | ||
* | ||
* `oldContacts` will not be empty and is the list of contacts that | ||
* have not been contacted for the longest. | ||
* | ||
* @param {KBucketPeer[]} oldContacts | ||
@@ -46,42 +57,6 @@ * @param {KBucketPeer} newContact | ||
_onPing(oldContacts: KBucketPeer[], newContact: KBucketPeer): void; | ||
_pingQueue: Queue<import("p-queue/dist/priority-queue").default, import("p-queue").DefaultAddOptions>; | ||
start(): Promise<void>; | ||
stop(): Promise<void>; | ||
_refreshTimeoutId: NodeJS.Timeout | undefined; | ||
/** | ||
* @param {number} cpl | ||
* @param {Date} lastRefresh | ||
* @param {boolean} force | ||
*/ | ||
_refreshCommonPrefixLength(cpl: number, lastRefresh: Date, force: boolean): Promise<void>; | ||
/** | ||
* @param {number} maxCommonPrefix | ||
*/ | ||
_getTrackedCommonPrefixLengthsForRefresh(maxCommonPrefix: number): Date[]; | ||
/** | ||
* | ||
* @param {number} targetCommonPrefixLength | ||
*/ | ||
_generateRandomPeerId(targetCommonPrefixLength: number): Promise<PeerId>; | ||
/** | ||
* @param {Uint8Array} localKadId | ||
* @param {number} randomPrefix | ||
* @param {number} targetCommonPrefixLength | ||
*/ | ||
_makePeerId(localKadId: Uint8Array, randomPrefix: number, targetCommonPrefixLength: number): Promise<Uint8Array>; | ||
/** | ||
* returns the maximum common prefix length between any peer in the table | ||
* and the current peer | ||
*/ | ||
_maxCommonPrefix(): number; | ||
/** | ||
* Returns the number of peers in the table with a given prefix length | ||
* | ||
* @param {number} prefixLength | ||
*/ | ||
_numPeersForCpl(prefixLength: number): number; | ||
/** | ||
* Yields the common prefix length of every peer in the table | ||
*/ | ||
_prefixLengths(): Generator<number, void, unknown>; | ||
/** | ||
* Amount of currently stored peers. | ||
@@ -95,3 +70,3 @@ */ | ||
*/ | ||
find(peer: PeerId): Promise<PeerId | undefined>; | ||
find(peer: PeerId): Promise<import("peer-id") | undefined>; | ||
/** | ||
@@ -102,3 +77,3 @@ * Retrieve the closest peers to the given key. | ||
*/ | ||
closestPeer(key: Uint8Array): PeerId | undefined; | ||
closestPeer(key: Uint8Array): import("peer-id") | undefined; | ||
/** | ||
@@ -108,5 +83,5 @@ * Retrieve the `count`-closest peers to the given key. | ||
* @param {Uint8Array} key | ||
* @param {number} count | ||
* @param {number} [count] - defaults to kBucketSize | ||
*/ | ||
closestPeers(key: Uint8Array, count: number): PeerId[]; | ||
closestPeers(key: Uint8Array, count?: number | undefined): import("peer-id")[]; | ||
/** | ||
@@ -125,28 +100,3 @@ * Add or update the routing table with the given peer. | ||
} | ||
declare namespace RoutingTable { | ||
export { KBucketPeer, KBucket, KBucketTree }; | ||
} | ||
import PeerId = require("peer-id"); | ||
type KBucketTree = { | ||
root: KBucket; | ||
localNodeId: Uint8Array; | ||
on: (event: string, callback: Function) => void; | ||
closest: (key: Uint8Array, count: number) => KBucketPeer[]; | ||
closestPeer: (key: Uint8Array) => KBucketPeer; | ||
remove: (key: Uint8Array) => void; | ||
add: (peer: KBucketPeer) => void; | ||
count: () => number; | ||
toIterable: () => Iterable<KBucket>; | ||
}; | ||
type KBucketPeer = { | ||
id: Uint8Array; | ||
peer: PeerId; | ||
}; | ||
type KBucket = { | ||
id: Uint8Array; | ||
contacts: KBucketPeer[]; | ||
dontSplit: boolean; | ||
left: KBucket; | ||
right: KBucket; | ||
}; | ||
import { default as Queue } from "p-queue"; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,5 +0,33 @@ | ||
declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<void>; | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
export type Message = import('../../message'); | ||
export type Message = import('../../message').Message; | ||
export type DHTMessageHandler = import('../types').DHTMessageHandler; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../../message').Message} Message | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
export class AddProviderHandler implements DHTMessageHandler { | ||
/** | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../providers').Providers} params.providers | ||
* @param {import('../../types').PeerStore} params.peerStore | ||
*/ | ||
constructor({ peerId, providers, peerStore }: { | ||
peerId: PeerId; | ||
providers: import('../../providers').Providers; | ||
peerStore: import('../../types').PeerStore; | ||
}); | ||
_peerId: import("peer-id"); | ||
_providers: import("../../providers").Providers; | ||
_peerStore: import("../../types").PeerStore; | ||
/** | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
handle(peerId: PeerId, msg: Message): Promise<undefined>; | ||
} | ||
//# sourceMappingURL=add-provider.d.ts.map |
@@ -1,5 +0,37 @@ | ||
declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<Message>; | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
import Message = require("../../message"); | ||
export type DHTMessageHandler = import('../types').DHTMessageHandler; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
export class FindNodeHandler implements DHTMessageHandler { | ||
/** | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../types').Addressable} params.addressable | ||
* @param {import('../../peer-routing').PeerRouting} params.peerRouting | ||
* @param {boolean} [params.lan] | ||
*/ | ||
constructor({ peerId, addressable, peerRouting, lan }: { | ||
peerId: PeerId; | ||
addressable: import('../../types').Addressable; | ||
peerRouting: import('../../peer-routing').PeerRouting; | ||
lan?: boolean | undefined; | ||
}); | ||
_peerId: import("peer-id"); | ||
_addressable: import("../../types").Addressable; | ||
_peerRouting: import("../../peer-routing").PeerRouting; | ||
_lan: boolean; | ||
/** | ||
* Process `FindNode` DHT messages | ||
* | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
handle(peerId: PeerId, msg: Message): Promise<Message>; | ||
} | ||
import { Message } from "../../message"; | ||
//# sourceMappingURL=find-node.d.ts.map |
@@ -1,5 +0,46 @@ | ||
declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<Message>; | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
import Message = require("../../message"); | ||
export type DHTMessageHandler = import('../types').DHTMessageHandler; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
export class GetProvidersHandler implements DHTMessageHandler { | ||
/** | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('../../providers').Providers} params.providers | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
* @param {import('../../types').PeerStore} params.peerStore | ||
* @param {import('../../types').Addressable} params.addressable | ||
* @param {boolean} [params.lan] | ||
*/ | ||
constructor({ peerId, peerRouting, providers, datastore, peerStore, addressable, lan }: { | ||
peerId: PeerId; | ||
peerRouting: import('../../peer-routing').PeerRouting; | ||
providers: import('../../providers').Providers; | ||
datastore: import('interface-datastore').Datastore; | ||
peerStore: import('../../types').PeerStore; | ||
addressable: import('../../types').Addressable; | ||
lan?: boolean | undefined; | ||
}); | ||
_peerId: import("peer-id"); | ||
_peerRouting: import("../../peer-routing").PeerRouting; | ||
_providers: import("../../providers").Providers; | ||
_datastore: import("interface-datastore").Datastore; | ||
_peerStore: import("../../types").PeerStore; | ||
_addressable: import("../../types").Addressable; | ||
_lan: boolean; | ||
/** | ||
* Process `GetProviders` DHT messages. | ||
* | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
handle(peerId: PeerId, msg: Message): Promise<Message>; | ||
} | ||
import { Message } from "../../message"; | ||
//# sourceMappingURL=get-providers.d.ts.map |
@@ -1,5 +0,46 @@ | ||
declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<Message>; | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
import Message = require("../../message"); | ||
export type DHTMessageHandler = import('../types').DHTMessageHandler; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
export class GetValueHandler implements DHTMessageHandler { | ||
/** | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../types').PeerStore} params.peerStore | ||
* @param {import('../../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
*/ | ||
constructor({ peerId, peerStore, peerRouting, datastore }: { | ||
peerId: PeerId; | ||
peerStore: import('../../types').PeerStore; | ||
peerRouting: import('../../peer-routing').PeerRouting; | ||
datastore: import('interface-datastore').Datastore; | ||
}); | ||
_peerId: import("peer-id"); | ||
_peerStore: import("../../types").PeerStore; | ||
_peerRouting: import("../../peer-routing").PeerRouting; | ||
_datastore: import("interface-datastore").Datastore; | ||
/** | ||
* Process `GetValue` DHT messages. | ||
* | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
handle(peerId: PeerId, msg: Message): Promise<Message>; | ||
/** | ||
* Try to fetch a given record by from the local datastore. | ||
* Returns the record iff it is still valid, meaning | ||
* - it was either authored by this node, or | ||
* - it was received less than `MAX_RECORD_AGE` ago. | ||
* | ||
* @param {Uint8Array} key | ||
*/ | ||
_checkLocalDatastore(key: Uint8Array): Promise<import("libp2p-record/dist/src/record") | undefined>; | ||
} | ||
import { Message } from "../../message"; | ||
//# sourceMappingURL=get-value.d.ts.map |
@@ -1,3 +0,13 @@ | ||
declare function _exports(dht: import('../../index')): (type: number) => any; | ||
declare function _exports({ peerId, providers, peerStore, addressable, peerRouting, datastore, validators, lan }: { | ||
peerId: import('peer-id'); | ||
providers: import('../../providers').Providers; | ||
peerStore: import('../../types').PeerStore; | ||
addressable: import('../../types').Addressable; | ||
peerRouting: import('../../peer-routing').PeerRouting; | ||
datastore: import('interface-datastore').Datastore; | ||
validators: import('libp2p-interfaces/src/types').DhtValidators; | ||
lan?: boolean | undefined; | ||
}): (type: number) => import("../types").DHTMessageHandler; | ||
export = _exports; | ||
export type DHTMessageHandler = import('../types').DHTMessageHandler; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -1,5 +0,21 @@ | ||
declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => import("../../message"); | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
export type Message = import('../../message'); | ||
export type Message = import('../../message').Message; | ||
export type DHTMessageHandler = import('../types').DHTMessageHandler; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../../message').Message} Message | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
export class PingHandler implements DHTMessageHandler { | ||
/** | ||
* Process `Ping` DHT messages. | ||
* | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
handle(peerId: PeerId, msg: Message): Promise<import("../../message").Message>; | ||
} | ||
//# sourceMappingURL=ping.d.ts.map |
@@ -1,5 +0,32 @@ | ||
declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<import("../../message")>; | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
export type Message = import('../../message'); | ||
export type Message = import('../../message').Message; | ||
export type DHTMessageHandler = import('../types').DHTMessageHandler; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../../message').Message} Message | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
export class PutValueHandler implements DHTMessageHandler { | ||
/** | ||
* @param {object} params | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
*/ | ||
constructor({ validators, datastore }: { | ||
validators: import('libp2p-interfaces/src/types').DhtValidators; | ||
datastore: import('interface-datastore').Datastore; | ||
}); | ||
_validators: import("libp2p-interfaces/src/types").DhtValidators; | ||
_datastore: import("interface-datastore").Datastore; | ||
/** | ||
* Process `PutValue` DHT messages. | ||
* | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
handle(peerId: PeerId, msg: Message): Promise<import("../../message").Message>; | ||
} | ||
//# sourceMappingURL=put-value.d.ts.map |
@@ -1,8 +0,56 @@ | ||
declare function _exports(dht: import('../index')): ({ stream, connection }: { | ||
stream: MuxedStream; | ||
connection: import("libp2p-interfaces/src/connection/connection"); | ||
}) => Promise<void>; | ||
export = _exports; | ||
export type PeerId = import('peer-id'); | ||
export type MuxedStream = import('libp2p-interfaces/src/stream-muxer/types').MuxedStream; | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream | ||
*/ | ||
/** | ||
* @param {import('../types').DHT} dht | ||
*/ | ||
export class RPC { | ||
/** | ||
* @param {object} params | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../providers').Providers} params.providers | ||
* @param {import('../types').PeerStore} params.peerStore | ||
* @param {import('../types').Addressable} params.addressable | ||
* @param {import('../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {boolean} [params.lan] | ||
*/ | ||
constructor(params: { | ||
routingTable: import('../routing-table').RoutingTable; | ||
peerId: import('peer-id'); | ||
providers: import('../providers').Providers; | ||
peerStore: import('../types').PeerStore; | ||
addressable: import('../types').Addressable; | ||
peerRouting: import('../peer-routing').PeerRouting; | ||
datastore: import('interface-datastore').Datastore; | ||
validators: import('libp2p-interfaces/src/types').DhtValidators; | ||
lan?: boolean | undefined; | ||
}); | ||
_messageHandler: (type: number) => import("./types").DHTMessageHandler; | ||
_routingTable: import("../routing-table").RoutingTable; | ||
/** | ||
* Process incoming DHT messages. | ||
* | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
handleMessage(peerId: PeerId, msg: Message): Promise<Message | undefined>; | ||
/** | ||
* Handle incoming streams on the dht protocol | ||
* | ||
* @param {object} props | ||
* @param {MuxedStream} props.stream | ||
* @param {import('libp2p-interfaces/src/connection').Connection} props.connection | ||
*/ | ||
onIncomingStream({ stream, connection }: { | ||
stream: MuxedStream; | ||
connection: import("libp2p-interfaces/src/connection/connection"); | ||
}): Promise<void>; | ||
} | ||
import { Message } from "../message"; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -0,29 +1,74 @@ | ||
/** | ||
* @param {import('./types').PeerData} peer | ||
*/ | ||
export function removePrivateAddresses({ id, multiaddrs }: import('./types').PeerData): { | ||
id: PeerId; | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
}; | ||
/** | ||
* @param {import('./types').PeerData} peer | ||
*/ | ||
export function removePublicAddresses({ id, multiaddrs }: import('./types').PeerData): { | ||
id: PeerId; | ||
multiaddrs: import("multiaddr").Multiaddr[]; | ||
}; | ||
/** | ||
* Creates a DHT ID by hashing a given Uint8Array. | ||
* | ||
* @param {Uint8Array} buf | ||
* @returns {Promise<Uint8Array>} | ||
*/ | ||
export function convertBuffer(buf: Uint8Array): Promise<Uint8Array>; | ||
/** | ||
* Creates a DHT ID by hashing a Peer ID | ||
* | ||
* @param {PeerId} peer | ||
* @returns {Promise<Uint8Array>} | ||
*/ | ||
export function convertPeerId(peer: PeerId): Promise<Uint8Array>; | ||
/** | ||
* Convert a Uint8Array to their SHA2-256 hash. | ||
* | ||
* @param {Uint8Array} buf | ||
* @returns {Key} | ||
*/ | ||
export function bufferToKey(buf: Uint8Array): Key; | ||
/** | ||
* Generate the key for a public key. | ||
* | ||
* @param {PeerId} peer | ||
* @returns {Uint8Array} | ||
*/ | ||
export function keyForPublicKey(peer: PeerId): Uint8Array; | ||
/** | ||
* @param {Uint8Array} key | ||
*/ | ||
export function isPublicKeyKey(key: Uint8Array): boolean; | ||
/** | ||
* @param {Uint8Array} key | ||
*/ | ||
export function isIPNSKey(key: Uint8Array): boolean; | ||
/** | ||
* @param {Uint8Array} key | ||
*/ | ||
export function fromPublicKeyKey(key: Uint8Array): PeerId; | ||
export function now(): number; | ||
export function encodeBase32(buf: Uint8Array): string; | ||
export function decodeBase32(raw: string): Uint8Array; | ||
export function sortClosestPeers(peers: Array<PeerId>, target: Uint8Array): Promise<PeerId[]>; | ||
export function xorCompare(a: { | ||
distance: Uint8Array; | ||
}, b: { | ||
distance: Uint8Array; | ||
}): 0 | 1 | -1; | ||
export function pathSize(resultsWanted: number, numPaths: number): number; | ||
/** | ||
* Create a new put record, encodes and signs it if enabled. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
* @returns {Uint8Array} | ||
*/ | ||
export function createPutRecord(key: Uint8Array, value: Uint8Array): Uint8Array; | ||
export function logger(id?: PeerId | undefined, subsystem?: string | undefined): debug.Debugger & { | ||
/** | ||
* Creates a logger for the given subsystem | ||
* | ||
* @param {string} name | ||
*/ | ||
export function logger(name: string): debug.Debugger & { | ||
error: debug.Debugger; | ||
}; | ||
export function withTimeout<T>(asyncFn: (...args: any[]) => Promise<T>, time?: number | undefined): (...args: any[]) => Promise<T>; | ||
export function mapParallel<T, O>(asyncIterator: AsyncIterable<T>, asyncFn: (arg0: T) => Promise<O>): Promise<O[]>; | ||
import PeerId = require("peer-id"); | ||
import { Key } from "interface-datastore/key"; | ||
import debug = require("debug"); | ||
export class TimeoutError extends Error { | ||
get code(): string; | ||
} | ||
//# sourceMappingURL=utils.d.ts.map |
{ | ||
"name": "libp2p-kad-dht", | ||
"version": "0.25.0", | ||
"version": "0.26.0", | ||
"description": "JavaScript implementation of the Kad-DHT for libp2p", | ||
@@ -8,4 +8,4 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>", | ||
"scripts": { | ||
"prepare": "npm run build", | ||
"lint": "aegir ts -p check && aegir lint", | ||
"prepare": "npm run build", | ||
"build": "npm run build:proto && npm run build:proto-types && aegir build", | ||
@@ -22,3 +22,4 @@ "build:proto": "pbjs -t static-module -w commonjs -r libp2p-dht-message --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/message/dht.js ./src/message/dht.proto", | ||
"coverage-publish": "aegir-coverage publish", | ||
"sim": "node test/simulation/index.js" | ||
"sim": "node test/simulation/index.js", | ||
"dep-check": "aegir dep-check" | ||
}, | ||
@@ -52,2 +53,3 @@ "files": [ | ||
"dependencies": { | ||
"any-signal": "^2.1.2", | ||
"datastore-core": "^6.0.7", | ||
@@ -57,8 +59,13 @@ "debug": "^4.3.1", | ||
"hashlru": "^2.3.0", | ||
"heap": "~0.2.6", | ||
"interface-datastore": "^6.0.2", | ||
"it-all": "^1.0.5", | ||
"it-drain": "^1.0.4", | ||
"it-first": "^1.0.4", | ||
"it-length": "^1.0.3", | ||
"it-length-prefixed": "^5.0.2", | ||
"it-map": "^1.0.5", | ||
"it-merge": "^1.0.3", | ||
"it-parallel": "^2.0.1", | ||
"it-pipe": "^1.1.0", | ||
"it-take": "^1.0.2", | ||
"k-bucket": "^5.1.0", | ||
@@ -70,8 +77,11 @@ "libp2p-crypto": "^0.19.5", | ||
"multiformats": "^9.4.5", | ||
"native-abort-controller": "^1.0.4", | ||
"p-defer": "^3.0.0", | ||
"p-map": "^4.0.0", | ||
"p-queue": "^6.6.2", | ||
"p-timeout": "^4.1.0", | ||
"peer-id": "^0.15.0", | ||
"private-ip": "^2.3.3", | ||
"protobufjs": "^6.10.2", | ||
"streaming-iterables": "^6.0.0", | ||
"timeout-abort-controller": "^2.0.0", | ||
"uint8arrays": "^3.0.0", | ||
@@ -84,13 +94,11 @@ "varint": "^6.0.0" | ||
"async-iterator-all": "^1.0.0", | ||
"crypto-browserify": "^3.12.0", | ||
"datastore-level": "^7.0.1", | ||
"delay": "^5.0.0", | ||
"execa": "^5.1.1", | ||
"it-filter": "^1.0.3", | ||
"it-last": "^1.0.6", | ||
"it-pair": "^1.0.0", | ||
"libp2p": "^0.32.3", | ||
"libp2p": "^0.33.0", | ||
"lodash.random": "^3.2.0", | ||
"lodash.range": "^3.2.0", | ||
"p-defer": "^3.0.0", | ||
"p-each-series": "^2.1.0", | ||
"p-map-series": "^2.1.0", | ||
"p-retry": "^4.2.0", | ||
@@ -97,0 +105,0 @@ "sinon": "^11.1.1", |
@@ -1,2 +0,2 @@ | ||
# js-libp2p-kad-dht | ||
# js-libp2p-kad-dht <!-- omit in toc --> | ||
@@ -7,3 +7,3 @@ [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) | ||
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io) | ||
[![Build Status](https://travis-ci.org/libp2p/js-libp2p-kad-dht.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-kad-dht) | ||
[![Build status](https://github.com/libp2p/js-libp2p-kad-dht/actions/workflows/test.yml/badge.svg?branch=master)](https://github.com/libp2p/js-libp2p-kad-dht/actions/workflows/test.yml) | ||
[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-kad-dht/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-kad-dht?branch=master) | ||
@@ -19,7 +19,7 @@ [![Dependency Status](https://david-dm.org/libp2p/js-libp2p-kad-dht.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-kad-dht) | ||
## Lead Maintainer | ||
## Lead Maintainer <!-- omit in toc --> | ||
[Vasco Santos](https://github.com/vasco-santos). | ||
## Table of Contents | ||
## Table of Contents <!-- omit in toc --> | ||
@@ -30,5 +30,9 @@ - [Install](#install) | ||
- [API](#api) | ||
- [Custom secondary DHT in libp2p](#custom-secondary-dht-in-libp2p) | ||
- [Peer Routing](#peer-routing) | ||
- [Content Routing](#content-routing) | ||
- [Peer Discovery](#peer-discovery) | ||
- [Implementation Summary](#implementation-summary) | ||
- [Contribute](#contribute) | ||
- [License](#license) | ||
## Install | ||
@@ -45,3 +49,3 @@ | ||
```js | ||
const KadDHT = require('libp2p-kad-dht') | ||
import { create } from 'libp2p-kad-dht' | ||
``` | ||
@@ -58,2 +62,4 @@ | ||
```js | ||
import { create } from 'libp2p-kad-dht' | ||
/** | ||
@@ -63,12 +69,8 @@ * @param {Libp2p} libp2p | ||
function addDHT(libp2p) { | ||
const customDHT = new KadDHT({ | ||
const customDHT = create({ | ||
libp2p, | ||
dialer: libp2p.dialer, | ||
peerId: libp2p.peerId, | ||
peerStore: libp2p.peerStore, | ||
registrar: libp2p.registrar, | ||
protocolPrefix: '/custom' | ||
}) | ||
customDHT.start() | ||
customDHT.on('peer', libp2p._onDiscoveryPeer) | ||
return customDHT | ||
@@ -75,0 +77,0 @@ } |
@@ -36,1 +36,4 @@ 'use strict' | ||
exports.ALPHA = 3 | ||
// How often we look for our closest DHT neighbours | ||
exports.QUERY_SELF_INTERVAL = Number(5 * minute) |
'use strict' | ||
const errcode = require('err-code') | ||
const pTimeout = require('p-timeout') | ||
const { equals: uint8ArrayEquals } = require('uint8arrays/equals') | ||
const { toString: uint8ArrayToString } = require('uint8arrays/to-string') | ||
const libp2pRecord = require('libp2p-record') | ||
const c = require('../constants') | ||
const Query = require('../query') | ||
const Libp2pRecord = require('libp2p-record') | ||
const { | ||
ALPHA | ||
} = require('../constants') | ||
const utils = require('../utils') | ||
const Record = libp2pRecord.Record | ||
const Record = Libp2pRecord.Record | ||
const parallel = require('it-parallel') | ||
const map = require('it-map') | ||
const { | ||
valueEvent, | ||
queryErrorEvent | ||
} = require('../query/events') | ||
const { Message } = require('../message') | ||
const { pipe } = require('it-pipe') | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../query').DHTQueryResult} DHTQueryResult | ||
* @typedef {import('../types').ValueEvent} ValueEvent | ||
*/ | ||
/** | ||
* @param {import('../')} dht | ||
*/ | ||
module.exports = (dht) => { | ||
class ContentFetching { | ||
/** | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {import('libp2p-interfaces/src/types').DhtSelectors} params.selectors | ||
* @param {import('../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('../query/manager').QueryManager} params.queryManager | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('../network').Network} params.network | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor ({ peerId, datastore, validators, selectors, peerRouting, queryManager, routingTable, network, lan }) { | ||
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:content-fetching`) | ||
this._peerId = peerId | ||
this._datastore = datastore | ||
this._validators = validators | ||
this._selectors = selectors | ||
this._peerRouting = peerRouting | ||
this._queryManager = queryManager | ||
this._routingTable = routingTable | ||
this._network = network | ||
} | ||
/** | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} rec | ||
*/ | ||
const putLocal = async (key, rec) => { // eslint-disable-line require-await | ||
return dht.datastore.put(utils.bufferToKey(key), rec) | ||
async putLocal (key, rec) { // eslint-disable-line require-await | ||
return this._datastore.put(utils.bufferToKey(key), rec) | ||
} | ||
@@ -36,11 +65,14 @@ | ||
*/ | ||
const getLocal = async (key) => { | ||
dht._log(`getLocal ${uint8ArrayToString(key, 'base32')}`) | ||
async getLocal (key) { | ||
this._log(`getLocal ${uint8ArrayToString(key, 'base32')}`) | ||
const raw = await dht.datastore.get(utils.bufferToKey(key)) | ||
dht._log(`found ${uint8ArrayToString(key, 'base32')} in local datastore`) | ||
const dsKey = utils.bufferToKey(key) | ||
this._log(`fetching record for key ${dsKey}`) | ||
const raw = await this._datastore.get(dsKey) | ||
this._log(`found ${dsKey} in local datastore`) | ||
const rec = Record.deserialize(raw) | ||
await dht._verifyRecordLocally(rec) | ||
await Libp2pRecord.validator.verifyRecord(this._validators, rec) | ||
@@ -54,251 +86,210 @@ return rec | ||
* @param {Uint8Array} key | ||
* @param {import('../query').DHTQueryValue[]} vals - values retrieved from the DHT | ||
* @param {ValueEvent[]} vals - values retrieved from the DHT | ||
* @param {Uint8Array} best - the best record that was found | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
const sendCorrectionRecord = async (key, vals, best) => { | ||
async * sendCorrectionRecord (key, vals, best, options = {}) { | ||
this._log('sendCorrection for %b', key) | ||
const fixupRec = await utils.createPutRecord(key, best) | ||
return Promise.all(vals.map(async (v) => { | ||
for (const { value, from } of vals) { | ||
// no need to do anything | ||
if (uint8ArrayEquals(v.val, best)) { | ||
return | ||
if (uint8ArrayEquals(value, best)) { | ||
this._log('record was ok') | ||
continue | ||
} | ||
// correct ourself | ||
if (dht._isSelf(v.from)) { | ||
if (this._peerId.equals(from)) { | ||
try { | ||
await dht._putLocal(key, fixupRec) | ||
} catch (err) { | ||
dht._log.error('Failed error correcting self', err) | ||
const dsKey = utils.bufferToKey(key) | ||
this._log(`Storing corrected record for key ${dsKey}`) | ||
await this._datastore.put(dsKey, fixupRec) | ||
} catch (/** @type {any} */ err) { | ||
this._log.error('Failed error correcting self', err) | ||
} | ||
return | ||
continue | ||
} | ||
// send correction | ||
try { | ||
await dht._putValueToPeer(key, fixupRec, v.from) | ||
} catch (err) { | ||
dht._log.error('Failed error correcting entry', err) | ||
} | ||
})) | ||
} | ||
let sentCorrection = false | ||
const request = new Message(Message.TYPES.PUT_VALUE, key, 0) | ||
request.record = Record.deserialize(fixupRec) | ||
return { | ||
/** | ||
* Store the given key/value pair locally, in the datastore. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} rec - encoded record | ||
*/ | ||
async _putLocal (key, rec) { // eslint-disable-line require-await | ||
return putLocal(key, rec) | ||
}, | ||
/** | ||
* Store the given key/value pair in the DHT. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
* @param {object} [options] - put options | ||
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length) | ||
*/ | ||
async put (key, value, options = {}) { | ||
dht._log('PutValue %b', key) | ||
// create record in the dht format | ||
const record = await utils.createPutRecord(key, value) | ||
// store the record locally | ||
await putLocal(key, record) | ||
// put record to the closest peers | ||
let counterAll = 0 | ||
let counterSuccess = 0 | ||
await utils.mapParallel(dht.getClosestPeers(key, { shallow: true }), async (peer) => { | ||
try { | ||
counterAll += 1 | ||
await dht._putValueToPeer(key, record, peer) | ||
counterSuccess += 1 | ||
} catch (err) { | ||
dht._log.error('Failed to put to peer (%b): %s', peer.id, err) | ||
for await (const event of this._network.sendRequest(from, request, options)) { | ||
if (event.name === 'PEER_RESPONSE' && event.record && uint8ArrayEquals(event.record.value, Record.deserialize(fixupRec).value)) { | ||
sentCorrection = true | ||
} | ||
}) | ||
// verify if we were able to put to enough peers | ||
const minPeers = options.minPeers || counterAll // Ensure we have a default `minPeers` | ||
yield event | ||
} | ||
if (minPeers > counterSuccess) { | ||
const error = errcode(new Error(`Failed to put value to enough peers: ${counterSuccess}/${minPeers}`), 'ERR_NOT_ENOUGH_PUT_PEERS') | ||
dht._log.error(error) | ||
throw error | ||
if (!sentCorrection) { | ||
yield queryErrorEvent({ from, error: errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') }) | ||
} | ||
}, | ||
/** | ||
* Get the value to the given key. | ||
* Times out after 1 minute by default. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
*/ | ||
async get (key, options = {}) { | ||
options.timeout = options.timeout || c.minute | ||
this._log.error('Failed error correcting entry') | ||
} | ||
} | ||
dht._log('_get %b', key) | ||
/** | ||
* Store the given key/value pair in the DHT | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
* @param {object} [options] - put options | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
async * put (key, value, options = {}) { | ||
this._log('put key %b value %b', key, value) | ||
const vals = await dht.getMany(key, c.GET_MANY_RECORD_COUNT, options) | ||
const recs = vals.map((v) => v.val) | ||
let i = 0 | ||
// create record in the dht format | ||
const record = await utils.createPutRecord(key, value) | ||
try { | ||
i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs) | ||
} catch (err) { | ||
// Assume the first record if no selector available | ||
if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') { | ||
throw err | ||
} | ||
} | ||
// store the record locally | ||
const dsKey = utils.bufferToKey(key) | ||
this._log(`storing record for key ${dsKey}`) | ||
await this._datastore.put(dsKey, record) | ||
const best = recs[i] | ||
dht._log('GetValue %b %s', key, best) | ||
// put record to the closest peers | ||
yield * pipe( | ||
this._peerRouting.getClosestPeers(key, { signal: options.signal }), | ||
(source) => map(source, (event) => { | ||
return async () => { | ||
if (event.name !== 'FINAL_PEER') { | ||
return [event] | ||
} | ||
if (!best) { | ||
throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND') | ||
} | ||
const events = [] | ||
await sendCorrectionRecord(key, vals, best) | ||
const msg = new Message(Message.TYPES.PUT_VALUE, key, 0) | ||
msg.record = Record.deserialize(record) | ||
return best | ||
}, | ||
for await (const putEvent of this._network.sendRequest(event.peer.id, msg, options)) { | ||
events.push(putEvent) | ||
/** | ||
* Get the `n` values to the given key without sorting. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {number} nvals | ||
* @param {object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
*/ | ||
async getMany (key, nvals, options = {}) { | ||
options.timeout = options.timeout || c.minute | ||
if (putEvent.name !== 'PEER_RESPONSE') { | ||
continue | ||
} | ||
dht._log('getMany %b (%s)', key, nvals) | ||
if (putEvent.record && uint8ArrayEquals(putEvent.record.value, Record.deserialize(record).value)) { | ||
} else { | ||
events.push(queryErrorEvent({ from: event.peer.id, error: errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') })) | ||
} | ||
} | ||
const vals = [] | ||
let localRec | ||
try { | ||
localRec = await getLocal(key) | ||
} catch (err) { | ||
if (nvals === 0) { | ||
throw err | ||
return events | ||
} | ||
}), | ||
(source) => parallel(source, { | ||
ordered: false, | ||
concurrency: ALPHA | ||
}), | ||
async function * (source) { | ||
for await (const events of source) { | ||
yield * events | ||
} | ||
} | ||
) | ||
} | ||
if (localRec) { | ||
vals.push({ | ||
val: localRec.value, | ||
from: dht.peerId | ||
}) | ||
} | ||
/** | ||
* Get the value to the given key | ||
* | ||
* @param {Uint8Array} key | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
async * get (key, options = {}) { | ||
this._log('get %b', key) | ||
if (vals.length >= nvals) { | ||
return vals | ||
/** @type {ValueEvent[]} */ | ||
const vals = [] | ||
for await (const event of this.getMany(key, options)) { | ||
if (event.name === 'VALUE') { | ||
vals.push(event) | ||
} | ||
const id = await utils.convertBuffer(key) | ||
const rtp = dht.routingTable.closestPeers(id, dht.kBucketSize) | ||
yield event | ||
} | ||
dht._log('peers in rt: %d', rtp.length) | ||
if (!vals.length) { | ||
return | ||
} | ||
if (rtp.length === 0) { | ||
const errMsg = 'Failed to lookup key! No peers from routing table!' | ||
const records = vals.map((v) => v.value) | ||
let i = 0 | ||
dht._log.error(errMsg) | ||
if (vals.length === 0) { | ||
throw errcode(new Error(errMsg), 'ERR_NO_PEERS_IN_ROUTING_TABLE') | ||
} | ||
return vals | ||
try { | ||
i = Libp2pRecord.selection.bestRecord(this._selectors, key, records) | ||
} catch (/** @type {any} */ err) { | ||
// Assume the first record if no selector available | ||
if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') { | ||
throw err | ||
} | ||
} | ||
const valsLength = vals.length | ||
const best = records[i] | ||
this._log('GetValue %b %b', key, best) | ||
/** | ||
* @param {number} pathIndex | ||
* @param {number} numPaths | ||
*/ | ||
function createQuery (pathIndex, numPaths) { | ||
// This function body runs once per disjoint path | ||
const pathSize = utils.pathSize(nvals - valsLength, numPaths) | ||
let queryResults = 0 | ||
if (!best) { | ||
throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND') | ||
} | ||
/** | ||
* Here we return the query function to use on this particular disjoint path | ||
* | ||
* @param {PeerId} peer | ||
*/ | ||
async function disjointPathQuery (peer) { | ||
let rec, peers, lookupErr | ||
try { | ||
const results = await dht._getValueOrPeers(peer, key) | ||
rec = results.record | ||
peers = results.peers | ||
} catch (err) { | ||
// If we have an invalid record we just want to continue and fetch a new one. | ||
if (err.code !== 'ERR_INVALID_RECORD') { | ||
throw err | ||
} | ||
lookupErr = err | ||
} | ||
yield * this.sendCorrectionRecord(key, vals, best, options) | ||
/** @type {import('../query').QueryResult} */ | ||
const res = { | ||
closerPeers: peers | ||
} | ||
yield vals[i] | ||
} | ||
if (rec && rec.value) { | ||
vals.push({ | ||
val: rec.value, | ||
from: peer | ||
}) | ||
/** | ||
* Get the `n` values to the given key without sorting. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
async * getMany (key, options = {}) { | ||
this._log('getMany values for %t', key) | ||
queryResults++ | ||
} else if (lookupErr) { | ||
vals.push({ | ||
err: lookupErr, | ||
from: peer | ||
}) | ||
try { | ||
const localRec = await this.getLocal(key) | ||
queryResults++ | ||
} | ||
yield valueEvent({ | ||
value: localRec.value, | ||
from: this._peerId | ||
}) | ||
} catch (/** @type {any} */ err) { | ||
this._log('error getting local value for %b', key, err) | ||
} | ||
// enough is enough | ||
if (queryResults >= pathSize) { | ||
res.pathComplete = true | ||
} | ||
const id = await utils.convertBuffer(key) | ||
const rtp = this._routingTable.closestPeers(id) | ||
return res | ||
} | ||
this._log('found %d peers in routing table', rtp.length) | ||
return disjointPathQuery | ||
} | ||
const self = this | ||
// we have peers, lets send the actual query to them | ||
const query = new Query(dht, key, createQuery) | ||
/** | ||
* @type {import('../query/types').QueryFunc} | ||
*/ | ||
const getValueQuery = async function * ({ peer, signal }) { | ||
for await (const event of self._peerRouting.getValueOrPeers(peer, key, { signal })) { | ||
yield event | ||
try { | ||
await pTimeout(query.run(rtp), options.timeout) | ||
} catch (err) { | ||
if (vals.length === 0) { | ||
throw err | ||
if (event.name === 'PEER_RESPONSE' && event.record) { | ||
yield valueEvent({ from: peer, value: event.record.value }) | ||
} | ||
} finally { | ||
query.stop() | ||
} | ||
} | ||
return vals | ||
} | ||
// we have peers, lets send the actual query to them | ||
yield * this._queryManager.run(key, rtp, getValueQuery, options) | ||
} | ||
} | ||
module.exports.ContentFetching = ContentFetching |
'use strict' | ||
const errcode = require('err-code') | ||
const pTimeout = require('p-timeout') | ||
const { Message } = require('../message') | ||
const parallel = require('it-parallel') | ||
const map = require('it-map') | ||
const { convertBuffer, logger } = require('../utils') | ||
const { ALPHA } = require('../constants') | ||
const { pipe } = require('it-pipe') | ||
const { | ||
queryErrorEvent, | ||
peerResponseEvent, | ||
providerEvent | ||
} = require('../query/events') | ||
const { Message: { MessageType } } = require('../message/dht') | ||
const c = require('../constants') | ||
const LimitedPeerList = require('../peer-list/limited-peer-list') | ||
const Message = require('../message') | ||
const Query = require('../query') | ||
const utils = require('../utils') | ||
/** | ||
@@ -18,179 +22,171 @@ * @typedef {import('multiformats/cid').CID} CID | ||
/** | ||
* @param {import('../')} dht | ||
*/ | ||
module.exports = (dht) => { | ||
class ContentRouting { | ||
/** | ||
* Check for providers from a single node. | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../network').Network} params.network | ||
* @param {import('../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('../query/manager').QueryManager} params.queryManager | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('../providers').Providers} params.providers | ||
* @param {import('../types').PeerStore} params.peerStore | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor ({ peerId, network, peerRouting, queryManager, routingTable, providers, peerStore, lan }) { | ||
this._log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:content-routing`) | ||
this._peerId = peerId | ||
this._network = network | ||
this._peerRouting = peerRouting | ||
this._queryManager = queryManager | ||
this._routingTable = routingTable | ||
this._providers = providers | ||
this._peerStore = peerStore | ||
} | ||
/** | ||
* Announce to the network that we can provide the value for a given key and | ||
* are contactable on the given multiaddrs | ||
* | ||
* @param {PeerId} peer | ||
* @param {CID} key | ||
* | ||
* @private | ||
* @param {Multiaddr[]} multiaddrs | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
const findProvidersSingle = async (peer, key) => { // eslint-disable-line require-await | ||
const msg = new Message(Message.TYPES.GET_PROVIDERS, key.bytes, 0) | ||
return dht.network.sendRequest(peer, msg) | ||
} | ||
async * provide (key, multiaddrs, options = {}) { | ||
this._log('provide %s', key) | ||
return { | ||
// Add peer as provider | ||
await this._providers.addProvider(key, this._peerId) | ||
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.bytes, 0) | ||
msg.providerPeers = [{ | ||
id: this._peerId, | ||
multiaddrs | ||
}] | ||
let sent = 0 | ||
/** | ||
* Announce to the network that we can provide the value for a given key | ||
* | ||
* @param {CID} key | ||
* @param {import('../types').QueryEvent} event | ||
*/ | ||
async provide (key) { | ||
dht._log(`provide: ${key}`) | ||
const maybeNotifyPeer = (event) => { | ||
return async () => { | ||
if (event.name !== 'FINAL_PEER') { | ||
return [event] | ||
} | ||
/** @type {Error[]} */ | ||
const errors = [] | ||
const events = [] | ||
// Add peer as provider | ||
await dht.providers.addProvider(key, dht.peerId) | ||
this._log('putProvider %s to %p', key, event.peer.id) | ||
const multiaddrs = dht.libp2p ? dht.libp2p.multiaddrs : [] | ||
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.bytes, 0) | ||
msg.providerPeers = [{ | ||
id: dht.peerId, | ||
multiaddrs | ||
}] | ||
try { | ||
this._log('sending provider record for %s to %p', key, event.peer.id) | ||
/** | ||
* @param {PeerId} peer | ||
*/ | ||
async function mapPeer (peer) { | ||
dht._log(`putProvider ${key} to ${peer.toB58String()}`) | ||
try { | ||
await dht.network.sendMessage(peer, msg) | ||
} catch (err) { | ||
errors.push(err) | ||
for await (const sendEvent of this._network.sendMessage(event.peer.id, msg, options)) { | ||
if (sendEvent.name === 'PEER_RESPONSE') { | ||
this._log('sent provider record for %s to %p', key, event.peer.id) | ||
sent++ | ||
} | ||
events.push(sendEvent) | ||
} | ||
} catch (/** @type {any} */ err) { | ||
this._log.error('error sending provide record to peer %p', event.peer.id, err) | ||
events.push(queryErrorEvent({ from: event.peer.id, error: err })) | ||
} | ||
return events | ||
} | ||
} | ||
// Notify closest peers | ||
await utils.mapParallel(dht.getClosestPeers(key.bytes), mapPeer) | ||
if (errors.length) { | ||
// TODO: | ||
// This should be infrequent. This means a peer we previously connected | ||
// to failed to exchange the provide message. If getClosestPeers was an | ||
// iterator, we could continue to pull until we announce to kBucketSize peers. | ||
throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`), 'ERR_SOME_PROVIDES_FAILED', { errors }) | ||
// Notify closest peers | ||
yield * pipe( | ||
this._peerRouting.getClosestPeers(key.multihash.bytes, options), | ||
(source) => map(source, (event) => maybeNotifyPeer(event)), | ||
(source) => parallel(source, { | ||
ordered: false, | ||
concurrency: ALPHA | ||
}), | ||
async function * (source) { | ||
for await (const events of source) { | ||
yield * events | ||
} | ||
} | ||
}, | ||
) | ||
/** | ||
* Search the dht for up to `K` providers of the given CID. | ||
* | ||
* @param {CID} key | ||
* @param {Object} [options] - findProviders options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds | ||
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find | ||
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
async * findProviders (key, options = { timeout: 60000, maxNumProviders: 5 }) { | ||
const providerTimeout = options.timeout || c.minute | ||
const n = options.maxNumProviders || c.K | ||
this._log('sent provider records to %d peers', sent) | ||
} | ||
dht._log(`findProviders ${key}`) | ||
/** | ||
* Search the dht for up to `K` providers of the given CID. | ||
* | ||
* @param {CID} key | ||
* @param {object} [options] - findProviders options | ||
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
async * findProviders (key, options = { maxNumProviders: 5 }) { | ||
const toFind = options.maxNumProviders || this._routingTable._kBucketSize | ||
const target = key.multihash.bytes | ||
const id = await convertBuffer(target) | ||
const self = this | ||
const out = new LimitedPeerList(n) | ||
const provs = await dht.providers.getProviders(key) | ||
this._log(`findProviders ${key}`) | ||
provs | ||
.forEach(id => { | ||
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */ | ||
const peerData = dht.peerStore.get(id) | ||
const provs = await this._providers.getProviders(key) | ||
if (peerData) { | ||
out.push({ | ||
id: peerData.id, | ||
multiaddrs: peerData.addresses | ||
.map((address) => address.multiaddr) | ||
}) | ||
} else { | ||
out.push({ | ||
id, | ||
multiaddrs: [] | ||
}) | ||
} | ||
}) | ||
// yield values if we have some, also slice because maybe we got lucky and already have too many? | ||
if (provs.length) { | ||
const providers = provs.slice(0, toFind).map(peerId => ({ | ||
id: peerId, | ||
multiaddrs: (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr) | ||
})) | ||
// All done | ||
if (out.length >= n) { | ||
// yield values | ||
for (const pData of out.toArray()) { | ||
yield pData | ||
} | ||
return | ||
} | ||
yield peerResponseEvent({ from: this._peerId, messageType: MessageType.GET_PROVIDERS, providers }) | ||
yield providerEvent({ from: this._peerId, providers: providers }) | ||
} | ||
// need more, query the network | ||
/** @type {LimitedPeerList[]} */ | ||
const paths = [] | ||
// All done | ||
if (provs.length >= toFind) { | ||
return | ||
} | ||
/** | ||
* | ||
* @param {number} pathIndex | ||
* @param {number} numPaths | ||
*/ | ||
function makePath (pathIndex, numPaths) { | ||
// This function body runs once per disjoint path | ||
const pathSize = utils.pathSize(n - out.length, numPaths) | ||
const pathProviders = new LimitedPeerList(pathSize) | ||
paths.push(pathProviders) | ||
/** | ||
* The query function to use on this particular disjoint path | ||
* | ||
* @type {import('../query/types').QueryFunc} | ||
*/ | ||
const findProvidersQuery = async function * ({ peer, signal }) { | ||
const request = new Message(Message.TYPES.GET_PROVIDERS, target, 0) | ||
/** | ||
* The query function to use on this particular disjoint path | ||
* | ||
* @param {PeerId} peer | ||
*/ | ||
async function queryDisjointPath (peer) { | ||
const msg = await findProvidersSingle(peer, key) | ||
const provs = msg.providerPeers | ||
dht._log(`Found ${provs.length} provider entries for ${key}`) | ||
yield * self._network.sendRequest(peer, request, { signal }) | ||
} | ||
provs.forEach((prov) => { | ||
pathProviders.push({ | ||
...prov | ||
}) | ||
}) | ||
const providers = new Set(provs.map(p => p.toB58String())) | ||
// hooray we have all that we want | ||
if (pathProviders.length >= pathSize) { | ||
return { pathComplete: true } | ||
} | ||
for await (const event of this._queryManager.run(target, this._routingTable.closestPeers(id), findProvidersQuery, options)) { | ||
yield event | ||
// it looks like we want some more | ||
return { closerPeers: msg.closerPeers } | ||
} | ||
if (event.name === 'PEER_RESPONSE') { | ||
this._log(`Found ${event.providers.length} provider entries for ${key} and ${event.closer.length} closer peers`) | ||
return queryDisjointPath | ||
} | ||
const newProviders = [] | ||
const query = new Query(dht, key.bytes, makePath) | ||
const peers = dht.routingTable.closestPeers(key.bytes, dht.kBucketSize) | ||
for (const peer of event.providers) { | ||
if (providers.has(peer.id.toB58String())) { | ||
continue | ||
} | ||
try { | ||
await pTimeout( | ||
query.run(peers), | ||
providerTimeout | ||
) | ||
} catch (err) { | ||
if (err.name !== pTimeout.TimeoutError.name) { | ||
throw err | ||
providers.add(peer.id.toB58String()) | ||
newProviders.push(peer) | ||
} | ||
} finally { | ||
query.stop() | ||
} | ||
// combine peers from each path | ||
paths.forEach((path) => { | ||
path.toArray().forEach((peer) => { | ||
out.push(peer) | ||
}) | ||
}) | ||
if (newProviders.length) { | ||
yield providerEvent({ from: event.from, providers: newProviders }) | ||
} | ||
for (const pData of out.toArray()) { | ||
yield pData | ||
if (providers.size === toFind) { | ||
return | ||
} | ||
} | ||
@@ -200,1 +196,3 @@ } | ||
} | ||
module.exports.ContentRouting = ContentRouting |
591
src/index.js
'use strict' | ||
const { EventEmitter } = require('events') | ||
const errcode = require('err-code') | ||
const { KadDHT } = require('./kad-dht') | ||
const { DualKadDHT } = require('./dual-kad-dht') | ||
const libp2pRecord = require('libp2p-record') | ||
const { MemoryDatastore } = require('datastore-core/memory') | ||
const { equals: uint8ArrayEquals } = require('uint8arrays/equals') | ||
const { toString: uint8ArrayToString } = require('uint8arrays/to-string') | ||
const RoutingTable = require('./routing-table') | ||
const utils = require('./utils') | ||
const c = require('./constants') | ||
const Network = require('./network') | ||
const contentFetching = require('./content-fetching') | ||
const contentRouting = require('./content-routing') | ||
const peerRouting = require('./peer-routing') | ||
const Message = require('./message') | ||
const Providers = require('./providers') | ||
const QueryManager = require('./query-manager') | ||
const Record = libp2pRecord.Record | ||
/** | ||
* @typedef {*} Libp2p | ||
* @typedef {*} PeerStore | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('interface-datastore').Datastore} Datastore | ||
* @typedef {*} Dialer | ||
* @typedef {*} Registrar | ||
* @typedef {import('multiformats/cid').CID} CID | ||
* @typedef {import('multiaddr').Multiaddr} Multiaddr | ||
* @typedef {object} PeerData | ||
* @property {PeerId} id | ||
* @property {Multiaddr[]} multiaddrs | ||
* @typedef {import('./types').DHT} DHT | ||
* @typedef {import('./kad-dht').KadDHTOps} KadDHTOps | ||
*/ | ||
/** | ||
* A DHT implementation modeled after Kademlia with S/Kademlia modifications. | ||
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht. | ||
*/ | ||
class KadDHT extends EventEmitter { | ||
module.exports = { | ||
/** | ||
* Create a new KadDHT. | ||
* | ||
* @param {Object} props | ||
* @param {Libp2p} props.libp2p - the libp2p instance | ||
* @param {Dialer} props.dialer - libp2p dialer instance | ||
* @param {PeerId} props.peerId - peer's peerId | ||
* @param {PeerStore} props.peerStore - libp2p peerStore | ||
* @param {Registrar} props.registrar - libp2p registrar instance | ||
* @param {string} [props.protocolPrefix = '/ipfs'] - libp2p registrar handle protocol | ||
* @param {boolean} [props.forceProtocolLegacy = false] - WARNING: this is not recommended and should only be used for legacy purposes | ||
* @param {number} props.kBucketSize - k-bucket size (default 20) | ||
* @param {boolean} props.clientMode - If true, the DHT will not respond to queries. This should be true if your node will not be dialable. (default: false) | ||
* @param {number} props.concurrency - alpha concurrency of queries (default 3) | ||
* @param {Datastore} props.datastore - datastore (default MemoryDatastore) | ||
* @param {object} props.validators - validators object with namespace as keys and function(key, record, callback) | ||
* @param {object} props.selectors - selectors object with namespace as keys and function(key, records) | ||
* @param {function(import('libp2p-record').Record, PeerId): void} [props.onPut] - Called when an entry is added to or changed in the datastore | ||
* @param {function(import('libp2p-record').Record): void} [props.onRemove] - Called when an entry is removed from the datastore | ||
* @param {KadDHTOps} opts | ||
* @returns {DHT} | ||
*/ | ||
constructor ({ | ||
libp2p, | ||
dialer, | ||
peerId, | ||
peerStore, | ||
registrar, | ||
protocolPrefix = '/ipfs', | ||
forceProtocolLegacy = false, | ||
datastore = new MemoryDatastore(), | ||
kBucketSize = c.K, | ||
clientMode = false, | ||
concurrency = c.ALPHA, | ||
validators = {}, | ||
selectors = {}, | ||
onPut = () => {}, | ||
onRemove = () => {} | ||
}) { | ||
super() | ||
if (!dialer) { | ||
throw new Error('libp2p-kad-dht requires an instance of Dialer') | ||
} | ||
/** | ||
* Local reference to the libp2p instance. May be undefined. | ||
* | ||
* @type {Libp2p} | ||
*/ | ||
this.libp2p = libp2p | ||
/** | ||
* Local reference to the libp2p dialer instance | ||
* | ||
* @type {Dialer} | ||
*/ | ||
this.dialer = dialer | ||
/** | ||
* Local peer-id | ||
* | ||
* @type {PeerId} | ||
*/ | ||
this.peerId = peerId | ||
/** | ||
* Local PeerStore | ||
* | ||
* @type {PeerStore} | ||
*/ | ||
this.peerStore = peerStore | ||
/** | ||
* Local peer info | ||
* | ||
* @type {Registrar} | ||
*/ | ||
this.registrar = registrar | ||
/** | ||
* Registrar protocol | ||
* | ||
* @type {string} | ||
*/ | ||
this.protocol = protocolPrefix + (forceProtocolLegacy ? '' : c.PROTOCOL_DHT) | ||
/** | ||
* k-bucket size | ||
* | ||
* @type {number} | ||
*/ | ||
this.kBucketSize = kBucketSize | ||
this._clientMode = clientMode | ||
/** | ||
* ALPHA concurrency at which each query path with run, defaults to 3 | ||
* | ||
* @type {number} | ||
*/ | ||
this.concurrency = concurrency | ||
/** | ||
* Number of disjoint query paths to use | ||
* This is set to `kBucketSize`/2 per the S/Kademlia paper | ||
* | ||
* @type {number} | ||
*/ | ||
this.disjointPaths = Math.ceil(this.kBucketSize / 2) | ||
/** | ||
* The routing table. | ||
* | ||
* @type {RoutingTable} | ||
*/ | ||
this.routingTable = new RoutingTable(this, { kBucketSize: this.kBucketSize }) | ||
/** | ||
* Reference to the datastore, uses an in-memory store if none given. | ||
* | ||
* @type {Datastore} | ||
*/ | ||
this.datastore = datastore | ||
/** | ||
* Provider management | ||
* | ||
* @type {Providers} | ||
*/ | ||
this.providers = new Providers(this.datastore, this.peerId) | ||
this.validators = { | ||
pk: libp2pRecord.validator.validators.pk, | ||
...validators | ||
} | ||
this.selectors = { | ||
pk: libp2pRecord.selection.selectors.pk, | ||
...selectors | ||
} | ||
this.network = new Network(this) | ||
this._log = utils.logger(this.peerId) | ||
/** | ||
* Keeps track of running queries | ||
* | ||
* @type {QueryManager} | ||
*/ | ||
this._queryManager = new QueryManager() | ||
this._running = false | ||
// DHT components | ||
this.contentFetching = contentFetching(this) | ||
this.contentRouting = contentRouting(this) | ||
this.peerRouting = peerRouting(this) | ||
// datastore events | ||
this.onPut = onPut | ||
this.onRemove = onRemove | ||
create: (opts) => { | ||
return new DualKadDHT( | ||
new KadDHT({ | ||
...opts, | ||
protocol: '/ipfs/kad/1.0.0', | ||
lan: false | ||
}), | ||
new KadDHT({ | ||
...opts, | ||
protocol: '/ipfs/lan/kad/1.0.0', | ||
clientMode: false, | ||
lan: true | ||
}), | ||
opts.libp2p | ||
) | ||
} | ||
/** | ||
* Is this DHT running. | ||
*/ | ||
get isStarted () { | ||
return this._running | ||
} | ||
/** | ||
* Start listening to incoming connections. | ||
*/ | ||
start () { | ||
this._running = true | ||
return Promise.all([ | ||
this.providers.start(), | ||
this._queryManager.start(), | ||
this.network.start(), | ||
this.routingTable.start() | ||
]) | ||
} | ||
/** | ||
* Stop accepting incoming connections and sending outgoing | ||
* messages. | ||
*/ | ||
stop () { | ||
this._running = false | ||
return Promise.all([ | ||
this.providers.stop(), | ||
this._queryManager.stop(), | ||
this.network.stop(), | ||
this.routingTable.stop() | ||
]) | ||
} | ||
/** | ||
* Store the given key/value pair in the DHT. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
* @param {Object} [options] - put options | ||
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length) | ||
* @returns {Promise<void>} | ||
*/ | ||
async put (key, value, options = {}) { // eslint-disable-line require-await | ||
return this.contentFetching.put(key, value, options) | ||
} | ||
/** | ||
* Get the value to the given key. | ||
* Times out after 1 minute by default. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
* @returns {Promise<Uint8Array>} | ||
*/ | ||
async get (key, options = {}) { // eslint-disable-line require-await | ||
return this.contentFetching.get(key, options) | ||
} | ||
/** | ||
* Get the `n` values to the given key without sorting. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {number} nvals | ||
* @param {Object} [options] - get options | ||
* @param {number} [options.timeout] - optional timeout (default: 60000) | ||
*/ | ||
async getMany (key, nvals, options = {}) { // eslint-disable-line require-await | ||
return this.contentFetching.getMany(key, nvals, options) | ||
} | ||
/** | ||
* Remove the given key from the local datastore. | ||
* | ||
* @param {Uint8Array} key | ||
*/ | ||
async removeLocal (key) { | ||
this._log(`removeLocal: ${uint8ArrayToString(key, 'base32')}`) | ||
const dsKey = utils.bufferToKey(key) | ||
try { | ||
await this.datastore.delete(dsKey) | ||
} catch (err) { | ||
if (err.code === 'ERR_NOT_FOUND') { | ||
return undefined | ||
} | ||
throw err | ||
} | ||
} | ||
/** | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} value | ||
*/ | ||
async _putLocal (key, value) { | ||
this._log(`_putLocal: ${uint8ArrayToString(key, 'base32')}`) | ||
const dsKey = utils.bufferToKey(key) | ||
await this.datastore.put(dsKey, value) | ||
} | ||
// ----------- Content Routing | ||
/** | ||
* Announce to the network that we can provide given key's value. | ||
* | ||
* @param {CID} key | ||
* @returns {Promise<void>} | ||
*/ | ||
async provide (key) { // eslint-disable-line require-await | ||
return this.contentRouting.provide(key) | ||
} | ||
/** | ||
* Search the dht for up to `K` providers of the given CID. | ||
* | ||
* @param {CID} key | ||
* @param {Object} [options] - findProviders options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) | ||
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find | ||
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
async * findProviders (key, options = { timeout: 6000, maxNumProviders: 5 }) { | ||
for await (const peerData of this.contentRouting.findProviders(key, options)) { | ||
yield peerData | ||
} | ||
} | ||
// ----------- Peer Routing ----------- | ||
/** | ||
* Search for a peer with the given ID. | ||
* | ||
* @param {PeerId} id | ||
* @param {Object} [options] - findPeer options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000) | ||
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
async findPeer (id, options = { timeout: 60000 }) { // eslint-disable-line require-await | ||
return this.peerRouting.findPeer(id, options) | ||
} | ||
/** | ||
* Kademlia 'node lookup' operation. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Object} [options] | ||
* @param {boolean} [options.shallow = false] - shallow query | ||
*/ | ||
async * getClosestPeers (key, options = { shallow: false }) { | ||
yield * this.peerRouting.getClosestPeers(key, options) | ||
} | ||
/** | ||
* Get the public key for the given peer id. | ||
* | ||
* @param {PeerId} peer | ||
*/ | ||
getPublicKey (peer) { | ||
return this.peerRouting.getPublicKey(peer) | ||
} | ||
// ----------- Discovery ----------- | ||
/** | ||
* @param {PeerId} peerId | ||
* @param {Multiaddr[]} multiaddrs | ||
*/ | ||
_peerDiscovered (peerId, multiaddrs) { | ||
this.emit('peer', { | ||
id: peerId, | ||
multiaddrs | ||
}) | ||
} | ||
// ----------- Internals ----------- | ||
/** | ||
* Returns the routing tables closest peers, for the key of | ||
* the message. | ||
* | ||
* @param {Message} msg | ||
*/ | ||
async _nearestPeersToQuery (msg) { | ||
const key = await utils.convertBuffer(msg.key) | ||
const ids = this.routingTable.closestPeers(key, this.kBucketSize) | ||
return ids.map((p) => { | ||
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */ | ||
const peer = this.peerStore.get(p) | ||
return { | ||
id: p, | ||
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : [] | ||
} | ||
}) | ||
} | ||
/** | ||
* Get the nearest peers to the given query, but iff closer | ||
* than self. | ||
* | ||
* @param {Message} msg | ||
* @param {PeerId} peerId | ||
*/ | ||
async _betterPeersToQuery (msg, peerId) { | ||
this._log('betterPeersToQuery') | ||
const closer = await this._nearestPeersToQuery(msg) | ||
return closer.filter((closer) => { | ||
if (this._isSelf(closer.id)) { | ||
// Should bail, not sure | ||
this._log.error('trying to return self as closer') | ||
return false | ||
} | ||
return !closer.id.isEqual(peerId) | ||
}) | ||
} | ||
/** | ||
* Try to fetch a given record by from the local datastore. | ||
* Returns the record iff it is still valid, meaning | ||
* - it was either authored by this node, or | ||
* - it was received less than `MAX_RECORD_AGE` ago. | ||
* | ||
* @param {Uint8Array} key | ||
*/ | ||
async _checkLocalDatastore (key) { | ||
this._log(`checkLocalDatastore: ${uint8ArrayToString(key)} %b`, key) | ||
const dsKey = utils.bufferToKey(key) | ||
// Fetch value from ds | ||
let rawRecord | ||
try { | ||
rawRecord = await this.datastore.get(dsKey) | ||
} catch (err) { | ||
if (err.code === 'ERR_NOT_FOUND') { | ||
return undefined | ||
} | ||
throw err | ||
} | ||
// Create record from the returned bytes | ||
const record = Record.deserialize(rawRecord) | ||
if (!record) { | ||
throw errcode(new Error('Invalid record'), 'ERR_INVALID_RECORD') | ||
} | ||
// Check validity: compare time received with max record age | ||
if (record.timeReceived == null || | ||
utils.now() - record.timeReceived.getTime() > c.MAX_RECORD_AGE) { | ||
// If record is bad delete it and return | ||
await this.datastore.delete(dsKey) | ||
this.onRemove(record) | ||
return undefined | ||
} | ||
// Record is valid | ||
return record | ||
} | ||
/** | ||
* Add the peer to the routing table and update it in the peerStore. | ||
* | ||
* @param {PeerId} peerId | ||
*/ | ||
async _add (peerId) { | ||
await this.routingTable.add(peerId) | ||
} | ||
/** | ||
* Verify a record without searching the DHT. | ||
* | ||
* @param {import('libp2p-record').Record} record | ||
*/ | ||
async _verifyRecordLocally (record) { | ||
this._log('verifyRecordLocally') | ||
await libp2pRecord.validator.verifyRecord(this.validators, record) | ||
} | ||
/** | ||
* Is the given peer id our PeerId? | ||
* | ||
* @param {PeerId} other | ||
*/ | ||
_isSelf (other) { | ||
return other && uint8ArrayEquals(this.peerId.id, other.id) | ||
} | ||
/** | ||
* Store the given key/value pair at the peer `target`. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Uint8Array} rec - encoded record | ||
* @param {PeerId} target | ||
*/ | ||
async _putValueToPeer (key, rec, target) { | ||
const msg = new Message(Message.TYPES.PUT_VALUE, key, 0) | ||
msg.record = Record.deserialize(rec) | ||
const resp = await this.network.sendRequest(target, msg) | ||
if (resp.record && !uint8ArrayEquals(resp.record.value, Record.deserialize(rec).value)) { | ||
throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') | ||
} | ||
} | ||
/** | ||
* Query a particular peer for the value for the given key. | ||
* It will either return the value or a list of closer peers. | ||
* | ||
* Note: The peerStore is updated with new addresses found for the given peer. | ||
* | ||
* @param {PeerId} peer | ||
* @param {Uint8Array} key | ||
*/ | ||
async _getValueOrPeers (peer, key) { | ||
const msg = await this._getValueSingle(peer, key) | ||
const peers = msg.closerPeers | ||
const record = msg.record | ||
if (record) { | ||
// We have a record | ||
try { | ||
await this._verifyRecordOnline(record) | ||
} catch (err) { | ||
const errMsg = 'invalid record received, discarded' | ||
this._log(errMsg) | ||
throw errcode(new Error(errMsg), 'ERR_INVALID_RECORD') | ||
} | ||
return { record, peers } | ||
} | ||
if (peers.length > 0) { | ||
return { peers } | ||
} | ||
throw errcode(new Error('Not found'), 'ERR_NOT_FOUND') | ||
} | ||
/** | ||
* Get a value via rpc call for the given parameters. | ||
* | ||
* @param {PeerId} peer | ||
* @param {Uint8Array} key | ||
*/ | ||
async _getValueSingle (peer, key) { // eslint-disable-line require-await | ||
const msg = new Message(Message.TYPES.GET_VALUE, key, 0) | ||
return this.network.sendRequest(peer, msg) | ||
} | ||
/** | ||
* Verify a record, fetching missing public keys from the network. | ||
* Calls back with an error if the record is invalid. | ||
* | ||
* @param {import('libp2p-record').Record} record | ||
* @returns {Promise<void>} | ||
*/ | ||
async _verifyRecordOnline (record) { | ||
await libp2pRecord.validator.verifyRecord(this.validators, record) | ||
} | ||
} | ||
module.exports = KadDHT | ||
module.exports.multicodec = '/ipfs' + c.PROTOCOL_DHT |
@@ -10,2 +10,3 @@ 'use strict' | ||
const CONNECTION_TYPE = Proto.Message.ConnectionType | ||
const MESSAGE_TYPE_LOOKUP = Object.keys(MESSAGE_TYPE) | ||
@@ -20,3 +21,3 @@ /** | ||
* | ||
* @typedef {import('../index').PeerData} PeerData | ||
* @typedef {import('../types').PeerData} PeerData | ||
*/ | ||
@@ -141,2 +142,4 @@ | ||
module.exports = Message | ||
module.exports.Message = Message | ||
module.exports.MESSAGE_TYPE = MESSAGE_TYPE | ||
module.exports.MESSAGE_TYPE_LOOKUP = MESSAGE_TYPE_LOOKUP |
'use strict' | ||
const errcode = require('err-code') | ||
const { pipe } = require('it-pipe') | ||
const lp = require('it-length-prefixed') | ||
const pTimeout = require('p-timeout') | ||
const { consume } = require('streaming-iterables') | ||
const drain = require('it-drain') | ||
const first = require('it-first') | ||
const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') | ||
const rpc = require('./rpc') | ||
const c = require('./constants') | ||
const Message = require('./message') | ||
const { Message, MESSAGE_TYPE_LOOKUP } = require('./message') | ||
const utils = require('./utils') | ||
const { EventEmitter } = require('events') | ||
const { | ||
dialingPeerEvent, | ||
sendingQueryEvent, | ||
peerResponseEvent, | ||
queryErrorEvent | ||
} = require('./query/events') | ||
@@ -21,2 +21,4 @@ /** | ||
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream | ||
* @typedef {import('./types').QueryEvent} QueryEvent | ||
* @typedef {import('./types').PeerData} PeerData | ||
*/ | ||
@@ -27,15 +29,18 @@ | ||
*/ | ||
class Network { | ||
class Network extends EventEmitter { | ||
/** | ||
* Create a new network | ||
* | ||
* @param {import('./index')} dht | ||
* @param {object} params | ||
* @param {import('./types').Dialer} params.dialer | ||
* @param {string} params.protocol | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor (dht) { | ||
this.dht = dht | ||
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT | ||
this._log = utils.logger(this.dht.peerId, 'net') | ||
this._rpc = rpc(this.dht) | ||
this._onPeerConnected = this._onPeerConnected.bind(this) | ||
constructor ({ dialer, protocol, lan }) { | ||
super() | ||
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:network`) | ||
this._running = false | ||
this._dialer = dialer | ||
this._protocol = protocol | ||
} | ||
@@ -51,23 +56,3 @@ | ||
if (!this.dht.isStarted) { | ||
throw errcode(new Error('Can not start network'), 'ERR_CANNOT_START_NETWORK') | ||
} | ||
this._running = true | ||
// Only respond to queries when not in client mode | ||
if (this.dht._clientMode === false) { | ||
// Incoming streams | ||
this.dht.registrar.handle(this.dht.protocol, this._rpc) | ||
} | ||
// register protocol with topology | ||
const topology = new MulticodecTopology({ | ||
multicodecs: [this.dht.protocol], | ||
handlers: { | ||
onConnect: this._onPeerConnected, | ||
onDisconnect: () => {} | ||
} | ||
}) | ||
this._registrarId = this.dht.registrar.register(topology) | ||
} | ||
@@ -79,11 +64,3 @@ | ||
stop () { | ||
if (!this.dht.isStarted && !this.isStarted) { | ||
return | ||
} | ||
this._running = false | ||
// unregister protocol and handlers | ||
if (this._registrarId) { | ||
this.dht.registrar.unregister(this._registrarId) | ||
} | ||
} | ||
@@ -101,45 +78,31 @@ | ||
/** | ||
* Are all network components there? | ||
* Send a request and record RTT for latency measurements | ||
* | ||
* @type {boolean} | ||
* @param {PeerId} to - The peer that should receive a message | ||
* @param {Message} msg - The message to send | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
get isConnected () { | ||
// TODO add a way to check if switch has started or not | ||
return this.dht.isStarted && this.isStarted | ||
} | ||
async * sendRequest (to, msg, options = {}) { | ||
this._log('sending %s to %p', MESSAGE_TYPE_LOOKUP[msg.type], to) | ||
/** | ||
* Registrar notifies a connection successfully with dht protocol. | ||
* | ||
* @param {PeerId} peerId - remote peer id | ||
*/ | ||
async _onPeerConnected (peerId) { | ||
await this.dht._add(peerId) | ||
this._log('added to the routing table: %s', peerId.toB58String()) | ||
} | ||
try { | ||
yield dialingPeerEvent({ peer: to }) | ||
/** | ||
* Send a request and record RTT for latency measurements. | ||
* | ||
* @async | ||
* @param {PeerId} to - The peer that should receive a message | ||
* @param {Message} msg - The message to send. | ||
*/ | ||
async sendRequest (to, msg) { | ||
// TODO: record latency | ||
if (!this.isConnected) { | ||
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE') | ||
} | ||
const { stream } = await this._dialer.dialProtocol(to, this._protocol, options) | ||
const id = to.toB58String() | ||
this._log('sending to: %s', id) | ||
yield sendingQueryEvent({ to, type: msg.type }) | ||
let conn = this.dht.registrar.connectionManager.get(to) | ||
if (!conn) { | ||
conn = await this.dht.dialer.connectToPeer(to) | ||
const response = await this._writeReadMessage(stream, msg.serialize(), options) | ||
yield peerResponseEvent({ | ||
from: to, | ||
messageType: response.type, | ||
closer: response.closerPeers, | ||
providers: response.providerPeers, | ||
record: response.record | ||
}) | ||
} catch (/** @type {any} */ err) { | ||
yield queryErrorEvent({ from: to, error: err }) | ||
} | ||
const { stream } = await conn.newStream(this.dht.protocol) | ||
return this._writeReadMessage(stream, msg.serialize()) | ||
} | ||
@@ -152,32 +115,37 @@ | ||
* @param {Message} msg | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
async sendMessage (to, msg) { | ||
if (!this.isConnected) { | ||
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE') | ||
} | ||
async * sendMessage (to, msg, options = {}) { | ||
this._log('sending %s to %p', MESSAGE_TYPE_LOOKUP[msg.type], to) | ||
const id = to.toB58String() | ||
this._log('sending to: %s', id) | ||
yield dialingPeerEvent({ peer: to }) | ||
let conn = this.dht.registrar.connectionManager.get(to) | ||
if (!conn) { | ||
conn = await this.dht.dialer.connectToPeer(to) | ||
const { stream } = await this._dialer.dialProtocol(to, this._protocol, options) | ||
yield sendingQueryEvent({ to, type: msg.type }) | ||
try { | ||
await this._writeMessage(stream, msg.serialize(), options) | ||
yield peerResponseEvent({ from: to, messageType: msg.type }) | ||
} catch (/** @type {any} */ err) { | ||
yield queryErrorEvent({ from: to, error: err }) | ||
} | ||
const { stream } = await conn.newStream(this.dht.protocol) | ||
return this._writeMessage(stream, msg.serialize()) | ||
} | ||
/** | ||
* Write a message and read its response. | ||
* If no response is received after the specified timeout | ||
* this will error out. | ||
* Write a message to the given stream | ||
* | ||
* @param {MuxedStream} stream - the stream to use | ||
* @param {Uint8Array} msg - the message to send | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
async _writeReadMessage (stream, msg) { // eslint-disable-line require-await | ||
return pTimeout( | ||
writeReadMessage(stream, msg), | ||
this.readMessageTimeout | ||
async _writeMessage (stream, msg, options = {}) { | ||
await pipe( | ||
[msg], | ||
lp.encode(), | ||
stream, | ||
drain | ||
) | ||
@@ -187,46 +155,47 @@ } | ||
/** | ||
* Write a message to the given stream. | ||
* Write a message and read its response. | ||
* If no response is received after the specified timeout | ||
* this will error out. | ||
* | ||
* @param {MuxedStream} stream - the stream to use | ||
* @param {Uint8Array} msg - the message to send | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
_writeMessage (stream, msg) { | ||
return pipe( | ||
async _writeReadMessage (stream, msg, options = {}) { | ||
const res = await pipe( | ||
[msg], | ||
lp.encode(), | ||
stream, | ||
consume | ||
lp.decode(), | ||
/** | ||
* @param {AsyncIterable<Uint8Array>} source | ||
*/ | ||
async source => { | ||
const buf = await first(source) | ||
if (buf) { | ||
return buf.slice() | ||
} | ||
} | ||
) | ||
} | ||
} | ||
/** | ||
* @param {MuxedStream} stream | ||
* @param {Uint8Array} msg | ||
*/ | ||
async function writeReadMessage (stream, msg) { | ||
const res = await pipe( | ||
[msg], | ||
lp.encode(), | ||
stream, | ||
lp.decode(), | ||
/** | ||
* @param {AsyncIterable<Uint8Array>} source | ||
*/ | ||
async source => { | ||
const buf = await first(source) | ||
if (buf) { | ||
return buf.slice() | ||
} | ||
if (res.length === 0) { | ||
throw errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED') | ||
} | ||
) | ||
if (res.length === 0) { | ||
throw errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED') | ||
const message = Message.deserialize(res) | ||
// tell any listeners about new peers we've seen | ||
message.closerPeers.forEach(peerData => { | ||
this.emit('peer', peerData) | ||
}) | ||
message.providerPeers.forEach(peerData => { | ||
this.emit('peer', peerData) | ||
}) | ||
return message | ||
} | ||
return Message.deserialize(res) | ||
} | ||
module.exports = Network | ||
module.exports.Network = Network |
@@ -5,3 +5,2 @@ 'use strict' | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../').PeerData} PeerData | ||
*/ | ||
@@ -14,3 +13,3 @@ | ||
constructor () { | ||
/** @type {PeerData[]} */ | ||
/** @type {PeerId[]} */ | ||
this.list = [] | ||
@@ -22,7 +21,7 @@ } | ||
* | ||
* @param {PeerData} peerData | ||
* @param {PeerId} peerId | ||
*/ | ||
push (peerData) { | ||
if (!this.has(peerData.id)) { | ||
this.list.push(peerData) | ||
push (peerId) { | ||
if (!this.has(peerId)) { | ||
this.list.push(peerId) | ||
@@ -41,3 +40,3 @@ return true | ||
has (peerId) { | ||
const match = this.list.find((i) => i.id.equals(peerId)) | ||
const match = this.list.find((i) => i.equals(peerId)) | ||
return Boolean(match) | ||
@@ -44,0 +43,0 @@ } |
@@ -5,3 +5,2 @@ 'use strict' | ||
const pMap = require('p-map') | ||
const { equals: uint8ArrayEquals } = require('uint8arrays/equals') | ||
const { compare: uint8ArrayCompare } = require('uint8arrays/compare') | ||
@@ -12,3 +11,2 @@ const { xor: uint8ArrayXor } = require('uint8arrays/xor') | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../').PeerData} PeerData | ||
*/ | ||
@@ -54,3 +52,3 @@ | ||
async add (peerId) { | ||
if (this.peerDistances.find(pd => uint8ArrayEquals(pd.peerId.id, peerId.id))) { | ||
if (this.peerDistances.find(pd => pd.peerId.equals(peerId))) { | ||
return | ||
@@ -57,0 +55,0 @@ } |
'use strict' | ||
const errcode = require('err-code') | ||
const pTimeout = require('p-timeout') | ||
const { validator } = require('libp2p-record') | ||
const PeerId = require('peer-id') | ||
const crypto = require('libp2p-crypto') | ||
const { toString: uint8ArrayToString } = require('uint8arrays/to-string') | ||
const c = require('../constants') | ||
const Message = require('../message') | ||
const Query = require('../query') | ||
const { Message } = require('../message') | ||
const utils = require('../utils') | ||
const { | ||
queryErrorEvent, | ||
finalPeerEvent, | ||
valueEvent | ||
} = require('../query/events') | ||
const PeerDistanceList = require('../peer-list/peer-distance-list') | ||
const { Record } = require('libp2p-record') | ||
/** | ||
* @typedef {import('multiaddr').Multiaddr} Multiaddr | ||
* @typedef {import('../types').PeerData} PeerData | ||
*/ | ||
/** | ||
* @param {import('../index')} dht | ||
*/ | ||
module.exports = (dht) => { | ||
class PeerRouting { | ||
/** | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('../types').PeerStore} params.peerStore | ||
* @param {import('../network').Network} params.network | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {import('../query/manager').QueryManager} params.queryManager | ||
* @param {boolean} params.lan | ||
*/ | ||
constructor ({ peerId, routingTable, peerStore, network, validators, queryManager, lan }) { | ||
this._peerId = peerId | ||
this._routingTable = routingTable | ||
this._peerStore = peerStore | ||
this._network = network | ||
this._validators = validators | ||
this._queryManager = queryManager | ||
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:peer-routing`) | ||
} | ||
/** | ||
* Look if we are connected to a peer with the given id. | ||
@@ -30,10 +49,18 @@ * Returns its id and addresses, if found, otherwise `undefined`. | ||
*/ | ||
const findPeerLocal = async (peer) => { | ||
dht._log(`findPeerLocal ${peer.toB58String()}`) | ||
const p = await dht.routingTable.find(peer) | ||
async findPeerLocal (peer) { | ||
let peerData | ||
const p = await this._routingTable.find(peer) | ||
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */ | ||
const peerData = p && dht.peerStore.get(p) | ||
if (p) { | ||
this._log('findPeerLocal found %p in routing table', peer) | ||
peerData = this._peerStore.get(p) | ||
} | ||
if (!peerData) { | ||
peerData = this._peerStore.get(peer) | ||
} | ||
if (peerData) { | ||
this._log('findPeerLocal found %p in peer store', peer) | ||
return { | ||
@@ -51,252 +78,243 @@ id: peerData.id, | ||
* @param {Uint8Array} key | ||
* @returns {Promise<Message>} | ||
* @private | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
const getValueSingle = async (peer, key) => { // eslint-disable-line require-await | ||
async * _getValueSingle (peer, key, options = {}) { // eslint-disable-line require-await | ||
const msg = new Message(Message.TYPES.GET_VALUE, key, 0) | ||
return dht.network.sendRequest(peer, msg) | ||
yield * this._network.sendRequest(peer, msg, options) | ||
} | ||
/** | ||
* Find close peers for a given peer | ||
* | ||
* @param {Uint8Array} key | ||
* @param {PeerId} peer | ||
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>} | ||
* @private | ||
*/ | ||
const closerPeersSingle = async (key, peer) => { | ||
dht._log(`closerPeersSingle ${uint8ArrayToString(key, 'base32')} from ${peer.toB58String()}`) | ||
const msg = await dht.peerRouting._findPeerSingle(peer, new PeerId(key)) | ||
return msg.closerPeers | ||
.filter((peerData) => !dht._isSelf(peerData.id)) | ||
.map((peerData) => { | ||
dht.peerStore.addressBook.add(peerData.id, peerData.multiaddrs) | ||
return peerData | ||
}) | ||
} | ||
/** | ||
* Get the public key directly from a node. | ||
* | ||
* @param {PeerId} peer | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
const getPublicKeyFromNode = async (peer) => { | ||
async * getPublicKeyFromNode (peer, options) { | ||
const pkKey = utils.keyForPublicKey(peer) | ||
const msg = await getValueSingle(peer, pkKey) | ||
if (!msg.record || !msg.record.value) { | ||
throw errcode(new Error(`Node not responding with its public key: ${peer.toB58String()}`), 'ERR_INVALID_RECORD') | ||
} | ||
for await (const event of this._getValueSingle(peer, pkKey, options)) { | ||
yield event | ||
const recPeer = await PeerId.createFromPubKey(msg.record.value) | ||
if (event.name === 'PEER_RESPONSE' && event.record) { | ||
const recPeer = await PeerId.createFromPubKey(event.record.value) | ||
// compare hashes of the pub key | ||
if (!recPeer.equals(peer)) { | ||
throw errcode(new Error('public key does not match id'), 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID') | ||
// compare hashes of the pub key | ||
if (!recPeer.equals(peer)) { | ||
throw errcode(new Error('public key does not match id'), 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID') | ||
} | ||
yield valueEvent({ from: peer, value: recPeer.pubKey.bytes }) | ||
} | ||
} | ||
return recPeer.pubKey | ||
throw errcode(new Error(`Node not responding with its public key: ${peer.toB58String()}`), 'ERR_INVALID_RECORD') | ||
} | ||
return { | ||
/** | ||
* Ask peer `peer` if they know where the peer with id `target` is. | ||
* Search for a peer with the given ID. | ||
* | ||
* @param {PeerId} peer | ||
* @param {PeerId} target | ||
* @returns {Promise<Message>} | ||
* @private | ||
* @param {PeerId} id | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
async _findPeerSingle (peer, target) { // eslint-disable-line require-await | ||
dht._log('findPeerSingle %s', peer.toB58String()) | ||
const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0) | ||
async * findPeer (id, options = {}) { | ||
this._log('findPeer %p', id) | ||
return dht.network.sendRequest(peer, msg) | ||
}, | ||
// Try to find locally | ||
const pi = await this.findPeerLocal(id) | ||
/** | ||
* Search for a peer with the given ID. | ||
* | ||
* @param {PeerId} id | ||
* @param {Object} [options] - findPeer options | ||
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds | ||
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
async findPeer (id, options = { timeout: 60000 }) { | ||
options.timeout = options.timeout || c.minute | ||
dht._log('findPeer %s', id.toB58String()) | ||
// already got it | ||
if (pi != null) { | ||
this._log('found local') | ||
yield finalPeerEvent({ | ||
from: this._peerId, | ||
peer: pi | ||
}) | ||
return | ||
} | ||
// Try to find locally | ||
const pi = await findPeerLocal(id) | ||
const key = await utils.convertPeerId(id) | ||
const peers = this._routingTable.closestPeers(key) | ||
// already got it | ||
if (pi != null) { | ||
dht._log('found local') | ||
return pi | ||
} | ||
// sanity check | ||
const match = peers.find((p) => p.equals(id)) | ||
const key = await utils.convertPeerId(id) | ||
const peers = dht.routingTable.closestPeers(key, dht.kBucketSize) | ||
if (match) { | ||
const peer = this._peerStore.get(id) | ||
if (peers.length === 0) { | ||
throw errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED') | ||
} | ||
// sanity check | ||
const match = peers.find((p) => p.isEqual(id)) | ||
if (match) { | ||
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */ | ||
const peer = dht.peerStore.get(id) | ||
if (peer) { | ||
dht._log('found in peerStore') | ||
return { | ||
if (peer) { | ||
this._log('found in peerStore') | ||
yield finalPeerEvent({ | ||
from: this._peerId, | ||
peer: { | ||
id: peer.id, | ||
multiaddrs: peer.addresses.map((address) => address.multiaddr) | ||
} | ||
} | ||
}) | ||
return | ||
} | ||
} | ||
// query the network | ||
const query = new Query(dht, id.id, () => { | ||
/** | ||
* There is no distinction between the disjoint paths, so there are no per-path | ||
* variables in dht scope. Just return the actual query function. | ||
* | ||
* @param {PeerId} peer | ||
*/ | ||
const queryFn = async (peer) => { | ||
const msg = await this._findPeerSingle(peer, id) | ||
const match = msg.closerPeers.find((p) => p.id.isEqual(id)) | ||
const self = this | ||
// found it | ||
/** | ||
* @type {import('../query/types').QueryFunc} | ||
*/ | ||
const findPeerQuery = async function * ({ peer, signal }) { | ||
const request = new Message(Message.TYPES.FIND_NODE, id.toBytes(), 0) | ||
for await (const event of self._network.sendRequest(peer, request, { signal })) { | ||
yield event | ||
if (event.name === 'PEER_RESPONSE') { | ||
const match = event.closer.find((p) => p.id.equals(id)) | ||
// found the peer | ||
if (match) { | ||
return { | ||
peer: match, | ||
queryComplete: true | ||
} | ||
yield finalPeerEvent({ from: event.from, peer: match }) | ||
} | ||
return { | ||
closerPeers: msg.closerPeers | ||
} | ||
} | ||
} | ||
} | ||
return queryFn | ||
}) | ||
let foundPeer = false | ||
let result | ||
try { | ||
result = await pTimeout(query.run(peers), options.timeout) | ||
} finally { | ||
query.stop() | ||
for await (const event of this._queryManager.run(id.id, peers, findPeerQuery, options)) { | ||
if (event.name === 'FINAL_PEER') { | ||
foundPeer = true | ||
} | ||
let success = false | ||
result.paths.forEach((result) => { | ||
if (result.success && result.peer) { | ||
success = true | ||
dht.peerStore.addressBook.add(result.peer.id, result.peer.multiaddrs) | ||
} | ||
}) | ||
dht._log('findPeer %s: %s', id.toB58String(), success) | ||
yield event | ||
} | ||
if (!success) { | ||
throw errcode(new Error('No peer found'), 'ERR_NOT_FOUND') | ||
} | ||
if (!foundPeer) { | ||
yield queryErrorEvent({ from: this._peerId, error: errcode(new Error('Not found'), 'ERR_NOT_FOUND') }) | ||
} | ||
} | ||
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */ | ||
const peerData = dht.peerStore.get(id) | ||
/** | ||
* Kademlia 'node lookup' operation | ||
* | ||
* @param {Uint8Array} key - the key to look up, could be a the bytes from a multihash or a peer ID | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
* @param {number} [options.queryFuncTimeout] | ||
*/ | ||
async * getClosestPeers (key, options = {}) { | ||
this._log('getClosestPeers to %b', key) | ||
const id = await utils.convertBuffer(key) | ||
const tablePeers = this._routingTable.closestPeers(id) | ||
const self = this | ||
if (!peerData) { | ||
throw errcode(new Error('No peer found in peer store'), 'ERR_NOT_FOUND') | ||
} | ||
const peers = new PeerDistanceList(id, this._routingTable._kBucketSize) | ||
tablePeers.forEach(peer => peers.add(peer)) | ||
return { | ||
id: peerData.id, | ||
multiaddrs: peerData.addresses.map((address) => address.multiaddr) | ||
} | ||
}, | ||
/** | ||
* Kademlia 'node lookup' operation. | ||
* | ||
* @param {Uint8Array} key | ||
* @param {Object} [options] | ||
* @param {boolean} [options.shallow=false] - shallow query | ||
* @returns {AsyncIterable<PeerId>} | ||
* @type {import('../query/types').QueryFunc} | ||
*/ | ||
async * getClosestPeers (key, options = { shallow: false }) { | ||
dht._log('getClosestPeers to %b', key) | ||
const getCloserPeersQuery = async function * ({ peer, signal }) { | ||
self._log('closerPeersSingle %s from %p', uint8ArrayToString(key, 'base32'), peer) | ||
const request = new Message(Message.TYPES.FIND_NODE, key, 0) | ||
const id = await utils.convertBuffer(key) | ||
const tablePeers = dht.routingTable.closestPeers(id, dht.kBucketSize) | ||
yield * self._network.sendRequest(peer, request, { signal }) | ||
} | ||
const q = new Query(dht, key, () => { | ||
// There is no distinction between the disjoint paths, | ||
// so there are no per-path variables in dht scope. | ||
// Just return the actual query function. | ||
return async (peer) => { | ||
const closer = await closerPeersSingle(key, peer) | ||
for await (const event of this._queryManager.run(key, tablePeers, getCloserPeersQuery, options)) { | ||
yield event | ||
return { | ||
closerPeers: closer, | ||
pathComplete: options.shallow ? true : undefined | ||
} | ||
} | ||
}) | ||
const res = await q.run(tablePeers) | ||
if (!res || !res.finalSet) { | ||
return [] | ||
if (event.name === 'PEER_RESPONSE') { | ||
event.closer.forEach(peerData => { | ||
peers.add(peerData.id) | ||
}) | ||
} | ||
} | ||
const sorted = await utils.sortClosestPeers(Array.from(res.finalSet), id) | ||
this._log('found %d peers close to %b', peers.length, key) | ||
for (const pId of sorted.slice(0, dht.kBucketSize)) { | ||
yield pId | ||
yield * peers.peers.map(peer => finalPeerEvent({ | ||
from: this._peerId, | ||
peer: { | ||
id: peer, | ||
multiaddrs: (this._peerStore.addressBook.get(peer) || []).map(addr => addr.multiaddr) | ||
} | ||
}, | ||
})) | ||
} | ||
/** | ||
* Get the public key for the given peer id. | ||
* | ||
* @param {PeerId} peer | ||
*/ | ||
async getPublicKey (peer) { | ||
dht._log('getPublicKey %s', peer.toB58String()) | ||
/** | ||
* Query a particular peer for the value for the given key. | ||
* It will either return the value or a list of closer peers. | ||
* | ||
* Note: The peerStore is updated with new addresses found for the given peer. | ||
* | ||
* @param {PeerId} peer | ||
* @param {Uint8Array} key | ||
* @param {object} [options] | ||
* @param {AbortSignal} [options.signal] | ||
*/ | ||
async * getValueOrPeers (peer, key, options = {}) { | ||
for await (const event of this._getValueSingle(peer, key, options)) { | ||
if (event.name === 'PEER_RESPONSE') { | ||
if (event.record) { | ||
// We have a record | ||
try { | ||
await this._verifyRecordOnline(event.record) | ||
} catch (/** @type {any} */ err) { | ||
const errMsg = 'invalid record received, discarded' | ||
this._log(errMsg) | ||
// local check | ||
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */ | ||
const peerData = dht.peerStore.get(peer) | ||
if (peerData && peerData.id.pubKey) { | ||
dht._log('getPublicKey: found local copy') | ||
return peerData.id.pubKey | ||
yield queryErrorEvent({ from: event.from, error: errcode(new Error(errMsg), 'ERR_INVALID_RECORD') }) | ||
continue | ||
} | ||
} | ||
} | ||
// try the node directly | ||
let pk | ||
yield event | ||
} | ||
} | ||
try { | ||
pk = await getPublicKeyFromNode(peer) | ||
} catch (err) { | ||
// try dht directly | ||
const pkKey = utils.keyForPublicKey(peer) | ||
const value = await dht.get(pkKey) | ||
pk = crypto.keys.unmarshalPublicKey(value) | ||
} | ||
/** | ||
* Verify a record, fetching missing public keys from the network. | ||
* Calls back with an error if the record is invalid. | ||
* | ||
* @param {import('../types').DHTRecord} record | ||
* @returns {Promise<void>} | ||
*/ | ||
async _verifyRecordOnline ({ key, value, timeReceived }) { | ||
await validator.verifyRecord(this._validators, new Record(key, value, timeReceived)) | ||
} | ||
const peerId = new PeerId(peer.id, undefined, pk) | ||
const addrs = ((peerData && peerData.addresses) || []).map((address) => address.multiaddr) | ||
dht.peerStore.addressBook.add(peerId, addrs) | ||
dht.peerStore.keyBook.set(peerId, pk) | ||
/** | ||
* Get the nearest peers to the given query, but if closer | ||
* than self | ||
* | ||
* @param {Uint8Array} key | ||
* @param {PeerId} closerThan | ||
*/ | ||
async getCloserPeersOffline (key, closerThan) { | ||
const id = await utils.convertBuffer(key) | ||
const ids = this._routingTable.closestPeers(id) | ||
const output = ids | ||
.map((p) => { | ||
const peer = this._peerStore.get(p) | ||
return pk | ||
return { | ||
id: p, | ||
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : [] | ||
} | ||
}) | ||
.filter((closer) => !closer.id.equals(closerThan)) | ||
if (output.length) { | ||
this._log('getCloserPeersOffline found %d peer(s) closer to %b than %p', output.length, key, closerThan) | ||
} else { | ||
this._log('getCloserPeersOffline could not find peer closer to %b than %p', key, closerThan) | ||
} | ||
return output | ||
} | ||
} | ||
module.exports.PeerRouting = PeerRouting |
@@ -9,5 +9,13 @@ 'use strict' | ||
const { default: Queue } = require('p-queue') | ||
const c = require('./constants') | ||
const { | ||
PROVIDERS_CLEANUP_INTERVAL, | ||
PROVIDERS_VALIDITY, | ||
PROVIDERS_LRU_CACHE_SIZE, | ||
PROVIDERS_KEY_PREFIX | ||
} = require('./constants') | ||
const utils = require('./utils') | ||
const { toString: uint8ArrayToString } = require('uint8arrays/to-string') | ||
const log = utils.logger('libp2p:kad-dht:providers') | ||
/** | ||
@@ -33,10 +41,7 @@ * @typedef {import('multiformats/cid').CID} CID | ||
* @param {Datastore} datastore | ||
* @param {PeerId} [self] | ||
* @param {number} [cacheSize=256] | ||
*/ | ||
constructor (datastore, self, cacheSize) { | ||
constructor (datastore, cacheSize) { | ||
this.datastore = datastore | ||
this._log = utils.logger(self, 'providers') | ||
/** | ||
@@ -47,3 +52,3 @@ * How often invalid records are cleaned. (in seconds) | ||
*/ | ||
this.cleanupInterval = c.PROVIDERS_CLEANUP_INTERVAL | ||
this.cleanupInterval = PROVIDERS_CLEANUP_INTERVAL | ||
@@ -55,3 +60,3 @@ /** | ||
*/ | ||
this.provideValidity = c.PROVIDERS_VALIDITY | ||
this.provideValidity = PROVIDERS_VALIDITY | ||
@@ -63,3 +68,3 @@ /** | ||
*/ | ||
this.lruCacheSize = cacheSize || c.PROVIDERS_LRU_CACHE_SIZE | ||
this.lruCacheSize = cacheSize || PROVIDERS_LRU_CACHE_SIZE | ||
@@ -108,3 +113,2 @@ // @ts-ignore hashlru types are wrong | ||
return this.syncQueue.add(async () => { | ||
this._log('start cleanup') | ||
const start = Date.now() | ||
@@ -118,3 +122,4 @@ | ||
// Get all provider entries from the datastore | ||
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }) | ||
const query = this.datastore.query({ prefix: PROVIDERS_KEY_PREFIX }) | ||
for await (const entry of query) { | ||
@@ -128,4 +133,5 @@ try { | ||
const expired = delta > this.provideValidity | ||
this._log('comparing: %d - %d = %d > %d %s', | ||
now, time, delta, this.provideValidity, expired ? '(expired)' : '') | ||
log('comparing: %d - %d = %d > %d %s', now, time, delta, this.provideValidity, expired ? '(expired)' : '') | ||
if (expired) { | ||
@@ -139,11 +145,13 @@ deleteCount++ | ||
count++ | ||
} catch (err) { | ||
this._log.error(err.message) | ||
} catch (/** @type {any} */ err) { | ||
log.error(err.message) | ||
} | ||
} | ||
this._log('deleting %d / %d entries', deleteCount, count) | ||
// Commit the deletes to the datastore | ||
if (deleted.size) { | ||
log('deleting %d / %d entries', deleteCount, count) | ||
await batch.commit() | ||
} else { | ||
log('nothing to delete') | ||
} | ||
@@ -155,2 +163,3 @@ | ||
const provs = this.providers.get(key) | ||
if (provs) { | ||
@@ -160,2 +169,3 @@ for (const peerId of peers) { | ||
} | ||
if (provs.size === 0) { | ||
@@ -169,3 +179,3 @@ this.providers.remove(key) | ||
this._log('Cleanup successful (%dms)', Date.now() - start) | ||
log('Cleanup successful (%dms)', Date.now() - start) | ||
}) | ||
@@ -185,2 +195,3 @@ } | ||
let provs = this.providers.get(cacheKey) | ||
if (!provs) { | ||
@@ -190,2 +201,3 @@ provs = await loadProviders(this.datastore, cid) | ||
} | ||
return provs | ||
@@ -203,11 +215,12 @@ } | ||
return this.syncQueue.add(async () => { | ||
this._log('addProvider %s', cid.toString()) | ||
log('addProvider %s', cid.toString()) | ||
const provs = await this._getProvidersMap(cid) | ||
this._log('loaded %s provs', provs.size) | ||
log('loaded %s provs', provs.size) | ||
const now = new Date() | ||
provs.set(utils.encodeBase32(provider.id), now) | ||
provs.set(provider.toString(), now) | ||
const dsKey = makeProviderKey(cid) | ||
this.providers.set(dsKey, provs) | ||
return writeProviderEntry(this.datastore, cid, provider, now) | ||
@@ -225,6 +238,7 @@ }) | ||
return this.syncQueue.add(async () => { | ||
this._log('getProviders %s', cid.toString()) | ||
log('getProviders %s', cid.toString()) | ||
const provs = await this._getProvidersMap(cid) | ||
return [...provs.keys()].map((base32PeerId) => { | ||
return new PeerId(utils.decodeBase32(base32PeerId)) | ||
return [...provs.keys()].map(peerIdStr => { | ||
return PeerId.parse(peerIdStr) | ||
}) | ||
@@ -244,4 +258,5 @@ }) | ||
function makeProviderKey (cid) { | ||
cid = typeof cid === 'string' ? cid : utils.encodeBase32(cid.bytes) | ||
return c.PROVIDERS_KEY_PREFIX + cid | ||
cid = typeof cid === 'string' ? cid : uint8ArrayToString(cid.multihash.bytes, 'base32') | ||
return PROVIDERS_KEY_PREFIX + cid | ||
} | ||
@@ -261,3 +276,3 @@ | ||
'/', | ||
utils.encodeBase32(peer.id) | ||
peer.toString() | ||
].join('') | ||
@@ -267,2 +282,3 @@ | ||
const buffer = Uint8Array.from(varint.encode(time.getTime())) | ||
return store.put(key, buffer) | ||
@@ -278,2 +294,3 @@ } | ||
const parts = key.toString().split('/') | ||
if (parts.length !== 4) { | ||
@@ -301,2 +318,3 @@ throw new Error('incorrectly formatted provider entry key in datastore: ' + key) | ||
const query = store.query({ prefix: makeProviderKey(cid) }) | ||
for await (const entry of query) { | ||
@@ -306,2 +324,3 @@ const { peerId } = parseProviderKey(entry.key) | ||
} | ||
return providers | ||
@@ -317,2 +336,2 @@ } | ||
module.exports = Providers | ||
module.exports.Providers = Providers |
'use strict' | ||
// @ts-ignore | ||
// @ts-expect-error no types | ||
const KBuck = require('k-bucket') | ||
const { xor: uint8ArrayXor } = require('uint8arrays/xor') | ||
const GENERATED_PREFIXES = require('./generated-prefix-list.json') | ||
const { sha256 } = require('multiformats/hashes/sha2') | ||
const crypto = require('libp2p-crypto') | ||
const PeerId = require('peer-id') | ||
const utils = require('../utils') | ||
const debug = require('debug') | ||
const log = Object.assign(debug('libp2p:dht:routing-table'), { | ||
error: debug('libp2p:dht:routing-table:error') | ||
}) | ||
// @ts-ignore | ||
const length = require('it-length') | ||
const { default: Queue } = require('p-queue') | ||
const { PROTOCOL_DHT } = require('../constants') | ||
const { TimeoutController } = require('timeout-abort-controller') | ||
/** | ||
* @typedef {object} KBucketPeer | ||
* @property {Uint8Array} id | ||
* @property {PeerId} peer | ||
* | ||
* @typedef {object} KBucket | ||
* @property {Uint8Array} id | ||
* @property {KBucketPeer[]} contacts | ||
* @property {boolean} dontSplit | ||
* @property {KBucket} left | ||
* @property {KBucket} right | ||
* | ||
* @typedef {object} KBucketTree | ||
* @property {KBucket} root | ||
* @property {Uint8Array} localNodeId | ||
* @property {(event: string, callback: Function) => void} on | ||
* @property {(key: Uint8Array, count: number) => KBucketPeer[]} closest | ||
* @property {(key: Uint8Array) => KBucketPeer} closestPeer | ||
* @property {(key: Uint8Array) => void} remove | ||
* @property {(peer: KBucketPeer) => void} add | ||
* @property {() => number} count | ||
* @property {() => Iterable<KBucket>} toIterable | ||
* @typedef {import('./types').KBucketPeer} KBucketPeer | ||
* @typedef {import('./types').KBucket} KBucket | ||
* @typedef {import('./types').KBucketTree} KBucketTree | ||
* @typedef {import('peer-id')} PeerId | ||
*/ | ||
/** | ||
* Cannot generate random KadIds longer than this + 1 | ||
*/ | ||
const MAX_COMMON_PREFIX_LENGTH = 15 | ||
/** | ||
* A wrapper around `k-bucket`, to provide easy store and | ||
@@ -53,12 +23,15 @@ * retrieval for peers. | ||
/** | ||
* @param {import('../')} dht | ||
* @param {object} [options] | ||
* @param {number} [options.kBucketSize=20] | ||
* @param {number} [options.refreshInterval=30000] | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../types').Dialer} params.dialer | ||
* @param {boolean} params.lan | ||
* @param {number} [params.kBucketSize=20] | ||
* @param {number} [params.pingTimeout=10000] | ||
*/ | ||
constructor (dht, { kBucketSize, refreshInterval } = {}) { | ||
this.peerId = dht.peerId | ||
this.dht = dht | ||
constructor ({ peerId, dialer, kBucketSize, pingTimeout, lan }) { | ||
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table`) | ||
this._peerId = peerId | ||
this._dialer = dialer | ||
this._kBucketSize = kBucketSize || 20 | ||
this._refreshInterval = refreshInterval || 30000 | ||
this._pingTimeout = pingTimeout || 10000 | ||
@@ -74,249 +47,70 @@ /** @type {KBucketTree} */ | ||
this._refreshTable = this._refreshTable.bind(this) | ||
this._onPing = this._onPing.bind(this) | ||
this._pingQueue = new Queue({ concurrency: 1 }) | ||
} | ||
async start () { | ||
this.kb.localNodeId = await utils.convertPeerId(this.peerId) | ||
this.kb.localNodeId = await utils.convertPeerId(this._peerId) | ||
this.kb.on('ping', this._onPing) | ||
await this._refreshTable(true) | ||
} | ||
async stop () { | ||
if (this._refreshTimeoutId) { | ||
clearTimeout(this._refreshTimeoutId) | ||
} | ||
this._pingQueue.clear() | ||
} | ||
/** | ||
* To speed lookups, we seed the table with random PeerIds. This means | ||
* when we are asked to locate a peer on the network, we can find a KadId | ||
* that is close to the requested peer ID and query that, then network | ||
* peers will tell us who they know who is close to the fake ID | ||
* Called on the `ping` event from `k-bucket` when a bucket is full | ||
* and cannot split. | ||
* | ||
* @param {boolean} [force=false] | ||
* `oldContacts.length` is defined by the `numberOfNodesToPing` param | ||
* passed to the `k-bucket` constructor. | ||
* | ||
* `oldContacts` will not be empty and is the list of contacts that | ||
* have not been contacted for the longest. | ||
* | ||
* @param {KBucketPeer[]} oldContacts | ||
* @param {KBucketPeer} newContact | ||
*/ | ||
async _refreshTable (force) { | ||
log('refreshing routing table') | ||
_onPing (oldContacts, newContact) { | ||
// add to a queue so multiple ping requests do not overlap and we don't | ||
// flood the network with ping requests if lots of newContact requests | ||
// are received | ||
this._pingQueue.add(async () => { | ||
let responded = 0 | ||
const prefixLength = this._maxCommonPrefix() | ||
const refreshCpls = this._getTrackedCommonPrefixLengthsForRefresh(prefixLength) | ||
try { | ||
await Promise.all( | ||
oldContacts.map(async oldContact => { | ||
let timeoutController | ||
log(`max common prefix length ${prefixLength}`) | ||
log(`tracked CPLs [ ${refreshCpls.map(date => `${date.getFullYear()}-${(date.getMonth() + 1).toString().padStart(2, '0')}-${date.getDate().toString().padStart(2, '0')} ${date.getHours().toString().padStart(2, '0')}:${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')}`).join(', ')} ]`) | ||
/** | ||
* If we see a gap at a common prefix length in the Routing table, we ONLY refresh up until | ||
* the maximum cpl we have in the Routing Table OR (2 * (Cpl+ 1) with the gap), whichever | ||
* is smaller. | ||
* | ||
* This is to prevent refreshes for Cpls that have no peers in the network but happen to be | ||
* before a very high max Cpl for which we do have peers in the network. | ||
* | ||
* The number of 2 * (Cpl + 1) can be proved and a proof would have been written here if | ||
* the programmer had paid more attention in the Math classes at university. | ||
* | ||
* So, please be patient and a doc explaining it will be published soon. | ||
* | ||
* https://github.com/libp2p/go-libp2p-kad-dht/commit/2851c88acb0a3f86bcfe3cfd0f4604a03db801d8#diff-ad45f4ba97ffbc4083c2eb87a4420c1157057b233f048030d67c6b551855ccf6R219 | ||
*/ | ||
await Promise.all( | ||
refreshCpls.map(async (lastRefresh, index) => { | ||
try { | ||
await this._refreshCommonPrefixLength(index, lastRefresh, force === true) | ||
if (this._numPeersForCpl(prefixLength) === 0) { | ||
const lastCpl = Math.min(2 * (index + 1), refreshCpls.length - 1) | ||
for (let n = index + 1; n < lastCpl + 1; n++) { | ||
try { | ||
await this._refreshCommonPrefixLength(n, lastRefresh, force === true) | ||
} catch (err) { | ||
log.error(err) | ||
try { | ||
timeoutController = new TimeoutController(this._pingTimeout) | ||
this._log(`Pinging old contact ${oldContact.peer}`) | ||
const { stream } = await this._dialer.dialProtocol(oldContact.peer, PROTOCOL_DHT, { | ||
signal: timeoutController.signal | ||
}) | ||
await stream.close() | ||
responded++ | ||
} catch (err) { | ||
this._log.error('Could not ping peer %p', oldContact.peer, err) | ||
this._log(`Evicting old contact after ping failed ${oldContact.peer}`) | ||
this.kb.remove(oldContact.id) | ||
} finally { | ||
if (timeoutController) { | ||
timeoutController.clear() | ||
} | ||
} | ||
} | ||
} catch (err) { | ||
log.error(err) | ||
} | ||
}) | ||
) | ||
}) | ||
) | ||
this._refreshTimeoutId = setTimeout(this._refreshTable, this._refreshInterval) | ||
// @ts-ignore | ||
this._refreshTimeoutId.unref() | ||
} | ||
/** | ||
* @param {number} cpl | ||
* @param {Date} lastRefresh | ||
* @param {boolean} force | ||
*/ | ||
async _refreshCommonPrefixLength (cpl, lastRefresh, force) { | ||
if (!force && lastRefresh.getTime() > (Date.now() - this._refreshInterval)) { | ||
log(`not running refresh for cpl ${cpl} as time since last refresh not above interval`) | ||
return | ||
} | ||
// gen a key for the query to refresh the cpl | ||
const peerId = await this._generateRandomPeerId(cpl) | ||
log(`starting refreshing cpl ${cpl} with key ${peerId.toB58String()} (routing table size was ${this.kb.count()})`) | ||
const peers = await length(this.dht.getClosestPeers(peerId.toBytes(), {})) | ||
log(`found ${peers} peers that were close to imaginary peer ${peerId.toB58String()}`) | ||
log(`finished refreshing cpl ${cpl} with key ${peerId.toB58String()} (routing table size was ${this.kb.count()})`) | ||
} | ||
/** | ||
* @param {number} maxCommonPrefix | ||
*/ | ||
_getTrackedCommonPrefixLengthsForRefresh (maxCommonPrefix) { | ||
if (maxCommonPrefix > MAX_COMMON_PREFIX_LENGTH) { | ||
maxCommonPrefix = MAX_COMMON_PREFIX_LENGTH | ||
} | ||
const dates = [] | ||
for (let i = 0; i <= maxCommonPrefix; i++) { | ||
// defaults to the zero value if we haven't refreshed it yet. | ||
dates[i] = this.commonPrefixLengthRefreshedAt[i] || new Date() | ||
} | ||
return dates | ||
} | ||
/** | ||
* | ||
* @param {number} targetCommonPrefixLength | ||
*/ | ||
async _generateRandomPeerId (targetCommonPrefixLength) { | ||
const randomBytes = crypto.randomBytes(2) | ||
const randomUint16 = (randomBytes[1] << 8) + randomBytes[0] | ||
const key = await this._makePeerId(this.kb.localNodeId, randomUint16, targetCommonPrefixLength) | ||
return PeerId.createFromBytes(key) | ||
} | ||
/** | ||
* @param {Uint8Array} localKadId | ||
* @param {number} randomPrefix | ||
* @param {number} targetCommonPrefixLength | ||
*/ | ||
async _makePeerId (localKadId, randomPrefix, targetCommonPrefixLength) { | ||
if (targetCommonPrefixLength > MAX_COMMON_PREFIX_LENGTH) { | ||
throw new Error(`Cannot generate peer ID for common prefix length greater than ${MAX_COMMON_PREFIX_LENGTH}`) | ||
} | ||
const view = new DataView(localKadId.buffer, localKadId.byteOffset, localKadId.byteLength) | ||
const localPrefix = view.getUint16(0, false) | ||
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B. | ||
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L | ||
// to our randomly generated prefix. | ||
const toggledLocalPrefix = localPrefix ^ (0x8000 >> targetCommonPrefixLength) | ||
// Combine the toggled local prefix and the random bits at the correct offset | ||
// such that ONLY the first `targetCommonPrefixLength` bits match the local ID. | ||
const mask = 65535 << (16 - (targetCommonPrefixLength + 1)) | ||
const targetPrefix = (toggledLocalPrefix & mask) | (randomPrefix & ~mask) | ||
// Convert to a known peer ID. | ||
const keyPrefix = GENERATED_PREFIXES[targetPrefix] | ||
const keyBuffer = new ArrayBuffer(34) | ||
const keyView = new DataView(keyBuffer, 0, keyBuffer.byteLength) | ||
keyView.setUint8(0, sha256.code) | ||
keyView.setUint8(1, 32) | ||
keyView.setUint32(2, keyPrefix, false) | ||
return new Uint8Array(keyView.buffer, keyView.byteOffset, keyView.byteLength) | ||
} | ||
/** | ||
* returns the maximum common prefix length between any peer in the table | ||
* and the current peer | ||
*/ | ||
_maxCommonPrefix () { | ||
if (!this.kb.localNodeId) { | ||
return 0 | ||
} | ||
// xor our KadId with every KadId in the k-bucket tree, | ||
// return the longest id prefix that is the same | ||
let prefixLength = 0 | ||
for (const length of this._prefixLengths()) { | ||
if (length > prefixLength) { | ||
prefixLength = length | ||
} | ||
} | ||
return prefixLength | ||
} | ||
/** | ||
* Returns the number of peers in the table with a given prefix length | ||
* | ||
* @param {number} prefixLength | ||
*/ | ||
_numPeersForCpl (prefixLength) { | ||
let count = 0 | ||
for (const length of this._prefixLengths()) { | ||
if (length === prefixLength) { | ||
count++ | ||
} | ||
} | ||
return count | ||
} | ||
/** | ||
* Yields the common prefix length of every peer in the table | ||
*/ | ||
* _prefixLengths () { | ||
for (const { id } of this.kb.toIterable()) { | ||
const distance = uint8ArrayXor(this.kb.localNodeId, id) | ||
let leadingZeros = 0 | ||
for (const byte of distance) { | ||
if (byte === 0) { | ||
leadingZeros++ | ||
} else { | ||
break | ||
if (responded < oldContacts.length) { | ||
this._log(`Adding new contact ${newContact.peer}`) | ||
this.kb.add(newContact) | ||
} | ||
} catch (err) { | ||
this._log.error('Could not process k-bucket ping event', err) | ||
} | ||
yield leadingZeros | ||
} | ||
}) | ||
} | ||
/** | ||
* Called on the `ping` event from `k-bucket`. | ||
* Currently this just removes the oldest contact from | ||
* the list, without actually pinging the individual peers. | ||
* This is the same as go does, but should probably | ||
* be upgraded to actually ping the individual peers. | ||
* | ||
* @param {KBucketPeer[]} oldContacts | ||
* @param {KBucketPeer} newContact | ||
*/ | ||
_onPing (oldContacts, newContact) { | ||
// just use the first one (k-bucket sorts from oldest to newest) | ||
const oldest = oldContacts[0] | ||
if (oldest) { | ||
// remove the oldest one | ||
this.kb.remove(oldest.id) | ||
} | ||
// add the new one | ||
this.kb.add(newContact) | ||
} | ||
// -- Public Interface | ||
@@ -362,5 +156,5 @@ | ||
* @param {Uint8Array} key | ||
* @param {number} count | ||
* @param {number} [count] - defaults to kBucketSize | ||
*/ | ||
closestPeers (key, count) { | ||
closestPeers (key, count = this._kBucketSize) { | ||
const closest = this.kb.closest(key, count) | ||
@@ -394,2 +188,2 @@ | ||
module.exports = RoutingTable | ||
module.exports.RoutingTable = RoutingTable |
@@ -5,22 +5,32 @@ 'use strict' | ||
const errcode = require('err-code') | ||
const utils = require('../../utils') | ||
const log = utils.logger('libp2p:kad-dht:rpc:handlers:add-provider') | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../../message')} Message | ||
* @typedef {import('../../message').Message} Message | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @param {import('../../index')} dht | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
module.exports = (dht) => { | ||
const log = utils.logger(dht.peerId, 'rpc:add-provider') | ||
class AddProviderHandler { | ||
/** | ||
* Process `AddProvider` DHT messages. | ||
* | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../providers').Providers} params.providers | ||
* @param {import('../../types').PeerStore} params.peerStore | ||
*/ | ||
constructor ({ peerId, providers, peerStore }) { | ||
this._peerId = peerId | ||
this._providers = providers | ||
this._peerStore = peerStore | ||
} | ||
/** | ||
* @param {PeerId} peerId | ||
* @param {Message} msg | ||
*/ | ||
async function addProvider (peerId, msg) { // eslint-disable-line require-await | ||
async handle (peerId, msg) { | ||
log('start') | ||
@@ -35,4 +45,5 @@ | ||
try { | ||
// this is actually just the multihash, not the whole CID | ||
cid = CID.decode(msg.key) | ||
} catch (err) { | ||
} catch (/** @type {any} */ err) { | ||
const errMsg = `Invalid CID: ${err.message}` | ||
@@ -42,33 +53,34 @@ throw errcode(new Error(errMsg), 'ERR_INVALID_CID') | ||
msg.providerPeers.forEach((pi) => { | ||
// Ignore providers not from the originator | ||
if (!pi.id.isEqual(peerId)) { | ||
log('invalid provider peer %s from %s', pi.id.toB58String(), peerId.toB58String()) | ||
return | ||
} | ||
if (!msg.providerPeers || !msg.providerPeers.length) { | ||
log.error('no providers found in message') | ||
} | ||
if (pi.multiaddrs.length < 1) { | ||
log('no valid addresses for provider %s. Ignore', peerId.toB58String()) | ||
return | ||
} | ||
await Promise.all( | ||
msg.providerPeers.map(async (pi) => { | ||
// Ignore providers not from the originator | ||
if (!pi.id.equals(peerId)) { | ||
log('invalid provider peer %p from %p', pi.id, peerId) | ||
return | ||
} | ||
log('received provider %s for %s (addrs %s)', peerId.toB58String(), cid.toString(), pi.multiaddrs.map((m) => m.toString())) | ||
if (pi.multiaddrs.length < 1) { | ||
log('no valid addresses for provider %p. Ignore', peerId) | ||
return | ||
} | ||
if (!dht._isSelf(pi.id)) { | ||
// Add known address to peer store | ||
dht.peerStore.addressBook.add(pi.id, pi.multiaddrs) | ||
return dht.providers.addProvider(cid, pi.id) | ||
} | ||
}) | ||
log('received provider %p for %s (addrs %s)', peerId, cid, pi.multiaddrs.map((m) => m.toString())) | ||
// Previous versions of the JS DHT sent erroneous providers in the | ||
// `providerPeers` field. In order to accommodate older clients that have | ||
// this bug, we fall back to assuming the originator is the provider if | ||
// we can't find any valid providers in the payload. | ||
// https://github.com/libp2p/js-libp2p-kad-dht/pull/127 | ||
// https://github.com/libp2p/js-libp2p-kad-dht/issues/128 | ||
return dht.providers.addProvider(cid, peerId) | ||
if (!this._peerId.equals(pi.id)) { | ||
// Add known address to peer store | ||
this._peerStore.addressBook.add(pi.id, pi.multiaddrs) | ||
await this._providers.addProvider(cid, pi.id) | ||
} | ||
}) | ||
) | ||
// typescript requires a return value | ||
return undefined | ||
} | ||
} | ||
return addProvider | ||
} | ||
module.exports.AddProviderHandler = AddProviderHandler |
'use strict' | ||
const { equals: uint8ArrayEquals } = require('uint8arrays/equals') | ||
const Message = require('../../message') | ||
const { Message } = require('../../message') | ||
const utils = require('../../utils') | ||
const log = utils.logger('libp2p:kad-dht:rpc:handlers:find-node') | ||
const { | ||
removePrivateAddresses, | ||
removePublicAddresses | ||
} = require('../../utils') | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @param {import('../../index')} dht | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
module.exports = (dht) => { | ||
const log = utils.logger(dht.peerId, 'rpc:find-node') | ||
class FindNodeHandler { | ||
/** | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../types').Addressable} params.addressable | ||
* @param {import('../../peer-routing').PeerRouting} params.peerRouting | ||
* @param {boolean} [params.lan] | ||
*/ | ||
constructor ({ peerId, addressable, peerRouting, lan }) { | ||
this._peerId = peerId | ||
this._addressable = addressable | ||
this._peerRouting = peerRouting | ||
this._lan = Boolean(lan) | ||
} | ||
/** | ||
* Process `FindNode` DHT messages. | ||
* Process `FindNode` DHT messages | ||
* | ||
@@ -24,15 +40,19 @@ * @param {PeerId} peerId | ||
*/ | ||
async function findNode (peerId, msg) { | ||
log('start') | ||
async handle (peerId, msg) { | ||
log('incoming request from %p for peers closer to %b', peerId, msg.key) | ||
let closer | ||
if (uint8ArrayEquals(msg.key, dht.peerId.id)) { | ||
if (this._peerId.equals(msg.key)) { | ||
closer = [{ | ||
id: dht.peerId, | ||
multiaddrs: dht.libp2p.multiaddrs | ||
id: this._peerId, | ||
multiaddrs: this._addressable.multiaddrs | ||
}] | ||
} else { | ||
closer = await dht._betterPeersToQuery(msg, peerId) | ||
closer = await this._peerRouting.getCloserPeersOffline(msg.key, peerId) | ||
} | ||
closer = closer | ||
.map(this._lan ? removePublicAddresses : removePrivateAddresses) | ||
.filter(({ multiaddrs }) => multiaddrs.length) | ||
const response = new Message(msg.type, new Uint8Array(0), msg.clusterLevel) | ||
@@ -43,3 +63,3 @@ | ||
} else { | ||
log('handle FindNode %s: could not find anything', peerId.toB58String()) | ||
log('could not find any peers closer to %p', peerId) | ||
} | ||
@@ -49,4 +69,4 @@ | ||
} | ||
} | ||
return findNode | ||
} | ||
module.exports.FindNodeHandler = FindNodeHandler |
@@ -5,15 +5,38 @@ 'use strict' | ||
const errcode = require('err-code') | ||
const Message = require('../../message') | ||
const { Message } = require('../../message') | ||
const utils = require('../../utils') | ||
const log = utils.logger('libp2p:kad-dht:rpc:handlers:get-providers') | ||
const { | ||
removePrivateAddresses, | ||
removePublicAddresses | ||
} = require('../../utils') | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @param {import('../../index')} dht | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
module.exports = (dht) => { | ||
const log = utils.logger(dht.peerId, 'rpc:get-providers') | ||
class GetProvidersHandler { | ||
/** | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('../../providers').Providers} params.providers | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
* @param {import('../../types').PeerStore} params.peerStore | ||
* @param {import('../../types').Addressable} params.addressable | ||
* @param {boolean} [params.lan] | ||
*/ | ||
constructor ({ peerId, peerRouting, providers, datastore, peerStore, addressable, lan }) { | ||
this._peerId = peerId | ||
this._peerRouting = peerRouting | ||
this._providers = providers | ||
this._datastore = datastore | ||
this._peerStore = peerStore | ||
this._addressable = addressable | ||
this._lan = Boolean(lan) | ||
} | ||
@@ -26,33 +49,46 @@ /** | ||
*/ | ||
async function getProviders (peerId, msg) { | ||
async handle (peerId, msg) { | ||
let cid | ||
try { | ||
cid = CID.decode(msg.key) | ||
} catch (err) { | ||
} catch (/** @type {any} */ err) { | ||
throw errcode(new Error(`Invalid CID: ${err.message}`), 'ERR_INVALID_CID') | ||
} | ||
log('%s', cid.toString()) | ||
log('%p asking for providers for %s', peerId, cid.toString()) | ||
const dsKey = utils.bufferToKey(cid.bytes) | ||
const [has, peers, closer] = await Promise.all([ | ||
dht.datastore.has(dsKey), | ||
dht.providers.getProviders(cid), | ||
dht._betterPeersToQuery(msg, peerId) | ||
this._datastore.has(dsKey), | ||
this._providers.getProviders(cid), | ||
this._peerRouting.getCloserPeersOffline(msg.key, peerId) | ||
]) | ||
const providerPeers = peers.map((peerId) => ({ | ||
id: peerId, | ||
multiaddrs: [] | ||
})) | ||
const closerPeers = closer.map((c) => ({ | ||
id: c.id, | ||
multiaddrs: [] | ||
})) | ||
const providerPeers = peers | ||
.map((provider) => ({ | ||
id: provider, | ||
multiaddrs: (this._peerStore.addressBook.get(provider) || []).map(address => address.multiaddr) | ||
})) | ||
.map(this._lan ? removePublicAddresses : removePrivateAddresses) | ||
.filter(({ multiaddrs }) => multiaddrs.length) | ||
const closerPeers = closer | ||
.map((closer) => ({ | ||
id: closer.id, | ||
multiaddrs: (this._peerStore.addressBook.get(closer.id) || []).map(address => address.multiaddr) | ||
})) | ||
.map(this._lan ? removePublicAddresses : removePrivateAddresses) | ||
.filter(({ multiaddrs }) => multiaddrs.length) | ||
if (has) { | ||
providerPeers.push({ | ||
id: dht.peerId, | ||
multiaddrs: [] | ||
const mapper = this._lan ? removePublicAddresses : removePrivateAddresses | ||
const ourRecord = mapper({ | ||
id: this._peerId, | ||
multiaddrs: this._addressable.multiaddrs | ||
}) | ||
if (ourRecord.multiaddrs.length) { | ||
providerPeers.push(ourRecord) | ||
} | ||
} | ||
@@ -73,4 +109,4 @@ | ||
} | ||
} | ||
return getProviders | ||
} | ||
module.exports.GetProvidersHandler = GetProvidersHandler |
'use strict' | ||
const { Record } = require('libp2p-record') | ||
const errcode = require('err-code') | ||
const Message = require('../../message') | ||
const { Message } = require('../../message') | ||
const { | ||
MAX_RECORD_AGE | ||
} = require('../../constants') | ||
const utils = require('../../utils') | ||
const log = utils.logger('libp2p:kad-dht:rpc:handlers:get-value') | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @param {import('../../index')} dht | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
module.exports = (dht) => { | ||
const log = utils.logger(dht.peerId, 'rpc:get-value') | ||
class GetValueHandler { | ||
/** | ||
* @param {object} params | ||
* @param {PeerId} params.peerId | ||
* @param {import('../../types').PeerStore} params.peerStore | ||
* @param {import('../../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
*/ | ||
constructor ({ peerId, peerStore, peerRouting, datastore }) { | ||
this._peerId = peerId | ||
this._peerStore = peerStore | ||
this._peerRouting = peerRouting | ||
this._datastore = datastore | ||
} | ||
@@ -25,8 +41,7 @@ /** | ||
* @param {Message} msg | ||
* @returns {Promise<Message>} | ||
*/ | ||
async function getValue (peerId, msg) { | ||
async handle (peerId, msg) { | ||
const key = msg.key | ||
log('key: %b', key) | ||
log('%p asked for key %b', peerId, key) | ||
@@ -44,6 +59,6 @@ if (!key || key.length === 0) { | ||
if (dht._isSelf(idFromKey)) { | ||
id = dht.peerId | ||
if (this._peerId.equals(idFromKey)) { | ||
id = this._peerId | ||
} else { | ||
const peerData = dht.peerStore.get(idFromKey) | ||
const peerData = this._peerStore.get(idFromKey) | ||
id = peerData && peerData.id | ||
@@ -60,8 +75,8 @@ } | ||
const [record, closer] = await Promise.all([ | ||
dht._checkLocalDatastore(key), | ||
dht._betterPeersToQuery(msg, peerId) | ||
this._checkLocalDatastore(key), | ||
this._peerRouting.getCloserPeersOffline(msg.key, peerId) | ||
]) | ||
if (record) { | ||
log('got record') | ||
log('had record for %b in local datastore', key) | ||
response.record = record | ||
@@ -71,3 +86,3 @@ } | ||
if (closer.length > 0) { | ||
log('got closer %s', closer.length) | ||
log('had %s closer peers in routing table', closer.length) | ||
response.closerPeers = closer | ||
@@ -79,3 +94,45 @@ } | ||
return getValue | ||
/** | ||
* Try to fetch a given record by from the local datastore. | ||
* Returns the record iff it is still valid, meaning | ||
* - it was either authored by this node, or | ||
* - it was received less than `MAX_RECORD_AGE` ago. | ||
* | ||
* @param {Uint8Array} key | ||
*/ | ||
async _checkLocalDatastore (key) { | ||
log('checkLocalDatastore looking for %b', key) | ||
const dsKey = utils.bufferToKey(key) | ||
// Fetch value from ds | ||
let rawRecord | ||
try { | ||
rawRecord = await this._datastore.get(dsKey) | ||
} catch (/** @type {any} */ err) { | ||
if (err.code === 'ERR_NOT_FOUND') { | ||
return undefined | ||
} | ||
throw err | ||
} | ||
// Create record from the returned bytes | ||
const record = Record.deserialize(rawRecord) | ||
if (!record) { | ||
throw errcode(new Error('Invalid record'), 'ERR_INVALID_RECORD') | ||
} | ||
// Check validity: compare time received with max record age | ||
if (record.timeReceived == null || | ||
Date.now() - record.timeReceived.getTime() > MAX_RECORD_AGE) { | ||
// If record is bad delete it and return | ||
await this._datastore.delete(dsKey) | ||
return undefined | ||
} | ||
// Record is valid | ||
return record | ||
} | ||
} | ||
module.exports.GetValueHandler = GetValueHandler |
'use strict' | ||
const T = require('../../message').TYPES | ||
const { Message } = require('../../message') | ||
const { AddProviderHandler } = require('./add-provider') | ||
const { FindNodeHandler } = require('./find-node') | ||
const { GetProvidersHandler } = require('./get-providers') | ||
const { GetValueHandler } = require('./get-value') | ||
const { PingHandler } = require('./ping') | ||
const { PutValueHandler } = require('./put-value') | ||
/** | ||
* | ||
* @param {import('../../index')} dht | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
module.exports = (dht) => { | ||
/** | ||
* @param {object} params | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../../providers').Providers} params.providers | ||
* @param {import('../../types').PeerStore} params.peerStore | ||
* @param {import('../../types').Addressable} params.addressable | ||
* @param {import('../../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {boolean} [params.lan] | ||
*/ | ||
module.exports = ({ peerId, providers, peerStore, addressable, peerRouting, datastore, validators, lan }) => { | ||
/** @type {Record<number, DHTMessageHandler>} */ | ||
const handlers = { | ||
[T.GET_VALUE]: require('./get-value')(dht), | ||
[T.PUT_VALUE]: require('./put-value')(dht), | ||
[T.FIND_NODE]: require('./find-node')(dht), | ||
[T.ADD_PROVIDER]: require('./add-provider')(dht), | ||
[T.GET_PROVIDERS]: require('./get-providers')(dht), | ||
[T.PING]: require('./ping')(dht) | ||
[Message.TYPES.GET_VALUE]: new GetValueHandler({ peerId, peerStore, peerRouting, datastore }), | ||
[Message.TYPES.PUT_VALUE]: new PutValueHandler({ validators, datastore }), | ||
[Message.TYPES.FIND_NODE]: new FindNodeHandler({ peerId, addressable, peerRouting, lan }), | ||
[Message.TYPES.ADD_PROVIDER]: new AddProviderHandler({ peerId, providers, peerStore }), | ||
[Message.TYPES.GET_PROVIDERS]: new GetProvidersHandler({ peerId, peerRouting, providers, datastore, peerStore, addressable, lan }), | ||
[Message.TYPES.PING]: new PingHandler() | ||
} | ||
@@ -25,3 +43,2 @@ | ||
function getMessageHandler (type) { | ||
// @ts-ignore ts does not aknowledge number as an index type | ||
return handlers[type] | ||
@@ -28,0 +45,0 @@ } |
'use strict' | ||
const utils = require('../../utils') | ||
const log = utils.logger('libp2p:kad-dht:rpc:handlers:ping') | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../../message')} Message | ||
* @typedef {import('../../message').Message} Message | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @param {import('../../index')} dht | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
module.exports = (dht) => { | ||
const log = utils.logger(dht.peerId, 'rpc:ping') | ||
class PingHandler { | ||
/** | ||
@@ -22,8 +22,8 @@ * Process `Ping` DHT messages. | ||
*/ | ||
function ping (peerId, msg) { | ||
log('from %s', peerId.toB58String()) | ||
async handle (peerId, msg) { | ||
log(`ping from ${peerId}`) | ||
return msg | ||
} | ||
} | ||
return ping | ||
} | ||
module.exports.PingHandler = PingHandler |
@@ -5,13 +5,24 @@ 'use strict' | ||
const errcode = require('err-code') | ||
const Libp2pRecord = require('libp2p-record') | ||
const log = utils.logger('libp2p:kad-dht:rpc:handlers:put-value') | ||
/** | ||
* @typedef {import('peer-id')} PeerId | ||
* @typedef {import('../../message')} Message | ||
* @typedef {import('../../message').Message} Message | ||
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler | ||
*/ | ||
/** | ||
* @param {import('../../index')} dht | ||
* @implements {DHTMessageHandler} | ||
*/ | ||
module.exports = (dht) => { | ||
const log = utils.logger(dht.peerId, 'rpc:put-value') | ||
class PutValueHandler { | ||
/** | ||
* @param {object} params | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
*/ | ||
constructor ({ validators, datastore }) { | ||
this._validators = validators | ||
this._datastore = datastore | ||
} | ||
@@ -24,5 +35,5 @@ /** | ||
*/ | ||
async function putValue (peerId, msg) { | ||
async handle (peerId, msg) { | ||
const key = msg.key | ||
log('key: %b', key) | ||
log('%p asked to store value for key %b', peerId, key) | ||
@@ -38,14 +49,12 @@ const record = msg.record | ||
await dht._verifyRecordLocally(record) | ||
await Libp2pRecord.validator.verifyRecord(this._validators, record) | ||
record.timeReceived = new Date() | ||
const recordKey = utils.bufferToKey(record.key) | ||
await dht.datastore.put(recordKey, record.serialize()) | ||
await this._datastore.put(recordKey, record.serialize()) | ||
dht.onPut(record, peerId) | ||
return msg | ||
} | ||
} | ||
return putValue | ||
} | ||
module.exports.PutValueHandler = PutValueHandler |
@@ -6,6 +6,8 @@ 'use strict' | ||
const Message = require('../message') | ||
const { Message, MESSAGE_TYPE_LOOKUP } = require('../message') | ||
const handlers = require('./handlers') | ||
const utils = require('../utils') | ||
const log = utils.logger('libp2p:kad-dht:rpc') | ||
/** | ||
@@ -17,7 +19,21 @@ * @typedef {import('peer-id')} PeerId | ||
/** | ||
* @param {import('../index')} dht | ||
* @param {import('../types').DHT} dht | ||
*/ | ||
module.exports = (dht) => { | ||
const log = utils.logger(dht.peerId, 'rpc') | ||
const getMessageHandler = handlers(dht) | ||
class RPC { | ||
/** | ||
* @param {object} params | ||
* @param {import('../routing-table').RoutingTable} params.routingTable | ||
* @param {import('peer-id')} params.peerId | ||
* @param {import('../providers').Providers} params.providers | ||
* @param {import('../types').PeerStore} params.peerStore | ||
* @param {import('../types').Addressable} params.addressable | ||
* @param {import('../peer-routing').PeerRouting} params.peerRouting | ||
* @param {import('interface-datastore').Datastore} params.datastore | ||
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators | ||
* @param {boolean} [params.lan] | ||
*/ | ||
constructor (params) { | ||
this._messageHandler = handlers(params) | ||
this._routingTable = params.routingTable | ||
} | ||
@@ -30,9 +46,9 @@ /** | ||
*/ | ||
async function handleMessage (peerId, msg) { | ||
async handleMessage (peerId, msg) { | ||
// get handler & execute it | ||
const handler = getMessageHandler(msg.type) | ||
const handler = this._messageHandler(msg.type) | ||
try { | ||
await dht._add(peerId) | ||
} catch (err) { | ||
await this._routingTable.add(peerId) | ||
} catch (/** @type {any} */ err) { | ||
log.error('Failed to update the kbucket store', err) | ||
@@ -46,3 +62,3 @@ } | ||
return handler(peerId, msg) | ||
return handler.handle(peerId, msg) | ||
} | ||
@@ -57,13 +73,12 @@ | ||
*/ | ||
async function onIncomingStream ({ stream, connection }) { | ||
async onIncomingStream ({ stream, connection }) { | ||
const peerId = connection.remotePeer | ||
try { | ||
await dht._add(peerId) | ||
} catch (err) { | ||
await this._routingTable.add(peerId) | ||
} catch (/** @type {any} */ err) { | ||
log.error(err) | ||
} | ||
const idB58Str = peerId.toB58String() | ||
log('from: %s', idB58Str) | ||
const self = this | ||
@@ -80,3 +95,4 @@ await pipe( | ||
const desMessage = Message.deserialize(msg.slice()) | ||
const res = await handleMessage(peerId, desMessage) | ||
log('incoming %s from %p', MESSAGE_TYPE_LOOKUP[desMessage.type], peerId) | ||
const res = await self.handleMessage(peerId, desMessage) | ||
@@ -93,4 +109,4 @@ // Not all handlers will return a response | ||
} | ||
} | ||
return onIncomingStream | ||
} | ||
module.exports.RPC = RPC |
232
src/utils.js
@@ -6,15 +6,53 @@ 'use strict' | ||
const { base58btc } = require('multiformats/bases/base58') | ||
const { base32 } = require('multiformats/bases/base32') | ||
const { Key } = require('interface-datastore/key') | ||
const { xor: uint8ArrayXor } = require('uint8arrays/xor') | ||
const { compare: uint8ArrayCompare } = require('uint8arrays/compare') | ||
const pMap = require('p-map') | ||
const { Record } = require('libp2p-record') | ||
const PeerId = require('peer-id') | ||
const errcode = require('err-code') | ||
const { fromString: uint8ArrayFromString } = require('uint8arrays/from-string') | ||
const { toString: uint8ArrayToString } = require('uint8arrays/to-string') | ||
const { concat: uint8ArrayConcat } = require('uint8arrays/concat') | ||
const pTimeout = require('p-timeout') | ||
const isPrivateIp = require('private-ip') | ||
// const IPNS_PREFIX = uint8ArrayFromString('/ipns/') | ||
const PK_PREFIX = uint8ArrayFromString('/pk/') | ||
/** | ||
* @param {import('./types').PeerData} peer | ||
*/ | ||
function removePrivateAddresses ({ id, multiaddrs }) { | ||
return { | ||
id, | ||
multiaddrs: multiaddrs.filter(multiaddr => { | ||
const [[type, addr]] = multiaddr.stringTuples() | ||
if (type !== 4 && type !== 6) { | ||
return false | ||
} | ||
// @ts-expect-error types are wrong https://github.com/frenchbread/private-ip/issues/18 | ||
return !isPrivateIp(addr) | ||
}) | ||
} | ||
} | ||
/** | ||
* @param {import('./types').PeerData} peer | ||
*/ | ||
function removePublicAddresses ({ id, multiaddrs }) { | ||
return { | ||
id, | ||
multiaddrs: multiaddrs.filter(multiaddr => { | ||
const [[type, addr]] = multiaddr.stringTuples() | ||
if (type !== 4 && type !== 6) { | ||
return false | ||
} | ||
// @ts-expect-error types are wrong https://github.com/frenchbread/private-ip/issues/18 | ||
return isPrivateIp(addr) | ||
}) | ||
} | ||
} | ||
/** | ||
* Creates a DHT ID by hashing a given Uint8Array. | ||
@@ -25,3 +63,3 @@ * | ||
*/ | ||
exports.convertBuffer = async (buf) => { | ||
const convertBuffer = async (buf) => { | ||
return (await sha256.digest(buf)).digest | ||
@@ -36,3 +74,3 @@ } | ||
*/ | ||
exports.convertPeerId = async (peer) => { | ||
const convertPeerId = async (peer) => { | ||
return (await sha256.digest(peer.id)).digest | ||
@@ -47,4 +85,4 @@ } | ||
*/ | ||
exports.bufferToKey = (buf) => { | ||
return new Key('/' + exports.encodeBase32(buf), false) | ||
const bufferToKey = (buf) => { | ||
return new Key('/' + uint8ArrayToString(buf, 'base32'), false) | ||
} | ||
@@ -58,5 +96,5 @@ | ||
*/ | ||
exports.keyForPublicKey = (peer) => { | ||
const keyForPublicKey = (peer) => { | ||
return uint8ArrayConcat([ | ||
uint8ArrayFromString('/pk/'), | ||
PK_PREFIX, | ||
peer.id | ||
@@ -69,3 +107,3 @@ ]) | ||
*/ | ||
exports.isPublicKeyKey = (key) => { | ||
const isPublicKeyKey = (key) => { | ||
return uint8ArrayToString(key.slice(0, 4)) === '/pk/' | ||
@@ -77,76 +115,14 @@ } | ||
*/ | ||
exports.fromPublicKeyKey = (key) => { | ||
return new PeerId(key.slice(4)) | ||
const isIPNSKey = (key) => { | ||
return uint8ArrayToString(key.slice(0, 4)) === '/ipns/' | ||
} | ||
/** | ||
* Get the current time as timestamp. | ||
* | ||
* @returns {number} | ||
* @param {Uint8Array} key | ||
*/ | ||
exports.now = () => { | ||
return Date.now() | ||
const fromPublicKeyKey = (key) => { | ||
return new PeerId(key.slice(4)) | ||
} | ||
/** | ||
* Encode a given Uint8Array into a base32 string. | ||
* | ||
* @param {Uint8Array} buf | ||
* @returns {string} | ||
*/ | ||
exports.encodeBase32 = (buf) => { | ||
return uint8ArrayToString(buf, 'base32') | ||
} | ||
/** | ||
* Decode a given base32 string into a Uint8Array. | ||
* | ||
* @param {string} raw | ||
* @returns {Uint8Array} | ||
*/ | ||
exports.decodeBase32 = (raw) => { | ||
return uint8ArrayFromString(raw, 'base32') | ||
} | ||
/** | ||
* Sort peers by distance to the given `target`. | ||
* | ||
* @param {Array<PeerId>} peers | ||
* @param {Uint8Array} target | ||
*/ | ||
exports.sortClosestPeers = async (peers, target) => { | ||
const distances = await pMap(peers, async (peer) => { | ||
const id = await exports.convertPeerId(peer) | ||
return { | ||
peer: peer, | ||
distance: uint8ArrayXor(id, target) | ||
} | ||
}) | ||
return distances.sort(exports.xorCompare).map((d) => d.peer) | ||
} | ||
/** | ||
* Compare function to sort an array of elements which have a distance property which is the xor distance to a given element. | ||
* | ||
* @param {{ distance: Uint8Array }} a | ||
* @param {{ distance: Uint8Array }} b | ||
*/ | ||
exports.xorCompare = (a, b) => { | ||
return uint8ArrayCompare(a.distance, b.distance) | ||
} | ||
/** | ||
* Computes how many results to collect on each disjoint path, rounding up. | ||
* This ensures that we look for at least one result per path. | ||
* | ||
* @param {number} resultsWanted | ||
* @param {number} numPaths - total number of paths | ||
*/ | ||
exports.pathSize = (resultsWanted, numPaths) => { | ||
return Math.ceil(resultsWanted / numPaths) | ||
} | ||
/** | ||
* Create a new put record, encodes and signs it if enabled. | ||
@@ -158,3 +134,3 @@ * | ||
*/ | ||
exports.createPutRecord = (key, value) => { | ||
const createPutRecord = (key, value) => { | ||
const timeReceived = new Date() | ||
@@ -169,14 +145,5 @@ const rec = new Record(key, value, timeReceived) | ||
* | ||
* @param {PeerId} [id] | ||
* @param {string} [subsystem] | ||
* @param {string} name | ||
*/ | ||
exports.logger = (id, subsystem) => { | ||
const name = ['libp2p', 'dht'] | ||
if (subsystem) { | ||
name.push(subsystem) | ||
} | ||
if (id) { | ||
name.push(`${id.toB58String().slice(0, 8)}`) | ||
} | ||
const logger = (name) => { | ||
// Add a formatter for converting to a base58 string | ||
@@ -187,68 +154,31 @@ debug.formatters.b = (v) => { | ||
const logger = Object.assign(debug(name.join(':')), { | ||
error: debug(name.concat(['error']).join(':')) | ||
}) | ||
// Add a formatter for converting to a base58 string | ||
debug.formatters.t = (v) => { | ||
return base32.baseEncode(v) | ||
} | ||
return logger | ||
} | ||
exports.TimeoutError = class TimeoutError extends Error { | ||
get code () { | ||
return 'ETIMEDOUT' | ||
// Add a formatter for stringifying peer ids | ||
debug.formatters.p = (p) => { | ||
return p.toB58String() | ||
} | ||
} | ||
/** | ||
* Creates an async function that calls the given `asyncFn` and Errors | ||
* if it does not resolve within `time` ms | ||
* | ||
* @template T | ||
* @param {(...args: any[]) => Promise<T>} asyncFn | ||
* @param {number} [time] | ||
*/ | ||
exports.withTimeout = (asyncFn, time) => { | ||
/** | ||
* @param {...any} args | ||
* @returns {Promise<T>} | ||
*/ | ||
async function timeoutFn (...args) { | ||
if (!time) { | ||
return asyncFn(...args) | ||
} | ||
const logger = Object.assign(debug(name), { | ||
error: debug(`${name}:error`) | ||
}) | ||
let res | ||
try { | ||
res = await pTimeout(asyncFn(...args), time) | ||
} catch (err) { | ||
if (err instanceof pTimeout.TimeoutError) { | ||
throw errcode(err, 'ETIMEDOUT') | ||
} | ||
throw err | ||
} | ||
return res | ||
} | ||
return timeoutFn | ||
return logger | ||
} | ||
/** | ||
* Iterates the given `asyncIterator` and runs each item through the given `asyncFn` in parallel. | ||
* Returns a promise that resolves when all items of the `asyncIterator` have been passed | ||
* through `asyncFn`. | ||
* | ||
* @template T | ||
* @template O | ||
* | ||
* @param {AsyncIterable<T>} asyncIterator | ||
* @param {(arg0: T) => Promise<O>} asyncFn | ||
*/ | ||
exports.mapParallel = async function (asyncIterator, asyncFn) { | ||
const tasks = [] | ||
for await (const item of asyncIterator) { | ||
tasks.push(asyncFn(item)) | ||
} | ||
return Promise.all(tasks) | ||
module.exports = { | ||
removePrivateAddresses, | ||
removePublicAddresses, | ||
convertBuffer, | ||
convertPeerId, | ||
bufferToKey, | ||
keyForPublicKey, | ||
isPublicKeyKey, | ||
isIPNSKey, | ||
fromPublicKeyKey, | ||
createPutRecord, | ||
logger | ||
} |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
1517497
15
106
15104
102
33
+ Addedany-signal@^2.1.2
+ Addedit-all@^1.0.5
+ Addedit-drain@^1.0.4
+ Addedit-map@^1.0.5
+ Addedit-merge@^1.0.3
+ Addedit-parallel@^2.0.1
+ Addedit-take@^1.0.2
+ Addedp-defer@^3.0.0
+ Addedprivate-ip@^2.3.3
+ Addedany-signal@2.1.2(transitive)
+ Addedipaddr.js@2.2.0(transitive)
+ Addedit-all@1.0.6(transitive)
+ Addedit-parallel@2.0.2(transitive)
+ Addednative-abort-controller@1.0.4(transitive)
+ Addednetmask@2.0.2(transitive)
+ Addedp-defer@3.0.0(transitive)
+ Addedprivate-ip@2.3.4(transitive)
+ Addedretimer@3.0.0(transitive)
+ Addedtimeout-abort-controller@2.0.0(transitive)
- Removedheap@~0.2.6
- Removedp-timeout@^4.1.0
- Removedheap@0.2.7(transitive)
- Removedp-timeout@4.1.0(transitive)