Socket
Socket
Sign inDemoInstall

libp2p-kad-dht

Package Overview
Dependencies
33
Maintainers
4
Versions
109
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.25.0 to 0.26.0

dist/src/dual-kad-dht.d.ts

1

dist/src/constants.d.ts

@@ -14,3 +14,4 @@ export var second: number;

export var ALPHA: number;
export var QUERY_SELF_INTERVAL: number;
declare const hour: number;
//# sourceMappingURL=constants.d.ts.map

107

dist/src/content-fetching/index.d.ts

@@ -1,31 +0,89 @@

declare function _exports(dht: import('../')): {
export type PeerId = import('peer-id');
export type ValueEvent = import('../types').ValueEvent;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').ValueEvent} ValueEvent
*/
export class ContentFetching {
/**
* Store the given key/value pair locally, in the datastore.
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('interface-datastore').Datastore} params.datastore
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {import('libp2p-interfaces/src/types').DhtSelectors} params.selectors
* @param {import('../peer-routing').PeerRouting} params.peerRouting
* @param {import('../query/manager').QueryManager} params.queryManager
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../network').Network} params.network
* @param {boolean} params.lan
*/
constructor({ peerId, datastore, validators, selectors, peerRouting, queryManager, routingTable, network, lan }: {
peerId: import('peer-id');
datastore: import('interface-datastore').Datastore;
validators: import('libp2p-interfaces/src/types').DhtValidators;
selectors: import('libp2p-interfaces/src/types').DhtSelectors;
peerRouting: import('../peer-routing').PeerRouting;
queryManager: import('../query/manager').QueryManager;
routingTable: import('../routing-table').RoutingTable;
network: import('../network').Network;
lan: boolean;
});
_log: debug.Debugger & {
error: debug.Debugger;
};
_peerId: import("peer-id");
_datastore: import("interface-datastore").Datastore;
_validators: import("libp2p-interfaces/src/types").DhtValidators;
_selectors: import("libp2p-interfaces/src/types").DhtSelectors;
_peerRouting: import("../peer-routing").PeerRouting;
_queryManager: import("../query/manager").QueryManager;
_routingTable: import("../routing-table").RoutingTable;
_network: import("../network").Network;
/**
* @param {Uint8Array} key
* @param {Uint8Array} rec
*/
putLocal(key: Uint8Array, rec: Uint8Array): Promise<void>;
/**
* Attempt to retrieve the value for the given key from
* the local datastore.
*
* @param {Uint8Array} key
* @param {Uint8Array} rec - encoded record
*/
_putLocal(key: Uint8Array, rec: Uint8Array): Promise<void>;
getLocal(key: Uint8Array): Promise<import("libp2p-record/dist/src/record")>;
/**
* Store the given key/value pair in the DHT.
* Send the best record found to any peers that have an out of date record.
*
* @param {Uint8Array} key
* @param {ValueEvent[]} vals - values retrieved from the DHT
* @param {Uint8Array} best - the best record that was found
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
sendCorrectionRecord(key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options?: {
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").DialingPeerEvent, void, unknown>;
/**
* Store the given key/value pair in the DHT
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
* @param {AbortSignal} [options.signal]
*/
put(key: Uint8Array, value: Uint8Array, options?: {
minPeers?: number | undefined;
} | undefined): Promise<void>;
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").ProviderEvent | import("../types").ValueEvent | import("../types").AddingPeerEvent | import("../types").DialingPeerEvent, void, undefined>;
/**
* Get the value to the given key.
* Times out after 1 minute by default.
* Get the value to the given key
*
* @param {Uint8Array} key
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
get(key: Uint8Array, options?: {
timeout?: number | undefined;
} | undefined): Promise<Uint8Array>;
signal?: AbortSignal | undefined;
queryFuncTimeout?: number | undefined;
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, unknown>;
/**

@@ -35,16 +93,11 @@ * Get the `n` values to the given key without sorting.

* @param {Uint8Array} key
* @param {number} nvals
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
getMany(key: Uint8Array, nvals: number, options?: {
timeout?: number | undefined;
} | undefined): Promise<{
val: Uint8Array;
from: import("peer-id");
}[]>;
};
export = _exports;
export type PeerId = import('peer-id');
export type DHTQueryResult = import('../query').DHTQueryResult;
getMany(key: Uint8Array, options?: {
signal?: AbortSignal | undefined;
queryFuncTimeout?: number | undefined;
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, undefined>;
}
//# sourceMappingURL=index.d.ts.map

@@ -1,8 +0,53 @@

declare function _exports(dht: import('../')): {
export type CID = import('multiformats/cid').CID;
export type PeerId = import('peer-id');
export type Multiaddr = import('multiaddr').Multiaddr;
/**
* @typedef {import('multiformats/cid').CID} CID
* @typedef {import('peer-id')} PeerId
* @typedef {import('multiaddr').Multiaddr} Multiaddr
*/
export class ContentRouting {
/**
* Announce to the network that we can provide the value for a given key
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../network').Network} params.network
* @param {import('../peer-routing').PeerRouting} params.peerRouting
* @param {import('../query/manager').QueryManager} params.queryManager
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../providers').Providers} params.providers
* @param {import('../types').PeerStore} params.peerStore
* @param {boolean} params.lan
*/
constructor({ peerId, network, peerRouting, queryManager, routingTable, providers, peerStore, lan }: {
peerId: import('peer-id');
network: import('../network').Network;
peerRouting: import('../peer-routing').PeerRouting;
queryManager: import('../query/manager').QueryManager;
routingTable: import('../routing-table').RoutingTable;
providers: import('../providers').Providers;
peerStore: import('../types').PeerStore;
lan: boolean;
});
_log: debug.Debugger & {
error: debug.Debugger;
};
_peerId: import("peer-id");
_network: import("../network").Network;
_peerRouting: import("../peer-routing").PeerRouting;
_queryManager: import("../query/manager").QueryManager;
_routingTable: import("../routing-table").RoutingTable;
_providers: import("../providers").Providers;
_peerStore: import("../types").PeerStore;
/**
* Announce to the network that we can provide the value for a given key and
* are contactable on the given multiaddrs
*
* @param {CID} key
* @param {Multiaddr[]} multiaddrs
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
provide(key: CID): Promise<void>;
provide(key: CID, multiaddrs: Multiaddr[], options?: {
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").ProviderEvent | import("../types").ValueEvent | import("../types").AddingPeerEvent | import("../types").DialingPeerEvent, void, undefined>;
/**

@@ -12,19 +57,13 @@ * Search the dht for up to `K` providers of the given CID.

* @param {CID} key
* @param {Object} [options] - findProviders options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds
* @param {object} [options] - findProviders options
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
findProviders(key: CID, options?: {
timeout?: number | undefined;
maxNumProviders?: number | undefined;
} | undefined): AsyncIterable<{
id: PeerId;
multiaddrs: Multiaddr[];
}>;
};
export = _exports;
export type CID = import('multiformats/cid').CID;
export type PeerId = import('peer-id');
export type Multiaddr = import('multiaddr').Multiaddr;
signal?: AbortSignal | undefined;
queryFuncTimeout?: number | undefined;
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, unknown>;
}
//# sourceMappingURL=index.d.ts.map

@@ -1,421 +0,5 @@

export = KadDHT;
/**
* @typedef {*} Libp2p
* @typedef {*} PeerStore
* @typedef {import('peer-id')} PeerId
* @typedef {import('interface-datastore').Datastore} Datastore
* @typedef {*} Dialer
* @typedef {*} Registrar
* @typedef {import('multiformats/cid').CID} CID
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {object} PeerData
* @property {PeerId} id
* @property {Multiaddr[]} multiaddrs
*/
/**
* A DHT implementation modeled after Kademlia with S/Kademlia modifications.
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht.
*/
declare class KadDHT extends EventEmitter {
/**
* Create a new KadDHT.
*
* @param {Object} props
* @param {Libp2p} props.libp2p - the libp2p instance
* @param {Dialer} props.dialer - libp2p dialer instance
* @param {PeerId} props.peerId - peer's peerId
* @param {PeerStore} props.peerStore - libp2p peerStore
* @param {Registrar} props.registrar - libp2p registrar instance
* @param {string} [props.protocolPrefix = '/ipfs'] - libp2p registrar handle protocol
* @param {boolean} [props.forceProtocolLegacy = false] - WARNING: this is not recommended and should only be used for legacy purposes
* @param {number} props.kBucketSize - k-bucket size (default 20)
* @param {boolean} props.clientMode - If true, the DHT will not respond to queries. This should be true if your node will not be dialable. (default: false)
* @param {number} props.concurrency - alpha concurrency of queries (default 3)
* @param {Datastore} props.datastore - datastore (default MemoryDatastore)
* @param {object} props.validators - validators object with namespace as keys and function(key, record, callback)
* @param {object} props.selectors - selectors object with namespace as keys and function(key, records)
* @param {function(import('libp2p-record').Record, PeerId): void} [props.onPut] - Called when an entry is added to or changed in the datastore
* @param {function(import('libp2p-record').Record): void} [props.onRemove] - Called when an entry is removed from the datastore
*/
constructor({ libp2p, dialer, peerId, peerStore, registrar, protocolPrefix, forceProtocolLegacy, datastore, kBucketSize, clientMode, concurrency, validators, selectors, onPut, onRemove }: {
libp2p: Libp2p;
dialer: Dialer;
peerId: PeerId;
peerStore: PeerStore;
registrar: Registrar;
protocolPrefix?: string | undefined;
forceProtocolLegacy?: boolean | undefined;
kBucketSize: number;
clientMode: boolean;
concurrency: number;
datastore: Datastore;
validators: object;
selectors: object;
onPut?: ((arg0: import("libp2p-record/dist/src/record"), arg1: PeerId) => void) | undefined;
onRemove?: ((arg0: import("libp2p-record/dist/src/record")) => void) | undefined;
});
/**
* Local reference to the libp2p instance. May be undefined.
*
* @type {Libp2p}
*/
libp2p: Libp2p;
/**
* Local reference to the libp2p dialer instance
*
* @type {Dialer}
*/
dialer: Dialer;
/**
* Local peer-id
*
* @type {PeerId}
*/
peerId: PeerId;
/**
* Local PeerStore
*
* @type {PeerStore}
*/
peerStore: PeerStore;
/**
* Local peer info
*
* @type {Registrar}
*/
registrar: Registrar;
/**
* Registrar protocol
*
* @type {string}
*/
protocol: string;
/**
* k-bucket size
*
* @type {number}
*/
kBucketSize: number;
_clientMode: boolean;
/**
* ALPHA concurrency at which each query path with run, defaults to 3
*
* @type {number}
*/
concurrency: number;
/**
* Number of disjoint query paths to use
* This is set to `kBucketSize`/2 per the S/Kademlia paper
*
* @type {number}
*/
disjointPaths: number;
/**
* The routing table.
*
* @type {RoutingTable}
*/
routingTable: RoutingTable;
/**
* Reference to the datastore, uses an in-memory store if none given.
*
* @type {Datastore}
*/
datastore: Datastore;
/**
* Provider management
*
* @type {Providers}
*/
providers: Providers;
validators: {
pk: {
func: (key: Uint8Array, publicKey: Uint8Array) => Promise<void>;
sign: boolean;
};
};
selectors: {
pk: (k: Uint8Array, records: Uint8Array[]) => number;
};
network: Network;
_log: debug.Debugger & {
error: debug.Debugger;
};
/**
* Keeps track of running queries
*
* @type {QueryManager}
*/
_queryManager: QueryManager;
_running: boolean;
contentFetching: {
_putLocal(key: Uint8Array, rec: Uint8Array): Promise<void>;
put(key: Uint8Array, value: Uint8Array, options?: {
minPeers?: number | undefined;
} | undefined): Promise<void>;
get(key: Uint8Array, options?: {
timeout?: number | undefined;
} | undefined): Promise<Uint8Array>;
getMany(key: Uint8Array, nvals: number, options?: {
timeout?: number | undefined;
} | undefined): Promise<{
val: Uint8Array;
from: import("peer-id");
}[]>;
};
contentRouting: {
provide(key: import("multiformats/cid").CID): Promise<void>;
findProviders(key: import("multiformats/cid").CID, options?: {
timeout?: number | undefined;
maxNumProviders?: number | undefined;
} | undefined): AsyncIterable<{
id: import("peer-id");
multiaddrs: import("multiaddr").Multiaddr[];
}>;
};
peerRouting: {
_findPeerSingle(peer: import("peer-id"), target: import("peer-id")): Promise<Message>;
findPeer(id: import("peer-id"), options?: {
timeout?: number | undefined;
} | undefined): Promise<{
id: import("peer-id");
multiaddrs: import("multiaddr").Multiaddr[];
}>;
getClosestPeers(key: Uint8Array, options?: {
shallow?: boolean | undefined;
} | undefined): AsyncIterable<import("peer-id")>;
getPublicKey(peer: import("peer-id")): Promise<import("libp2p-crypto").PublicKey>;
};
onPut: (arg0: import("libp2p-record/dist/src/record"), arg1: PeerId) => void;
onRemove: (arg0: import("libp2p-record/dist/src/record")) => void;
/**
* Is this DHT running.
*/
get isStarted(): boolean;
/**
* Start listening to incoming connections.
*/
start(): Promise<[void, void, void, void]>;
/**
* Stop accepting incoming connections and sending outgoing
* messages.
*/
stop(): Promise<[void, void, void, void]>;
/**
* Store the given key/value pair in the DHT.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
* @returns {Promise<void>}
*/
put(key: Uint8Array, value: Uint8Array, options?: {
minPeers?: number | undefined;
} | undefined): Promise<void>;
/**
* Get the value to the given key.
* Times out after 1 minute by default.
*
* @param {Uint8Array} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Uint8Array>}
*/
get(key: Uint8Array, options?: {
timeout?: number | undefined;
} | undefined): Promise<Uint8Array>;
/**
* Get the `n` values to the given key without sorting.
*
* @param {Uint8Array} key
* @param {number} nvals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
*/
getMany(key: Uint8Array, nvals: number, options?: {
timeout?: number | undefined;
} | undefined): Promise<{
val: Uint8Array;
from: import("peer-id");
}[]>;
/**
* Remove the given key from the local datastore.
*
* @param {Uint8Array} key
*/
removeLocal(key: Uint8Array): Promise<undefined>;
/**
* @param {Uint8Array} key
* @param {Uint8Array} value
*/
_putLocal(key: Uint8Array, value: Uint8Array): Promise<void>;
/**
* Announce to the network that we can provide given key's value.
*
* @param {CID} key
* @returns {Promise<void>}
*/
provide(key: CID): Promise<void>;
/**
* Search the dht for up to `K` providers of the given CID.
*
* @param {CID} key
* @param {Object} [options] - findProviders options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
findProviders(key: CID, options?: {
timeout?: number | undefined;
maxNumProviders?: number | undefined;
} | undefined): AsyncIterable<{
id: PeerId;
multiaddrs: Multiaddr[];
}>;
/**
* Search for a peer with the given ID.
*
* @param {PeerId} id
* @param {Object} [options] - findPeer options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000)
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
findPeer(id: PeerId, options?: {
timeout?: number | undefined;
} | undefined): Promise<{
id: PeerId;
multiaddrs: Multiaddr[];
}>;
/**
* Kademlia 'node lookup' operation.
*
* @param {Uint8Array} key
* @param {Object} [options]
* @param {boolean} [options.shallow = false] - shallow query
*/
getClosestPeers(key: Uint8Array, options?: {
shallow?: boolean | undefined;
} | undefined): AsyncGenerator<import("peer-id"), void, undefined>;
/**
* Get the public key for the given peer id.
*
* @param {PeerId} peer
*/
getPublicKey(peer: PeerId): Promise<import("libp2p-crypto").PublicKey>;
/**
* @param {PeerId} peerId
* @param {Multiaddr[]} multiaddrs
*/
_peerDiscovered(peerId: PeerId, multiaddrs: Multiaddr[]): void;
/**
* Returns the routing tables closest peers, for the key of
* the message.
*
* @param {Message} msg
*/
_nearestPeersToQuery(msg: Message): Promise<{
id: import("peer-id");
multiaddrs: import("multiaddr").Multiaddr[];
}[]>;
/**
* Get the nearest peers to the given query, but iff closer
* than self.
*
* @param {Message} msg
* @param {PeerId} peerId
*/
_betterPeersToQuery(msg: Message, peerId: PeerId): Promise<{
id: import("peer-id");
multiaddrs: import("multiaddr").Multiaddr[];
}[]>;
/**
* Try to fetch a given record by from the local datastore.
* Returns the record iff it is still valid, meaning
* - it was either authored by this node, or
* - it was received less than `MAX_RECORD_AGE` ago.
*
* @param {Uint8Array} key
*/
_checkLocalDatastore(key: Uint8Array): Promise<import("libp2p-record/dist/src/record") | undefined>;
/**
* Add the peer to the routing table and update it in the peerStore.
*
* @param {PeerId} peerId
*/
_add(peerId: PeerId): Promise<void>;
/**
* Verify a record without searching the DHT.
*
* @param {import('libp2p-record').Record} record
*/
_verifyRecordLocally(record: import("libp2p-record/dist/src/record")): Promise<void>;
/**
* Is the given peer id our PeerId?
*
* @param {PeerId} other
*/
_isSelf(other: PeerId): boolean;
/**
* Store the given key/value pair at the peer `target`.
*
* @param {Uint8Array} key
* @param {Uint8Array} rec - encoded record
* @param {PeerId} target
*/
_putValueToPeer(key: Uint8Array, rec: Uint8Array, target: PeerId): Promise<void>;
/**
* Query a particular peer for the value for the given key.
* It will either return the value or a list of closer peers.
*
* Note: The peerStore is updated with new addresses found for the given peer.
*
* @param {PeerId} peer
* @param {Uint8Array} key
*/
_getValueOrPeers(peer: PeerId, key: Uint8Array): Promise<{
record: import("libp2p-record/dist/src/record");
peers: PeerData[];
} | {
peers: PeerData[];
record?: undefined;
}>;
/**
* Get a value via rpc call for the given parameters.
*
* @param {PeerId} peer
* @param {Uint8Array} key
*/
_getValueSingle(peer: PeerId, key: Uint8Array): Promise<Message>;
/**
* Verify a record, fetching missing public keys from the network.
* Calls back with an error if the record is invalid.
*
* @param {import('libp2p-record').Record} record
* @returns {Promise<void>}
*/
_verifyRecordOnline(record: import("libp2p-record/dist/src/record")): Promise<void>;
}
declare namespace KadDHT {
export { multicodec, Libp2p, PeerStore, PeerId, Datastore, Dialer, Registrar, CID, Multiaddr, PeerData };
}
import { EventEmitter } from "events";
type Libp2p = any;
type Dialer = any;
type PeerId = import('peer-id');
type PeerStore = any;
type Registrar = any;
import RoutingTable = require("./routing-table");
type Datastore = import('interface-datastore').Datastore;
import Providers = require("./providers");
import Network = require("./network");
import QueryManager = require("./query-manager");
import Message = require("./message");
type CID = import('multiformats/cid').CID;
type Multiaddr = import('multiaddr').Multiaddr;
type PeerData = {
id: PeerId;
multiaddrs: Multiaddr[];
};
declare var multicodec: string;
export type DHT = import('./types').DHT;
export type KadDHTOps = import('./kad-dht').KadDHTOps;
declare function create(opts: import("./kad-dht").KadDHTOps): import("./types").DHT;
export {};
//# sourceMappingURL=index.d.ts.map

@@ -1,2 +0,8 @@

export = Message;
export type ConnectionType = 0 | 1 | 2 | 3 | 4;
export type PBPeer = {
id: Uint8Array;
addrs: Uint8Array[];
connection: ConnectionType;
};
export type PeerData = import('../types').PeerData;
/**

@@ -10,3 +16,3 @@ * @typedef {0|1|2|3|4} ConnectionType

*
* @typedef {import('../index').PeerData} PeerData
* @typedef {import('../types').PeerData} PeerData
*/

@@ -16,3 +22,3 @@ /**

*/
declare class Message {
export class Message {
/**

@@ -49,15 +55,11 @@ * Decode from protobuf

}
declare namespace Message {
export { MESSAGE_TYPE as TYPES, CONNECTION_TYPE as CONNECTION_TYPES, ConnectionType, PBPeer, PeerData };
export namespace Message {
export { MESSAGE_TYPE as TYPES };
export { CONNECTION_TYPE as CONNECTION_TYPES };
}
export const MESSAGE_TYPE: typeof Proto.Message.MessageType;
export const MESSAGE_TYPE_LOOKUP: string[];
import Proto = require("./dht");
type PeerData = import('../index').PeerData;
declare const MESSAGE_TYPE: typeof Proto.Message.MessageType;
declare const CONNECTION_TYPE: typeof Proto.Message.ConnectionType;
type ConnectionType = 0 | 1 | 2 | 3 | 4;
type PBPeer = {
id: Uint8Array;
addrs: Uint8Array[];
connection: ConnectionType;
};
export {};
//# sourceMappingURL=index.d.ts.map

@@ -1,5 +0,10 @@

export = Network;
export type PeerId = import('peer-id');
export type MuxedStream = import('libp2p-interfaces/src/stream-muxer/types').MuxedStream;
export type QueryEvent = import('./types').QueryEvent;
export type PeerData = import('./types').PeerData;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('./types').QueryEvent} QueryEvent
* @typedef {import('./types').PeerData} PeerData
*/

@@ -9,25 +14,31 @@ /**

*/
declare class Network {
export class Network extends EventEmitter {
/**
* Create a new network
*
* @param {import('./index')} dht
* @param {object} params
* @param {import('./types').Dialer} params.dialer
* @param {string} params.protocol
* @param {boolean} params.lan
*/
constructor(dht: import('./index'));
dht: import("./index");
readMessageTimeout: number;
constructor({ dialer, protocol, lan }: {
dialer: import('./types').Dialer;
protocol: string;
lan: boolean;
});
_log: debug.Debugger & {
error: debug.Debugger;
error: debug.Debugger; /**
* Write a message and read its response.
* If no response is received after the specified timeout
* this will error out.
*
* @param {MuxedStream} stream - the stream to use
* @param {Uint8Array} msg - the message to send
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
};
_rpc: ({ stream, connection }: {
stream: import("libp2p-interfaces/src/stream-muxer/types").MuxedStream;
connection: import("libp2p-interfaces/src/connection/connection");
}) => Promise<void>;
/**
* Registrar notifies a connection successfully with dht protocol.
*
* @param {PeerId} peerId - remote peer id
*/
_onPeerConnected(peerId: PeerId): Promise<void>;
_running: boolean;
_dialer: import("./types").Dialer;
_protocol: string;
/**

@@ -37,3 +48,2 @@ * Start the network

start(): void;
_registrarId: any;
/**

@@ -50,15 +60,12 @@ * Stop all network activity

/**
* Are all network components there?
* Send a request and record RTT for latency measurements
*
* @type {boolean}
*/
get isConnected(): boolean;
/**
* Send a request and record RTT for latency measurements.
*
* @async
* @param {PeerId} to - The peer that should receive a message
* @param {Message} msg - The message to send.
* @param {Message} msg - The message to send
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
sendRequest(to: PeerId, msg: Message): Promise<Message>;
sendRequest(to: PeerId, msg: Message, options?: {
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("./types").SendingQueryEvent | import("./types").PeerResponseEvent | import("./types").QueryErrorEvent | import("./types").DialingPeerEvent, void, unknown>;
/**

@@ -69,27 +76,35 @@ * Sends a message without expecting an answer.

* @param {Message} msg
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
sendMessage(to: PeerId, msg: Message): Promise<any>;
sendMessage(to: PeerId, msg: Message, options?: {
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("./types").SendingQueryEvent | import("./types").PeerResponseEvent | import("./types").QueryErrorEvent | import("./types").DialingPeerEvent, void, unknown>;
/**
* Write a message and read its response.
* If no response is received after the specified timeout
* this will error out.
* Write a message to the given stream
*
* @param {MuxedStream} stream - the stream to use
* @param {Uint8Array} msg - the message to send
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
_writeReadMessage(stream: MuxedStream, msg: Uint8Array): Promise<Message>;
_writeMessage(stream: MuxedStream, msg: Uint8Array, options?: {
signal?: AbortSignal | undefined;
} | undefined): Promise<void>;
/**
* Write a message to the given stream.
* Write a message and read its response.
* If no response is received after the specified timeout
* this will error out.
*
* @param {MuxedStream} stream - the stream to use
* @param {Uint8Array} msg - the message to send
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
_writeMessage(stream: MuxedStream, msg: Uint8Array): any;
_writeReadMessage(stream: MuxedStream, msg: Uint8Array, options?: {
signal?: AbortSignal | undefined;
} | undefined): Promise<Message>;
}
declare namespace Network {
export { PeerId, MuxedStream };
}
type PeerId = import('peer-id');
import Message = require("./message");
type MuxedStream = import('libp2p-interfaces/src/stream-muxer/types').MuxedStream;
import { EventEmitter } from "events";
import { Message } from "./message";
//# sourceMappingURL=network.d.ts.map
export = PeerList;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../').PeerData} PeerData
*/

@@ -10,10 +9,10 @@ /**

declare class PeerList {
/** @type {PeerData[]} */
list: PeerData[];
/** @type {PeerId[]} */
list: PeerId[];
/**
* Add a new peer. Returns `true` if it was a new one
*
* @param {PeerData} peerData
* @param {PeerId} peerId
*/
push(peerData: PeerData): boolean;
push(peerId: PeerId): boolean;
/**

@@ -28,7 +27,7 @@ * Check if this PeerData is already in here.

*/
toArray(): import("../").PeerData[];
toArray(): import("peer-id")[];
/**
* Remove the last element
*/
pop(): import("../").PeerData | undefined;
pop(): import("peer-id") | undefined;
/**

@@ -40,6 +39,5 @@ * The length of the list

declare namespace PeerList {
export { PeerId, PeerData };
export { PeerId };
}
type PeerData = import('../').PeerData;
type PeerId = import('peer-id');
//# sourceMappingURL=index.d.ts.map
export = PeerDistanceList;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../').PeerData} PeerData
*/

@@ -47,6 +46,5 @@ /**

declare namespace PeerDistanceList {
export { PeerId, PeerData };
export { PeerId };
}
type PeerId = import('peer-id');
type PeerData = import('../').PeerData;
//# sourceMappingURL=peer-distance-list.d.ts.map

@@ -1,48 +0,126 @@

declare function _exports(dht: import('../index')): {
export type Multiaddr = import('multiaddr').Multiaddr;
export type PeerData = import('../types').PeerData;
/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('../types').PeerData} PeerData
*/
export class PeerRouting {
/**
* Ask peer `peer` if they know where the peer with id `target` is.
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../types').PeerStore} params.peerStore
* @param {import('../network').Network} params.network
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {import('../query/manager').QueryManager} params.queryManager
* @param {boolean} params.lan
*/
constructor({ peerId, routingTable, peerStore, network, validators, queryManager, lan }: {
peerId: import('peer-id');
routingTable: import('../routing-table').RoutingTable;
peerStore: import('../types').PeerStore;
network: import('../network').Network;
validators: import('libp2p-interfaces/src/types').DhtValidators;
queryManager: import('../query/manager').QueryManager;
lan: boolean;
});
_peerId: PeerId;
_routingTable: import("../routing-table").RoutingTable;
_peerStore: import("../types").PeerStore;
_network: import("../network").Network;
_validators: import("libp2p-interfaces/src/types").DhtValidators;
_queryManager: import("../query/manager").QueryManager;
_log: debug.Debugger & {
error: debug.Debugger;
};
/**
* Look if we are connected to a peer with the given id.
* Returns its id and addresses, if found, otherwise `undefined`.
*
* @param {PeerId} peer
* @param {PeerId} target
* @returns {Promise<Message>}
* @private
*/
_findPeerSingle(peer: PeerId, target: PeerId): Promise<Message>;
findPeerLocal(peer: PeerId): Promise<{
id: PeerId;
multiaddrs: import("multiaddr").Multiaddr[];
} | undefined>;
/**
* Get a value via rpc call for the given parameters.
*
* @param {PeerId} peer
* @param {Uint8Array} key
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
_getValueSingle(peer: PeerId, key: Uint8Array, options?: {
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").DialingPeerEvent, void, unknown>;
/**
* Get the public key directly from a node.
*
* @param {PeerId} peer
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
getPublicKeyFromNode(peer: PeerId, options?: {
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").ValueEvent | import("../types").DialingPeerEvent, void, unknown>;
/**
* Search for a peer with the given ID.
*
* @param {PeerId} id
* @param {Object} [options] - findPeer options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
findPeer(id: PeerId, options?: {
timeout?: number | undefined;
} | undefined): Promise<{
id: PeerId;
multiaddrs: Multiaddr[];
}>;
signal?: AbortSignal | undefined;
queryFuncTimeout?: number | undefined;
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, unknown>;
/**
* Kademlia 'node lookup' operation.
* Kademlia 'node lookup' operation
*
* @param {Uint8Array} key
* @param {Object} [options]
* @param {boolean} [options.shallow=false] - shallow query
* @returns {AsyncIterable<PeerId>}
* @param {Uint8Array} key - the key to look up, could be a the bytes from a multihash or a peer ID
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
getClosestPeers(key: Uint8Array, options?: {
shallow?: boolean | undefined;
} | undefined): AsyncIterable<PeerId>;
signal?: AbortSignal | undefined;
queryFuncTimeout?: number | undefined;
} | undefined): AsyncGenerator<import("../types").QueryEvent, void, undefined>;
/**
* Get the public key for the given peer id.
* Query a particular peer for the value for the given key.
* It will either return the value or a list of closer peers.
*
* Note: The peerStore is updated with new addresses found for the given peer.
*
* @param {PeerId} peer
* @param {Uint8Array} key
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
getPublicKey(peer: PeerId): Promise<crypto.PublicKey>;
};
export = _exports;
export type Multiaddr = import('multiaddr').Multiaddr;
getValueOrPeers(peer: PeerId, key: Uint8Array, options?: {
signal?: AbortSignal | undefined;
} | undefined): AsyncGenerator<import("../types").SendingQueryEvent | import("../types").PeerResponseEvent | import("../types").QueryErrorEvent | import("../types").DialingPeerEvent, void, unknown>;
/**
* Verify a record, fetching missing public keys from the network.
* Calls back with an error if the record is invalid.
*
* @param {import('../types').DHTRecord} record
* @returns {Promise<void>}
*/
_verifyRecordOnline({ key, value, timeReceived }: import('../types').DHTRecord): Promise<void>;
/**
* Get the nearest peers to the given query, but if closer
* than self
*
* @param {Uint8Array} key
* @param {PeerId} closerThan
*/
getCloserPeersOffline(key: Uint8Array, closerThan: PeerId): Promise<{
id: PeerId;
multiaddrs: import("multiaddr").Multiaddr[];
}[]>;
}
import PeerId = require("peer-id");
import Message = require("../message");
import crypto = require("libp2p-crypto");
//# sourceMappingURL=index.d.ts.map

@@ -1,2 +0,3 @@

export = Providers;
export type CID = import('multiformats/cid').CID;
export type Datastore = import('interface-datastore').Datastore;
/**

@@ -18,13 +19,9 @@ * @typedef {import('multiformats/cid').CID} CID

*/
declare class Providers {
export class Providers {
/**
* @param {Datastore} datastore
* @param {PeerId} [self]
* @param {number} [cacheSize=256]
*/
constructor(datastore: Datastore, self?: PeerId | undefined, cacheSize?: number | undefined);
constructor(datastore: Datastore, cacheSize?: number | undefined);
datastore: import("interface-datastore").Datastore;
_log: debug.Debugger & {
error: debug.Debugger;
};
/**

@@ -92,9 +89,4 @@ * How often invalid records are cleaned. (in seconds)

}
declare namespace Providers {
export { CID, Datastore };
}
import { default as Queue } from "p-queue";
type CID = import('multiformats/cid').CID;
import PeerId = require("peer-id");
type Datastore = import('interface-datastore').Datastore;
//# sourceMappingURL=providers.d.ts.map

@@ -1,21 +0,38 @@

export = RoutingTable;
export type KBucketPeer = import('./types').KBucketPeer;
export type KBucket = import('./types').KBucket;
export type KBucketTree = import('./types').KBucketTree;
export type PeerId = import('peer-id');
/**
* @typedef {import('./types').KBucketPeer} KBucketPeer
* @typedef {import('./types').KBucket} KBucket
* @typedef {import('./types').KBucketTree} KBucketTree
* @typedef {import('peer-id')} PeerId
*/
/**
* A wrapper around `k-bucket`, to provide easy store and
* retrieval for peers.
*/
declare class RoutingTable {
export class RoutingTable {
/**
* @param {import('../')} dht
* @param {object} [options]
* @param {number} [options.kBucketSize=20]
* @param {number} [options.refreshInterval=30000]
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../types').Dialer} params.dialer
* @param {boolean} params.lan
* @param {number} [params.kBucketSize=20]
* @param {number} [params.pingTimeout=10000]
*/
constructor(dht: import('../'), { kBucketSize, refreshInterval }?: {
constructor({ peerId, dialer, kBucketSize, pingTimeout, lan }: {
peerId: import('peer-id');
dialer: import('../types').Dialer;
lan: boolean;
kBucketSize?: number | undefined;
refreshInterval?: number | undefined;
} | undefined);
peerId: PeerId;
dht: import("../");
pingTimeout?: number | undefined;
});
_log: debug.Debugger & {
error: debug.Debugger;
};
_peerId: import("peer-id");
_dialer: import("../types").Dialer;
_kBucketSize: number;
_refreshInterval: number;
_pingTimeout: number;
/** @type {KBucketTree} */

@@ -26,17 +43,11 @@ kb: KBucketTree;

/**
* To speed lookups, we seed the table with random PeerIds. This means
* when we are asked to locate a peer on the network, we can find a KadId
* that is close to the requested peer ID and query that, then network
* peers will tell us who they know who is close to the fake ID
* Called on the `ping` event from `k-bucket` when a bucket is full
* and cannot split.
*
* @param {boolean} [force=false]
*/
_refreshTable(force?: boolean | undefined): Promise<void>;
/**
* Called on the `ping` event from `k-bucket`.
* Currently this just removes the oldest contact from
* the list, without actually pinging the individual peers.
* This is the same as go does, but should probably
* be upgraded to actually ping the individual peers.
* `oldContacts.length` is defined by the `numberOfNodesToPing` param
* passed to the `k-bucket` constructor.
*
* `oldContacts` will not be empty and is the list of contacts that
* have not been contacted for the longest.
*
* @param {KBucketPeer[]} oldContacts

@@ -46,42 +57,6 @@ * @param {KBucketPeer} newContact

_onPing(oldContacts: KBucketPeer[], newContact: KBucketPeer): void;
_pingQueue: Queue<import("p-queue/dist/priority-queue").default, import("p-queue").DefaultAddOptions>;
start(): Promise<void>;
stop(): Promise<void>;
_refreshTimeoutId: NodeJS.Timeout | undefined;
/**
* @param {number} cpl
* @param {Date} lastRefresh
* @param {boolean} force
*/
_refreshCommonPrefixLength(cpl: number, lastRefresh: Date, force: boolean): Promise<void>;
/**
* @param {number} maxCommonPrefix
*/
_getTrackedCommonPrefixLengthsForRefresh(maxCommonPrefix: number): Date[];
/**
*
* @param {number} targetCommonPrefixLength
*/
_generateRandomPeerId(targetCommonPrefixLength: number): Promise<PeerId>;
/**
* @param {Uint8Array} localKadId
* @param {number} randomPrefix
* @param {number} targetCommonPrefixLength
*/
_makePeerId(localKadId: Uint8Array, randomPrefix: number, targetCommonPrefixLength: number): Promise<Uint8Array>;
/**
* returns the maximum common prefix length between any peer in the table
* and the current peer
*/
_maxCommonPrefix(): number;
/**
* Returns the number of peers in the table with a given prefix length
*
* @param {number} prefixLength
*/
_numPeersForCpl(prefixLength: number): number;
/**
* Yields the common prefix length of every peer in the table
*/
_prefixLengths(): Generator<number, void, unknown>;
/**
* Amount of currently stored peers.

@@ -95,3 +70,3 @@ */

*/
find(peer: PeerId): Promise<PeerId | undefined>;
find(peer: PeerId): Promise<import("peer-id") | undefined>;
/**

@@ -102,3 +77,3 @@ * Retrieve the closest peers to the given key.

*/
closestPeer(key: Uint8Array): PeerId | undefined;
closestPeer(key: Uint8Array): import("peer-id") | undefined;
/**

@@ -108,5 +83,5 @@ * Retrieve the `count`-closest peers to the given key.

* @param {Uint8Array} key
* @param {number} count
* @param {number} [count] - defaults to kBucketSize
*/
closestPeers(key: Uint8Array, count: number): PeerId[];
closestPeers(key: Uint8Array, count?: number | undefined): import("peer-id")[];
/**

@@ -125,28 +100,3 @@ * Add or update the routing table with the given peer.

}
declare namespace RoutingTable {
export { KBucketPeer, KBucket, KBucketTree };
}
import PeerId = require("peer-id");
type KBucketTree = {
root: KBucket;
localNodeId: Uint8Array;
on: (event: string, callback: Function) => void;
closest: (key: Uint8Array, count: number) => KBucketPeer[];
closestPeer: (key: Uint8Array) => KBucketPeer;
remove: (key: Uint8Array) => void;
add: (peer: KBucketPeer) => void;
count: () => number;
toIterable: () => Iterable<KBucket>;
};
type KBucketPeer = {
id: Uint8Array;
peer: PeerId;
};
type KBucket = {
id: Uint8Array;
contacts: KBucketPeer[];
dontSplit: boolean;
left: KBucket;
right: KBucket;
};
import { default as Queue } from "p-queue";
//# sourceMappingURL=index.d.ts.map

@@ -1,5 +0,33 @@

declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<void>;
export = _exports;
export type PeerId = import('peer-id');
export type Message = import('../../message');
export type Message = import('../../message').Message;
export type DHTMessageHandler = import('../types').DHTMessageHandler;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message').Message} Message
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @implements {DHTMessageHandler}
*/
export class AddProviderHandler implements DHTMessageHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../providers').Providers} params.providers
* @param {import('../../types').PeerStore} params.peerStore
*/
constructor({ peerId, providers, peerStore }: {
peerId: PeerId;
providers: import('../../providers').Providers;
peerStore: import('../../types').PeerStore;
});
_peerId: import("peer-id");
_providers: import("../../providers").Providers;
_peerStore: import("../../types").PeerStore;
/**
* @param {PeerId} peerId
* @param {Message} msg
*/
handle(peerId: PeerId, msg: Message): Promise<undefined>;
}
//# sourceMappingURL=add-provider.d.ts.map

@@ -1,5 +0,37 @@

declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<Message>;
export = _exports;
export type PeerId = import('peer-id');
import Message = require("../../message");
export type DHTMessageHandler = import('../types').DHTMessageHandler;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @implements {DHTMessageHandler}
*/
export class FindNodeHandler implements DHTMessageHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../types').Addressable} params.addressable
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {boolean} [params.lan]
*/
constructor({ peerId, addressable, peerRouting, lan }: {
peerId: PeerId;
addressable: import('../../types').Addressable;
peerRouting: import('../../peer-routing').PeerRouting;
lan?: boolean | undefined;
});
_peerId: import("peer-id");
_addressable: import("../../types").Addressable;
_peerRouting: import("../../peer-routing").PeerRouting;
_lan: boolean;
/**
* Process `FindNode` DHT messages
*
* @param {PeerId} peerId
* @param {Message} msg
*/
handle(peerId: PeerId, msg: Message): Promise<Message>;
}
import { Message } from "../../message";
//# sourceMappingURL=find-node.d.ts.map

@@ -1,5 +0,46 @@

declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<Message>;
export = _exports;
export type PeerId = import('peer-id');
import Message = require("../../message");
export type DHTMessageHandler = import('../types').DHTMessageHandler;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @implements {DHTMessageHandler}
*/
export class GetProvidersHandler implements DHTMessageHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('../../providers').Providers} params.providers
* @param {import('interface-datastore').Datastore} params.datastore
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('../../types').Addressable} params.addressable
* @param {boolean} [params.lan]
*/
constructor({ peerId, peerRouting, providers, datastore, peerStore, addressable, lan }: {
peerId: PeerId;
peerRouting: import('../../peer-routing').PeerRouting;
providers: import('../../providers').Providers;
datastore: import('interface-datastore').Datastore;
peerStore: import('../../types').PeerStore;
addressable: import('../../types').Addressable;
lan?: boolean | undefined;
});
_peerId: import("peer-id");
_peerRouting: import("../../peer-routing").PeerRouting;
_providers: import("../../providers").Providers;
_datastore: import("interface-datastore").Datastore;
_peerStore: import("../../types").PeerStore;
_addressable: import("../../types").Addressable;
_lan: boolean;
/**
* Process `GetProviders` DHT messages.
*
* @param {PeerId} peerId
* @param {Message} msg
*/
handle(peerId: PeerId, msg: Message): Promise<Message>;
}
import { Message } from "../../message";
//# sourceMappingURL=get-providers.d.ts.map

@@ -1,5 +0,46 @@

declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<Message>;
export = _exports;
export type PeerId = import('peer-id');
import Message = require("../../message");
export type DHTMessageHandler = import('../types').DHTMessageHandler;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @implements {DHTMessageHandler}
*/
export class GetValueHandler implements DHTMessageHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.datastore
*/
constructor({ peerId, peerStore, peerRouting, datastore }: {
peerId: PeerId;
peerStore: import('../../types').PeerStore;
peerRouting: import('../../peer-routing').PeerRouting;
datastore: import('interface-datastore').Datastore;
});
_peerId: import("peer-id");
_peerStore: import("../../types").PeerStore;
_peerRouting: import("../../peer-routing").PeerRouting;
_datastore: import("interface-datastore").Datastore;
/**
* Process `GetValue` DHT messages.
*
* @param {PeerId} peerId
* @param {Message} msg
*/
handle(peerId: PeerId, msg: Message): Promise<Message>;
/**
* Try to fetch a given record by from the local datastore.
* Returns the record iff it is still valid, meaning
* - it was either authored by this node, or
* - it was received less than `MAX_RECORD_AGE` ago.
*
* @param {Uint8Array} key
*/
_checkLocalDatastore(key: Uint8Array): Promise<import("libp2p-record/dist/src/record") | undefined>;
}
import { Message } from "../../message";
//# sourceMappingURL=get-value.d.ts.map

@@ -1,3 +0,13 @@

declare function _exports(dht: import('../../index')): (type: number) => any;
declare function _exports({ peerId, providers, peerStore, addressable, peerRouting, datastore, validators, lan }: {
peerId: import('peer-id');
providers: import('../../providers').Providers;
peerStore: import('../../types').PeerStore;
addressable: import('../../types').Addressable;
peerRouting: import('../../peer-routing').PeerRouting;
datastore: import('interface-datastore').Datastore;
validators: import('libp2p-interfaces/src/types').DhtValidators;
lan?: boolean | undefined;
}): (type: number) => import("../types").DHTMessageHandler;
export = _exports;
export type DHTMessageHandler = import('../types').DHTMessageHandler;
//# sourceMappingURL=index.d.ts.map

@@ -1,5 +0,21 @@

declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => import("../../message");
export = _exports;
export type PeerId = import('peer-id');
export type Message = import('../../message');
export type Message = import('../../message').Message;
export type DHTMessageHandler = import('../types').DHTMessageHandler;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message').Message} Message
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @implements {DHTMessageHandler}
*/
export class PingHandler implements DHTMessageHandler {
/**
* Process `Ping` DHT messages.
*
* @param {PeerId} peerId
* @param {Message} msg
*/
handle(peerId: PeerId, msg: Message): Promise<import("../../message").Message>;
}
//# sourceMappingURL=ping.d.ts.map

@@ -1,5 +0,32 @@

declare function _exports(dht: import('../../index')): (peerId: PeerId, msg: Message) => Promise<import("../../message")>;
export = _exports;
export type PeerId = import('peer-id');
export type Message = import('../../message');
export type Message = import('../../message').Message;
export type DHTMessageHandler = import('../types').DHTMessageHandler;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message').Message} Message
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @implements {DHTMessageHandler}
*/
export class PutValueHandler implements DHTMessageHandler {
/**
* @param {object} params
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {import('interface-datastore').Datastore} params.datastore
*/
constructor({ validators, datastore }: {
validators: import('libp2p-interfaces/src/types').DhtValidators;
datastore: import('interface-datastore').Datastore;
});
_validators: import("libp2p-interfaces/src/types").DhtValidators;
_datastore: import("interface-datastore").Datastore;
/**
* Process `PutValue` DHT messages.
*
* @param {PeerId} peerId
* @param {Message} msg
*/
handle(peerId: PeerId, msg: Message): Promise<import("../../message").Message>;
}
//# sourceMappingURL=put-value.d.ts.map

@@ -1,8 +0,56 @@

declare function _exports(dht: import('../index')): ({ stream, connection }: {
stream: MuxedStream;
connection: import("libp2p-interfaces/src/connection/connection");
}) => Promise<void>;
export = _exports;
export type PeerId = import('peer-id');
export type MuxedStream = import('libp2p-interfaces/src/stream-muxer/types').MuxedStream;
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
*/
/**
* @param {import('../types').DHT} dht
*/
export class RPC {
/**
* @param {object} params
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('peer-id')} params.peerId
* @param {import('../providers').Providers} params.providers
* @param {import('../types').PeerStore} params.peerStore
* @param {import('../types').Addressable} params.addressable
* @param {import('../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.datastore
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {boolean} [params.lan]
*/
constructor(params: {
routingTable: import('../routing-table').RoutingTable;
peerId: import('peer-id');
providers: import('../providers').Providers;
peerStore: import('../types').PeerStore;
addressable: import('../types').Addressable;
peerRouting: import('../peer-routing').PeerRouting;
datastore: import('interface-datastore').Datastore;
validators: import('libp2p-interfaces/src/types').DhtValidators;
lan?: boolean | undefined;
});
_messageHandler: (type: number) => import("./types").DHTMessageHandler;
_routingTable: import("../routing-table").RoutingTable;
/**
* Process incoming DHT messages.
*
* @param {PeerId} peerId
* @param {Message} msg
*/
handleMessage(peerId: PeerId, msg: Message): Promise<Message | undefined>;
/**
* Handle incoming streams on the dht protocol
*
* @param {object} props
* @param {MuxedStream} props.stream
* @param {import('libp2p-interfaces/src/connection').Connection} props.connection
*/
onIncomingStream({ stream, connection }: {
stream: MuxedStream;
connection: import("libp2p-interfaces/src/connection/connection");
}): Promise<void>;
}
import { Message } from "../message";
//# sourceMappingURL=index.d.ts.map

@@ -0,29 +1,74 @@

/**
* @param {import('./types').PeerData} peer
*/
export function removePrivateAddresses({ id, multiaddrs }: import('./types').PeerData): {
id: PeerId;
multiaddrs: import("multiaddr").Multiaddr[];
};
/**
* @param {import('./types').PeerData} peer
*/
export function removePublicAddresses({ id, multiaddrs }: import('./types').PeerData): {
id: PeerId;
multiaddrs: import("multiaddr").Multiaddr[];
};
/**
* Creates a DHT ID by hashing a given Uint8Array.
*
* @param {Uint8Array} buf
* @returns {Promise<Uint8Array>}
*/
export function convertBuffer(buf: Uint8Array): Promise<Uint8Array>;
/**
* Creates a DHT ID by hashing a Peer ID
*
* @param {PeerId} peer
* @returns {Promise<Uint8Array>}
*/
export function convertPeerId(peer: PeerId): Promise<Uint8Array>;
/**
* Convert a Uint8Array to their SHA2-256 hash.
*
* @param {Uint8Array} buf
* @returns {Key}
*/
export function bufferToKey(buf: Uint8Array): Key;
/**
* Generate the key for a public key.
*
* @param {PeerId} peer
* @returns {Uint8Array}
*/
export function keyForPublicKey(peer: PeerId): Uint8Array;
/**
* @param {Uint8Array} key
*/
export function isPublicKeyKey(key: Uint8Array): boolean;
/**
* @param {Uint8Array} key
*/
export function isIPNSKey(key: Uint8Array): boolean;
/**
* @param {Uint8Array} key
*/
export function fromPublicKeyKey(key: Uint8Array): PeerId;
export function now(): number;
export function encodeBase32(buf: Uint8Array): string;
export function decodeBase32(raw: string): Uint8Array;
export function sortClosestPeers(peers: Array<PeerId>, target: Uint8Array): Promise<PeerId[]>;
export function xorCompare(a: {
distance: Uint8Array;
}, b: {
distance: Uint8Array;
}): 0 | 1 | -1;
export function pathSize(resultsWanted: number, numPaths: number): number;
/**
* Create a new put record, encodes and signs it if enabled.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @returns {Uint8Array}
*/
export function createPutRecord(key: Uint8Array, value: Uint8Array): Uint8Array;
export function logger(id?: PeerId | undefined, subsystem?: string | undefined): debug.Debugger & {
/**
* Creates a logger for the given subsystem
*
* @param {string} name
*/
export function logger(name: string): debug.Debugger & {
error: debug.Debugger;
};
export function withTimeout<T>(asyncFn: (...args: any[]) => Promise<T>, time?: number | undefined): (...args: any[]) => Promise<T>;
export function mapParallel<T, O>(asyncIterator: AsyncIterable<T>, asyncFn: (arg0: T) => Promise<O>): Promise<O[]>;
import PeerId = require("peer-id");
import { Key } from "interface-datastore/key";
import debug = require("debug");
export class TimeoutError extends Error {
get code(): string;
}
//# sourceMappingURL=utils.d.ts.map
{
"name": "libp2p-kad-dht",
"version": "0.25.0",
"version": "0.26.0",
"description": "JavaScript implementation of the Kad-DHT for libp2p",

@@ -8,4 +8,4 @@ "leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",

"scripts": {
"prepare": "npm run build",
"lint": "aegir ts -p check && aegir lint",
"prepare": "npm run build",
"build": "npm run build:proto && npm run build:proto-types && aegir build",

@@ -22,3 +22,4 @@ "build:proto": "pbjs -t static-module -w commonjs -r libp2p-dht-message --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/message/dht.js ./src/message/dht.proto",

"coverage-publish": "aegir-coverage publish",
"sim": "node test/simulation/index.js"
"sim": "node test/simulation/index.js",
"dep-check": "aegir dep-check"
},

@@ -52,2 +53,3 @@ "files": [

"dependencies": {
"any-signal": "^2.1.2",
"datastore-core": "^6.0.7",

@@ -57,8 +59,13 @@ "debug": "^4.3.1",

"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "^6.0.2",
"it-all": "^1.0.5",
"it-drain": "^1.0.4",
"it-first": "^1.0.4",
"it-length": "^1.0.3",
"it-length-prefixed": "^5.0.2",
"it-map": "^1.0.5",
"it-merge": "^1.0.3",
"it-parallel": "^2.0.1",
"it-pipe": "^1.1.0",
"it-take": "^1.0.2",
"k-bucket": "^5.1.0",

@@ -70,8 +77,11 @@ "libp2p-crypto": "^0.19.5",

"multiformats": "^9.4.5",
"native-abort-controller": "^1.0.4",
"p-defer": "^3.0.0",
"p-map": "^4.0.0",
"p-queue": "^6.6.2",
"p-timeout": "^4.1.0",
"peer-id": "^0.15.0",
"private-ip": "^2.3.3",
"protobufjs": "^6.10.2",
"streaming-iterables": "^6.0.0",
"timeout-abort-controller": "^2.0.0",
"uint8arrays": "^3.0.0",

@@ -84,13 +94,11 @@ "varint": "^6.0.0"

"async-iterator-all": "^1.0.0",
"crypto-browserify": "^3.12.0",
"datastore-level": "^7.0.1",
"delay": "^5.0.0",
"execa": "^5.1.1",
"it-filter": "^1.0.3",
"it-last": "^1.0.6",
"it-pair": "^1.0.0",
"libp2p": "^0.32.3",
"libp2p": "^0.33.0",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"p-defer": "^3.0.0",
"p-each-series": "^2.1.0",
"p-map-series": "^2.1.0",
"p-retry": "^4.2.0",

@@ -97,0 +105,0 @@ "sinon": "^11.1.1",

@@ -1,2 +0,2 @@

# js-libp2p-kad-dht
# js-libp2p-kad-dht <!-- omit in toc -->

@@ -7,3 +7,3 @@ [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)

[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
[![Build Status](https://travis-ci.org/libp2p/js-libp2p-kad-dht.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-kad-dht)
[![Build status](https://github.com/libp2p/js-libp2p-kad-dht/actions/workflows/test.yml/badge.svg?branch=master)](https://github.com/libp2p/js-libp2p-kad-dht/actions/workflows/test.yml)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-kad-dht/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-kad-dht?branch=master)

@@ -19,7 +19,7 @@ [![Dependency Status](https://david-dm.org/libp2p/js-libp2p-kad-dht.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-kad-dht)

## Lead Maintainer
## Lead Maintainer <!-- omit in toc -->
[Vasco Santos](https://github.com/vasco-santos).
## Table of Contents
## Table of Contents <!-- omit in toc -->

@@ -30,5 +30,9 @@ - [Install](#install)

- [API](#api)
- [Custom secondary DHT in libp2p](#custom-secondary-dht-in-libp2p)
- [Peer Routing](#peer-routing)
- [Content Routing](#content-routing)
- [Peer Discovery](#peer-discovery)
- [Implementation Summary](#implementation-summary)
- [Contribute](#contribute)
- [License](#license)
## Install

@@ -45,3 +49,3 @@

```js
const KadDHT = require('libp2p-kad-dht')
import { create } from 'libp2p-kad-dht'
```

@@ -58,2 +62,4 @@

```js
import { create } from 'libp2p-kad-dht'
/**

@@ -63,12 +69,8 @@ * @param {Libp2p} libp2p

function addDHT(libp2p) {
const customDHT = new KadDHT({
const customDHT = create({
libp2p,
dialer: libp2p.dialer,
peerId: libp2p.peerId,
peerStore: libp2p.peerStore,
registrar: libp2p.registrar,
protocolPrefix: '/custom'
})
customDHT.start()
customDHT.on('peer', libp2p._onDiscoveryPeer)
return customDHT

@@ -75,0 +77,0 @@ }

@@ -36,1 +36,4 @@ 'use strict'

exports.ALPHA = 3
// How often we look for our closest DHT neighbours
exports.QUERY_SELF_INTERVAL = Number(5 * minute)
'use strict'
const errcode = require('err-code')
const pTimeout = require('p-timeout')
const { equals: uint8ArrayEquals } = require('uint8arrays/equals')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const libp2pRecord = require('libp2p-record')
const c = require('../constants')
const Query = require('../query')
const Libp2pRecord = require('libp2p-record')
const {
ALPHA
} = require('../constants')
const utils = require('../utils')
const Record = libp2pRecord.Record
const Record = Libp2pRecord.Record
const parallel = require('it-parallel')
const map = require('it-map')
const {
valueEvent,
queryErrorEvent
} = require('../query/events')
const { Message } = require('../message')
const { pipe } = require('it-pipe')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../query').DHTQueryResult} DHTQueryResult
* @typedef {import('../types').ValueEvent} ValueEvent
*/
/**
* @param {import('../')} dht
*/
module.exports = (dht) => {
class ContentFetching {
/**
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('interface-datastore').Datastore} params.datastore
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {import('libp2p-interfaces/src/types').DhtSelectors} params.selectors
* @param {import('../peer-routing').PeerRouting} params.peerRouting
* @param {import('../query/manager').QueryManager} params.queryManager
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../network').Network} params.network
* @param {boolean} params.lan
*/
constructor ({ peerId, datastore, validators, selectors, peerRouting, queryManager, routingTable, network, lan }) {
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:content-fetching`)
this._peerId = peerId
this._datastore = datastore
this._validators = validators
this._selectors = selectors
this._peerRouting = peerRouting
this._queryManager = queryManager
this._routingTable = routingTable
this._network = network
}
/**
* @param {Uint8Array} key
* @param {Uint8Array} rec
*/
const putLocal = async (key, rec) => { // eslint-disable-line require-await
return dht.datastore.put(utils.bufferToKey(key), rec)
async putLocal (key, rec) { // eslint-disable-line require-await
return this._datastore.put(utils.bufferToKey(key), rec)
}

@@ -36,11 +65,14 @@

*/
const getLocal = async (key) => {
dht._log(`getLocal ${uint8ArrayToString(key, 'base32')}`)
async getLocal (key) {
this._log(`getLocal ${uint8ArrayToString(key, 'base32')}`)
const raw = await dht.datastore.get(utils.bufferToKey(key))
dht._log(`found ${uint8ArrayToString(key, 'base32')} in local datastore`)
const dsKey = utils.bufferToKey(key)
this._log(`fetching record for key ${dsKey}`)
const raw = await this._datastore.get(dsKey)
this._log(`found ${dsKey} in local datastore`)
const rec = Record.deserialize(raw)
await dht._verifyRecordLocally(rec)
await Libp2pRecord.validator.verifyRecord(this._validators, rec)

@@ -54,251 +86,210 @@ return rec

* @param {Uint8Array} key
* @param {import('../query').DHTQueryValue[]} vals - values retrieved from the DHT
* @param {ValueEvent[]} vals - values retrieved from the DHT
* @param {Uint8Array} best - the best record that was found
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
const sendCorrectionRecord = async (key, vals, best) => {
async * sendCorrectionRecord (key, vals, best, options = {}) {
this._log('sendCorrection for %b', key)
const fixupRec = await utils.createPutRecord(key, best)
return Promise.all(vals.map(async (v) => {
for (const { value, from } of vals) {
// no need to do anything
if (uint8ArrayEquals(v.val, best)) {
return
if (uint8ArrayEquals(value, best)) {
this._log('record was ok')
continue
}
// correct ourself
if (dht._isSelf(v.from)) {
if (this._peerId.equals(from)) {
try {
await dht._putLocal(key, fixupRec)
} catch (err) {
dht._log.error('Failed error correcting self', err)
const dsKey = utils.bufferToKey(key)
this._log(`Storing corrected record for key ${dsKey}`)
await this._datastore.put(dsKey, fixupRec)
} catch (/** @type {any} */ err) {
this._log.error('Failed error correcting self', err)
}
return
continue
}
// send correction
try {
await dht._putValueToPeer(key, fixupRec, v.from)
} catch (err) {
dht._log.error('Failed error correcting entry', err)
}
}))
}
let sentCorrection = false
const request = new Message(Message.TYPES.PUT_VALUE, key, 0)
request.record = Record.deserialize(fixupRec)
return {
/**
* Store the given key/value pair locally, in the datastore.
*
* @param {Uint8Array} key
* @param {Uint8Array} rec - encoded record
*/
async _putLocal (key, rec) { // eslint-disable-line require-await
return putLocal(key, rec)
},
/**
* Store the given key/value pair in the DHT.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
*/
async put (key, value, options = {}) {
dht._log('PutValue %b', key)
// create record in the dht format
const record = await utils.createPutRecord(key, value)
// store the record locally
await putLocal(key, record)
// put record to the closest peers
let counterAll = 0
let counterSuccess = 0
await utils.mapParallel(dht.getClosestPeers(key, { shallow: true }), async (peer) => {
try {
counterAll += 1
await dht._putValueToPeer(key, record, peer)
counterSuccess += 1
} catch (err) {
dht._log.error('Failed to put to peer (%b): %s', peer.id, err)
for await (const event of this._network.sendRequest(from, request, options)) {
if (event.name === 'PEER_RESPONSE' && event.record && uint8ArrayEquals(event.record.value, Record.deserialize(fixupRec).value)) {
sentCorrection = true
}
})
// verify if we were able to put to enough peers
const minPeers = options.minPeers || counterAll // Ensure we have a default `minPeers`
yield event
}
if (minPeers > counterSuccess) {
const error = errcode(new Error(`Failed to put value to enough peers: ${counterSuccess}/${minPeers}`), 'ERR_NOT_ENOUGH_PUT_PEERS')
dht._log.error(error)
throw error
if (!sentCorrection) {
yield queryErrorEvent({ from, error: errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') })
}
},
/**
* Get the value to the given key.
* Times out after 1 minute by default.
*
* @param {Uint8Array} key
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
*/
async get (key, options = {}) {
options.timeout = options.timeout || c.minute
this._log.error('Failed error correcting entry')
}
}
dht._log('_get %b', key)
/**
* Store the given key/value pair in the DHT
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {object} [options] - put options
* @param {AbortSignal} [options.signal]
*/
async * put (key, value, options = {}) {
this._log('put key %b value %b', key, value)
const vals = await dht.getMany(key, c.GET_MANY_RECORD_COUNT, options)
const recs = vals.map((v) => v.val)
let i = 0
// create record in the dht format
const record = await utils.createPutRecord(key, value)
try {
i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs)
} catch (err) {
// Assume the first record if no selector available
if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') {
throw err
}
}
// store the record locally
const dsKey = utils.bufferToKey(key)
this._log(`storing record for key ${dsKey}`)
await this._datastore.put(dsKey, record)
const best = recs[i]
dht._log('GetValue %b %s', key, best)
// put record to the closest peers
yield * pipe(
this._peerRouting.getClosestPeers(key, { signal: options.signal }),
(source) => map(source, (event) => {
return async () => {
if (event.name !== 'FINAL_PEER') {
return [event]
}
if (!best) {
throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND')
}
const events = []
await sendCorrectionRecord(key, vals, best)
const msg = new Message(Message.TYPES.PUT_VALUE, key, 0)
msg.record = Record.deserialize(record)
return best
},
for await (const putEvent of this._network.sendRequest(event.peer.id, msg, options)) {
events.push(putEvent)
/**
* Get the `n` values to the given key without sorting.
*
* @param {Uint8Array} key
* @param {number} nvals
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
*/
async getMany (key, nvals, options = {}) {
options.timeout = options.timeout || c.minute
if (putEvent.name !== 'PEER_RESPONSE') {
continue
}
dht._log('getMany %b (%s)', key, nvals)
if (putEvent.record && uint8ArrayEquals(putEvent.record.value, Record.deserialize(record).value)) {
} else {
events.push(queryErrorEvent({ from: event.peer.id, error: errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') }))
}
}
const vals = []
let localRec
try {
localRec = await getLocal(key)
} catch (err) {
if (nvals === 0) {
throw err
return events
}
}),
(source) => parallel(source, {
ordered: false,
concurrency: ALPHA
}),
async function * (source) {
for await (const events of source) {
yield * events
}
}
)
}
if (localRec) {
vals.push({
val: localRec.value,
from: dht.peerId
})
}
/**
* Get the value to the given key
*
* @param {Uint8Array} key
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
async * get (key, options = {}) {
this._log('get %b', key)
if (vals.length >= nvals) {
return vals
/** @type {ValueEvent[]} */
const vals = []
for await (const event of this.getMany(key, options)) {
if (event.name === 'VALUE') {
vals.push(event)
}
const id = await utils.convertBuffer(key)
const rtp = dht.routingTable.closestPeers(id, dht.kBucketSize)
yield event
}
dht._log('peers in rt: %d', rtp.length)
if (!vals.length) {
return
}
if (rtp.length === 0) {
const errMsg = 'Failed to lookup key! No peers from routing table!'
const records = vals.map((v) => v.value)
let i = 0
dht._log.error(errMsg)
if (vals.length === 0) {
throw errcode(new Error(errMsg), 'ERR_NO_PEERS_IN_ROUTING_TABLE')
}
return vals
try {
i = Libp2pRecord.selection.bestRecord(this._selectors, key, records)
} catch (/** @type {any} */ err) {
// Assume the first record if no selector available
if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') {
throw err
}
}
const valsLength = vals.length
const best = records[i]
this._log('GetValue %b %b', key, best)
/**
* @param {number} pathIndex
* @param {number} numPaths
*/
function createQuery (pathIndex, numPaths) {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - valsLength, numPaths)
let queryResults = 0
if (!best) {
throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND')
}
/**
* Here we return the query function to use on this particular disjoint path
*
* @param {PeerId} peer
*/
async function disjointPathQuery (peer) {
let rec, peers, lookupErr
try {
const results = await dht._getValueOrPeers(peer, key)
rec = results.record
peers = results.peers
} catch (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (err.code !== 'ERR_INVALID_RECORD') {
throw err
}
lookupErr = err
}
yield * this.sendCorrectionRecord(key, vals, best, options)
/** @type {import('../query').QueryResult} */
const res = {
closerPeers: peers
}
yield vals[i]
}
if (rec && rec.value) {
vals.push({
val: rec.value,
from: peer
})
/**
* Get the `n` values to the given key without sorting.
*
* @param {Uint8Array} key
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
async * getMany (key, options = {}) {
this._log('getMany values for %t', key)
queryResults++
} else if (lookupErr) {
vals.push({
err: lookupErr,
from: peer
})
try {
const localRec = await this.getLocal(key)
queryResults++
}
yield valueEvent({
value: localRec.value,
from: this._peerId
})
} catch (/** @type {any} */ err) {
this._log('error getting local value for %b', key, err)
}
// enough is enough
if (queryResults >= pathSize) {
res.pathComplete = true
}
const id = await utils.convertBuffer(key)
const rtp = this._routingTable.closestPeers(id)
return res
}
this._log('found %d peers in routing table', rtp.length)
return disjointPathQuery
}
const self = this
// we have peers, lets send the actual query to them
const query = new Query(dht, key, createQuery)
/**
* @type {import('../query/types').QueryFunc}
*/
const getValueQuery = async function * ({ peer, signal }) {
for await (const event of self._peerRouting.getValueOrPeers(peer, key, { signal })) {
yield event
try {
await pTimeout(query.run(rtp), options.timeout)
} catch (err) {
if (vals.length === 0) {
throw err
if (event.name === 'PEER_RESPONSE' && event.record) {
yield valueEvent({ from: peer, value: event.record.value })
}
} finally {
query.stop()
}
}
return vals
}
// we have peers, lets send the actual query to them
yield * this._queryManager.run(key, rtp, getValueQuery, options)
}
}
module.exports.ContentFetching = ContentFetching
'use strict'
const errcode = require('err-code')
const pTimeout = require('p-timeout')
const { Message } = require('../message')
const parallel = require('it-parallel')
const map = require('it-map')
const { convertBuffer, logger } = require('../utils')
const { ALPHA } = require('../constants')
const { pipe } = require('it-pipe')
const {
queryErrorEvent,
peerResponseEvent,
providerEvent
} = require('../query/events')
const { Message: { MessageType } } = require('../message/dht')
const c = require('../constants')
const LimitedPeerList = require('../peer-list/limited-peer-list')
const Message = require('../message')
const Query = require('../query')
const utils = require('../utils')
/**

@@ -18,179 +22,171 @@ * @typedef {import('multiformats/cid').CID} CID

/**
* @param {import('../')} dht
*/
module.exports = (dht) => {
class ContentRouting {
/**
* Check for providers from a single node.
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../network').Network} params.network
* @param {import('../peer-routing').PeerRouting} params.peerRouting
* @param {import('../query/manager').QueryManager} params.queryManager
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../providers').Providers} params.providers
* @param {import('../types').PeerStore} params.peerStore
* @param {boolean} params.lan
*/
constructor ({ peerId, network, peerRouting, queryManager, routingTable, providers, peerStore, lan }) {
this._log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:content-routing`)
this._peerId = peerId
this._network = network
this._peerRouting = peerRouting
this._queryManager = queryManager
this._routingTable = routingTable
this._providers = providers
this._peerStore = peerStore
}
/**
* Announce to the network that we can provide the value for a given key and
* are contactable on the given multiaddrs
*
* @param {PeerId} peer
* @param {CID} key
*
* @private
* @param {Multiaddr[]} multiaddrs
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
const findProvidersSingle = async (peer, key) => { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_PROVIDERS, key.bytes, 0)
return dht.network.sendRequest(peer, msg)
}
async * provide (key, multiaddrs, options = {}) {
this._log('provide %s', key)
return {
// Add peer as provider
await this._providers.addProvider(key, this._peerId)
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.bytes, 0)
msg.providerPeers = [{
id: this._peerId,
multiaddrs
}]
let sent = 0
/**
* Announce to the network that we can provide the value for a given key
*
* @param {CID} key
* @param {import('../types').QueryEvent} event
*/
async provide (key) {
dht._log(`provide: ${key}`)
const maybeNotifyPeer = (event) => {
return async () => {
if (event.name !== 'FINAL_PEER') {
return [event]
}
/** @type {Error[]} */
const errors = []
const events = []
// Add peer as provider
await dht.providers.addProvider(key, dht.peerId)
this._log('putProvider %s to %p', key, event.peer.id)
const multiaddrs = dht.libp2p ? dht.libp2p.multiaddrs : []
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.bytes, 0)
msg.providerPeers = [{
id: dht.peerId,
multiaddrs
}]
try {
this._log('sending provider record for %s to %p', key, event.peer.id)
/**
* @param {PeerId} peer
*/
async function mapPeer (peer) {
dht._log(`putProvider ${key} to ${peer.toB58String()}`)
try {
await dht.network.sendMessage(peer, msg)
} catch (err) {
errors.push(err)
for await (const sendEvent of this._network.sendMessage(event.peer.id, msg, options)) {
if (sendEvent.name === 'PEER_RESPONSE') {
this._log('sent provider record for %s to %p', key, event.peer.id)
sent++
}
events.push(sendEvent)
}
} catch (/** @type {any} */ err) {
this._log.error('error sending provide record to peer %p', event.peer.id, err)
events.push(queryErrorEvent({ from: event.peer.id, error: err }))
}
return events
}
}
// Notify closest peers
await utils.mapParallel(dht.getClosestPeers(key.bytes), mapPeer)
if (errors.length) {
// TODO:
// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`), 'ERR_SOME_PROVIDES_FAILED', { errors })
// Notify closest peers
yield * pipe(
this._peerRouting.getClosestPeers(key.multihash.bytes, options),
(source) => map(source, (event) => maybeNotifyPeer(event)),
(source) => parallel(source, {
ordered: false,
concurrency: ALPHA
}),
async function * (source) {
for await (const events of source) {
yield * events
}
}
},
)
/**
* Search the dht for up to `K` providers of the given CID.
*
* @param {CID} key
* @param {Object} [options] - findProviders options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = { timeout: 60000, maxNumProviders: 5 }) {
const providerTimeout = options.timeout || c.minute
const n = options.maxNumProviders || c.K
this._log('sent provider records to %d peers', sent)
}
dht._log(`findProviders ${key}`)
/**
* Search the dht for up to `K` providers of the given CID.
*
* @param {CID} key
* @param {object} [options] - findProviders options
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
async * findProviders (key, options = { maxNumProviders: 5 }) {
const toFind = options.maxNumProviders || this._routingTable._kBucketSize
const target = key.multihash.bytes
const id = await convertBuffer(target)
const self = this
const out = new LimitedPeerList(n)
const provs = await dht.providers.getProviders(key)
this._log(`findProviders ${key}`)
provs
.forEach(id => {
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */
const peerData = dht.peerStore.get(id)
const provs = await this._providers.getProviders(key)
if (peerData) {
out.push({
id: peerData.id,
multiaddrs: peerData.addresses
.map((address) => address.multiaddr)
})
} else {
out.push({
id,
multiaddrs: []
})
}
})
// yield values if we have some, also slice because maybe we got lucky and already have too many?
if (provs.length) {
const providers = provs.slice(0, toFind).map(peerId => ({
id: peerId,
multiaddrs: (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr)
}))
// All done
if (out.length >= n) {
// yield values
for (const pData of out.toArray()) {
yield pData
}
return
}
yield peerResponseEvent({ from: this._peerId, messageType: MessageType.GET_PROVIDERS, providers })
yield providerEvent({ from: this._peerId, providers: providers })
}
// need more, query the network
/** @type {LimitedPeerList[]} */
const paths = []
// All done
if (provs.length >= toFind) {
return
}
/**
*
* @param {number} pathIndex
* @param {number} numPaths
*/
function makePath (pathIndex, numPaths) {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(n - out.length, numPaths)
const pathProviders = new LimitedPeerList(pathSize)
paths.push(pathProviders)
/**
* The query function to use on this particular disjoint path
*
* @type {import('../query/types').QueryFunc}
*/
const findProvidersQuery = async function * ({ peer, signal }) {
const request = new Message(Message.TYPES.GET_PROVIDERS, target, 0)
/**
* The query function to use on this particular disjoint path
*
* @param {PeerId} peer
*/
async function queryDisjointPath (peer) {
const msg = await findProvidersSingle(peer, key)
const provs = msg.providerPeers
dht._log(`Found ${provs.length} provider entries for ${key}`)
yield * self._network.sendRequest(peer, request, { signal })
}
provs.forEach((prov) => {
pathProviders.push({
...prov
})
})
const providers = new Set(provs.map(p => p.toB58String()))
// hooray we have all that we want
if (pathProviders.length >= pathSize) {
return { pathComplete: true }
}
for await (const event of this._queryManager.run(target, this._routingTable.closestPeers(id), findProvidersQuery, options)) {
yield event
// it looks like we want some more
return { closerPeers: msg.closerPeers }
}
if (event.name === 'PEER_RESPONSE') {
this._log(`Found ${event.providers.length} provider entries for ${key} and ${event.closer.length} closer peers`)
return queryDisjointPath
}
const newProviders = []
const query = new Query(dht, key.bytes, makePath)
const peers = dht.routingTable.closestPeers(key.bytes, dht.kBucketSize)
for (const peer of event.providers) {
if (providers.has(peer.id.toB58String())) {
continue
}
try {
await pTimeout(
query.run(peers),
providerTimeout
)
} catch (err) {
if (err.name !== pTimeout.TimeoutError.name) {
throw err
providers.add(peer.id.toB58String())
newProviders.push(peer)
}
} finally {
query.stop()
}
// combine peers from each path
paths.forEach((path) => {
path.toArray().forEach((peer) => {
out.push(peer)
})
})
if (newProviders.length) {
yield providerEvent({ from: event.from, providers: newProviders })
}
for (const pData of out.toArray()) {
yield pData
if (providers.size === toFind) {
return
}
}

@@ -200,1 +196,3 @@ }

}
module.exports.ContentRouting = ContentRouting
'use strict'
const { EventEmitter } = require('events')
const errcode = require('err-code')
const { KadDHT } = require('./kad-dht')
const { DualKadDHT } = require('./dual-kad-dht')
const libp2pRecord = require('libp2p-record')
const { MemoryDatastore } = require('datastore-core/memory')
const { equals: uint8ArrayEquals } = require('uint8arrays/equals')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const RoutingTable = require('./routing-table')
const utils = require('./utils')
const c = require('./constants')
const Network = require('./network')
const contentFetching = require('./content-fetching')
const contentRouting = require('./content-routing')
const peerRouting = require('./peer-routing')
const Message = require('./message')
const Providers = require('./providers')
const QueryManager = require('./query-manager')
const Record = libp2pRecord.Record
/**
* @typedef {*} Libp2p
* @typedef {*} PeerStore
* @typedef {import('peer-id')} PeerId
* @typedef {import('interface-datastore').Datastore} Datastore
* @typedef {*} Dialer
* @typedef {*} Registrar
* @typedef {import('multiformats/cid').CID} CID
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {object} PeerData
* @property {PeerId} id
* @property {Multiaddr[]} multiaddrs
* @typedef {import('./types').DHT} DHT
* @typedef {import('./kad-dht').KadDHTOps} KadDHTOps
*/
/**
* A DHT implementation modeled after Kademlia with S/Kademlia modifications.
* Original implementation in go: https://github.com/libp2p/go-libp2p-kad-dht.
*/
class KadDHT extends EventEmitter {
module.exports = {
/**
* Create a new KadDHT.
*
* @param {Object} props
* @param {Libp2p} props.libp2p - the libp2p instance
* @param {Dialer} props.dialer - libp2p dialer instance
* @param {PeerId} props.peerId - peer's peerId
* @param {PeerStore} props.peerStore - libp2p peerStore
* @param {Registrar} props.registrar - libp2p registrar instance
* @param {string} [props.protocolPrefix = '/ipfs'] - libp2p registrar handle protocol
* @param {boolean} [props.forceProtocolLegacy = false] - WARNING: this is not recommended and should only be used for legacy purposes
* @param {number} props.kBucketSize - k-bucket size (default 20)
* @param {boolean} props.clientMode - If true, the DHT will not respond to queries. This should be true if your node will not be dialable. (default: false)
* @param {number} props.concurrency - alpha concurrency of queries (default 3)
* @param {Datastore} props.datastore - datastore (default MemoryDatastore)
* @param {object} props.validators - validators object with namespace as keys and function(key, record, callback)
* @param {object} props.selectors - selectors object with namespace as keys and function(key, records)
* @param {function(import('libp2p-record').Record, PeerId): void} [props.onPut] - Called when an entry is added to or changed in the datastore
* @param {function(import('libp2p-record').Record): void} [props.onRemove] - Called when an entry is removed from the datastore
* @param {KadDHTOps} opts
* @returns {DHT}
*/
constructor ({
libp2p,
dialer,
peerId,
peerStore,
registrar,
protocolPrefix = '/ipfs',
forceProtocolLegacy = false,
datastore = new MemoryDatastore(),
kBucketSize = c.K,
clientMode = false,
concurrency = c.ALPHA,
validators = {},
selectors = {},
onPut = () => {},
onRemove = () => {}
}) {
super()
if (!dialer) {
throw new Error('libp2p-kad-dht requires an instance of Dialer')
}
/**
* Local reference to the libp2p instance. May be undefined.
*
* @type {Libp2p}
*/
this.libp2p = libp2p
/**
* Local reference to the libp2p dialer instance
*
* @type {Dialer}
*/
this.dialer = dialer
/**
* Local peer-id
*
* @type {PeerId}
*/
this.peerId = peerId
/**
* Local PeerStore
*
* @type {PeerStore}
*/
this.peerStore = peerStore
/**
* Local peer info
*
* @type {Registrar}
*/
this.registrar = registrar
/**
* Registrar protocol
*
* @type {string}
*/
this.protocol = protocolPrefix + (forceProtocolLegacy ? '' : c.PROTOCOL_DHT)
/**
* k-bucket size
*
* @type {number}
*/
this.kBucketSize = kBucketSize
this._clientMode = clientMode
/**
* ALPHA concurrency at which each query path with run, defaults to 3
*
* @type {number}
*/
this.concurrency = concurrency
/**
* Number of disjoint query paths to use
* This is set to `kBucketSize`/2 per the S/Kademlia paper
*
* @type {number}
*/
this.disjointPaths = Math.ceil(this.kBucketSize / 2)
/**
* The routing table.
*
* @type {RoutingTable}
*/
this.routingTable = new RoutingTable(this, { kBucketSize: this.kBucketSize })
/**
* Reference to the datastore, uses an in-memory store if none given.
*
* @type {Datastore}
*/
this.datastore = datastore
/**
* Provider management
*
* @type {Providers}
*/
this.providers = new Providers(this.datastore, this.peerId)
this.validators = {
pk: libp2pRecord.validator.validators.pk,
...validators
}
this.selectors = {
pk: libp2pRecord.selection.selectors.pk,
...selectors
}
this.network = new Network(this)
this._log = utils.logger(this.peerId)
/**
* Keeps track of running queries
*
* @type {QueryManager}
*/
this._queryManager = new QueryManager()
this._running = false
// DHT components
this.contentFetching = contentFetching(this)
this.contentRouting = contentRouting(this)
this.peerRouting = peerRouting(this)
// datastore events
this.onPut = onPut
this.onRemove = onRemove
create: (opts) => {
return new DualKadDHT(
new KadDHT({
...opts,
protocol: '/ipfs/kad/1.0.0',
lan: false
}),
new KadDHT({
...opts,
protocol: '/ipfs/lan/kad/1.0.0',
clientMode: false,
lan: true
}),
opts.libp2p
)
}
/**
* Is this DHT running.
*/
get isStarted () {
return this._running
}
/**
* Start listening to incoming connections.
*/
start () {
this._running = true
return Promise.all([
this.providers.start(),
this._queryManager.start(),
this.network.start(),
this.routingTable.start()
])
}
/**
* Stop accepting incoming connections and sending outgoing
* messages.
*/
stop () {
this._running = false
return Promise.all([
this.providers.stop(),
this._queryManager.stop(),
this.network.stop(),
this.routingTable.stop()
])
}
/**
* Store the given key/value pair in the DHT.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
* @returns {Promise<void>}
*/
async put (key, value, options = {}) { // eslint-disable-line require-await
return this.contentFetching.put(key, value, options)
}
/**
* Get the value to the given key.
* Times out after 1 minute by default.
*
* @param {Uint8Array} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Uint8Array>}
*/
async get (key, options = {}) { // eslint-disable-line require-await
return this.contentFetching.get(key, options)
}
/**
* Get the `n` values to the given key without sorting.
*
* @param {Uint8Array} key
* @param {number} nvals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
*/
async getMany (key, nvals, options = {}) { // eslint-disable-line require-await
return this.contentFetching.getMany(key, nvals, options)
}
/**
* Remove the given key from the local datastore.
*
* @param {Uint8Array} key
*/
async removeLocal (key) {
this._log(`removeLocal: ${uint8ArrayToString(key, 'base32')}`)
const dsKey = utils.bufferToKey(key)
try {
await this.datastore.delete(dsKey)
} catch (err) {
if (err.code === 'ERR_NOT_FOUND') {
return undefined
}
throw err
}
}
/**
* @param {Uint8Array} key
* @param {Uint8Array} value
*/
async _putLocal (key, value) {
this._log(`_putLocal: ${uint8ArrayToString(key, 'base32')}`)
const dsKey = utils.bufferToKey(key)
await this.datastore.put(dsKey, value)
}
// ----------- Content Routing
/**
* Announce to the network that we can provide given key's value.
*
* @param {CID} key
* @returns {Promise<void>}
*/
async provide (key) { // eslint-disable-line require-await
return this.contentRouting.provide(key)
}
/**
* Search the dht for up to `K` providers of the given CID.
*
* @param {CID} key
* @param {Object} [options] - findProviders options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = { timeout: 6000, maxNumProviders: 5 }) {
for await (const peerData of this.contentRouting.findProviders(key, options)) {
yield peerData
}
}
// ----------- Peer Routing -----------
/**
* Search for a peer with the given ID.
*
* @param {PeerId} id
* @param {Object} [options] - findPeer options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds (default: 60000)
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options = { timeout: 60000 }) { // eslint-disable-line require-await
return this.peerRouting.findPeer(id, options)
}
/**
* Kademlia 'node lookup' operation.
*
* @param {Uint8Array} key
* @param {Object} [options]
* @param {boolean} [options.shallow = false] - shallow query
*/
async * getClosestPeers (key, options = { shallow: false }) {
yield * this.peerRouting.getClosestPeers(key, options)
}
/**
* Get the public key for the given peer id.
*
* @param {PeerId} peer
*/
getPublicKey (peer) {
return this.peerRouting.getPublicKey(peer)
}
// ----------- Discovery -----------
/**
* @param {PeerId} peerId
* @param {Multiaddr[]} multiaddrs
*/
_peerDiscovered (peerId, multiaddrs) {
this.emit('peer', {
id: peerId,
multiaddrs
})
}
// ----------- Internals -----------
/**
* Returns the routing tables closest peers, for the key of
* the message.
*
* @param {Message} msg
*/
async _nearestPeersToQuery (msg) {
const key = await utils.convertBuffer(msg.key)
const ids = this.routingTable.closestPeers(key, this.kBucketSize)
return ids.map((p) => {
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */
const peer = this.peerStore.get(p)
return {
id: p,
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : []
}
})
}
/**
* Get the nearest peers to the given query, but iff closer
* than self.
*
* @param {Message} msg
* @param {PeerId} peerId
*/
async _betterPeersToQuery (msg, peerId) {
this._log('betterPeersToQuery')
const closer = await this._nearestPeersToQuery(msg)
return closer.filter((closer) => {
if (this._isSelf(closer.id)) {
// Should bail, not sure
this._log.error('trying to return self as closer')
return false
}
return !closer.id.isEqual(peerId)
})
}
/**
* Try to fetch a given record by from the local datastore.
* Returns the record iff it is still valid, meaning
* - it was either authored by this node, or
* - it was received less than `MAX_RECORD_AGE` ago.
*
* @param {Uint8Array} key
*/
async _checkLocalDatastore (key) {
this._log(`checkLocalDatastore: ${uint8ArrayToString(key)} %b`, key)
const dsKey = utils.bufferToKey(key)
// Fetch value from ds
let rawRecord
try {
rawRecord = await this.datastore.get(dsKey)
} catch (err) {
if (err.code === 'ERR_NOT_FOUND') {
return undefined
}
throw err
}
// Create record from the returned bytes
const record = Record.deserialize(rawRecord)
if (!record) {
throw errcode(new Error('Invalid record'), 'ERR_INVALID_RECORD')
}
// Check validity: compare time received with max record age
if (record.timeReceived == null ||
utils.now() - record.timeReceived.getTime() > c.MAX_RECORD_AGE) {
// If record is bad delete it and return
await this.datastore.delete(dsKey)
this.onRemove(record)
return undefined
}
// Record is valid
return record
}
/**
* Add the peer to the routing table and update it in the peerStore.
*
* @param {PeerId} peerId
*/
async _add (peerId) {
await this.routingTable.add(peerId)
}
/**
* Verify a record without searching the DHT.
*
* @param {import('libp2p-record').Record} record
*/
async _verifyRecordLocally (record) {
this._log('verifyRecordLocally')
await libp2pRecord.validator.verifyRecord(this.validators, record)
}
/**
* Is the given peer id our PeerId?
*
* @param {PeerId} other
*/
_isSelf (other) {
return other && uint8ArrayEquals(this.peerId.id, other.id)
}
/**
* Store the given key/value pair at the peer `target`.
*
* @param {Uint8Array} key
* @param {Uint8Array} rec - encoded record
* @param {PeerId} target
*/
async _putValueToPeer (key, rec, target) {
const msg = new Message(Message.TYPES.PUT_VALUE, key, 0)
msg.record = Record.deserialize(rec)
const resp = await this.network.sendRequest(target, msg)
if (resp.record && !uint8ArrayEquals(resp.record.value, Record.deserialize(rec).value)) {
throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID')
}
}
/**
* Query a particular peer for the value for the given key.
* It will either return the value or a list of closer peers.
*
* Note: The peerStore is updated with new addresses found for the given peer.
*
* @param {PeerId} peer
* @param {Uint8Array} key
*/
async _getValueOrPeers (peer, key) {
const msg = await this._getValueSingle(peer, key)
const peers = msg.closerPeers
const record = msg.record
if (record) {
// We have a record
try {
await this._verifyRecordOnline(record)
} catch (err) {
const errMsg = 'invalid record received, discarded'
this._log(errMsg)
throw errcode(new Error(errMsg), 'ERR_INVALID_RECORD')
}
return { record, peers }
}
if (peers.length > 0) {
return { peers }
}
throw errcode(new Error('Not found'), 'ERR_NOT_FOUND')
}
/**
* Get a value via rpc call for the given parameters.
*
* @param {PeerId} peer
* @param {Uint8Array} key
*/
async _getValueSingle (peer, key) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_VALUE, key, 0)
return this.network.sendRequest(peer, msg)
}
/**
* Verify a record, fetching missing public keys from the network.
* Calls back with an error if the record is invalid.
*
* @param {import('libp2p-record').Record} record
* @returns {Promise<void>}
*/
async _verifyRecordOnline (record) {
await libp2pRecord.validator.verifyRecord(this.validators, record)
}
}
module.exports = KadDHT
module.exports.multicodec = '/ipfs' + c.PROTOCOL_DHT

@@ -10,2 +10,3 @@ 'use strict'

const CONNECTION_TYPE = Proto.Message.ConnectionType
const MESSAGE_TYPE_LOOKUP = Object.keys(MESSAGE_TYPE)

@@ -20,3 +21,3 @@ /**

*
* @typedef {import('../index').PeerData} PeerData
* @typedef {import('../types').PeerData} PeerData
*/

@@ -141,2 +142,4 @@

module.exports = Message
module.exports.Message = Message
module.exports.MESSAGE_TYPE = MESSAGE_TYPE
module.exports.MESSAGE_TYPE_LOOKUP = MESSAGE_TYPE_LOOKUP
'use strict'
const errcode = require('err-code')
const { pipe } = require('it-pipe')
const lp = require('it-length-prefixed')
const pTimeout = require('p-timeout')
const { consume } = require('streaming-iterables')
const drain = require('it-drain')
const first = require('it-first')
const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')
const rpc = require('./rpc')
const c = require('./constants')
const Message = require('./message')
const { Message, MESSAGE_TYPE_LOOKUP } = require('./message')
const utils = require('./utils')
const { EventEmitter } = require('events')
const {
dialingPeerEvent,
sendingQueryEvent,
peerResponseEvent,
queryErrorEvent
} = require('./query/events')

@@ -21,2 +21,4 @@ /**

* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('./types').QueryEvent} QueryEvent
* @typedef {import('./types').PeerData} PeerData
*/

@@ -27,15 +29,18 @@

*/
class Network {
class Network extends EventEmitter {
/**
* Create a new network
*
* @param {import('./index')} dht
* @param {object} params
* @param {import('./types').Dialer} params.dialer
* @param {string} params.protocol
* @param {boolean} params.lan
*/
constructor (dht) {
this.dht = dht
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
this._log = utils.logger(this.dht.peerId, 'net')
this._rpc = rpc(this.dht)
this._onPeerConnected = this._onPeerConnected.bind(this)
constructor ({ dialer, protocol, lan }) {
super()
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:network`)
this._running = false
this._dialer = dialer
this._protocol = protocol
}

@@ -51,23 +56,3 @@

if (!this.dht.isStarted) {
throw errcode(new Error('Can not start network'), 'ERR_CANNOT_START_NETWORK')
}
this._running = true
// Only respond to queries when not in client mode
if (this.dht._clientMode === false) {
// Incoming streams
this.dht.registrar.handle(this.dht.protocol, this._rpc)
}
// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: [this.dht.protocol],
handlers: {
onConnect: this._onPeerConnected,
onDisconnect: () => {}
}
})
this._registrarId = this.dht.registrar.register(topology)
}

@@ -79,11 +64,3 @@

stop () {
if (!this.dht.isStarted && !this.isStarted) {
return
}
this._running = false
// unregister protocol and handlers
if (this._registrarId) {
this.dht.registrar.unregister(this._registrarId)
}
}

@@ -101,45 +78,31 @@

/**
* Are all network components there?
* Send a request and record RTT for latency measurements
*
* @type {boolean}
* @param {PeerId} to - The peer that should receive a message
* @param {Message} msg - The message to send
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
get isConnected () {
// TODO add a way to check if switch has started or not
return this.dht.isStarted && this.isStarted
}
async * sendRequest (to, msg, options = {}) {
this._log('sending %s to %p', MESSAGE_TYPE_LOOKUP[msg.type], to)
/**
* Registrar notifies a connection successfully with dht protocol.
*
* @param {PeerId} peerId - remote peer id
*/
async _onPeerConnected (peerId) {
await this.dht._add(peerId)
this._log('added to the routing table: %s', peerId.toB58String())
}
try {
yield dialingPeerEvent({ peer: to })
/**
* Send a request and record RTT for latency measurements.
*
* @async
* @param {PeerId} to - The peer that should receive a message
* @param {Message} msg - The message to send.
*/
async sendRequest (to, msg) {
// TODO: record latency
if (!this.isConnected) {
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}
const { stream } = await this._dialer.dialProtocol(to, this._protocol, options)
const id = to.toB58String()
this._log('sending to: %s', id)
yield sendingQueryEvent({ to, type: msg.type })
let conn = this.dht.registrar.connectionManager.get(to)
if (!conn) {
conn = await this.dht.dialer.connectToPeer(to)
const response = await this._writeReadMessage(stream, msg.serialize(), options)
yield peerResponseEvent({
from: to,
messageType: response.type,
closer: response.closerPeers,
providers: response.providerPeers,
record: response.record
})
} catch (/** @type {any} */ err) {
yield queryErrorEvent({ from: to, error: err })
}
const { stream } = await conn.newStream(this.dht.protocol)
return this._writeReadMessage(stream, msg.serialize())
}

@@ -152,32 +115,37 @@

* @param {Message} msg
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
async sendMessage (to, msg) {
if (!this.isConnected) {
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}
async * sendMessage (to, msg, options = {}) {
this._log('sending %s to %p', MESSAGE_TYPE_LOOKUP[msg.type], to)
const id = to.toB58String()
this._log('sending to: %s', id)
yield dialingPeerEvent({ peer: to })
let conn = this.dht.registrar.connectionManager.get(to)
if (!conn) {
conn = await this.dht.dialer.connectToPeer(to)
const { stream } = await this._dialer.dialProtocol(to, this._protocol, options)
yield sendingQueryEvent({ to, type: msg.type })
try {
await this._writeMessage(stream, msg.serialize(), options)
yield peerResponseEvent({ from: to, messageType: msg.type })
} catch (/** @type {any} */ err) {
yield queryErrorEvent({ from: to, error: err })
}
const { stream } = await conn.newStream(this.dht.protocol)
return this._writeMessage(stream, msg.serialize())
}
/**
* Write a message and read its response.
* If no response is received after the specified timeout
* this will error out.
* Write a message to the given stream
*
* @param {MuxedStream} stream - the stream to use
* @param {Uint8Array} msg - the message to send
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
async _writeReadMessage (stream, msg) { // eslint-disable-line require-await
return pTimeout(
writeReadMessage(stream, msg),
this.readMessageTimeout
async _writeMessage (stream, msg, options = {}) {
await pipe(
[msg],
lp.encode(),
stream,
drain
)

@@ -187,46 +155,47 @@ }

/**
* Write a message to the given stream.
* Write a message and read its response.
* If no response is received after the specified timeout
* this will error out.
*
* @param {MuxedStream} stream - the stream to use
* @param {Uint8Array} msg - the message to send
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
_writeMessage (stream, msg) {
return pipe(
async _writeReadMessage (stream, msg, options = {}) {
const res = await pipe(
[msg],
lp.encode(),
stream,
consume
lp.decode(),
/**
* @param {AsyncIterable<Uint8Array>} source
*/
async source => {
const buf = await first(source)
if (buf) {
return buf.slice()
}
}
)
}
}
/**
* @param {MuxedStream} stream
* @param {Uint8Array} msg
*/
async function writeReadMessage (stream, msg) {
const res = await pipe(
[msg],
lp.encode(),
stream,
lp.decode(),
/**
* @param {AsyncIterable<Uint8Array>} source
*/
async source => {
const buf = await first(source)
if (buf) {
return buf.slice()
}
if (res.length === 0) {
throw errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED')
}
)
if (res.length === 0) {
throw errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED')
const message = Message.deserialize(res)
// tell any listeners about new peers we've seen
message.closerPeers.forEach(peerData => {
this.emit('peer', peerData)
})
message.providerPeers.forEach(peerData => {
this.emit('peer', peerData)
})
return message
}
return Message.deserialize(res)
}
module.exports = Network
module.exports.Network = Network

@@ -5,3 +5,2 @@ 'use strict'

* @typedef {import('peer-id')} PeerId
* @typedef {import('../').PeerData} PeerData
*/

@@ -14,3 +13,3 @@

constructor () {
/** @type {PeerData[]} */
/** @type {PeerId[]} */
this.list = []

@@ -22,7 +21,7 @@ }

*
* @param {PeerData} peerData
* @param {PeerId} peerId
*/
push (peerData) {
if (!this.has(peerData.id)) {
this.list.push(peerData)
push (peerId) {
if (!this.has(peerId)) {
this.list.push(peerId)

@@ -41,3 +40,3 @@ return true

has (peerId) {
const match = this.list.find((i) => i.id.equals(peerId))
const match = this.list.find((i) => i.equals(peerId))
return Boolean(match)

@@ -44,0 +43,0 @@ }

@@ -5,3 +5,2 @@ 'use strict'

const pMap = require('p-map')
const { equals: uint8ArrayEquals } = require('uint8arrays/equals')
const { compare: uint8ArrayCompare } = require('uint8arrays/compare')

@@ -12,3 +11,2 @@ const { xor: uint8ArrayXor } = require('uint8arrays/xor')

* @typedef {import('peer-id')} PeerId
* @typedef {import('../').PeerData} PeerData
*/

@@ -54,3 +52,3 @@

async add (peerId) {
if (this.peerDistances.find(pd => uint8ArrayEquals(pd.peerId.id, peerId.id))) {
if (this.peerDistances.find(pd => pd.peerId.equals(peerId))) {
return

@@ -57,0 +55,0 @@ }

'use strict'
const errcode = require('err-code')
const pTimeout = require('p-timeout')
const { validator } = require('libp2p-record')
const PeerId = require('peer-id')
const crypto = require('libp2p-crypto')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const c = require('../constants')
const Message = require('../message')
const Query = require('../query')
const { Message } = require('../message')
const utils = require('../utils')
const {
queryErrorEvent,
finalPeerEvent,
valueEvent
} = require('../query/events')
const PeerDistanceList = require('../peer-list/peer-distance-list')
const { Record } = require('libp2p-record')
/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('../types').PeerData} PeerData
*/
/**
* @param {import('../index')} dht
*/
module.exports = (dht) => {
class PeerRouting {
/**
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../types').PeerStore} params.peerStore
* @param {import('../network').Network} params.network
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {import('../query/manager').QueryManager} params.queryManager
* @param {boolean} params.lan
*/
constructor ({ peerId, routingTable, peerStore, network, validators, queryManager, lan }) {
this._peerId = peerId
this._routingTable = routingTable
this._peerStore = peerStore
this._network = network
this._validators = validators
this._queryManager = queryManager
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:peer-routing`)
}
/**
* Look if we are connected to a peer with the given id.

@@ -30,10 +49,18 @@ * Returns its id and addresses, if found, otherwise `undefined`.

*/
const findPeerLocal = async (peer) => {
dht._log(`findPeerLocal ${peer.toB58String()}`)
const p = await dht.routingTable.find(peer)
async findPeerLocal (peer) {
let peerData
const p = await this._routingTable.find(peer)
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */
const peerData = p && dht.peerStore.get(p)
if (p) {
this._log('findPeerLocal found %p in routing table', peer)
peerData = this._peerStore.get(p)
}
if (!peerData) {
peerData = this._peerStore.get(peer)
}
if (peerData) {
this._log('findPeerLocal found %p in peer store', peer)
return {

@@ -51,252 +78,243 @@ id: peerData.id,

* @param {Uint8Array} key
* @returns {Promise<Message>}
* @private
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
const getValueSingle = async (peer, key) => { // eslint-disable-line require-await
async * _getValueSingle (peer, key, options = {}) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_VALUE, key, 0)
return dht.network.sendRequest(peer, msg)
yield * this._network.sendRequest(peer, msg, options)
}
/**
* Find close peers for a given peer
*
* @param {Uint8Array} key
* @param {PeerId} peer
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>}
* @private
*/
const closerPeersSingle = async (key, peer) => {
dht._log(`closerPeersSingle ${uint8ArrayToString(key, 'base32')} from ${peer.toB58String()}`)
const msg = await dht.peerRouting._findPeerSingle(peer, new PeerId(key))
return msg.closerPeers
.filter((peerData) => !dht._isSelf(peerData.id))
.map((peerData) => {
dht.peerStore.addressBook.add(peerData.id, peerData.multiaddrs)
return peerData
})
}
/**
* Get the public key directly from a node.
*
* @param {PeerId} peer
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
const getPublicKeyFromNode = async (peer) => {
async * getPublicKeyFromNode (peer, options) {
const pkKey = utils.keyForPublicKey(peer)
const msg = await getValueSingle(peer, pkKey)
if (!msg.record || !msg.record.value) {
throw errcode(new Error(`Node not responding with its public key: ${peer.toB58String()}`), 'ERR_INVALID_RECORD')
}
for await (const event of this._getValueSingle(peer, pkKey, options)) {
yield event
const recPeer = await PeerId.createFromPubKey(msg.record.value)
if (event.name === 'PEER_RESPONSE' && event.record) {
const recPeer = await PeerId.createFromPubKey(event.record.value)
// compare hashes of the pub key
if (!recPeer.equals(peer)) {
throw errcode(new Error('public key does not match id'), 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID')
// compare hashes of the pub key
if (!recPeer.equals(peer)) {
throw errcode(new Error('public key does not match id'), 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID')
}
yield valueEvent({ from: peer, value: recPeer.pubKey.bytes })
}
}
return recPeer.pubKey
throw errcode(new Error(`Node not responding with its public key: ${peer.toB58String()}`), 'ERR_INVALID_RECORD')
}
return {
/**
* Ask peer `peer` if they know where the peer with id `target` is.
* Search for a peer with the given ID.
*
* @param {PeerId} peer
* @param {PeerId} target
* @returns {Promise<Message>}
* @private
* @param {PeerId} id
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
async _findPeerSingle (peer, target) { // eslint-disable-line require-await
dht._log('findPeerSingle %s', peer.toB58String())
const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0)
async * findPeer (id, options = {}) {
this._log('findPeer %p', id)
return dht.network.sendRequest(peer, msg)
},
// Try to find locally
const pi = await this.findPeerLocal(id)
/**
* Search for a peer with the given ID.
*
* @param {PeerId} id
* @param {Object} [options] - findPeer options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options = { timeout: 60000 }) {
options.timeout = options.timeout || c.minute
dht._log('findPeer %s', id.toB58String())
// already got it
if (pi != null) {
this._log('found local')
yield finalPeerEvent({
from: this._peerId,
peer: pi
})
return
}
// Try to find locally
const pi = await findPeerLocal(id)
const key = await utils.convertPeerId(id)
const peers = this._routingTable.closestPeers(key)
// already got it
if (pi != null) {
dht._log('found local')
return pi
}
// sanity check
const match = peers.find((p) => p.equals(id))
const key = await utils.convertPeerId(id)
const peers = dht.routingTable.closestPeers(key, dht.kBucketSize)
if (match) {
const peer = this._peerStore.get(id)
if (peers.length === 0) {
throw errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED')
}
// sanity check
const match = peers.find((p) => p.isEqual(id))
if (match) {
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */
const peer = dht.peerStore.get(id)
if (peer) {
dht._log('found in peerStore')
return {
if (peer) {
this._log('found in peerStore')
yield finalPeerEvent({
from: this._peerId,
peer: {
id: peer.id,
multiaddrs: peer.addresses.map((address) => address.multiaddr)
}
}
})
return
}
}
// query the network
const query = new Query(dht, id.id, () => {
/**
* There is no distinction between the disjoint paths, so there are no per-path
* variables in dht scope. Just return the actual query function.
*
* @param {PeerId} peer
*/
const queryFn = async (peer) => {
const msg = await this._findPeerSingle(peer, id)
const match = msg.closerPeers.find((p) => p.id.isEqual(id))
const self = this
// found it
/**
* @type {import('../query/types').QueryFunc}
*/
const findPeerQuery = async function * ({ peer, signal }) {
const request = new Message(Message.TYPES.FIND_NODE, id.toBytes(), 0)
for await (const event of self._network.sendRequest(peer, request, { signal })) {
yield event
if (event.name === 'PEER_RESPONSE') {
const match = event.closer.find((p) => p.id.equals(id))
// found the peer
if (match) {
return {
peer: match,
queryComplete: true
}
yield finalPeerEvent({ from: event.from, peer: match })
}
return {
closerPeers: msg.closerPeers
}
}
}
}
return queryFn
})
let foundPeer = false
let result
try {
result = await pTimeout(query.run(peers), options.timeout)
} finally {
query.stop()
for await (const event of this._queryManager.run(id.id, peers, findPeerQuery, options)) {
if (event.name === 'FINAL_PEER') {
foundPeer = true
}
let success = false
result.paths.forEach((result) => {
if (result.success && result.peer) {
success = true
dht.peerStore.addressBook.add(result.peer.id, result.peer.multiaddrs)
}
})
dht._log('findPeer %s: %s', id.toB58String(), success)
yield event
}
if (!success) {
throw errcode(new Error('No peer found'), 'ERR_NOT_FOUND')
}
if (!foundPeer) {
yield queryErrorEvent({ from: this._peerId, error: errcode(new Error('Not found'), 'ERR_NOT_FOUND') })
}
}
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */
const peerData = dht.peerStore.get(id)
/**
* Kademlia 'node lookup' operation
*
* @param {Uint8Array} key - the key to look up, could be a the bytes from a multihash or a peer ID
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {number} [options.queryFuncTimeout]
*/
async * getClosestPeers (key, options = {}) {
this._log('getClosestPeers to %b', key)
const id = await utils.convertBuffer(key)
const tablePeers = this._routingTable.closestPeers(id)
const self = this
if (!peerData) {
throw errcode(new Error('No peer found in peer store'), 'ERR_NOT_FOUND')
}
const peers = new PeerDistanceList(id, this._routingTable._kBucketSize)
tablePeers.forEach(peer => peers.add(peer))
return {
id: peerData.id,
multiaddrs: peerData.addresses.map((address) => address.multiaddr)
}
},
/**
* Kademlia 'node lookup' operation.
*
* @param {Uint8Array} key
* @param {Object} [options]
* @param {boolean} [options.shallow=false] - shallow query
* @returns {AsyncIterable<PeerId>}
* @type {import('../query/types').QueryFunc}
*/
async * getClosestPeers (key, options = { shallow: false }) {
dht._log('getClosestPeers to %b', key)
const getCloserPeersQuery = async function * ({ peer, signal }) {
self._log('closerPeersSingle %s from %p', uint8ArrayToString(key, 'base32'), peer)
const request = new Message(Message.TYPES.FIND_NODE, key, 0)
const id = await utils.convertBuffer(key)
const tablePeers = dht.routingTable.closestPeers(id, dht.kBucketSize)
yield * self._network.sendRequest(peer, request, { signal })
}
const q = new Query(dht, key, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in dht scope.
// Just return the actual query function.
return async (peer) => {
const closer = await closerPeersSingle(key, peer)
for await (const event of this._queryManager.run(key, tablePeers, getCloserPeersQuery, options)) {
yield event
return {
closerPeers: closer,
pathComplete: options.shallow ? true : undefined
}
}
})
const res = await q.run(tablePeers)
if (!res || !res.finalSet) {
return []
if (event.name === 'PEER_RESPONSE') {
event.closer.forEach(peerData => {
peers.add(peerData.id)
})
}
}
const sorted = await utils.sortClosestPeers(Array.from(res.finalSet), id)
this._log('found %d peers close to %b', peers.length, key)
for (const pId of sorted.slice(0, dht.kBucketSize)) {
yield pId
yield * peers.peers.map(peer => finalPeerEvent({
from: this._peerId,
peer: {
id: peer,
multiaddrs: (this._peerStore.addressBook.get(peer) || []).map(addr => addr.multiaddr)
}
},
}))
}
/**
* Get the public key for the given peer id.
*
* @param {PeerId} peer
*/
async getPublicKey (peer) {
dht._log('getPublicKey %s', peer.toB58String())
/**
* Query a particular peer for the value for the given key.
* It will either return the value or a list of closer peers.
*
* Note: The peerStore is updated with new addresses found for the given peer.
*
* @param {PeerId} peer
* @param {Uint8Array} key
* @param {object} [options]
* @param {AbortSignal} [options.signal]
*/
async * getValueOrPeers (peer, key, options = {}) {
for await (const event of this._getValueSingle(peer, key, options)) {
if (event.name === 'PEER_RESPONSE') {
if (event.record) {
// We have a record
try {
await this._verifyRecordOnline(event.record)
} catch (/** @type {any} */ err) {
const errMsg = 'invalid record received, discarded'
this._log(errMsg)
// local check
/** @type {{ id: PeerId, addresses: { multiaddr: Multiaddr }[] }} */
const peerData = dht.peerStore.get(peer)
if (peerData && peerData.id.pubKey) {
dht._log('getPublicKey: found local copy')
return peerData.id.pubKey
yield queryErrorEvent({ from: event.from, error: errcode(new Error(errMsg), 'ERR_INVALID_RECORD') })
continue
}
}
}
// try the node directly
let pk
yield event
}
}
try {
pk = await getPublicKeyFromNode(peer)
} catch (err) {
// try dht directly
const pkKey = utils.keyForPublicKey(peer)
const value = await dht.get(pkKey)
pk = crypto.keys.unmarshalPublicKey(value)
}
/**
* Verify a record, fetching missing public keys from the network.
* Calls back with an error if the record is invalid.
*
* @param {import('../types').DHTRecord} record
* @returns {Promise<void>}
*/
async _verifyRecordOnline ({ key, value, timeReceived }) {
await validator.verifyRecord(this._validators, new Record(key, value, timeReceived))
}
const peerId = new PeerId(peer.id, undefined, pk)
const addrs = ((peerData && peerData.addresses) || []).map((address) => address.multiaddr)
dht.peerStore.addressBook.add(peerId, addrs)
dht.peerStore.keyBook.set(peerId, pk)
/**
* Get the nearest peers to the given query, but if closer
* than self
*
* @param {Uint8Array} key
* @param {PeerId} closerThan
*/
async getCloserPeersOffline (key, closerThan) {
const id = await utils.convertBuffer(key)
const ids = this._routingTable.closestPeers(id)
const output = ids
.map((p) => {
const peer = this._peerStore.get(p)
return pk
return {
id: p,
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : []
}
})
.filter((closer) => !closer.id.equals(closerThan))
if (output.length) {
this._log('getCloserPeersOffline found %d peer(s) closer to %b than %p', output.length, key, closerThan)
} else {
this._log('getCloserPeersOffline could not find peer closer to %b than %p', key, closerThan)
}
return output
}
}
module.exports.PeerRouting = PeerRouting

@@ -9,5 +9,13 @@ 'use strict'

const { default: Queue } = require('p-queue')
const c = require('./constants')
const {
PROVIDERS_CLEANUP_INTERVAL,
PROVIDERS_VALIDITY,
PROVIDERS_LRU_CACHE_SIZE,
PROVIDERS_KEY_PREFIX
} = require('./constants')
const utils = require('./utils')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const log = utils.logger('libp2p:kad-dht:providers')
/**

@@ -33,10 +41,7 @@ * @typedef {import('multiformats/cid').CID} CID

* @param {Datastore} datastore
* @param {PeerId} [self]
* @param {number} [cacheSize=256]
*/
constructor (datastore, self, cacheSize) {
constructor (datastore, cacheSize) {
this.datastore = datastore
this._log = utils.logger(self, 'providers')
/**

@@ -47,3 +52,3 @@ * How often invalid records are cleaned. (in seconds)

*/
this.cleanupInterval = c.PROVIDERS_CLEANUP_INTERVAL
this.cleanupInterval = PROVIDERS_CLEANUP_INTERVAL

@@ -55,3 +60,3 @@ /**

*/
this.provideValidity = c.PROVIDERS_VALIDITY
this.provideValidity = PROVIDERS_VALIDITY

@@ -63,3 +68,3 @@ /**

*/
this.lruCacheSize = cacheSize || c.PROVIDERS_LRU_CACHE_SIZE
this.lruCacheSize = cacheSize || PROVIDERS_LRU_CACHE_SIZE

@@ -108,3 +113,2 @@ // @ts-ignore hashlru types are wrong

return this.syncQueue.add(async () => {
this._log('start cleanup')
const start = Date.now()

@@ -118,3 +122,4 @@

// Get all provider entries from the datastore
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX })
const query = this.datastore.query({ prefix: PROVIDERS_KEY_PREFIX })
for await (const entry of query) {

@@ -128,4 +133,5 @@ try {

const expired = delta > this.provideValidity
this._log('comparing: %d - %d = %d > %d %s',
now, time, delta, this.provideValidity, expired ? '(expired)' : '')
log('comparing: %d - %d = %d > %d %s', now, time, delta, this.provideValidity, expired ? '(expired)' : '')
if (expired) {

@@ -139,11 +145,13 @@ deleteCount++

count++
} catch (err) {
this._log.error(err.message)
} catch (/** @type {any} */ err) {
log.error(err.message)
}
}
this._log('deleting %d / %d entries', deleteCount, count)
// Commit the deletes to the datastore
if (deleted.size) {
log('deleting %d / %d entries', deleteCount, count)
await batch.commit()
} else {
log('nothing to delete')
}

@@ -155,2 +163,3 @@

const provs = this.providers.get(key)
if (provs) {

@@ -160,2 +169,3 @@ for (const peerId of peers) {

}
if (provs.size === 0) {

@@ -169,3 +179,3 @@ this.providers.remove(key)

this._log('Cleanup successful (%dms)', Date.now() - start)
log('Cleanup successful (%dms)', Date.now() - start)
})

@@ -185,2 +195,3 @@ }

let provs = this.providers.get(cacheKey)
if (!provs) {

@@ -190,2 +201,3 @@ provs = await loadProviders(this.datastore, cid)

}
return provs

@@ -203,11 +215,12 @@ }

return this.syncQueue.add(async () => {
this._log('addProvider %s', cid.toString())
log('addProvider %s', cid.toString())
const provs = await this._getProvidersMap(cid)
this._log('loaded %s provs', provs.size)
log('loaded %s provs', provs.size)
const now = new Date()
provs.set(utils.encodeBase32(provider.id), now)
provs.set(provider.toString(), now)
const dsKey = makeProviderKey(cid)
this.providers.set(dsKey, provs)
return writeProviderEntry(this.datastore, cid, provider, now)

@@ -225,6 +238,7 @@ })

return this.syncQueue.add(async () => {
this._log('getProviders %s', cid.toString())
log('getProviders %s', cid.toString())
const provs = await this._getProvidersMap(cid)
return [...provs.keys()].map((base32PeerId) => {
return new PeerId(utils.decodeBase32(base32PeerId))
return [...provs.keys()].map(peerIdStr => {
return PeerId.parse(peerIdStr)
})

@@ -244,4 +258,5 @@ })

function makeProviderKey (cid) {
cid = typeof cid === 'string' ? cid : utils.encodeBase32(cid.bytes)
return c.PROVIDERS_KEY_PREFIX + cid
cid = typeof cid === 'string' ? cid : uint8ArrayToString(cid.multihash.bytes, 'base32')
return PROVIDERS_KEY_PREFIX + cid
}

@@ -261,3 +276,3 @@

'/',
utils.encodeBase32(peer.id)
peer.toString()
].join('')

@@ -267,2 +282,3 @@

const buffer = Uint8Array.from(varint.encode(time.getTime()))
return store.put(key, buffer)

@@ -278,2 +294,3 @@ }

const parts = key.toString().split('/')
if (parts.length !== 4) {

@@ -301,2 +318,3 @@ throw new Error('incorrectly formatted provider entry key in datastore: ' + key)

const query = store.query({ prefix: makeProviderKey(cid) })
for await (const entry of query) {

@@ -306,2 +324,3 @@ const { peerId } = parseProviderKey(entry.key)

}
return providers

@@ -317,2 +336,2 @@ }

module.exports = Providers
module.exports.Providers = Providers
'use strict'
// @ts-ignore
// @ts-expect-error no types
const KBuck = require('k-bucket')
const { xor: uint8ArrayXor } = require('uint8arrays/xor')
const GENERATED_PREFIXES = require('./generated-prefix-list.json')
const { sha256 } = require('multiformats/hashes/sha2')
const crypto = require('libp2p-crypto')
const PeerId = require('peer-id')
const utils = require('../utils')
const debug = require('debug')
const log = Object.assign(debug('libp2p:dht:routing-table'), {
error: debug('libp2p:dht:routing-table:error')
})
// @ts-ignore
const length = require('it-length')
const { default: Queue } = require('p-queue')
const { PROTOCOL_DHT } = require('../constants')
const { TimeoutController } = require('timeout-abort-controller')
/**
* @typedef {object} KBucketPeer
* @property {Uint8Array} id
* @property {PeerId} peer
*
* @typedef {object} KBucket
* @property {Uint8Array} id
* @property {KBucketPeer[]} contacts
* @property {boolean} dontSplit
* @property {KBucket} left
* @property {KBucket} right
*
* @typedef {object} KBucketTree
* @property {KBucket} root
* @property {Uint8Array} localNodeId
* @property {(event: string, callback: Function) => void} on
* @property {(key: Uint8Array, count: number) => KBucketPeer[]} closest
* @property {(key: Uint8Array) => KBucketPeer} closestPeer
* @property {(key: Uint8Array) => void} remove
* @property {(peer: KBucketPeer) => void} add
* @property {() => number} count
* @property {() => Iterable<KBucket>} toIterable
* @typedef {import('./types').KBucketPeer} KBucketPeer
* @typedef {import('./types').KBucket} KBucket
* @typedef {import('./types').KBucketTree} KBucketTree
* @typedef {import('peer-id')} PeerId
*/
/**
* Cannot generate random KadIds longer than this + 1
*/
const MAX_COMMON_PREFIX_LENGTH = 15
/**
* A wrapper around `k-bucket`, to provide easy store and

@@ -53,12 +23,15 @@ * retrieval for peers.

/**
* @param {import('../')} dht
* @param {object} [options]
* @param {number} [options.kBucketSize=20]
* @param {number} [options.refreshInterval=30000]
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../types').Dialer} params.dialer
* @param {boolean} params.lan
* @param {number} [params.kBucketSize=20]
* @param {number} [params.pingTimeout=10000]
*/
constructor (dht, { kBucketSize, refreshInterval } = {}) {
this.peerId = dht.peerId
this.dht = dht
constructor ({ peerId, dialer, kBucketSize, pingTimeout, lan }) {
this._log = utils.logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table`)
this._peerId = peerId
this._dialer = dialer
this._kBucketSize = kBucketSize || 20
this._refreshInterval = refreshInterval || 30000
this._pingTimeout = pingTimeout || 10000

@@ -74,249 +47,70 @@ /** @type {KBucketTree} */

this._refreshTable = this._refreshTable.bind(this)
this._onPing = this._onPing.bind(this)
this._pingQueue = new Queue({ concurrency: 1 })
}
async start () {
this.kb.localNodeId = await utils.convertPeerId(this.peerId)
this.kb.localNodeId = await utils.convertPeerId(this._peerId)
this.kb.on('ping', this._onPing)
await this._refreshTable(true)
}
async stop () {
if (this._refreshTimeoutId) {
clearTimeout(this._refreshTimeoutId)
}
this._pingQueue.clear()
}
/**
* To speed lookups, we seed the table with random PeerIds. This means
* when we are asked to locate a peer on the network, we can find a KadId
* that is close to the requested peer ID and query that, then network
* peers will tell us who they know who is close to the fake ID
* Called on the `ping` event from `k-bucket` when a bucket is full
* and cannot split.
*
* @param {boolean} [force=false]
* `oldContacts.length` is defined by the `numberOfNodesToPing` param
* passed to the `k-bucket` constructor.
*
* `oldContacts` will not be empty and is the list of contacts that
* have not been contacted for the longest.
*
* @param {KBucketPeer[]} oldContacts
* @param {KBucketPeer} newContact
*/
async _refreshTable (force) {
log('refreshing routing table')
_onPing (oldContacts, newContact) {
// add to a queue so multiple ping requests do not overlap and we don't
// flood the network with ping requests if lots of newContact requests
// are received
this._pingQueue.add(async () => {
let responded = 0
const prefixLength = this._maxCommonPrefix()
const refreshCpls = this._getTrackedCommonPrefixLengthsForRefresh(prefixLength)
try {
await Promise.all(
oldContacts.map(async oldContact => {
let timeoutController
log(`max common prefix length ${prefixLength}`)
log(`tracked CPLs [ ${refreshCpls.map(date => `${date.getFullYear()}-${(date.getMonth() + 1).toString().padStart(2, '0')}-${date.getDate().toString().padStart(2, '0')} ${date.getHours().toString().padStart(2, '0')}:${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')}`).join(', ')} ]`)
/**
* If we see a gap at a common prefix length in the Routing table, we ONLY refresh up until
* the maximum cpl we have in the Routing Table OR (2 * (Cpl+ 1) with the gap), whichever
* is smaller.
*
* This is to prevent refreshes for Cpls that have no peers in the network but happen to be
* before a very high max Cpl for which we do have peers in the network.
*
* The number of 2 * (Cpl + 1) can be proved and a proof would have been written here if
* the programmer had paid more attention in the Math classes at university.
*
* So, please be patient and a doc explaining it will be published soon.
*
* https://github.com/libp2p/go-libp2p-kad-dht/commit/2851c88acb0a3f86bcfe3cfd0f4604a03db801d8#diff-ad45f4ba97ffbc4083c2eb87a4420c1157057b233f048030d67c6b551855ccf6R219
*/
await Promise.all(
refreshCpls.map(async (lastRefresh, index) => {
try {
await this._refreshCommonPrefixLength(index, lastRefresh, force === true)
if (this._numPeersForCpl(prefixLength) === 0) {
const lastCpl = Math.min(2 * (index + 1), refreshCpls.length - 1)
for (let n = index + 1; n < lastCpl + 1; n++) {
try {
await this._refreshCommonPrefixLength(n, lastRefresh, force === true)
} catch (err) {
log.error(err)
try {
timeoutController = new TimeoutController(this._pingTimeout)
this._log(`Pinging old contact ${oldContact.peer}`)
const { stream } = await this._dialer.dialProtocol(oldContact.peer, PROTOCOL_DHT, {
signal: timeoutController.signal
})
await stream.close()
responded++
} catch (err) {
this._log.error('Could not ping peer %p', oldContact.peer, err)
this._log(`Evicting old contact after ping failed ${oldContact.peer}`)
this.kb.remove(oldContact.id)
} finally {
if (timeoutController) {
timeoutController.clear()
}
}
}
} catch (err) {
log.error(err)
}
})
)
})
)
this._refreshTimeoutId = setTimeout(this._refreshTable, this._refreshInterval)
// @ts-ignore
this._refreshTimeoutId.unref()
}
/**
* @param {number} cpl
* @param {Date} lastRefresh
* @param {boolean} force
*/
async _refreshCommonPrefixLength (cpl, lastRefresh, force) {
if (!force && lastRefresh.getTime() > (Date.now() - this._refreshInterval)) {
log(`not running refresh for cpl ${cpl} as time since last refresh not above interval`)
return
}
// gen a key for the query to refresh the cpl
const peerId = await this._generateRandomPeerId(cpl)
log(`starting refreshing cpl ${cpl} with key ${peerId.toB58String()} (routing table size was ${this.kb.count()})`)
const peers = await length(this.dht.getClosestPeers(peerId.toBytes(), {}))
log(`found ${peers} peers that were close to imaginary peer ${peerId.toB58String()}`)
log(`finished refreshing cpl ${cpl} with key ${peerId.toB58String()} (routing table size was ${this.kb.count()})`)
}
/**
* @param {number} maxCommonPrefix
*/
_getTrackedCommonPrefixLengthsForRefresh (maxCommonPrefix) {
if (maxCommonPrefix > MAX_COMMON_PREFIX_LENGTH) {
maxCommonPrefix = MAX_COMMON_PREFIX_LENGTH
}
const dates = []
for (let i = 0; i <= maxCommonPrefix; i++) {
// defaults to the zero value if we haven't refreshed it yet.
dates[i] = this.commonPrefixLengthRefreshedAt[i] || new Date()
}
return dates
}
/**
*
* @param {number} targetCommonPrefixLength
*/
async _generateRandomPeerId (targetCommonPrefixLength) {
const randomBytes = crypto.randomBytes(2)
const randomUint16 = (randomBytes[1] << 8) + randomBytes[0]
const key = await this._makePeerId(this.kb.localNodeId, randomUint16, targetCommonPrefixLength)
return PeerId.createFromBytes(key)
}
/**
* @param {Uint8Array} localKadId
* @param {number} randomPrefix
* @param {number} targetCommonPrefixLength
*/
async _makePeerId (localKadId, randomPrefix, targetCommonPrefixLength) {
if (targetCommonPrefixLength > MAX_COMMON_PREFIX_LENGTH) {
throw new Error(`Cannot generate peer ID for common prefix length greater than ${MAX_COMMON_PREFIX_LENGTH}`)
}
const view = new DataView(localKadId.buffer, localKadId.byteOffset, localKadId.byteLength)
const localPrefix = view.getUint16(0, false)
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
const toggledLocalPrefix = localPrefix ^ (0x8000 >> targetCommonPrefixLength)
// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCommonPrefixLength` bits match the local ID.
const mask = 65535 << (16 - (targetCommonPrefixLength + 1))
const targetPrefix = (toggledLocalPrefix & mask) | (randomPrefix & ~mask)
// Convert to a known peer ID.
const keyPrefix = GENERATED_PREFIXES[targetPrefix]
const keyBuffer = new ArrayBuffer(34)
const keyView = new DataView(keyBuffer, 0, keyBuffer.byteLength)
keyView.setUint8(0, sha256.code)
keyView.setUint8(1, 32)
keyView.setUint32(2, keyPrefix, false)
return new Uint8Array(keyView.buffer, keyView.byteOffset, keyView.byteLength)
}
/**
* returns the maximum common prefix length between any peer in the table
* and the current peer
*/
_maxCommonPrefix () {
if (!this.kb.localNodeId) {
return 0
}
// xor our KadId with every KadId in the k-bucket tree,
// return the longest id prefix that is the same
let prefixLength = 0
for (const length of this._prefixLengths()) {
if (length > prefixLength) {
prefixLength = length
}
}
return prefixLength
}
/**
* Returns the number of peers in the table with a given prefix length
*
* @param {number} prefixLength
*/
_numPeersForCpl (prefixLength) {
let count = 0
for (const length of this._prefixLengths()) {
if (length === prefixLength) {
count++
}
}
return count
}
/**
* Yields the common prefix length of every peer in the table
*/
* _prefixLengths () {
for (const { id } of this.kb.toIterable()) {
const distance = uint8ArrayXor(this.kb.localNodeId, id)
let leadingZeros = 0
for (const byte of distance) {
if (byte === 0) {
leadingZeros++
} else {
break
if (responded < oldContacts.length) {
this._log(`Adding new contact ${newContact.peer}`)
this.kb.add(newContact)
}
} catch (err) {
this._log.error('Could not process k-bucket ping event', err)
}
yield leadingZeros
}
})
}
/**
* Called on the `ping` event from `k-bucket`.
* Currently this just removes the oldest contact from
* the list, without actually pinging the individual peers.
* This is the same as go does, but should probably
* be upgraded to actually ping the individual peers.
*
* @param {KBucketPeer[]} oldContacts
* @param {KBucketPeer} newContact
*/
_onPing (oldContacts, newContact) {
// just use the first one (k-bucket sorts from oldest to newest)
const oldest = oldContacts[0]
if (oldest) {
// remove the oldest one
this.kb.remove(oldest.id)
}
// add the new one
this.kb.add(newContact)
}
// -- Public Interface

@@ -362,5 +156,5 @@

* @param {Uint8Array} key
* @param {number} count
* @param {number} [count] - defaults to kBucketSize
*/
closestPeers (key, count) {
closestPeers (key, count = this._kBucketSize) {
const closest = this.kb.closest(key, count)

@@ -394,2 +188,2 @@

module.exports = RoutingTable
module.exports.RoutingTable = RoutingTable

@@ -5,22 +5,32 @@ 'use strict'

const errcode = require('err-code')
const utils = require('../../utils')
const log = utils.logger('libp2p:kad-dht:rpc:handlers:add-provider')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message')} Message
* @typedef {import('../../message').Message} Message
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @param {import('../../index')} dht
* @implements {DHTMessageHandler}
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:add-provider')
class AddProviderHandler {
/**
* Process `AddProvider` DHT messages.
*
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../providers').Providers} params.providers
* @param {import('../../types').PeerStore} params.peerStore
*/
constructor ({ peerId, providers, peerStore }) {
this._peerId = peerId
this._providers = providers
this._peerStore = peerStore
}
/**
* @param {PeerId} peerId
* @param {Message} msg
*/
async function addProvider (peerId, msg) { // eslint-disable-line require-await
async handle (peerId, msg) {
log('start')

@@ -35,4 +45,5 @@

try {
// this is actually just the multihash, not the whole CID
cid = CID.decode(msg.key)
} catch (err) {
} catch (/** @type {any} */ err) {
const errMsg = `Invalid CID: ${err.message}`

@@ -42,33 +53,34 @@ throw errcode(new Error(errMsg), 'ERR_INVALID_CID')

msg.providerPeers.forEach((pi) => {
// Ignore providers not from the originator
if (!pi.id.isEqual(peerId)) {
log('invalid provider peer %s from %s', pi.id.toB58String(), peerId.toB58String())
return
}
if (!msg.providerPeers || !msg.providerPeers.length) {
log.error('no providers found in message')
}
if (pi.multiaddrs.length < 1) {
log('no valid addresses for provider %s. Ignore', peerId.toB58String())
return
}
await Promise.all(
msg.providerPeers.map(async (pi) => {
// Ignore providers not from the originator
if (!pi.id.equals(peerId)) {
log('invalid provider peer %p from %p', pi.id, peerId)
return
}
log('received provider %s for %s (addrs %s)', peerId.toB58String(), cid.toString(), pi.multiaddrs.map((m) => m.toString()))
if (pi.multiaddrs.length < 1) {
log('no valid addresses for provider %p. Ignore', peerId)
return
}
if (!dht._isSelf(pi.id)) {
// Add known address to peer store
dht.peerStore.addressBook.add(pi.id, pi.multiaddrs)
return dht.providers.addProvider(cid, pi.id)
}
})
log('received provider %p for %s (addrs %s)', peerId, cid, pi.multiaddrs.map((m) => m.toString()))
// Previous versions of the JS DHT sent erroneous providers in the
// `providerPeers` field. In order to accommodate older clients that have
// this bug, we fall back to assuming the originator is the provider if
// we can't find any valid providers in the payload.
// https://github.com/libp2p/js-libp2p-kad-dht/pull/127
// https://github.com/libp2p/js-libp2p-kad-dht/issues/128
return dht.providers.addProvider(cid, peerId)
if (!this._peerId.equals(pi.id)) {
// Add known address to peer store
this._peerStore.addressBook.add(pi.id, pi.multiaddrs)
await this._providers.addProvider(cid, pi.id)
}
})
)
// typescript requires a return value
return undefined
}
}
return addProvider
}
module.exports.AddProviderHandler = AddProviderHandler
'use strict'
const { equals: uint8ArrayEquals } = require('uint8arrays/equals')
const Message = require('../../message')
const { Message } = require('../../message')
const utils = require('../../utils')
const log = utils.logger('libp2p:kad-dht:rpc:handlers:find-node')
const {
removePrivateAddresses,
removePublicAddresses
} = require('../../utils')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @param {import('../../index')} dht
* @implements {DHTMessageHandler}
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:find-node')
class FindNodeHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../types').Addressable} params.addressable
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {boolean} [params.lan]
*/
constructor ({ peerId, addressable, peerRouting, lan }) {
this._peerId = peerId
this._addressable = addressable
this._peerRouting = peerRouting
this._lan = Boolean(lan)
}
/**
* Process `FindNode` DHT messages.
* Process `FindNode` DHT messages
*

@@ -24,15 +40,19 @@ * @param {PeerId} peerId

*/
async function findNode (peerId, msg) {
log('start')
async handle (peerId, msg) {
log('incoming request from %p for peers closer to %b', peerId, msg.key)
let closer
if (uint8ArrayEquals(msg.key, dht.peerId.id)) {
if (this._peerId.equals(msg.key)) {
closer = [{
id: dht.peerId,
multiaddrs: dht.libp2p.multiaddrs
id: this._peerId,
multiaddrs: this._addressable.multiaddrs
}]
} else {
closer = await dht._betterPeersToQuery(msg, peerId)
closer = await this._peerRouting.getCloserPeersOffline(msg.key, peerId)
}
closer = closer
.map(this._lan ? removePublicAddresses : removePrivateAddresses)
.filter(({ multiaddrs }) => multiaddrs.length)
const response = new Message(msg.type, new Uint8Array(0), msg.clusterLevel)

@@ -43,3 +63,3 @@

} else {
log('handle FindNode %s: could not find anything', peerId.toB58String())
log('could not find any peers closer to %p', peerId)
}

@@ -49,4 +69,4 @@

}
}
return findNode
}
module.exports.FindNodeHandler = FindNodeHandler

@@ -5,15 +5,38 @@ 'use strict'

const errcode = require('err-code')
const Message = require('../../message')
const { Message } = require('../../message')
const utils = require('../../utils')
const log = utils.logger('libp2p:kad-dht:rpc:handlers:get-providers')
const {
removePrivateAddresses,
removePublicAddresses
} = require('../../utils')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @param {import('../../index')} dht
* @implements {DHTMessageHandler}
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:get-providers')
class GetProvidersHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('../../providers').Providers} params.providers
* @param {import('interface-datastore').Datastore} params.datastore
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('../../types').Addressable} params.addressable
* @param {boolean} [params.lan]
*/
constructor ({ peerId, peerRouting, providers, datastore, peerStore, addressable, lan }) {
this._peerId = peerId
this._peerRouting = peerRouting
this._providers = providers
this._datastore = datastore
this._peerStore = peerStore
this._addressable = addressable
this._lan = Boolean(lan)
}

@@ -26,33 +49,46 @@ /**

*/
async function getProviders (peerId, msg) {
async handle (peerId, msg) {
let cid
try {
cid = CID.decode(msg.key)
} catch (err) {
} catch (/** @type {any} */ err) {
throw errcode(new Error(`Invalid CID: ${err.message}`), 'ERR_INVALID_CID')
}
log('%s', cid.toString())
log('%p asking for providers for %s', peerId, cid.toString())
const dsKey = utils.bufferToKey(cid.bytes)
const [has, peers, closer] = await Promise.all([
dht.datastore.has(dsKey),
dht.providers.getProviders(cid),
dht._betterPeersToQuery(msg, peerId)
this._datastore.has(dsKey),
this._providers.getProviders(cid),
this._peerRouting.getCloserPeersOffline(msg.key, peerId)
])
const providerPeers = peers.map((peerId) => ({
id: peerId,
multiaddrs: []
}))
const closerPeers = closer.map((c) => ({
id: c.id,
multiaddrs: []
}))
const providerPeers = peers
.map((provider) => ({
id: provider,
multiaddrs: (this._peerStore.addressBook.get(provider) || []).map(address => address.multiaddr)
}))
.map(this._lan ? removePublicAddresses : removePrivateAddresses)
.filter(({ multiaddrs }) => multiaddrs.length)
const closerPeers = closer
.map((closer) => ({
id: closer.id,
multiaddrs: (this._peerStore.addressBook.get(closer.id) || []).map(address => address.multiaddr)
}))
.map(this._lan ? removePublicAddresses : removePrivateAddresses)
.filter(({ multiaddrs }) => multiaddrs.length)
if (has) {
providerPeers.push({
id: dht.peerId,
multiaddrs: []
const mapper = this._lan ? removePublicAddresses : removePrivateAddresses
const ourRecord = mapper({
id: this._peerId,
multiaddrs: this._addressable.multiaddrs
})
if (ourRecord.multiaddrs.length) {
providerPeers.push(ourRecord)
}
}

@@ -73,4 +109,4 @@

}
}
return getProviders
}
module.exports.GetProvidersHandler = GetProvidersHandler
'use strict'
const { Record } = require('libp2p-record')
const errcode = require('err-code')
const Message = require('../../message')
const { Message } = require('../../message')
const {
MAX_RECORD_AGE
} = require('../../constants')
const utils = require('../../utils')
const log = utils.logger('libp2p:kad-dht:rpc:handlers:get-value')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @param {import('../../index')} dht
* @implements {DHTMessageHandler}
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:get-value')
class GetValueHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.datastore
*/
constructor ({ peerId, peerStore, peerRouting, datastore }) {
this._peerId = peerId
this._peerStore = peerStore
this._peerRouting = peerRouting
this._datastore = datastore
}

@@ -25,8 +41,7 @@ /**

* @param {Message} msg
* @returns {Promise<Message>}
*/
async function getValue (peerId, msg) {
async handle (peerId, msg) {
const key = msg.key
log('key: %b', key)
log('%p asked for key %b', peerId, key)

@@ -44,6 +59,6 @@ if (!key || key.length === 0) {

if (dht._isSelf(idFromKey)) {
id = dht.peerId
if (this._peerId.equals(idFromKey)) {
id = this._peerId
} else {
const peerData = dht.peerStore.get(idFromKey)
const peerData = this._peerStore.get(idFromKey)
id = peerData && peerData.id

@@ -60,8 +75,8 @@ }

const [record, closer] = await Promise.all([
dht._checkLocalDatastore(key),
dht._betterPeersToQuery(msg, peerId)
this._checkLocalDatastore(key),
this._peerRouting.getCloserPeersOffline(msg.key, peerId)
])
if (record) {
log('got record')
log('had record for %b in local datastore', key)
response.record = record

@@ -71,3 +86,3 @@ }

if (closer.length > 0) {
log('got closer %s', closer.length)
log('had %s closer peers in routing table', closer.length)
response.closerPeers = closer

@@ -79,3 +94,45 @@ }

return getValue
/**
* Try to fetch a given record by from the local datastore.
* Returns the record iff it is still valid, meaning
* - it was either authored by this node, or
* - it was received less than `MAX_RECORD_AGE` ago.
*
* @param {Uint8Array} key
*/
async _checkLocalDatastore (key) {
log('checkLocalDatastore looking for %b', key)
const dsKey = utils.bufferToKey(key)
// Fetch value from ds
let rawRecord
try {
rawRecord = await this._datastore.get(dsKey)
} catch (/** @type {any} */ err) {
if (err.code === 'ERR_NOT_FOUND') {
return undefined
}
throw err
}
// Create record from the returned bytes
const record = Record.deserialize(rawRecord)
if (!record) {
throw errcode(new Error('Invalid record'), 'ERR_INVALID_RECORD')
}
// Check validity: compare time received with max record age
if (record.timeReceived == null ||
Date.now() - record.timeReceived.getTime() > MAX_RECORD_AGE) {
// If record is bad delete it and return
await this._datastore.delete(dsKey)
return undefined
}
// Record is valid
return record
}
}
module.exports.GetValueHandler = GetValueHandler
'use strict'
const T = require('../../message').TYPES
const { Message } = require('../../message')
const { AddProviderHandler } = require('./add-provider')
const { FindNodeHandler } = require('./find-node')
const { GetProvidersHandler } = require('./get-providers')
const { GetValueHandler } = require('./get-value')
const { PingHandler } = require('./ping')
const { PutValueHandler } = require('./put-value')
/**
*
* @param {import('../../index')} dht
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
module.exports = (dht) => {
/**
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../../providers').Providers} params.providers
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('../../types').Addressable} params.addressable
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.datastore
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {boolean} [params.lan]
*/
module.exports = ({ peerId, providers, peerStore, addressable, peerRouting, datastore, validators, lan }) => {
/** @type {Record<number, DHTMessageHandler>} */
const handlers = {
[T.GET_VALUE]: require('./get-value')(dht),
[T.PUT_VALUE]: require('./put-value')(dht),
[T.FIND_NODE]: require('./find-node')(dht),
[T.ADD_PROVIDER]: require('./add-provider')(dht),
[T.GET_PROVIDERS]: require('./get-providers')(dht),
[T.PING]: require('./ping')(dht)
[Message.TYPES.GET_VALUE]: new GetValueHandler({ peerId, peerStore, peerRouting, datastore }),
[Message.TYPES.PUT_VALUE]: new PutValueHandler({ validators, datastore }),
[Message.TYPES.FIND_NODE]: new FindNodeHandler({ peerId, addressable, peerRouting, lan }),
[Message.TYPES.ADD_PROVIDER]: new AddProviderHandler({ peerId, providers, peerStore }),
[Message.TYPES.GET_PROVIDERS]: new GetProvidersHandler({ peerId, peerRouting, providers, datastore, peerStore, addressable, lan }),
[Message.TYPES.PING]: new PingHandler()
}

@@ -25,3 +43,2 @@

function getMessageHandler (type) {
// @ts-ignore ts does not aknowledge number as an index type
return handlers[type]

@@ -28,0 +45,0 @@ }

'use strict'
const utils = require('../../utils')
const log = utils.logger('libp2p:kad-dht:rpc:handlers:ping')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message')} Message
* @typedef {import('../../message').Message} Message
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @param {import('../../index')} dht
* @implements {DHTMessageHandler}
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:ping')
class PingHandler {
/**

@@ -22,8 +22,8 @@ * Process `Ping` DHT messages.

*/
function ping (peerId, msg) {
log('from %s', peerId.toB58String())
async handle (peerId, msg) {
log(`ping from ${peerId}`)
return msg
}
}
return ping
}
module.exports.PingHandler = PingHandler

@@ -5,13 +5,24 @@ 'use strict'

const errcode = require('err-code')
const Libp2pRecord = require('libp2p-record')
const log = utils.logger('libp2p:kad-dht:rpc:handlers:put-value')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message')} Message
* @typedef {import('../../message').Message} Message
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
*/
/**
* @param {import('../../index')} dht
* @implements {DHTMessageHandler}
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:put-value')
class PutValueHandler {
/**
* @param {object} params
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {import('interface-datastore').Datastore} params.datastore
*/
constructor ({ validators, datastore }) {
this._validators = validators
this._datastore = datastore
}

@@ -24,5 +35,5 @@ /**

*/
async function putValue (peerId, msg) {
async handle (peerId, msg) {
const key = msg.key
log('key: %b', key)
log('%p asked to store value for key %b', peerId, key)

@@ -38,14 +49,12 @@ const record = msg.record

await dht._verifyRecordLocally(record)
await Libp2pRecord.validator.verifyRecord(this._validators, record)
record.timeReceived = new Date()
const recordKey = utils.bufferToKey(record.key)
await dht.datastore.put(recordKey, record.serialize())
await this._datastore.put(recordKey, record.serialize())
dht.onPut(record, peerId)
return msg
}
}
return putValue
}
module.exports.PutValueHandler = PutValueHandler

@@ -6,6 +6,8 @@ 'use strict'

const Message = require('../message')
const { Message, MESSAGE_TYPE_LOOKUP } = require('../message')
const handlers = require('./handlers')
const utils = require('../utils')
const log = utils.logger('libp2p:kad-dht:rpc')
/**

@@ -17,7 +19,21 @@ * @typedef {import('peer-id')} PeerId

/**
* @param {import('../index')} dht
* @param {import('../types').DHT} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc')
const getMessageHandler = handlers(dht)
class RPC {
/**
* @param {object} params
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('peer-id')} params.peerId
* @param {import('../providers').Providers} params.providers
* @param {import('../types').PeerStore} params.peerStore
* @param {import('../types').Addressable} params.addressable
* @param {import('../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.datastore
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {boolean} [params.lan]
*/
constructor (params) {
this._messageHandler = handlers(params)
this._routingTable = params.routingTable
}

@@ -30,9 +46,9 @@ /**

*/
async function handleMessage (peerId, msg) {
async handleMessage (peerId, msg) {
// get handler & execute it
const handler = getMessageHandler(msg.type)
const handler = this._messageHandler(msg.type)
try {
await dht._add(peerId)
} catch (err) {
await this._routingTable.add(peerId)
} catch (/** @type {any} */ err) {
log.error('Failed to update the kbucket store', err)

@@ -46,3 +62,3 @@ }

return handler(peerId, msg)
return handler.handle(peerId, msg)
}

@@ -57,13 +73,12 @@

*/
async function onIncomingStream ({ stream, connection }) {
async onIncomingStream ({ stream, connection }) {
const peerId = connection.remotePeer
try {
await dht._add(peerId)
} catch (err) {
await this._routingTable.add(peerId)
} catch (/** @type {any} */ err) {
log.error(err)
}
const idB58Str = peerId.toB58String()
log('from: %s', idB58Str)
const self = this

@@ -80,3 +95,4 @@ await pipe(

const desMessage = Message.deserialize(msg.slice())
const res = await handleMessage(peerId, desMessage)
log('incoming %s from %p', MESSAGE_TYPE_LOOKUP[desMessage.type], peerId)
const res = await self.handleMessage(peerId, desMessage)

@@ -93,4 +109,4 @@ // Not all handlers will return a response

}
}
return onIncomingStream
}
module.exports.RPC = RPC

@@ -6,15 +6,53 @@ 'use strict'

const { base58btc } = require('multiformats/bases/base58')
const { base32 } = require('multiformats/bases/base32')
const { Key } = require('interface-datastore/key')
const { xor: uint8ArrayXor } = require('uint8arrays/xor')
const { compare: uint8ArrayCompare } = require('uint8arrays/compare')
const pMap = require('p-map')
const { Record } = require('libp2p-record')
const PeerId = require('peer-id')
const errcode = require('err-code')
const { fromString: uint8ArrayFromString } = require('uint8arrays/from-string')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const { concat: uint8ArrayConcat } = require('uint8arrays/concat')
const pTimeout = require('p-timeout')
const isPrivateIp = require('private-ip')
// const IPNS_PREFIX = uint8ArrayFromString('/ipns/')
const PK_PREFIX = uint8ArrayFromString('/pk/')
/**
* @param {import('./types').PeerData} peer
*/
function removePrivateAddresses ({ id, multiaddrs }) {
return {
id,
multiaddrs: multiaddrs.filter(multiaddr => {
const [[type, addr]] = multiaddr.stringTuples()
if (type !== 4 && type !== 6) {
return false
}
// @ts-expect-error types are wrong https://github.com/frenchbread/private-ip/issues/18
return !isPrivateIp(addr)
})
}
}
/**
* @param {import('./types').PeerData} peer
*/
function removePublicAddresses ({ id, multiaddrs }) {
return {
id,
multiaddrs: multiaddrs.filter(multiaddr => {
const [[type, addr]] = multiaddr.stringTuples()
if (type !== 4 && type !== 6) {
return false
}
// @ts-expect-error types are wrong https://github.com/frenchbread/private-ip/issues/18
return isPrivateIp(addr)
})
}
}
/**
* Creates a DHT ID by hashing a given Uint8Array.

@@ -25,3 +63,3 @@ *

*/
exports.convertBuffer = async (buf) => {
const convertBuffer = async (buf) => {
return (await sha256.digest(buf)).digest

@@ -36,3 +74,3 @@ }

*/
exports.convertPeerId = async (peer) => {
const convertPeerId = async (peer) => {
return (await sha256.digest(peer.id)).digest

@@ -47,4 +85,4 @@ }

*/
exports.bufferToKey = (buf) => {
return new Key('/' + exports.encodeBase32(buf), false)
const bufferToKey = (buf) => {
return new Key('/' + uint8ArrayToString(buf, 'base32'), false)
}

@@ -58,5 +96,5 @@

*/
exports.keyForPublicKey = (peer) => {
const keyForPublicKey = (peer) => {
return uint8ArrayConcat([
uint8ArrayFromString('/pk/'),
PK_PREFIX,
peer.id

@@ -69,3 +107,3 @@ ])

*/
exports.isPublicKeyKey = (key) => {
const isPublicKeyKey = (key) => {
return uint8ArrayToString(key.slice(0, 4)) === '/pk/'

@@ -77,76 +115,14 @@ }

*/
exports.fromPublicKeyKey = (key) => {
return new PeerId(key.slice(4))
const isIPNSKey = (key) => {
return uint8ArrayToString(key.slice(0, 4)) === '/ipns/'
}
/**
* Get the current time as timestamp.
*
* @returns {number}
* @param {Uint8Array} key
*/
exports.now = () => {
return Date.now()
const fromPublicKeyKey = (key) => {
return new PeerId(key.slice(4))
}
/**
* Encode a given Uint8Array into a base32 string.
*
* @param {Uint8Array} buf
* @returns {string}
*/
exports.encodeBase32 = (buf) => {
return uint8ArrayToString(buf, 'base32')
}
/**
* Decode a given base32 string into a Uint8Array.
*
* @param {string} raw
* @returns {Uint8Array}
*/
exports.decodeBase32 = (raw) => {
return uint8ArrayFromString(raw, 'base32')
}
/**
* Sort peers by distance to the given `target`.
*
* @param {Array<PeerId>} peers
* @param {Uint8Array} target
*/
exports.sortClosestPeers = async (peers, target) => {
const distances = await pMap(peers, async (peer) => {
const id = await exports.convertPeerId(peer)
return {
peer: peer,
distance: uint8ArrayXor(id, target)
}
})
return distances.sort(exports.xorCompare).map((d) => d.peer)
}
/**
* Compare function to sort an array of elements which have a distance property which is the xor distance to a given element.
*
* @param {{ distance: Uint8Array }} a
* @param {{ distance: Uint8Array }} b
*/
exports.xorCompare = (a, b) => {
return uint8ArrayCompare(a.distance, b.distance)
}
/**
* Computes how many results to collect on each disjoint path, rounding up.
* This ensures that we look for at least one result per path.
*
* @param {number} resultsWanted
* @param {number} numPaths - total number of paths
*/
exports.pathSize = (resultsWanted, numPaths) => {
return Math.ceil(resultsWanted / numPaths)
}
/**
* Create a new put record, encodes and signs it if enabled.

@@ -158,3 +134,3 @@ *

*/
exports.createPutRecord = (key, value) => {
const createPutRecord = (key, value) => {
const timeReceived = new Date()

@@ -169,14 +145,5 @@ const rec = new Record(key, value, timeReceived)

*
* @param {PeerId} [id]
* @param {string} [subsystem]
* @param {string} name
*/
exports.logger = (id, subsystem) => {
const name = ['libp2p', 'dht']
if (subsystem) {
name.push(subsystem)
}
if (id) {
name.push(`${id.toB58String().slice(0, 8)}`)
}
const logger = (name) => {
// Add a formatter for converting to a base58 string

@@ -187,68 +154,31 @@ debug.formatters.b = (v) => {

const logger = Object.assign(debug(name.join(':')), {
error: debug(name.concat(['error']).join(':'))
})
// Add a formatter for converting to a base58 string
debug.formatters.t = (v) => {
return base32.baseEncode(v)
}
return logger
}
exports.TimeoutError = class TimeoutError extends Error {
get code () {
return 'ETIMEDOUT'
// Add a formatter for stringifying peer ids
debug.formatters.p = (p) => {
return p.toB58String()
}
}
/**
* Creates an async function that calls the given `asyncFn` and Errors
* if it does not resolve within `time` ms
*
* @template T
* @param {(...args: any[]) => Promise<T>} asyncFn
* @param {number} [time]
*/
exports.withTimeout = (asyncFn, time) => {
/**
* @param {...any} args
* @returns {Promise<T>}
*/
async function timeoutFn (...args) {
if (!time) {
return asyncFn(...args)
}
const logger = Object.assign(debug(name), {
error: debug(`${name}:error`)
})
let res
try {
res = await pTimeout(asyncFn(...args), time)
} catch (err) {
if (err instanceof pTimeout.TimeoutError) {
throw errcode(err, 'ETIMEDOUT')
}
throw err
}
return res
}
return timeoutFn
return logger
}
/**
* Iterates the given `asyncIterator` and runs each item through the given `asyncFn` in parallel.
* Returns a promise that resolves when all items of the `asyncIterator` have been passed
* through `asyncFn`.
*
* @template T
* @template O
*
* @param {AsyncIterable<T>} asyncIterator
* @param {(arg0: T) => Promise<O>} asyncFn
*/
exports.mapParallel = async function (asyncIterator, asyncFn) {
const tasks = []
for await (const item of asyncIterator) {
tasks.push(asyncFn(item))
}
return Promise.all(tasks)
module.exports = {
removePrivateAddresses,
removePublicAddresses,
convertBuffer,
convertPeerId,
bufferToKey,
keyForPublicKey,
isPublicKeyKey,
isIPNSKey,
fromPublicKeyKey,
createPutRecord,
logger
}

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc