@libp2p/peer-store
Advanced tools
Comparing version 11.0.22 to 11.1.0
@@ -22,5 +22,27 @@ /** | ||
export interface PersistentPeerStoreInit { | ||
/** | ||
* Used to remove multiaddrs of peers before storing them. The default is to | ||
* store all addresses | ||
*/ | ||
addressFilter?: AddressFilter; | ||
/** | ||
* The multiaddrs for a given peer will expire after this number of ms after | ||
* which they must be re-fetched using the peer routing. | ||
* | ||
* Defaults to one hour. | ||
* | ||
* @default 3_600_000 | ||
*/ | ||
maxAddressAge?: number; | ||
/** | ||
* Any peer without multiaddrs that has not been updated after this number of | ||
* ms will be evicted from the peer store. | ||
* | ||
* Defaults to six hours. | ||
* | ||
* @default 21_600_000 | ||
*/ | ||
maxPeerAge?: number; | ||
} | ||
export declare function persistentPeerStore(components: PersistentPeerStoreComponents, init?: PersistentPeerStoreInit): PeerStore; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -10,2 +10,3 @@ import { type Codec, type DecodeOptions } from 'protons-runtime'; | ||
tags: Map<string, Tag>; | ||
updated?: number; | ||
} | ||
@@ -38,2 +39,3 @@ export declare namespace Peer { | ||
isCertified?: boolean; | ||
observed?: number; | ||
} | ||
@@ -40,0 +42,0 @@ export declare namespace Address { |
@@ -160,2 +160,6 @@ /* eslint-disable import/export */ | ||
} | ||
if (obj.updated != null) { | ||
w.uint32(64); | ||
w.uint64Number(obj.updated); | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
@@ -219,2 +223,6 @@ w.ldelim(); | ||
} | ||
case 8: { | ||
obj.updated = reader.uint64Number(); | ||
break; | ||
} | ||
default: { | ||
@@ -255,2 +263,6 @@ reader.skipType(tag & 7); | ||
} | ||
if (obj.observed != null) { | ||
w.uint32(24); | ||
w.uint64Number(obj.observed); | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
@@ -275,2 +287,6 @@ w.ldelim(); | ||
} | ||
case 3: { | ||
obj.observed = reader.uint64Number(); | ||
break; | ||
} | ||
default: { | ||
@@ -277,0 +293,0 @@ reader.skipType(tag & 7); |
import { type Mortice } from 'mortice'; | ||
import { Peer as PeerPB } from './pb/peer.js'; | ||
import type { PersistentPeerStoreComponents, PersistentPeerStoreInit } from './index.js'; | ||
@@ -10,2 +11,6 @@ import type { PeerUpdate as PeerUpdateExternal, PeerId, Peer, PeerData, PeerQuery } from '@libp2p/interface'; | ||
} | ||
export interface ExistingPeer { | ||
peerPB: PeerPB; | ||
peer: Peer; | ||
} | ||
export declare class PersistentStore { | ||
@@ -18,2 +23,4 @@ #private; | ||
private readonly log; | ||
private readonly maxAddressAge; | ||
private readonly maxPeerAge; | ||
constructor(components: PersistentPeerStoreComponents, init?: PersistentPeerStoreInit); | ||
@@ -20,0 +27,0 @@ has(peerId: PeerId): Promise<boolean>; |
@@ -1,2 +0,2 @@ | ||
import { InvalidParametersError } from '@libp2p/interface'; | ||
import { NotFoundError } from '@libp2p/interface'; | ||
import { peerIdFromCID } from '@libp2p/peer-id'; | ||
@@ -6,25 +6,26 @@ import mortice, {} from 'mortice'; | ||
import { CID } from 'multiformats/cid'; | ||
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'; | ||
import { MAX_ADDRESS_AGE, MAX_PEER_AGE } from './constants.js'; | ||
import { Peer as PeerPB } from './pb/peer.js'; | ||
import { bytesToPeer } from './utils/bytes-to-peer.js'; | ||
import { bytesToPeer, pbToPeer } from './utils/bytes-to-peer.js'; | ||
import { peerEquals } from './utils/peer-equals.js'; | ||
import { NAMESPACE_COMMON, peerIdToDatastoreKey } from './utils/peer-id-to-datastore-key.js'; | ||
import { toPeerPB } from './utils/to-peer-pb.js'; | ||
function decodePeer(key, value) { | ||
function keyToPeerId(key) { | ||
// /peers/${peer-id-as-libp2p-key-cid-string-in-base-32} | ||
const base32Str = key.toString().split('/')[2]; | ||
const buf = CID.parse(base32Str, base32); | ||
const peerId = peerIdFromCID(buf); | ||
return bytesToPeer(peerId, value); | ||
return peerIdFromCID(buf); | ||
} | ||
function mapQuery(query) { | ||
if (query == null) { | ||
return {}; | ||
} | ||
function decodePeer(key, value, maxAddressAge) { | ||
const peerId = keyToPeerId(key); | ||
return bytesToPeer(peerId, value, maxAddressAge); | ||
} | ||
function mapQuery(query, maxAddressAge) { | ||
return { | ||
prefix: NAMESPACE_COMMON, | ||
filters: (query.filters ?? []).map(fn => ({ key, value }) => { | ||
return fn(decodePeer(key, value)); | ||
return fn(decodePeer(key, value, maxAddressAge)); | ||
}), | ||
orders: (query.orders ?? []).map(fn => (a, b) => { | ||
return fn(decodePeer(a.key, a.value), decodePeer(b.key, b.value)); | ||
return fn(decodePeer(a.key, a.value, maxAddressAge), decodePeer(b.key, b.value, maxAddressAge)); | ||
}) | ||
@@ -39,2 +40,4 @@ }; | ||
log; | ||
maxAddressAge; | ||
maxPeerAge; | ||
constructor(components, init = {}) { | ||
@@ -49,9 +52,20 @@ this.log = components.logger.forComponent('libp2p:peer-store'); | ||
}); | ||
this.maxAddressAge = init.maxAddressAge ?? MAX_ADDRESS_AGE; | ||
this.maxPeerAge = init.maxPeerAge ?? MAX_PEER_AGE; | ||
} | ||
async has(peerId) { | ||
return this.datastore.has(peerIdToDatastoreKey(peerId)); | ||
try { | ||
await this.load(peerId); | ||
return true; | ||
} | ||
catch (err) { | ||
if (err.name !== 'NotFoundError') { | ||
throw err; | ||
} | ||
} | ||
return false; | ||
} | ||
async delete(peerId) { | ||
if (this.peerId.equals(peerId)) { | ||
throw new InvalidParametersError('Cannot delete self peer'); | ||
return; | ||
} | ||
@@ -61,14 +75,20 @@ await this.datastore.delete(peerIdToDatastoreKey(peerId)); | ||
async load(peerId) { | ||
const buf = await this.datastore.get(peerIdToDatastoreKey(peerId)); | ||
return bytesToPeer(peerId, buf); | ||
const key = peerIdToDatastoreKey(peerId); | ||
const buf = await this.datastore.get(key); | ||
const peer = PeerPB.decode(buf); | ||
if (this.#peerIsExpired(peer)) { | ||
await this.datastore.delete(key); | ||
throw new NotFoundError(); | ||
} | ||
return pbToPeer(peerId, peer, this.maxAddressAge); | ||
} | ||
async save(peerId, data) { | ||
const { existingBuf, existingPeer } = await this.#findExistingPeer(peerId); | ||
const existingPeer = await this.#findExistingPeer(peerId); | ||
const peerPb = await toPeerPB(peerId, data, 'patch', { | ||
addressFilter: this.addressFilter | ||
}); | ||
return this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer); | ||
return this.#saveIfDifferent(peerId, peerPb, existingPeer); | ||
} | ||
async patch(peerId, data) { | ||
const { existingBuf, existingPeer } = await this.#findExistingPeer(peerId); | ||
const existingPeer = await this.#findExistingPeer(peerId); | ||
const peerPb = await toPeerPB(peerId, data, 'patch', { | ||
@@ -78,6 +98,6 @@ addressFilter: this.addressFilter, | ||
}); | ||
return this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer); | ||
return this.#saveIfDifferent(peerId, peerPb, existingPeer); | ||
} | ||
async merge(peerId, data) { | ||
const { existingBuf, existingPeer } = await this.#findExistingPeer(peerId); | ||
const existingPeer = await this.#findExistingPeer(peerId); | ||
const peerPb = await toPeerPB(peerId, data, 'merge', { | ||
@@ -87,12 +107,18 @@ addressFilter: this.addressFilter, | ||
}); | ||
return this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer); | ||
return this.#saveIfDifferent(peerId, peerPb, existingPeer); | ||
} | ||
async *all(query) { | ||
for await (const { key, value } of this.datastore.query(mapQuery(query ?? {}))) { | ||
const peer = decodePeer(key, value); | ||
if (peer.id.equals(this.peerId)) { | ||
// Skip self peer if present | ||
for await (const { key, value } of this.datastore.query(mapQuery(query ?? {}, this.maxAddressAge))) { | ||
const peerId = keyToPeerId(key); | ||
// skip self peer if present | ||
if (peerId.equals(this.peerId)) { | ||
continue; | ||
} | ||
yield peer; | ||
const peer = PeerPB.decode(value); | ||
// remove expired peer | ||
if (this.#peerIsExpired(peer)) { | ||
await this.datastore.delete(key); | ||
continue; | ||
} | ||
yield pbToPeer(peerId, peer, this.maxAddressAge); | ||
} | ||
@@ -102,7 +128,13 @@ } | ||
try { | ||
const existingBuf = await this.datastore.get(peerIdToDatastoreKey(peerId)); | ||
const existingPeer = bytesToPeer(peerId, existingBuf); | ||
const key = peerIdToDatastoreKey(peerId); | ||
const buf = await this.datastore.get(key); | ||
const peerPB = PeerPB.decode(buf); | ||
// remove expired peer | ||
if (this.#peerIsExpired(peerPB)) { | ||
await this.datastore.delete(key); | ||
throw new NotFoundError(); | ||
} | ||
return { | ||
existingBuf, | ||
existingPeer | ||
peerPB, | ||
peer: bytesToPeer(peerId, buf, this.maxAddressAge) | ||
}; | ||
@@ -115,21 +147,26 @@ } | ||
} | ||
return {}; | ||
} | ||
async #saveIfDifferent(peerId, peer, existingBuf, existingPeer) { | ||
async #saveIfDifferent(peerId, peer, existingPeer) { | ||
// record last update | ||
peer.updated = Date.now(); | ||
const buf = PeerPB.encode(peer); | ||
if (existingBuf != null && uint8ArrayEquals(buf, existingBuf)) { | ||
return { | ||
peer: bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: false | ||
}; | ||
} | ||
await this.datastore.put(peerIdToDatastoreKey(peerId), buf); | ||
return { | ||
peer: bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: true | ||
peer: bytesToPeer(peerId, buf, this.maxAddressAge), | ||
previous: existingPeer?.peer, | ||
updated: existingPeer == null || !peerEquals(peer, existingPeer.peerPB) | ||
}; | ||
} | ||
#peerIsExpired(peer) { | ||
if (peer.updated == null) { | ||
return true; | ||
} | ||
const expired = peer.updated < (Date.now() - this.maxPeerAge); | ||
const minAddressObserved = Date.now() - this.maxAddressAge; | ||
const addrs = peer.addresses.filter(addr => { | ||
return addr.observed != null && addr.observed > minAddressObserved; | ||
}); | ||
return expired && addrs.length === 0; | ||
} | ||
} | ||
//# sourceMappingURL=store.js.map |
@@ -0,3 +1,5 @@ | ||
import { Peer as PeerPB } from '../pb/peer.js'; | ||
import type { PeerId, Peer } from '@libp2p/interface'; | ||
export declare function bytesToPeer(peerId: PeerId, buf: Uint8Array): Peer; | ||
export declare function bytesToPeer(peerId: PeerId, buf: Uint8Array, maxAddressAge: number): Peer; | ||
export declare function pbToPeer(peerId: PeerId, peer: PeerPB, maxAddressAge: number): Peer; | ||
//# sourceMappingURL=bytes-to-peer.d.ts.map |
@@ -20,4 +20,7 @@ import { publicKeyFromProtobuf } from '@libp2p/crypto/keys'; | ||
} | ||
export function bytesToPeer(peerId, buf) { | ||
export function bytesToPeer(peerId, buf, maxAddressAge) { | ||
const peer = PeerPB.decode(buf); | ||
return pbToPeer(peerId, peer, maxAddressAge); | ||
} | ||
export function pbToPeer(peerId, peer, maxAddressAge) { | ||
const tags = new Map(); | ||
@@ -35,3 +38,6 @@ // remove any expired tags | ||
id: populatePublicKey(peerId, peer), | ||
addresses: peer.addresses.map(({ multiaddr: ma, isCertified }) => { | ||
addresses: peer.addresses | ||
// remove any expired multiaddrs | ||
.filter(({ observed }) => observed != null && observed > (Date.now() - maxAddressAge)) | ||
.map(({ multiaddr: ma, isCertified }) => { | ||
return { | ||
@@ -38,0 +44,0 @@ multiaddr: multiaddr(ma), |
import type { AddressFilter } from '../index.js'; | ||
import type { Address as AddressPB } from '../pb/peer.js'; | ||
import type { PeerId, Address } from '@libp2p/interface'; | ||
export declare function dedupeFilterAndSortAddresses(peerId: PeerId, filter: AddressFilter, addresses: Array<Address | AddressPB | undefined>): Promise<AddressPB[]>; | ||
export declare function dedupeFilterAndSortAddresses(peerId: PeerId, filter: AddressFilter, addresses: Array<Address | AddressPB | undefined>, existingAddresses?: AddressPB[]): Promise<AddressPB[]>; | ||
//# sourceMappingURL=dedupe-addresses.d.ts.map |
import { InvalidParametersError } from '@libp2p/interface'; | ||
import { isMultiaddr, multiaddr } from '@multiformats/multiaddr'; | ||
export async function dedupeFilterAndSortAddresses(peerId, filter, addresses) { | ||
export async function dedupeFilterAndSortAddresses(peerId, filter, addresses, existingAddresses) { | ||
const addressMap = new Map(); | ||
@@ -5,0 +5,0 @@ for (const addr of addresses) { |
import type { AddressFilter } from '../index.js'; | ||
import type { Peer as PeerPB } from '../pb/peer.js'; | ||
import type { PeerId, Peer, PeerData } from '@libp2p/interface'; | ||
import type { ExistingPeer } from '../store.js'; | ||
import type { PeerId, PeerData } from '@libp2p/interface'; | ||
export interface ToPBPeerOptions { | ||
addressFilter?: AddressFilter; | ||
existingPeer?: Peer; | ||
existingPeer?: ExistingPeer; | ||
} | ||
export declare function toPeerPB(peerId: PeerId, data: Partial<PeerData>, strategy: 'merge' | 'patch', options: ToPBPeerOptions): Promise<PeerPB>; | ||
//# sourceMappingURL=to-peer-pb.d.ts.map |
@@ -0,3 +1,5 @@ | ||
/* eslint-disable complexity */ | ||
import { publicKeyToProtobuf } from '@libp2p/crypto/keys'; | ||
import { InvalidParametersError } from '@libp2p/interface'; | ||
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'; | ||
import { dedupeFilterAndSortAddresses } from './dedupe-addresses.js'; | ||
@@ -11,3 +13,3 @@ export async function toPeerPB(peerId, data, strategy, options) { | ||
} | ||
const existingPeer = options.existingPeer; | ||
const existingPeer = options.existingPeer?.peer; | ||
if (existingPeer != null && !peerId.equals(existingPeer.id)) { | ||
@@ -114,3 +116,3 @@ throw new InvalidParametersError('peer id did not match existing peer id'); | ||
const output = { | ||
addresses: await dedupeFilterAndSortAddresses(peerId, options.addressFilter ?? (async () => true), addresses), | ||
addresses: await dedupeFilterAndSortAddresses(peerId, options.addressFilter ?? (async () => true), addresses, options.existingPeer?.peerPB.addresses), | ||
protocols: [...protocols.values()].sort((a, b) => { | ||
@@ -124,2 +126,6 @@ return a.localeCompare(b); | ||
}; | ||
// add observed addresses to multiaddrs | ||
output.addresses.forEach(addr => { | ||
addr.observed = options.existingPeer?.peerPB.addresses?.find(addr => uint8ArrayEquals(addr.multiaddr, addr.multiaddr))?.observed ?? Date.now(); | ||
}); | ||
// Ed25519 and secp256k1 have their public key embedded in them so no need to duplicate it | ||
@@ -126,0 +132,0 @@ if (peerId.type !== 'RSA') { |
{ | ||
"name": "@libp2p/peer-store", | ||
"version": "11.0.22", | ||
"version": "11.1.0", | ||
"description": "Stores information about peers libp2p knows on the network", | ||
@@ -62,6 +62,6 @@ "license": "Apache-2.0 OR MIT", | ||
"dependencies": { | ||
"@libp2p/crypto": "^5.0.14", | ||
"@libp2p/interface": "^2.6.1", | ||
"@libp2p/peer-id": "^5.0.15", | ||
"@libp2p/peer-record": "^8.0.22", | ||
"@libp2p/crypto": "^5.0.15", | ||
"@libp2p/interface": "^2.7.0", | ||
"@libp2p/peer-id": "^5.0.16", | ||
"@libp2p/peer-record": "^8.0.23", | ||
"@multiformats/multiaddr": "^12.3.3", | ||
@@ -77,3 +77,3 @@ "interface-datastore": "^8.3.1", | ||
"devDependencies": { | ||
"@libp2p/logger": "^5.1.11", | ||
"@libp2p/logger": "^5.1.12", | ||
"@types/sinon": "^17.0.3", | ||
@@ -80,0 +80,0 @@ "aegir": "^45.1.1", |
@@ -30,3 +30,27 @@ /** | ||
export interface PersistentPeerStoreInit { | ||
/** | ||
* Used to remove multiaddrs of peers before storing them. The default is to | ||
* store all addresses | ||
*/ | ||
addressFilter?: AddressFilter | ||
/** | ||
* The multiaddrs for a given peer will expire after this number of ms after | ||
* which they must be re-fetched using the peer routing. | ||
* | ||
* Defaults to one hour. | ||
* | ||
* @default 3_600_000 | ||
*/ | ||
maxAddressAge?: number | ||
/** | ||
* Any peer without multiaddrs that has not been updated after this number of | ||
* ms will be evicted from the peer store. | ||
* | ||
* Defaults to six hours. | ||
* | ||
* @default 21_600_000 | ||
*/ | ||
maxPeerAge?: number | ||
} | ||
@@ -33,0 +57,0 @@ |
@@ -18,2 +18,3 @@ /* eslint-disable import/export */ | ||
tags: Map<string, Tag> | ||
updated?: number | ||
} | ||
@@ -212,2 +213,7 @@ | ||
if (obj.updated != null) { | ||
w.uint32(64) | ||
w.uint64Number(obj.updated) | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
@@ -278,2 +284,6 @@ w.ldelim() | ||
} | ||
case 8: { | ||
obj.updated = reader.uint64Number() | ||
break | ||
} | ||
default: { | ||
@@ -305,2 +315,3 @@ reader.skipType(tag & 7) | ||
isCertified?: boolean | ||
observed?: number | ||
} | ||
@@ -328,2 +339,7 @@ | ||
if (obj.observed != null) { | ||
w.uint32(24) | ||
w.uint64Number(obj.observed) | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
@@ -351,2 +367,6 @@ w.ldelim() | ||
} | ||
case 3: { | ||
obj.observed = reader.uint64Number() | ||
break | ||
} | ||
default: { | ||
@@ -353,0 +373,0 @@ reader.skipType(tag & 7) |
150
src/store.ts
@@ -1,2 +0,2 @@ | ||
import { InvalidParametersError } from '@libp2p/interface' | ||
import { NotFoundError } from '@libp2p/interface' | ||
import { peerIdFromCID } from '@libp2p/peer-id' | ||
@@ -6,5 +6,6 @@ import mortice, { type Mortice } from 'mortice' | ||
import { CID } from 'multiformats/cid' | ||
import { equals as uint8ArrayEquals } from 'uint8arrays/equals' | ||
import { MAX_ADDRESS_AGE, MAX_PEER_AGE } from './constants.js' | ||
import { Peer as PeerPB } from './pb/peer.js' | ||
import { bytesToPeer } from './utils/bytes-to-peer.js' | ||
import { bytesToPeer, pbToPeer } from './utils/bytes-to-peer.js' | ||
import { peerEquals } from './utils/peer-equals.js' | ||
import { NAMESPACE_COMMON, peerIdToDatastoreKey } from './utils/peer-id-to-datastore-key.js' | ||
@@ -23,23 +24,29 @@ import { toPeerPB } from './utils/to-peer-pb.js' | ||
function decodePeer (key: Key, value: Uint8Array): Peer { | ||
export interface ExistingPeer { | ||
peerPB: PeerPB | ||
peer: Peer | ||
} | ||
function keyToPeerId (key: Key): PeerId { | ||
// /peers/${peer-id-as-libp2p-key-cid-string-in-base-32} | ||
const base32Str = key.toString().split('/')[2] | ||
const buf = CID.parse(base32Str, base32) | ||
const peerId = peerIdFromCID(buf) | ||
return bytesToPeer(peerId, value) | ||
return peerIdFromCID(buf) | ||
} | ||
function mapQuery (query: PeerQuery): Query { | ||
if (query == null) { | ||
return {} | ||
} | ||
function decodePeer (key: Key, value: Uint8Array, maxAddressAge: number): Peer { | ||
const peerId = keyToPeerId(key) | ||
return bytesToPeer(peerId, value, maxAddressAge) | ||
} | ||
function mapQuery (query: PeerQuery, maxAddressAge: number): Query { | ||
return { | ||
prefix: NAMESPACE_COMMON, | ||
filters: (query.filters ?? []).map(fn => ({ key, value }) => { | ||
return fn(decodePeer(key, value)) | ||
return fn(decodePeer(key, value, maxAddressAge)) | ||
}), | ||
orders: (query.orders ?? []).map(fn => (a, b) => { | ||
return fn(decodePeer(a.key, a.value), decodePeer(b.key, b.value)) | ||
return fn(decodePeer(a.key, a.value, maxAddressAge), decodePeer(b.key, b.value, maxAddressAge)) | ||
}) | ||
@@ -55,2 +62,4 @@ } | ||
private readonly log: Logger | ||
private readonly maxAddressAge: number | ||
private readonly maxPeerAge: number | ||
@@ -66,6 +75,18 @@ constructor (components: PersistentPeerStoreComponents, init: PersistentPeerStoreInit = {}) { | ||
}) | ||
this.maxAddressAge = init.maxAddressAge ?? MAX_ADDRESS_AGE | ||
this.maxPeerAge = init.maxPeerAge ?? MAX_PEER_AGE | ||
} | ||
async has (peerId: PeerId): Promise<boolean> { | ||
return this.datastore.has(peerIdToDatastoreKey(peerId)) | ||
try { | ||
await this.load(peerId) | ||
return true | ||
} catch (err: any) { | ||
if (err.name !== 'NotFoundError') { | ||
throw err | ||
} | ||
} | ||
return false | ||
} | ||
@@ -75,3 +96,3 @@ | ||
if (this.peerId.equals(peerId)) { | ||
throw new InvalidParametersError('Cannot delete self peer') | ||
return | ||
} | ||
@@ -83,12 +104,16 @@ | ||
async load (peerId: PeerId): Promise<Peer> { | ||
const buf = await this.datastore.get(peerIdToDatastoreKey(peerId)) | ||
const key = peerIdToDatastoreKey(peerId) | ||
const buf = await this.datastore.get(key) | ||
const peer = PeerPB.decode(buf) | ||
return bytesToPeer(peerId, buf) | ||
if (this.#peerIsExpired(peer)) { | ||
await this.datastore.delete(key) | ||
throw new NotFoundError() | ||
} | ||
return pbToPeer(peerId, peer, this.maxAddressAge) | ||
} | ||
async save (peerId: PeerId, data: PeerData): Promise<PeerUpdate> { | ||
const { | ||
existingBuf, | ||
existingPeer | ||
} = await this.#findExistingPeer(peerId) | ||
const existingPeer = await this.#findExistingPeer(peerId) | ||
@@ -99,10 +124,7 @@ const peerPb: PeerPB = await toPeerPB(peerId, data, 'patch', { | ||
return this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer) | ||
return this.#saveIfDifferent(peerId, peerPb, existingPeer) | ||
} | ||
async patch (peerId: PeerId, data: Partial<PeerData>): Promise<PeerUpdate> { | ||
const { | ||
existingBuf, | ||
existingPeer | ||
} = await this.#findExistingPeer(peerId) | ||
const existingPeer = await this.#findExistingPeer(peerId) | ||
@@ -114,10 +136,7 @@ const peerPb: PeerPB = await toPeerPB(peerId, data, 'patch', { | ||
return this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer) | ||
return this.#saveIfDifferent(peerId, peerPb, existingPeer) | ||
} | ||
async merge (peerId: PeerId, data: PeerData): Promise<PeerUpdate> { | ||
const { | ||
existingBuf, | ||
existingPeer | ||
} = await this.#findExistingPeer(peerId) | ||
const existingPeer = await this.#findExistingPeer(peerId) | ||
@@ -129,26 +148,41 @@ const peerPb: PeerPB = await toPeerPB(peerId, data, 'merge', { | ||
return this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer) | ||
return this.#saveIfDifferent(peerId, peerPb, existingPeer) | ||
} | ||
async * all (query?: PeerQuery): AsyncGenerator<Peer, void, unknown> { | ||
for await (const { key, value } of this.datastore.query(mapQuery(query ?? {}))) { | ||
const peer = decodePeer(key, value) | ||
for await (const { key, value } of this.datastore.query(mapQuery(query ?? {}, this.maxAddressAge))) { | ||
const peerId = keyToPeerId(key) | ||
if (peer.id.equals(this.peerId)) { | ||
// Skip self peer if present | ||
// skip self peer if present | ||
if (peerId.equals(this.peerId)) { | ||
continue | ||
} | ||
yield peer | ||
const peer = PeerPB.decode(value) | ||
// remove expired peer | ||
if (this.#peerIsExpired(peer)) { | ||
await this.datastore.delete(key) | ||
continue | ||
} | ||
yield pbToPeer(peerId, peer, this.maxAddressAge) | ||
} | ||
} | ||
async #findExistingPeer (peerId: PeerId): Promise<{ existingBuf?: Uint8Array, existingPeer?: Peer }> { | ||
async #findExistingPeer (peerId: PeerId): Promise<ExistingPeer | undefined> { | ||
try { | ||
const existingBuf = await this.datastore.get(peerIdToDatastoreKey(peerId)) | ||
const existingPeer = bytesToPeer(peerId, existingBuf) | ||
const key = peerIdToDatastoreKey(peerId) | ||
const buf = await this.datastore.get(key) | ||
const peerPB = PeerPB.decode(buf) | ||
// remove expired peer | ||
if (this.#peerIsExpired(peerPB)) { | ||
await this.datastore.delete(key) | ||
throw new NotFoundError() | ||
} | ||
return { | ||
existingBuf, | ||
existingPeer | ||
peerPB, | ||
peer: bytesToPeer(peerId, buf, this.maxAddressAge) | ||
} | ||
@@ -160,25 +194,31 @@ } catch (err: any) { | ||
} | ||
return {} | ||
} | ||
async #saveIfDifferent (peerId: PeerId, peer: PeerPB, existingBuf?: Uint8Array, existingPeer?: Peer): Promise<PeerUpdate> { | ||
async #saveIfDifferent (peerId: PeerId, peer: PeerPB, existingPeer?: ExistingPeer): Promise<PeerUpdate> { | ||
// record last update | ||
peer.updated = Date.now() | ||
const buf = PeerPB.encode(peer) | ||
if (existingBuf != null && uint8ArrayEquals(buf, existingBuf)) { | ||
return { | ||
peer: bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: false | ||
} | ||
} | ||
await this.datastore.put(peerIdToDatastoreKey(peerId), buf) | ||
return { | ||
peer: bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: true | ||
peer: bytesToPeer(peerId, buf, this.maxAddressAge), | ||
previous: existingPeer?.peer, | ||
updated: existingPeer == null || !peerEquals(peer, existingPeer.peerPB) | ||
} | ||
} | ||
#peerIsExpired (peer: PeerPB): boolean { | ||
if (peer.updated == null) { | ||
return true | ||
} | ||
const expired = peer.updated < (Date.now() - this.maxPeerAge) | ||
const minAddressObserved = Date.now() - this.maxAddressAge | ||
const addrs = peer.addresses.filter(addr => { | ||
return addr.observed != null && addr.observed > minAddressObserved | ||
}) | ||
return expired && addrs.length === 0 | ||
} | ||
} |
@@ -26,4 +26,9 @@ import { publicKeyFromProtobuf } from '@libp2p/crypto/keys' | ||
export function bytesToPeer (peerId: PeerId, buf: Uint8Array): Peer { | ||
export function bytesToPeer (peerId: PeerId, buf: Uint8Array, maxAddressAge: number): Peer { | ||
const peer = PeerPB.decode(buf) | ||
return pbToPeer(peerId, peer, maxAddressAge) | ||
} | ||
export function pbToPeer (peerId: PeerId, peer: PeerPB, maxAddressAge: number): Peer { | ||
const tags = new Map<string, Tag>() | ||
@@ -45,8 +50,11 @@ | ||
id: populatePublicKey(peerId, peer), | ||
addresses: peer.addresses.map(({ multiaddr: ma, isCertified }) => { | ||
return { | ||
multiaddr: multiaddr(ma), | ||
isCertified: isCertified ?? false | ||
} | ||
}), | ||
addresses: peer.addresses | ||
// remove any expired multiaddrs | ||
.filter(({ observed }) => observed != null && observed > (Date.now() - maxAddressAge)) | ||
.map(({ multiaddr: ma, isCertified }) => { | ||
return { | ||
multiaddr: multiaddr(ma), | ||
isCertified: isCertified ?? false | ||
} | ||
}), | ||
metadata: peer.metadata, | ||
@@ -53,0 +61,0 @@ peerRecordEnvelope: peer.peerRecordEnvelope ?? undefined, |
@@ -7,3 +7,3 @@ import { InvalidParametersError } from '@libp2p/interface' | ||
export async function dedupeFilterAndSortAddresses (peerId: PeerId, filter: AddressFilter, addresses: Array<Address | AddressPB | undefined>): Promise<AddressPB[]> { | ||
export async function dedupeFilterAndSortAddresses (peerId: PeerId, filter: AddressFilter, addresses: Array<Address | AddressPB | undefined>, existingAddresses?: AddressPB[]): Promise<AddressPB[]> { | ||
const addressMap = new Map<string, Address>() | ||
@@ -10,0 +10,0 @@ |
@@ -0,11 +1,14 @@ | ||
/* eslint-disable complexity */ | ||
import { publicKeyToProtobuf } from '@libp2p/crypto/keys' | ||
import { InvalidParametersError } from '@libp2p/interface' | ||
import { equals as uint8ArrayEquals } from 'uint8arrays/equals' | ||
import { dedupeFilterAndSortAddresses } from './dedupe-addresses.js' | ||
import type { AddressFilter } from '../index.js' | ||
import type { Tag, Peer as PeerPB } from '../pb/peer.js' | ||
import type { PeerId, Address, Peer, PeerData, TagOptions } from '@libp2p/interface' | ||
import type { ExistingPeer } from '../store.js' | ||
import type { PeerId, Address, PeerData, TagOptions } from '@libp2p/interface' | ||
export interface ToPBPeerOptions { | ||
addressFilter?: AddressFilter | ||
existingPeer?: Peer | ||
existingPeer?: ExistingPeer | ||
} | ||
@@ -22,3 +25,3 @@ | ||
const existingPeer = options.existingPeer | ||
const existingPeer = options.existingPeer?.peer | ||
@@ -145,3 +148,8 @@ if (existingPeer != null && !peerId.equals(existingPeer.id)) { | ||
const output: PeerPB = { | ||
addresses: await dedupeFilterAndSortAddresses(peerId, options.addressFilter ?? (async () => true), addresses), | ||
addresses: await dedupeFilterAndSortAddresses( | ||
peerId, | ||
options.addressFilter ?? (async () => true), | ||
addresses, | ||
options.existingPeer?.peerPB.addresses | ||
), | ||
protocols: [...protocols.values()].sort((a, b) => { | ||
@@ -156,2 +164,7 @@ return a.localeCompare(b) | ||
// add observed addresses to multiaddrs | ||
output.addresses.forEach(addr => { | ||
addr.observed = options.existingPeer?.peerPB.addresses?.find(addr => uint8ArrayEquals(addr.multiaddr, addr.multiaddr))?.observed ?? Date.now() | ||
}) | ||
// Ed25519 and secp256k1 have their public key embedded in them so no need to duplicate it | ||
@@ -158,0 +171,0 @@ if (peerId.type !== 'RSA') { |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
279706
50
2934
Updated@libp2p/crypto@^5.0.15
Updated@libp2p/interface@^2.7.0
Updated@libp2p/peer-id@^5.0.16
Updated@libp2p/peer-record@^8.0.23