@libp2p/peer-store
Advanced tools
Comparing version 7.0.2 to 8.0.0
export declare const codes: { | ||
ERR_INVALID_PARAMETERS: string; | ||
ERR_NOT_FOUND: string; | ||
}; | ||
//# sourceMappingURL=errors.d.ts.map |
export const codes = { | ||
ERR_INVALID_PARAMETERS: 'ERR_INVALID_PARAMETERS', | ||
ERR_NOT_FOUND: 'ERR_NOT_FOUND' | ||
ERR_INVALID_PARAMETERS: 'ERR_INVALID_PARAMETERS' | ||
}; | ||
//# sourceMappingURL=errors.js.map |
@@ -1,41 +0,39 @@ | ||
import { EventEmitter } from '@libp2p/interfaces/events'; | ||
import type { PeerStore, AddressBook, KeyBook, MetadataBook, ProtoBook, PeerStoreEvents, PeerStoreInit, Peer, TagOptions } from '@libp2p/interface-peer-store'; | ||
import type { EventEmitter } from '@libp2p/interfaces/events'; | ||
import type { PeerStore, Peer, PeerData } from '@libp2p/interface-peer-store'; | ||
import type { PeerId } from '@libp2p/interface-peer-id'; | ||
import type { Datastore } from 'interface-datastore'; | ||
import type { Multiaddr } from '@multiformats/multiaddr'; | ||
import type { Libp2pEvents } from '@libp2p/interface-libp2p'; | ||
export interface PersistentPeerStoreComponents { | ||
peerId: PeerId; | ||
datastore: Datastore; | ||
events: EventEmitter<Libp2pEvents>; | ||
} | ||
/** | ||
* Return true to allow storing the passed multiaddr for the passed peer | ||
*/ | ||
export interface AddressFilter { | ||
(peerId: PeerId, multiaddr: Multiaddr): Promise<boolean>; | ||
} | ||
export interface PersistentPeerStoreInit { | ||
addressFilter?: AddressFilter; | ||
} | ||
/** | ||
* An implementation of PeerStore that stores data in a Datastore | ||
*/ | ||
export declare class PersistentPeerStore extends EventEmitter<PeerStoreEvents> implements PeerStore { | ||
addressBook: AddressBook; | ||
keyBook: KeyBook; | ||
metadataBook: MetadataBook; | ||
protoBook: ProtoBook; | ||
private readonly components; | ||
export declare class PersistentPeerStore implements PeerStore { | ||
#private; | ||
private readonly store; | ||
constructor(components: PersistentPeerStoreComponents, init?: PeerStoreInit); | ||
private readonly events; | ||
private readonly peerId; | ||
constructor(components: PersistentPeerStoreComponents, init?: PersistentPeerStoreInit); | ||
forEach(fn: (peer: Peer) => void): Promise<void>; | ||
all(): Promise<Peer[]>; | ||
/** | ||
* Delete the information of the given peer in every book | ||
*/ | ||
delete(peerId: PeerId): Promise<void>; | ||
/** | ||
* Get the stored information of a given peer | ||
*/ | ||
has(peerId: PeerId): Promise<boolean>; | ||
get(peerId: PeerId): Promise<Peer>; | ||
/** | ||
* Returns true if we have a record of the peer | ||
*/ | ||
has(peerId: PeerId): Promise<boolean>; | ||
tagPeer(peerId: PeerId, tag: string, options?: TagOptions): Promise<void>; | ||
unTagPeer(peerId: PeerId, tag: string): Promise<void>; | ||
getTags(peerId: PeerId): Promise<Array<{ | ||
name: string; | ||
value: number; | ||
}>>; | ||
save(id: PeerId, data: PeerData): Promise<Peer>; | ||
patch(id: PeerId, data: PeerData): Promise<Peer>; | ||
merge(id: PeerId, data: PeerData): Promise<Peer>; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
@@ -0,10 +1,9 @@ | ||
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) { | ||
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter"); | ||
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it"); | ||
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver); | ||
}; | ||
var _PersistentPeerStore_instances, _PersistentPeerStore_emitIfUpdated; | ||
import { PersistentStore } from './store.js'; | ||
import { logger } from '@libp2p/logger'; | ||
import { EventEmitter } from '@libp2p/interfaces/events'; | ||
import { PeerStoreAddressBook } from './address-book.js'; | ||
import { PeerStoreKeyBook } from './key-book.js'; | ||
import { PeerStoreMetadataBook } from './metadata-book.js'; | ||
import { PeerStoreProtoBook } from './proto-book.js'; | ||
import { PersistentStore } from './store.js'; | ||
import { CodeError } from '@libp2p/interfaces/errors'; | ||
import { Tags } from './pb/tags.js'; | ||
const log = logger('libp2p:peer-store'); | ||
@@ -14,22 +13,15 @@ /** | ||
*/ | ||
export class PersistentPeerStore extends EventEmitter { | ||
export class PersistentPeerStore { | ||
constructor(components, init = {}) { | ||
super(); | ||
this.components = components; | ||
this.store = new PersistentStore(components); | ||
this.addressBook = new PeerStoreAddressBook(this.dispatchEvent.bind(this), this.store, init.addressFilter); | ||
this.keyBook = new PeerStoreKeyBook(this.dispatchEvent.bind(this), this.store); | ||
this.metadataBook = new PeerStoreMetadataBook(this.dispatchEvent.bind(this), this.store); | ||
this.protoBook = new PeerStoreProtoBook(this.dispatchEvent.bind(this), this.store); | ||
_PersistentPeerStore_instances.add(this); | ||
this.events = components.events; | ||
this.peerId = components.peerId; | ||
this.store = new PersistentStore(components, init); | ||
} | ||
async forEach(fn) { | ||
log.trace('getPeers await read lock'); | ||
log.trace('forEach await read lock'); | ||
const release = await this.store.lock.readLock(); | ||
log.trace('getPeers got read lock'); | ||
log.trace('forEach got read lock'); | ||
try { | ||
for await (const peer of this.store.all()) { | ||
if (peer.id.equals(this.components.peerId)) { | ||
// Skip self peer if present | ||
continue; | ||
} | ||
fn(peer); | ||
@@ -39,3 +31,3 @@ } | ||
finally { | ||
log.trace('getPeers release read lock'); | ||
log.trace('forEach release read lock'); | ||
release(); | ||
@@ -45,11 +37,17 @@ } | ||
async all() { | ||
const output = []; | ||
await this.forEach(peer => { | ||
output.push(peer); | ||
}); | ||
return output; | ||
log.trace('all await read lock'); | ||
const release = await this.store.lock.readLock(); | ||
log.trace('all got read lock'); | ||
try { | ||
const output = []; | ||
for await (const peer of this.store.all()) { | ||
output.push(peer); | ||
} | ||
return output; | ||
} | ||
finally { | ||
log.trace('all release read lock'); | ||
release(); | ||
} | ||
} | ||
/** | ||
* Delete the information of the given peer in every book | ||
*/ | ||
async delete(peerId) { | ||
@@ -67,5 +65,14 @@ log.trace('delete await write lock'); | ||
} | ||
/** | ||
* Get the stored information of a given peer | ||
*/ | ||
async has(peerId) { | ||
log.trace('has await read lock'); | ||
const release = await this.store.lock.readLock(); | ||
log.trace('has got read lock'); | ||
try { | ||
return await this.store.has(peerId); | ||
} | ||
finally { | ||
log.trace('has release read lock'); | ||
release(); | ||
} | ||
} | ||
async get(peerId) { | ||
@@ -83,65 +90,56 @@ log.trace('get await read lock'); | ||
} | ||
/** | ||
* Returns true if we have a record of the peer | ||
*/ | ||
async has(peerId) { | ||
log.trace('has await read lock'); | ||
const release = await this.store.lock.readLock(); | ||
log.trace('has got read lock'); | ||
async save(id, data) { | ||
log.trace('save await write lock'); | ||
const release = await this.store.lock.writeLock(); | ||
log.trace('save got write lock'); | ||
try { | ||
return await this.store.has(peerId); | ||
const result = await this.store.save(id, data); | ||
__classPrivateFieldGet(this, _PersistentPeerStore_instances, "m", _PersistentPeerStore_emitIfUpdated).call(this, id, result); | ||
return result.peer; | ||
} | ||
finally { | ||
log.trace('has release read lock'); | ||
log.trace('save release write lock'); | ||
release(); | ||
} | ||
} | ||
async tagPeer(peerId, tag, options = {}) { | ||
const providedValue = options.value ?? 0; | ||
const value = Math.round(providedValue); | ||
const ttl = options.ttl ?? undefined; | ||
if (value !== providedValue || value < 0 || value > 100) { | ||
throw new CodeError('Tag value must be between 0-100', 'ERR_TAG_VALUE_OUT_OF_BOUNDS'); | ||
async patch(id, data) { | ||
log.trace('patch await write lock'); | ||
const release = await this.store.lock.writeLock(); | ||
log.trace('patch got write lock'); | ||
try { | ||
const result = await this.store.patch(id, data); | ||
__classPrivateFieldGet(this, _PersistentPeerStore_instances, "m", _PersistentPeerStore_emitIfUpdated).call(this, id, result); | ||
return result.peer; | ||
} | ||
const buf = await this.metadataBook.getValue(peerId, 'tags'); | ||
let tags = []; | ||
if (buf != null) { | ||
tags = Tags.decode(buf).tags; | ||
finally { | ||
log.trace('patch release write lock'); | ||
release(); | ||
} | ||
// do not allow duplicate tags | ||
tags = tags.filter(t => t.name !== tag); | ||
tags.push({ | ||
name: tag, | ||
value, | ||
expiry: ttl == null ? undefined : BigInt(Date.now() + ttl) | ||
}); | ||
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }).subarray()); | ||
} | ||
async unTagPeer(peerId, tag) { | ||
const buf = await this.metadataBook.getValue(peerId, 'tags'); | ||
let tags = []; | ||
if (buf != null) { | ||
tags = Tags.decode(buf).tags; | ||
async merge(id, data) { | ||
log.trace('merge await write lock'); | ||
const release = await this.store.lock.writeLock(); | ||
log.trace('merge got write lock'); | ||
try { | ||
const result = await this.store.merge(id, data); | ||
__classPrivateFieldGet(this, _PersistentPeerStore_instances, "m", _PersistentPeerStore_emitIfUpdated).call(this, id, result); | ||
return result.peer; | ||
} | ||
tags = tags.filter(t => t.name !== tag); | ||
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }).subarray()); | ||
} | ||
async getTags(peerId) { | ||
const buf = await this.metadataBook.getValue(peerId, 'tags'); | ||
let tags = []; | ||
if (buf != null) { | ||
tags = Tags.decode(buf).tags; | ||
finally { | ||
log.trace('merge release write lock'); | ||
release(); | ||
} | ||
const now = BigInt(Date.now()); | ||
const unexpiredTags = tags.filter(tag => tag.expiry == null || tag.expiry > now); | ||
if (unexpiredTags.length !== tags.length) { | ||
// remove any expired tags | ||
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags: unexpiredTags }).subarray()); | ||
} | ||
return unexpiredTags.map(t => ({ | ||
name: t.name, | ||
value: t.value ?? 0 | ||
})); | ||
} | ||
} | ||
_PersistentPeerStore_instances = new WeakSet(), _PersistentPeerStore_emitIfUpdated = function _PersistentPeerStore_emitIfUpdated(id, result) { | ||
if (!result.updated) { | ||
return; | ||
} | ||
if (this.peerId.equals(id)) { | ||
this.events.safeDispatchEvent('self:peer:update', { detail: result }); | ||
} | ||
else { | ||
this.events.safeDispatchEvent('peer:update', { detail: result }); | ||
} | ||
}; | ||
//# sourceMappingURL=index.js.map |
@@ -6,7 +6,26 @@ import type { Codec } from 'protons-runtime'; | ||
protocols: string[]; | ||
metadata: Metadata[]; | ||
pubKey?: Uint8Array; | ||
publicKey?: Uint8Array; | ||
peerRecordEnvelope?: Uint8Array; | ||
metadata: Map<string, Uint8Array>; | ||
tags: Map<string, Tag>; | ||
} | ||
export declare namespace Peer { | ||
interface Peer$metadataEntry { | ||
key: string; | ||
value: Uint8Array; | ||
} | ||
namespace Peer$metadataEntry { | ||
const codec: () => Codec<Peer$metadataEntry>; | ||
const encode: (obj: Partial<Peer$metadataEntry>) => Uint8Array; | ||
const decode: (buf: Uint8Array | Uint8ArrayList) => Peer$metadataEntry; | ||
} | ||
interface Peer$tagsEntry { | ||
key: string; | ||
value?: Tag; | ||
} | ||
namespace Peer$tagsEntry { | ||
const codec: () => Codec<Peer$tagsEntry>; | ||
const encode: (obj: Partial<Peer$tagsEntry>) => Uint8Array; | ||
const decode: (buf: Uint8Array | Uint8ArrayList) => Peer$tagsEntry; | ||
} | ||
const codec: () => Codec<Peer>; | ||
@@ -25,11 +44,11 @@ const encode: (obj: Partial<Peer>) => Uint8Array; | ||
} | ||
export interface Metadata { | ||
key: string; | ||
value: Uint8Array; | ||
export interface Tag { | ||
value: number; | ||
expiry?: bigint; | ||
} | ||
export declare namespace Metadata { | ||
const codec: () => Codec<Metadata>; | ||
const encode: (obj: Partial<Metadata>) => Uint8Array; | ||
const decode: (buf: Uint8Array | Uint8ArrayList) => Metadata; | ||
export declare namespace Tag { | ||
const codec: () => Codec<Tag>; | ||
const encode: (obj: Partial<Tag>) => Uint8Array; | ||
const decode: (buf: Uint8Array | Uint8ArrayList) => Tag; | ||
} | ||
//# sourceMappingURL=peer.d.ts.map |
@@ -9,2 +9,105 @@ /* eslint-disable import/export */ | ||
(function (Peer) { | ||
let Peer$metadataEntry; | ||
(function (Peer$metadataEntry) { | ||
let _codec; | ||
Peer$metadataEntry.codec = () => { | ||
if (_codec == null) { | ||
_codec = message((obj, w, opts = {}) => { | ||
if (opts.lengthDelimited !== false) { | ||
w.fork(); | ||
} | ||
if ((obj.key != null && obj.key !== '')) { | ||
w.uint32(10); | ||
w.string(obj.key); | ||
} | ||
if ((obj.value != null && obj.value.byteLength > 0)) { | ||
w.uint32(18); | ||
w.bytes(obj.value); | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
w.ldelim(); | ||
} | ||
}, (reader, length) => { | ||
const obj = { | ||
key: '', | ||
value: new Uint8Array(0) | ||
}; | ||
const end = length == null ? reader.len : reader.pos + length; | ||
while (reader.pos < end) { | ||
const tag = reader.uint32(); | ||
switch (tag >>> 3) { | ||
case 1: | ||
obj.key = reader.string(); | ||
break; | ||
case 2: | ||
obj.value = reader.bytes(); | ||
break; | ||
default: | ||
reader.skipType(tag & 7); | ||
break; | ||
} | ||
} | ||
return obj; | ||
}); | ||
} | ||
return _codec; | ||
}; | ||
Peer$metadataEntry.encode = (obj) => { | ||
return encodeMessage(obj, Peer$metadataEntry.codec()); | ||
}; | ||
Peer$metadataEntry.decode = (buf) => { | ||
return decodeMessage(buf, Peer$metadataEntry.codec()); | ||
}; | ||
})(Peer$metadataEntry = Peer.Peer$metadataEntry || (Peer.Peer$metadataEntry = {})); | ||
let Peer$tagsEntry; | ||
(function (Peer$tagsEntry) { | ||
let _codec; | ||
Peer$tagsEntry.codec = () => { | ||
if (_codec == null) { | ||
_codec = message((obj, w, opts = {}) => { | ||
if (opts.lengthDelimited !== false) { | ||
w.fork(); | ||
} | ||
if ((obj.key != null && obj.key !== '')) { | ||
w.uint32(10); | ||
w.string(obj.key); | ||
} | ||
if (obj.value != null) { | ||
w.uint32(18); | ||
Tag.codec().encode(obj.value, w); | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
w.ldelim(); | ||
} | ||
}, (reader, length) => { | ||
const obj = { | ||
key: '' | ||
}; | ||
const end = length == null ? reader.len : reader.pos + length; | ||
while (reader.pos < end) { | ||
const tag = reader.uint32(); | ||
switch (tag >>> 3) { | ||
case 1: | ||
obj.key = reader.string(); | ||
break; | ||
case 2: | ||
obj.value = Tag.codec().decode(reader, reader.uint32()); | ||
break; | ||
default: | ||
reader.skipType(tag & 7); | ||
break; | ||
} | ||
} | ||
return obj; | ||
}); | ||
} | ||
return _codec; | ||
}; | ||
Peer$tagsEntry.encode = (obj) => { | ||
return encodeMessage(obj, Peer$tagsEntry.codec()); | ||
}; | ||
Peer$tagsEntry.decode = (buf) => { | ||
return decodeMessage(buf, Peer$tagsEntry.codec()); | ||
}; | ||
})(Peer$tagsEntry = Peer.Peer$tagsEntry || (Peer.Peer$tagsEntry = {})); | ||
let _codec; | ||
@@ -29,11 +132,5 @@ Peer.codec = () => { | ||
} | ||
if (obj.metadata != null) { | ||
for (const value of obj.metadata) { | ||
w.uint32(26); | ||
Metadata.codec().encode(value, w); | ||
} | ||
} | ||
if (obj.pubKey != null) { | ||
if (obj.publicKey != null) { | ||
w.uint32(34); | ||
w.bytes(obj.pubKey); | ||
w.bytes(obj.publicKey); | ||
} | ||
@@ -44,2 +141,14 @@ if (obj.peerRecordEnvelope != null) { | ||
} | ||
if (obj.metadata != null && obj.metadata.size !== 0) { | ||
for (const [key, value] of obj.metadata.entries()) { | ||
w.uint32(50); | ||
Peer.Peer$metadataEntry.codec().encode({ key, value }, w); | ||
} | ||
} | ||
if (obj.tags != null && obj.tags.size !== 0) { | ||
for (const [key, value] of obj.tags.entries()) { | ||
w.uint32(58); | ||
Peer.Peer$tagsEntry.codec().encode({ key, value }, w); | ||
} | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
@@ -52,3 +161,4 @@ w.ldelim(); | ||
protocols: [], | ||
metadata: [] | ||
metadata: new Map(), | ||
tags: new Map() | ||
}; | ||
@@ -65,7 +175,4 @@ const end = length == null ? reader.len : reader.pos + length; | ||
break; | ||
case 3: | ||
obj.metadata.push(Metadata.codec().decode(reader, reader.uint32())); | ||
break; | ||
case 4: | ||
obj.pubKey = reader.bytes(); | ||
obj.publicKey = reader.bytes(); | ||
break; | ||
@@ -75,2 +182,12 @@ case 5: | ||
break; | ||
case 6: { | ||
const entry = Peer.Peer$metadataEntry.codec().decode(reader, reader.uint32()); | ||
obj.metadata.set(entry.key, entry.value); | ||
break; | ||
} | ||
case 7: { | ||
const entry = Peer.Peer$tagsEntry.codec().decode(reader, reader.uint32()); | ||
obj.tags.set(entry.key, entry.value); | ||
break; | ||
} | ||
default: | ||
@@ -144,6 +261,6 @@ reader.skipType(tag & 7); | ||
})(Address || (Address = {})); | ||
export var Metadata; | ||
(function (Metadata) { | ||
export var Tag; | ||
(function (Tag) { | ||
let _codec; | ||
Metadata.codec = () => { | ||
Tag.codec = () => { | ||
if (_codec == null) { | ||
@@ -154,9 +271,9 @@ _codec = message((obj, w, opts = {}) => { | ||
} | ||
if ((obj.key != null && obj.key !== '')) { | ||
w.uint32(10); | ||
w.string(obj.key); | ||
if ((obj.value != null && obj.value !== 0)) { | ||
w.uint32(8); | ||
w.uint32(obj.value); | ||
} | ||
if ((obj.value != null && obj.value.byteLength > 0)) { | ||
w.uint32(18); | ||
w.bytes(obj.value); | ||
if (obj.expiry != null) { | ||
w.uint32(16); | ||
w.uint64(obj.expiry); | ||
} | ||
@@ -168,4 +285,3 @@ if (opts.lengthDelimited !== false) { | ||
const obj = { | ||
key: '', | ||
value: new Uint8Array(0) | ||
value: 0 | ||
}; | ||
@@ -177,6 +293,6 @@ const end = length == null ? reader.len : reader.pos + length; | ||
case 1: | ||
obj.key = reader.string(); | ||
obj.value = reader.uint32(); | ||
break; | ||
case 2: | ||
obj.value = reader.bytes(); | ||
obj.expiry = reader.uint64(); | ||
break; | ||
@@ -193,9 +309,9 @@ default: | ||
}; | ||
Metadata.encode = (obj) => { | ||
return encodeMessage(obj, Metadata.codec()); | ||
Tag.encode = (obj) => { | ||
return encodeMessage(obj, Tag.codec()); | ||
}; | ||
Metadata.decode = (buf) => { | ||
return decodeMessage(buf, Metadata.codec()); | ||
Tag.decode = (buf) => { | ||
return decodeMessage(buf, Tag.codec()); | ||
}; | ||
})(Metadata || (Metadata = {})); | ||
})(Tag || (Tag = {})); | ||
//# sourceMappingURL=peer.js.map |
@@ -1,37 +0,27 @@ | ||
import { Key } from 'interface-datastore/key'; | ||
import type { Peer } from '@libp2p/interface-peer-store'; | ||
import type { Peer, PeerData } from '@libp2p/interface-peer-store'; | ||
import type { PeerId } from '@libp2p/interface-peer-id'; | ||
import type { PersistentPeerStoreComponents } from './index.js'; | ||
export interface Store { | ||
has: (peerId: PeerId) => Promise<boolean>; | ||
save: (peer: Peer) => Promise<Peer>; | ||
load: (peerId: PeerId) => Promise<Peer>; | ||
delete: (peerId: PeerId) => Promise<void>; | ||
merge: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer>; | ||
mergeOrCreate: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer>; | ||
patch: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer>; | ||
patchOrCreate: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer>; | ||
all: () => AsyncIterable<Peer>; | ||
lock: { | ||
readLock: () => Promise<() => void>; | ||
writeLock: () => Promise<() => void>; | ||
}; | ||
import type { PersistentPeerStoreComponents, PersistentPeerStoreInit } from './index.js'; | ||
import type { PeerUpdate as PeerUpdateExternal } from '@libp2p/interface-libp2p'; | ||
import { Mortice } from 'mortice'; | ||
/** | ||
* Event detail emitted when peer data changes | ||
*/ | ||
export interface PeerUpdate extends PeerUpdateExternal { | ||
updated: boolean; | ||
} | ||
export declare class PersistentStore { | ||
private readonly components; | ||
lock: any; | ||
constructor(components: PersistentPeerStoreComponents); | ||
_peerIdToDatastoreKey(peerId: PeerId): Key; | ||
#private; | ||
private readonly peerId; | ||
private readonly datastore; | ||
readonly lock: Mortice; | ||
private readonly addressFilter?; | ||
constructor(components: PersistentPeerStoreComponents, init?: PersistentPeerStoreInit); | ||
has(peerId: PeerId): Promise<boolean>; | ||
delete(peerId: PeerId): Promise<void>; | ||
load(peerId: PeerId): Promise<Peer>; | ||
save(peer: Peer): Promise<Peer>; | ||
patch(peerId: PeerId, data: Partial<Peer>): Promise<Peer>; | ||
patchOrCreate(peerId: PeerId, data: Partial<Peer>): Promise<Peer>; | ||
_patch(peerId: PeerId, data: Partial<Peer>, peer: Peer): Promise<Peer>; | ||
merge(peerId: PeerId, data: Partial<Peer>): Promise<Peer>; | ||
mergeOrCreate(peerId: PeerId, data: Partial<Peer>): Promise<Peer>; | ||
_merge(peerId: PeerId, data: Partial<Peer>, peer: Peer): Promise<Peer>; | ||
save(peerId: PeerId, data: PeerData): Promise<PeerUpdate>; | ||
patch(peerId: PeerId, data: Partial<PeerData>): Promise<PeerUpdate>; | ||
merge(peerId: PeerId, data: PeerData): Promise<PeerUpdate>; | ||
all(): AsyncGenerator<Peer, void, unknown>; | ||
} | ||
//# sourceMappingURL=store.d.ts.map |
@@ -1,16 +0,23 @@ | ||
import { logger } from '@libp2p/logger'; | ||
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) { | ||
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter"); | ||
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it"); | ||
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver); | ||
}; | ||
var _PersistentStore_instances, _PersistentStore_findExistingPeer, _PersistentStore_saveIfDifferent; | ||
import { peerIdFromBytes } from '@libp2p/peer-id'; | ||
import { base32 } from 'multiformats/bases/base32'; | ||
import { Peer as PeerPB } from './pb/peer.js'; | ||
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'; | ||
import { NAMESPACE_COMMON, peerIdToDatastoreKey } from './utils/peer-id-to-datastore-key.js'; | ||
import { bytesToPeer } from './utils/bytes-to-peer.js'; | ||
import { CodeError } from '@libp2p/interfaces/errors'; | ||
import { codes } from './errors.js'; | ||
import { Key } from 'interface-datastore/key'; | ||
import { base32 } from 'multiformats/bases/base32'; | ||
import { multiaddr } from '@multiformats/multiaddr'; | ||
import { Peer as PeerPB } from './pb/peer.js'; | ||
import mortice from 'mortice'; | ||
import { equals as uint8arrayEquals } from 'uint8arrays/equals'; | ||
const log = logger('libp2p:peer-store:store'); | ||
const NAMESPACE_COMMON = '/peers/'; | ||
import { toPeerPB } from './utils/to-peer-pb.js'; | ||
export class PersistentStore { | ||
constructor(components) { | ||
this.components = components; | ||
constructor(components, init = {}) { | ||
_PersistentStore_instances.add(this); | ||
this.peerId = components.peerId; | ||
this.datastore = components.datastore; | ||
this.addressFilter = init.addressFilter; | ||
this.lock = mortice({ | ||
@@ -21,153 +28,40 @@ name: 'peer-store', | ||
} | ||
_peerIdToDatastoreKey(peerId) { | ||
if (peerId.type == null) { | ||
log.error('peerId must be an instance of peer-id to store data'); | ||
throw new CodeError('peerId must be an instance of peer-id', codes.ERR_INVALID_PARAMETERS); | ||
} | ||
const b32key = peerId.toCID().toString(); | ||
return new Key(`${NAMESPACE_COMMON}${b32key}`); | ||
} | ||
async has(peerId) { | ||
return await this.components.datastore.has(this._peerIdToDatastoreKey(peerId)); | ||
return await this.datastore.has(peerIdToDatastoreKey(peerId)); | ||
} | ||
async delete(peerId) { | ||
await this.components.datastore.delete(this._peerIdToDatastoreKey(peerId)); | ||
if (this.peerId.equals(peerId)) { | ||
throw new CodeError('Cannot delete self peer', codes.ERR_INVALID_PARAMETERS); | ||
} | ||
await this.datastore.delete(peerIdToDatastoreKey(peerId)); | ||
} | ||
async load(peerId) { | ||
const buf = await this.components.datastore.get(this._peerIdToDatastoreKey(peerId)); | ||
const peer = PeerPB.decode(buf); | ||
const metadata = new Map(); | ||
for (const meta of peer.metadata) { | ||
metadata.set(meta.key, meta.value); | ||
} | ||
return { | ||
...peer, | ||
id: peerId, | ||
addresses: peer.addresses.map(({ multiaddr: ma, isCertified }) => { | ||
return { | ||
multiaddr: multiaddr(ma), | ||
isCertified: isCertified ?? false | ||
}; | ||
}), | ||
metadata, | ||
pubKey: peer.pubKey ?? undefined, | ||
peerRecordEnvelope: peer.peerRecordEnvelope ?? undefined | ||
}; | ||
const buf = await this.datastore.get(peerIdToDatastoreKey(peerId)); | ||
return await bytesToPeer(peerId, buf); | ||
} | ||
async save(peer) { | ||
if (peer.pubKey != null && peer.id.publicKey != null && !uint8arrayEquals(peer.pubKey, peer.id.publicKey)) { | ||
log.error('peer publicKey bytes do not match peer id publicKey bytes'); | ||
throw new CodeError('publicKey bytes do not match peer id publicKey bytes', codes.ERR_INVALID_PARAMETERS); | ||
} | ||
// dedupe addresses | ||
const addressSet = new Set(); | ||
const addresses = peer.addresses | ||
.filter(address => { | ||
if (addressSet.has(address.multiaddr.toString())) { | ||
return false; | ||
} | ||
addressSet.add(address.multiaddr.toString()); | ||
return true; | ||
}) | ||
.sort((a, b) => { | ||
return a.multiaddr.toString().localeCompare(b.multiaddr.toString()); | ||
}) | ||
.map(({ multiaddr, isCertified }) => ({ | ||
multiaddr: multiaddr.bytes, | ||
isCertified | ||
})); | ||
const metadata = []; | ||
[...peer.metadata.keys()].sort().forEach(key => { | ||
const value = peer.metadata.get(key); | ||
if (value != null) { | ||
metadata.push({ key, value }); | ||
} | ||
async save(peerId, data) { | ||
const { existingBuf, existingPeer } = await __classPrivateFieldGet(this, _PersistentStore_instances, "m", _PersistentStore_findExistingPeer).call(this, peerId); | ||
const peerPb = await toPeerPB(peerId, data, 'patch', { | ||
addressFilter: this.addressFilter | ||
}); | ||
const buf = PeerPB.encode({ | ||
addresses, | ||
protocols: peer.protocols.sort(), | ||
pubKey: peer.pubKey, | ||
metadata, | ||
peerRecordEnvelope: peer.peerRecordEnvelope | ||
}); | ||
await this.components.datastore.put(this._peerIdToDatastoreKey(peer.id), buf.subarray()); | ||
return await this.load(peer.id); | ||
return await __classPrivateFieldGet(this, _PersistentStore_instances, "m", _PersistentStore_saveIfDifferent).call(this, peerId, peerPb, existingBuf, existingPeer); | ||
} | ||
async patch(peerId, data) { | ||
const peer = await this.load(peerId); | ||
return await this._patch(peerId, data, peer); | ||
} | ||
async patchOrCreate(peerId, data) { | ||
let peer; | ||
try { | ||
peer = await this.load(peerId); | ||
} | ||
catch (err) { | ||
if (err.code !== codes.ERR_NOT_FOUND) { | ||
throw err; | ||
} | ||
peer = { id: peerId, addresses: [], protocols: [], metadata: new Map() }; | ||
} | ||
return await this._patch(peerId, data, peer); | ||
} | ||
async _patch(peerId, data, peer) { | ||
return await this.save({ | ||
...peer, | ||
...data, | ||
id: peerId | ||
const { existingBuf, existingPeer } = await __classPrivateFieldGet(this, _PersistentStore_instances, "m", _PersistentStore_findExistingPeer).call(this, peerId); | ||
const peerPb = await toPeerPB(peerId, data, 'patch', { | ||
addressFilter: this.addressFilter, | ||
existingPeer | ||
}); | ||
return await __classPrivateFieldGet(this, _PersistentStore_instances, "m", _PersistentStore_saveIfDifferent).call(this, peerId, peerPb, existingBuf, existingPeer); | ||
} | ||
async merge(peerId, data) { | ||
const peer = await this.load(peerId); | ||
return await this._merge(peerId, data, peer); | ||
} | ||
async mergeOrCreate(peerId, data) { | ||
/** @type {Peer} */ | ||
let peer; | ||
try { | ||
peer = await this.load(peerId); | ||
} | ||
catch (err) { | ||
if (err.code !== codes.ERR_NOT_FOUND) { | ||
throw err; | ||
} | ||
peer = { id: peerId, addresses: [], protocols: [], metadata: new Map() }; | ||
} | ||
return await this._merge(peerId, data, peer); | ||
} | ||
async _merge(peerId, data, peer) { | ||
// if the peer has certified addresses, use those in | ||
// favour of the supplied versions | ||
const addresses = new Map(); | ||
peer.addresses.forEach((addr) => { | ||
addresses.set(addr.multiaddr.toString(), addr.isCertified); | ||
const { existingBuf, existingPeer } = await __classPrivateFieldGet(this, _PersistentStore_instances, "m", _PersistentStore_findExistingPeer).call(this, peerId); | ||
const peerPb = await toPeerPB(peerId, data, 'merge', { | ||
addressFilter: this.addressFilter, | ||
existingPeer | ||
}); | ||
(data.addresses ?? []).forEach(addr => { | ||
const addrString = addr.multiaddr.toString(); | ||
const isAlreadyCertified = Boolean(addresses.get(addrString)); | ||
const isCertified = isAlreadyCertified || addr.isCertified; | ||
addresses.set(addrString, isCertified); | ||
}); | ||
return await this.save({ | ||
id: peerId, | ||
addresses: Array.from(addresses.entries()).map(([addrStr, isCertified]) => { | ||
return { | ||
multiaddr: multiaddr(addrStr), | ||
isCertified | ||
}; | ||
}), | ||
protocols: Array.from(new Set([ | ||
...(peer.protocols ?? []), | ||
...(data.protocols ?? []) | ||
])), | ||
metadata: new Map([ | ||
...(peer.metadata?.entries() ?? []), | ||
...(data.metadata?.entries() ?? []) | ||
]), | ||
pubKey: data.pubKey ?? (peer != null ? peer.pubKey : undefined), | ||
peerRecordEnvelope: data.peerRecordEnvelope ?? (peer != null ? peer.peerRecordEnvelope : undefined) | ||
}); | ||
return await __classPrivateFieldGet(this, _PersistentStore_instances, "m", _PersistentStore_saveIfDifferent).call(this, peerId, peerPb, existingBuf, existingPeer); | ||
} | ||
async *all() { | ||
for await (const key of this.components.datastore.queryKeys({ | ||
for await (const { key, value } of this.datastore.query({ | ||
prefix: NAMESPACE_COMMON | ||
@@ -178,6 +72,42 @@ })) { | ||
const buf = base32.decode(base32Str); | ||
yield this.load(peerIdFromBytes(buf)); | ||
const peerId = peerIdFromBytes(buf); | ||
if (peerId.equals(this.peerId)) { | ||
// Skip self peer if present | ||
continue; | ||
} | ||
yield bytesToPeer(peerId, value); | ||
} | ||
} | ||
} | ||
_PersistentStore_instances = new WeakSet(), _PersistentStore_findExistingPeer = async function _PersistentStore_findExistingPeer(peerId) { | ||
try { | ||
const existingBuf = await this.datastore.get(peerIdToDatastoreKey(peerId)); | ||
const existingPeer = await bytesToPeer(peerId, existingBuf); | ||
return { | ||
existingBuf, | ||
existingPeer | ||
}; | ||
} | ||
catch (err) { | ||
if (err.code !== 'ERR_NOT_FOUND') { | ||
throw err; | ||
} | ||
} | ||
return {}; | ||
}, _PersistentStore_saveIfDifferent = async function _PersistentStore_saveIfDifferent(peerId, peer, existingBuf, existingPeer) { | ||
const buf = PeerPB.encode(peer); | ||
if (existingBuf != null && uint8ArrayEquals(buf, existingBuf)) { | ||
return { | ||
peer: await bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: false | ||
}; | ||
} | ||
await this.datastore.put(peerIdToDatastoreKey(peerId), buf); | ||
return { | ||
peer: await bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: true | ||
}; | ||
}; | ||
//# sourceMappingURL=store.js.map |
{ | ||
"Store": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/_internal_.Store.html", | ||
"codec": "https://libp2p.github.io/js-libp2p-peer-store/functions/_internal_.Address.codec.html", | ||
"decode": "https://libp2p.github.io/js-libp2p-peer-store/functions/_internal_.Address.decode.html", | ||
"encode": "https://libp2p.github.io/js-libp2p-peer-store/functions/_internal_.Address.encode.html", | ||
"Peer$metadataEntry": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/_internal_.Peer.Peer_metadataEntry-1.html", | ||
"Peer$tagsEntry": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/_internal_.Peer.Peer_tagsEntry-1.html", | ||
"PersistentStore": "https://libp2p.github.io/js-libp2p-peer-store/classes/_internal_.PersistentStore.html", | ||
"Address": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/_internal_.Address-1.html", | ||
"Peer": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/_internal_.Peer-1.html", | ||
"PeerUpdate": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/_internal_.PeerUpdate.html", | ||
"Tag": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/_internal_.Tag-1.html", | ||
"PersistentPeerStore": "https://libp2p.github.io/js-libp2p-peer-store/classes/PersistentPeerStore.html", | ||
"PersistentPeerStoreComponents": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/PersistentPeerStoreComponents.html" | ||
"AddressFilter": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/AddressFilter.html", | ||
"PersistentPeerStoreComponents": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/PersistentPeerStoreComponents.html", | ||
"PersistentPeerStoreInit": "https://libp2p.github.io/js-libp2p-peer-store/interfaces/PersistentPeerStoreInit.html" | ||
} |
{ | ||
"name": "@libp2p/peer-store", | ||
"version": "7.0.2", | ||
"version": "8.0.0", | ||
"description": "Stores information about peers libp2p knows on the network", | ||
@@ -134,3 +134,3 @@ "license": "Apache-2.0 OR MIT", | ||
"dep-check": "aegir dep-check -i protons", | ||
"generate": "protons src/pb/peer.proto src/pb/tags.proto", | ||
"generate": "protons src/pb/*.proto", | ||
"build": "aegir build", | ||
@@ -148,13 +148,12 @@ "test": "aegir test", | ||
"dependencies": { | ||
"@libp2p/crypto": "^1.0.15", | ||
"@libp2p/interface-libp2p": "^2.0.0", | ||
"@libp2p/interface-peer-id": "^2.0.0", | ||
"@libp2p/interface-peer-info": "^1.0.3", | ||
"@libp2p/interface-peer-store": "^1.2.2", | ||
"@libp2p/interface-record": "^2.0.1", | ||
"@libp2p/interface-peer-store": "^2.0.1", | ||
"@libp2p/interfaces": "^3.2.0", | ||
"@libp2p/logger": "^2.0.0", | ||
"@libp2p/logger": "^2.0.7", | ||
"@libp2p/peer-id": "^2.0.0", | ||
"@libp2p/peer-record": "^5.0.0", | ||
"@multiformats/multiaddr": "^12.0.0", | ||
"interface-datastore": "^8.0.0", | ||
"mortice": "^3.0.0", | ||
"mortice": "^3.0.1", | ||
"multiformats": "^11.0.0", | ||
@@ -167,3 +166,2 @@ "protons-runtime": "^5.0.0", | ||
"@libp2p/peer-id-factory": "^2.0.0", | ||
"@libp2p/utils": "^3.0.2", | ||
"aegir": "^38.1.6", | ||
@@ -173,3 +171,3 @@ "datastore-core": "^9.0.1", | ||
"p-defer": "^4.0.0", | ||
"p-wait-for": "^5.0.0", | ||
"p-event": "^5.0.1", | ||
"protons": "^7.0.2", | ||
@@ -176,0 +174,0 @@ "sinon": "^15.0.1" |
170
README.md
@@ -14,22 +14,2 @@ # @libp2p/peer-store <!-- omit in toc --> | ||
- [Browser `<script>` tag](#browser-script-tag) | ||
- [Description](#description) | ||
- [Submitting records to the PeerStore](#submitting-records-to-the-peerstore) | ||
- [Identify](#identify) | ||
- [Peer Discovery](#peer-discovery) | ||
- [Dialer](#dialer) | ||
- [DHT](#dht) | ||
- [Retrieving records from the PeerStore](#retrieving-records-from-the-peerstore) | ||
- [Peer](#peer) | ||
- [Protocols](#protocols) | ||
- [Multiaddrs](#multiaddrs) | ||
- [PeerStore implementation](#peerstore-implementation) | ||
- [Components](#components) | ||
- [Address Book](#address-book) | ||
- [Key Book](#key-book) | ||
- [Protocol Book](#protocol-book) | ||
- [Metadata Book](#metadata-book) | ||
- [API](#api) | ||
- [Events](#events) | ||
- [Data Persistence](#data-persistence) | ||
- [Future Considerations](#future-considerations) | ||
- [API Docs](#api-docs) | ||
@@ -53,152 +33,2 @@ - [License](#license) | ||
## Description | ||
Libp2p's PeerStore is responsible for keeping an updated register with the relevant information of the known peers. It should be the single source of truth for all peer data, where a subsystem can learn about peers' data and where someone can listen for updates. The PeerStore comprises four main components: `addressBook`, `keyBook`, `protocolBook` and `metadataBook`. | ||
The PeerStore manages the high level operations on its inner books. Moreover, the PeerStore should be responsible for notifying interested parties of relevant events, through its Event Emitter. | ||
### Submitting records to the PeerStore | ||
Several libp2p subsystems will perform operations that might gather relevant information about peers. | ||
#### Identify | ||
- The Identify protocol automatically runs on every connection when multiplexing is enabled. The protocol will put the multiaddrs and protocols provided by the peer to the PeerStore. | ||
- In the background, the Identify Service is also waiting for protocol change notifications of peers via the IdentifyPush protocol. Peers may leverage the `identify-push` message to communicate protocol changes to all connected peers, so that their PeerStore can be updated with the updated protocols. | ||
- While it is currently not supported in js-libp2p, future iterations may also support the [IdentifyDelta protocol](https://github.com/libp2p/specs/pull/176). | ||
- Taking into account that the Identify protocol records are directly from the peer, they should be considered the source of truth and weighted accordingly. | ||
#### Peer Discovery | ||
- Libp2p discovery protocols aim to discover new peers in the network. In a typical discovery protocol, addresses of the peer are discovered along with its peer id. Once this happens, a libp2p discovery protocol should emit a `peer` event with the information of the discovered peer and this information will be added to the PeerStore by libp2p. | ||
#### Dialer | ||
- Libp2p API supports dialing a peer given a `multiaddr`, and no prior knowledge of the peer. If the node is able to establish a connection with the peer, it and its multiaddr is added to the PeerStore. | ||
- When a connection is being upgraded, more precisely after its encryption, or even in a discovery protocol, a libp2p node can get to know other parties public keys. In this scenario, libp2p will add the peer's public key to its `KeyBook`. | ||
#### DHT | ||
- On some DHT operations, such as finding providers for a given CID, nodes may exchange peer data as part of the query. This passive peer discovery should result in the DHT emitting the `peer` event in the same way [Peer Discovery](#peerdiscovery) does. | ||
### Retrieving records from the PeerStore | ||
When data in the PeerStore is updated the PeerStore will emit events based on the changes, to allow applications and other subsystems to take action on those changes. Any subsystem interested in these notifications should subscribe the [`PeerStore events`][peer-store-events]. | ||
#### Peer | ||
- Each time a new peer is discovered, the PeerStore should emit a [`peer` event][peer-store-events], so that interested parties can leverage this peer and establish a connection with it. | ||
#### Protocols | ||
- When the known protocols of a peer change, the PeerStore emits a [`change:protocols` event][peer-store-events]. | ||
#### Multiaddrs | ||
- When the known listening `multiaddrs` of a peer change, the PeerStore emits a [`change:multiaddrs` event][peer-store-events]. | ||
### PeerStore implementation | ||
The PeerStore wraps four main components: `addressBook`, `keyBook`, `protocolBook` and `metadataBook`. Moreover, it provides a high level API for those components, as well as data events. | ||
### Components | ||
#### Address Book | ||
The `addressBook` keeps the known multiaddrs of a peer. The multiaddrs of each peer may change over time and the Address Book must account for this. | ||
`Map<string, Address>` | ||
A `peerId.toString()` identifier mapping to a `Address` object, which should have the following structure: | ||
```js | ||
{ | ||
multiaddr: <Multiaddr> | ||
} | ||
``` | ||
#### Key Book | ||
The `keyBook` tracks the public keys of the peers by keeping their [`PeerId`][peer-id]. | ||
`Map<string, PeerId` | ||
A `peerId.toString()` identifier mapping to a `PeerId` of the peer. This instance contains the peer public key. | ||
#### Protocol Book | ||
The `protoBook` holds the identifiers of the protocols supported by each peer. The protocols supported by each peer are dynamic and will change over time. | ||
`Map<string, Set<string>>` | ||
A `peerId.toString()` identifier mapping to a `Set` of protocol identifier strings. | ||
#### Metadata Book | ||
The `metadataBook` keeps track of the known metadata of a peer. Its metadata is stored in a key value fashion, where a key identifier (`string`) represents a metadata value (`Uint8Array`). | ||
`Map<string, Map<string, Uint8Array>>` | ||
A `peerId.toString()` identifier mapping to the peer metadata Map. | ||
### API | ||
For the complete API documentation, you should check the [API.md](https://libp2p.github.io/js-libp2p-peer-store). | ||
Access to its underlying books: | ||
- `peerStore.addressBook.*` | ||
- `peerStore.keyBook.*` | ||
- `peerStore.metadataBook.*` | ||
- `peerStore.protoBook.*` | ||
### Events | ||
- `peer` - emitted when a new peer is added. | ||
- `change:multiaddrs` - emitted when a known peer has a different set of multiaddrs. | ||
- `change:protocols` - emitted when a known peer supports a different set of protocols. | ||
- `change:pubkey` - emitted when a peer's public key is known. | ||
- `change:metadata` - emitted when known metadata of a peer changes. | ||
## Data Persistence | ||
The data stored in the PeerStore can be persisted if configured appropriately. Keeping a record of the peers already discovered by the peer, as well as their known data aims to improve the efficiency of peers joining the network after being offline. | ||
The libp2p node will need to receive a [datastore](https://github.com/ipfs/interface-datastore), in order to persist this data across restarts. A [datastore](https://github.com/ipfs/interface-datastore) stores its data in a key-value fashion. As a result, we need coherent keys so that we do not overwrite data. | ||
The PeerStore should not continuously update the datastore whenever data is changed. Instead, it should only store new data after reaching a certain threshold of "dirty" peers, as well as when the node is stopped, in order to batch writes to the datastore. | ||
The peer id will be appended to the datastore key for each data namespace. The namespaces were defined as follows: | ||
**AddressBook** | ||
All the known peer addresses are stored with a key pattern as follows: | ||
`/peers/addrs/<b32 peer id no padding>` | ||
**ProtoBook** | ||
All the known peer protocols are stored with a key pattern as follows: | ||
`/peers/protos/<b32 peer id no padding>` | ||
**KeyBook** | ||
All public keys are stored under the following pattern: | ||
` /peers/keys/<b32 peer id no padding>` | ||
**MetadataBook** | ||
Metadata is stored under the following key pattern: | ||
`/peers/metadata/<b32 peer id no padding>/<key>` | ||
## Future Considerations | ||
- If multiaddr TTLs are added, the PeerStore may schedule jobs to delete all addresses that exceed the TTL to prevent AddressBook bloating | ||
- Further API methods will probably need to be added in the context of multiaddr validity and confidence. | ||
- When improving libp2p configuration for specific runtimes, we should take into account the PeerStore recommended datastore. | ||
- When improving libp2p configuration, we should think about a possible way of allowing the configuration of Bootstrap to be influenced by the persisted peers, as a way to decrease the load on Bootstrap nodes. | ||
## API Docs | ||
@@ -205,0 +35,0 @@ |
export const codes = { | ||
ERR_INVALID_PARAMETERS: 'ERR_INVALID_PARAMETERS', | ||
ERR_NOT_FOUND: 'ERR_NOT_FOUND' | ||
ERR_INVALID_PARAMETERS: 'ERR_INVALID_PARAMETERS' | ||
} |
196
src/index.ts
@@ -1,13 +0,9 @@ | ||
import { logger } from '@libp2p/logger' | ||
import { EventEmitter } from '@libp2p/interfaces/events' | ||
import { PeerStoreAddressBook } from './address-book.js' | ||
import { PeerStoreKeyBook } from './key-book.js' | ||
import { PeerStoreMetadataBook } from './metadata-book.js' | ||
import { PeerStoreProtoBook } from './proto-book.js' | ||
import { PersistentStore, Store } from './store.js' | ||
import type { PeerStore, AddressBook, KeyBook, MetadataBook, ProtoBook, PeerStoreEvents, PeerStoreInit, Peer, TagOptions } from '@libp2p/interface-peer-store' | ||
import type { EventEmitter } from '@libp2p/interfaces/events' | ||
import { PersistentStore, PeerUpdate } from './store.js' | ||
import type { PeerStore, Peer, PeerData } from '@libp2p/interface-peer-store' | ||
import type { PeerId } from '@libp2p/interface-peer-id' | ||
import { CodeError } from '@libp2p/interfaces/errors' | ||
import { Tag, Tags } from './pb/tags.js' | ||
import type { Datastore } from 'interface-datastore' | ||
import type { Multiaddr } from '@multiformats/multiaddr' | ||
import type { Libp2pEvents } from '@libp2p/interface-libp2p' | ||
import { logger } from '@libp2p/logger' | ||
@@ -19,43 +15,41 @@ const log = logger('libp2p:peer-store') | ||
datastore: Datastore | ||
events: EventEmitter<Libp2pEvents> | ||
} | ||
/** | ||
* An implementation of PeerStore that stores data in a Datastore | ||
* Return true to allow storing the passed multiaddr for the passed peer | ||
*/ | ||
export class PersistentPeerStore extends EventEmitter<PeerStoreEvents> implements PeerStore { | ||
public addressBook: AddressBook | ||
public keyBook: KeyBook | ||
public metadataBook: MetadataBook | ||
public protoBook: ProtoBook | ||
export interface AddressFilter { | ||
(peerId: PeerId, multiaddr: Multiaddr): Promise<boolean> | ||
} | ||
private readonly components: PersistentPeerStoreComponents | ||
private readonly store: Store | ||
export interface PersistentPeerStoreInit { | ||
addressFilter?: AddressFilter | ||
} | ||
constructor (components: PersistentPeerStoreComponents, init: PeerStoreInit = {}) { | ||
super() | ||
/** | ||
* An implementation of PeerStore that stores data in a Datastore | ||
*/ | ||
export class PersistentPeerStore implements PeerStore { | ||
private readonly store: PersistentStore | ||
private readonly events: EventEmitter<Libp2pEvents> | ||
private readonly peerId: PeerId | ||
this.components = components | ||
this.store = new PersistentStore(components) | ||
this.addressBook = new PeerStoreAddressBook(this.dispatchEvent.bind(this), this.store, init.addressFilter) | ||
this.keyBook = new PeerStoreKeyBook(this.dispatchEvent.bind(this), this.store) | ||
this.metadataBook = new PeerStoreMetadataBook(this.dispatchEvent.bind(this), this.store) | ||
this.protoBook = new PeerStoreProtoBook(this.dispatchEvent.bind(this), this.store) | ||
constructor (components: PersistentPeerStoreComponents, init: PersistentPeerStoreInit = {}) { | ||
this.events = components.events | ||
this.peerId = components.peerId | ||
this.store = new PersistentStore(components, init) | ||
} | ||
async forEach (fn: (peer: Peer) => void): Promise<void> { | ||
log.trace('getPeers await read lock') | ||
log.trace('forEach await read lock') | ||
const release = await this.store.lock.readLock() | ||
log.trace('getPeers got read lock') | ||
log.trace('forEach got read lock') | ||
try { | ||
for await (const peer of this.store.all()) { | ||
if (peer.id.equals(this.components.peerId)) { | ||
// Skip self peer if present | ||
continue | ||
} | ||
fn(peer) | ||
} | ||
} finally { | ||
log.trace('getPeers release read lock') | ||
log.trace('forEach release read lock') | ||
release() | ||
@@ -66,14 +60,20 @@ } | ||
async all (): Promise<Peer[]> { | ||
const output: Peer[] = [] | ||
log.trace('all await read lock') | ||
const release = await this.store.lock.readLock() | ||
log.trace('all got read lock') | ||
await this.forEach(peer => { | ||
output.push(peer) | ||
}) | ||
try { | ||
const output: Peer[] = [] | ||
return output | ||
for await (const peer of this.store.all()) { | ||
output.push(peer) | ||
} | ||
return output | ||
} finally { | ||
log.trace('all release read lock') | ||
release() | ||
} | ||
} | ||
/** | ||
* Delete the information of the given peer in every book | ||
*/ | ||
async delete (peerId: PeerId): Promise<void> { | ||
@@ -92,5 +92,15 @@ log.trace('delete await write lock') | ||
/** | ||
* Get the stored information of a given peer | ||
*/ | ||
async has (peerId: PeerId): Promise<boolean> { | ||
log.trace('has await read lock') | ||
const release = await this.store.lock.readLock() | ||
log.trace('has got read lock') | ||
try { | ||
return await this.store.has(peerId) | ||
} finally { | ||
log.trace('has release read lock') | ||
release() | ||
} | ||
} | ||
async get (peerId: PeerId): Promise<Peer> { | ||
@@ -109,14 +119,15 @@ log.trace('get await read lock') | ||
/** | ||
* Returns true if we have a record of the peer | ||
*/ | ||
async has (peerId: PeerId): Promise<boolean> { | ||
log.trace('has await read lock') | ||
const release = await this.store.lock.readLock() | ||
log.trace('has got read lock') | ||
async save (id: PeerId, data: PeerData): Promise<Peer> { | ||
log.trace('save await write lock') | ||
const release = await this.store.lock.writeLock() | ||
log.trace('save got write lock') | ||
try { | ||
return await this.store.has(peerId) | ||
const result = await this.store.save(id, data) | ||
this.#emitIfUpdated(id, result) | ||
return result.peer | ||
} finally { | ||
log.trace('has release read lock') | ||
log.trace('save release write lock') | ||
release() | ||
@@ -126,64 +137,47 @@ } | ||
async tagPeer (peerId: PeerId, tag: string, options: TagOptions = {}): Promise<void> { | ||
const providedValue = options.value ?? 0 | ||
const value = Math.round(providedValue) | ||
const ttl = options.ttl ?? undefined | ||
async patch (id: PeerId, data: PeerData): Promise<Peer> { | ||
log.trace('patch await write lock') | ||
const release = await this.store.lock.writeLock() | ||
log.trace('patch got write lock') | ||
if (value !== providedValue || value < 0 || value > 100) { | ||
throw new CodeError('Tag value must be between 0-100', 'ERR_TAG_VALUE_OUT_OF_BOUNDS') | ||
} | ||
try { | ||
const result = await this.store.patch(id, data) | ||
const buf = await this.metadataBook.getValue(peerId, 'tags') | ||
let tags: Tag[] = [] | ||
this.#emitIfUpdated(id, result) | ||
if (buf != null) { | ||
tags = Tags.decode(buf).tags | ||
return result.peer | ||
} finally { | ||
log.trace('patch release write lock') | ||
release() | ||
} | ||
} | ||
// do not allow duplicate tags | ||
tags = tags.filter(t => t.name !== tag) | ||
async merge (id: PeerId, data: PeerData): Promise<Peer> { | ||
log.trace('merge await write lock') | ||
const release = await this.store.lock.writeLock() | ||
log.trace('merge got write lock') | ||
tags.push({ | ||
name: tag, | ||
value, | ||
expiry: ttl == null ? undefined : BigInt(Date.now() + ttl) | ||
}) | ||
try { | ||
const result = await this.store.merge(id, data) | ||
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }).subarray()) | ||
} | ||
this.#emitIfUpdated(id, result) | ||
async unTagPeer (peerId: PeerId, tag: string): Promise<void> { | ||
const buf = await this.metadataBook.getValue(peerId, 'tags') | ||
let tags: Tag[] = [] | ||
if (buf != null) { | ||
tags = Tags.decode(buf).tags | ||
return result.peer | ||
} finally { | ||
log.trace('merge release write lock') | ||
release() | ||
} | ||
tags = tags.filter(t => t.name !== tag) | ||
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }).subarray()) | ||
} | ||
async getTags (peerId: PeerId): Promise<Array<{ name: string, value: number }>> { | ||
const buf = await this.metadataBook.getValue(peerId, 'tags') | ||
let tags: Tag[] = [] | ||
if (buf != null) { | ||
tags = Tags.decode(buf).tags | ||
#emitIfUpdated (id: PeerId, result: PeerUpdate): void { | ||
if (!result.updated) { | ||
return | ||
} | ||
const now = BigInt(Date.now()) | ||
const unexpiredTags = tags.filter(tag => tag.expiry == null || tag.expiry > now) | ||
if (unexpiredTags.length !== tags.length) { | ||
// remove any expired tags | ||
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags: unexpiredTags }).subarray()) | ||
if (this.peerId.equals(id)) { | ||
this.events.safeDispatchEvent('self:peer:update', { detail: result }) | ||
} else { | ||
this.events.safeDispatchEvent('peer:update', { detail: result }) | ||
} | ||
return unexpiredTags.map(t => ({ | ||
name: t.name, | ||
value: t.value ?? 0 | ||
})) | ||
} | ||
} |
@@ -14,8 +14,144 @@ /* eslint-disable import/export */ | ||
protocols: string[] | ||
metadata: Metadata[] | ||
pubKey?: Uint8Array | ||
publicKey?: Uint8Array | ||
peerRecordEnvelope?: Uint8Array | ||
metadata: Map<string, Uint8Array> | ||
tags: Map<string, Tag> | ||
} | ||
export namespace Peer { | ||
export interface Peer$metadataEntry { | ||
key: string | ||
value: Uint8Array | ||
} | ||
export namespace Peer$metadataEntry { | ||
let _codec: Codec<Peer$metadataEntry> | ||
export const codec = (): Codec<Peer$metadataEntry> => { | ||
if (_codec == null) { | ||
_codec = message<Peer$metadataEntry>((obj, w, opts = {}) => { | ||
if (opts.lengthDelimited !== false) { | ||
w.fork() | ||
} | ||
if ((obj.key != null && obj.key !== '')) { | ||
w.uint32(10) | ||
w.string(obj.key) | ||
} | ||
if ((obj.value != null && obj.value.byteLength > 0)) { | ||
w.uint32(18) | ||
w.bytes(obj.value) | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
w.ldelim() | ||
} | ||
}, (reader, length) => { | ||
const obj: any = { | ||
key: '', | ||
value: new Uint8Array(0) | ||
} | ||
const end = length == null ? reader.len : reader.pos + length | ||
while (reader.pos < end) { | ||
const tag = reader.uint32() | ||
switch (tag >>> 3) { | ||
case 1: | ||
obj.key = reader.string() | ||
break | ||
case 2: | ||
obj.value = reader.bytes() | ||
break | ||
default: | ||
reader.skipType(tag & 7) | ||
break | ||
} | ||
} | ||
return obj | ||
}) | ||
} | ||
return _codec | ||
} | ||
export const encode = (obj: Partial<Peer$metadataEntry>): Uint8Array => { | ||
return encodeMessage(obj, Peer$metadataEntry.codec()) | ||
} | ||
export const decode = (buf: Uint8Array | Uint8ArrayList): Peer$metadataEntry => { | ||
return decodeMessage(buf, Peer$metadataEntry.codec()) | ||
} | ||
} | ||
export interface Peer$tagsEntry { | ||
key: string | ||
value?: Tag | ||
} | ||
export namespace Peer$tagsEntry { | ||
let _codec: Codec<Peer$tagsEntry> | ||
export const codec = (): Codec<Peer$tagsEntry> => { | ||
if (_codec == null) { | ||
_codec = message<Peer$tagsEntry>((obj, w, opts = {}) => { | ||
if (opts.lengthDelimited !== false) { | ||
w.fork() | ||
} | ||
if ((obj.key != null && obj.key !== '')) { | ||
w.uint32(10) | ||
w.string(obj.key) | ||
} | ||
if (obj.value != null) { | ||
w.uint32(18) | ||
Tag.codec().encode(obj.value, w) | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
w.ldelim() | ||
} | ||
}, (reader, length) => { | ||
const obj: any = { | ||
key: '' | ||
} | ||
const end = length == null ? reader.len : reader.pos + length | ||
while (reader.pos < end) { | ||
const tag = reader.uint32() | ||
switch (tag >>> 3) { | ||
case 1: | ||
obj.key = reader.string() | ||
break | ||
case 2: | ||
obj.value = Tag.codec().decode(reader, reader.uint32()) | ||
break | ||
default: | ||
reader.skipType(tag & 7) | ||
break | ||
} | ||
} | ||
return obj | ||
}) | ||
} | ||
return _codec | ||
} | ||
export const encode = (obj: Partial<Peer$tagsEntry>): Uint8Array => { | ||
return encodeMessage(obj, Peer$tagsEntry.codec()) | ||
} | ||
export const decode = (buf: Uint8Array | Uint8ArrayList): Peer$tagsEntry => { | ||
return decodeMessage(buf, Peer$tagsEntry.codec()) | ||
} | ||
} | ||
let _codec: Codec<Peer> | ||
@@ -44,12 +180,5 @@ | ||
if (obj.metadata != null) { | ||
for (const value of obj.metadata) { | ||
w.uint32(26) | ||
Metadata.codec().encode(value, w) | ||
} | ||
} | ||
if (obj.pubKey != null) { | ||
if (obj.publicKey != null) { | ||
w.uint32(34) | ||
w.bytes(obj.pubKey) | ||
w.bytes(obj.publicKey) | ||
} | ||
@@ -62,2 +191,16 @@ | ||
if (obj.metadata != null && obj.metadata.size !== 0) { | ||
for (const [key, value] of obj.metadata.entries()) { | ||
w.uint32(50) | ||
Peer.Peer$metadataEntry.codec().encode({ key, value }, w) | ||
} | ||
} | ||
if (obj.tags != null && obj.tags.size !== 0) { | ||
for (const [key, value] of obj.tags.entries()) { | ||
w.uint32(58) | ||
Peer.Peer$tagsEntry.codec().encode({ key, value }, w) | ||
} | ||
} | ||
if (opts.lengthDelimited !== false) { | ||
@@ -70,3 +213,4 @@ w.ldelim() | ||
protocols: [], | ||
metadata: [] | ||
metadata: new Map<string, Uint8Array>(), | ||
tags: new Map<string, undefined>() | ||
} | ||
@@ -86,7 +230,4 @@ | ||
break | ||
case 3: | ||
obj.metadata.push(Metadata.codec().decode(reader, reader.uint32())) | ||
break | ||
case 4: | ||
obj.pubKey = reader.bytes() | ||
obj.publicKey = reader.bytes() | ||
break | ||
@@ -96,2 +237,12 @@ case 5: | ||
break | ||
case 6: { | ||
const entry = Peer.Peer$metadataEntry.codec().decode(reader, reader.uint32()) | ||
obj.metadata.set(entry.key, entry.value) | ||
break | ||
} | ||
case 7: { | ||
const entry = Peer.Peer$tagsEntry.codec().decode(reader, reader.uint32()) | ||
obj.tags.set(entry.key, entry.value) | ||
break | ||
} | ||
default: | ||
@@ -186,13 +337,13 @@ reader.skipType(tag & 7) | ||
export interface Metadata { | ||
key: string | ||
value: Uint8Array | ||
export interface Tag { | ||
value: number | ||
expiry?: bigint | ||
} | ||
export namespace Metadata { | ||
let _codec: Codec<Metadata> | ||
export namespace Tag { | ||
let _codec: Codec<Tag> | ||
export const codec = (): Codec<Metadata> => { | ||
export const codec = (): Codec<Tag> => { | ||
if (_codec == null) { | ||
_codec = message<Metadata>((obj, w, opts = {}) => { | ||
_codec = message<Tag>((obj, w, opts = {}) => { | ||
if (opts.lengthDelimited !== false) { | ||
@@ -202,10 +353,10 @@ w.fork() | ||
if ((obj.key != null && obj.key !== '')) { | ||
w.uint32(10) | ||
w.string(obj.key) | ||
if ((obj.value != null && obj.value !== 0)) { | ||
w.uint32(8) | ||
w.uint32(obj.value) | ||
} | ||
if ((obj.value != null && obj.value.byteLength > 0)) { | ||
w.uint32(18) | ||
w.bytes(obj.value) | ||
if (obj.expiry != null) { | ||
w.uint32(16) | ||
w.uint64(obj.expiry) | ||
} | ||
@@ -218,4 +369,3 @@ | ||
const obj: any = { | ||
key: '', | ||
value: new Uint8Array(0) | ||
value: 0 | ||
} | ||
@@ -230,6 +380,6 @@ | ||
case 1: | ||
obj.key = reader.string() | ||
obj.value = reader.uint32() | ||
break | ||
case 2: | ||
obj.value = reader.bytes() | ||
obj.expiry = reader.uint64() | ||
break | ||
@@ -249,9 +399,9 @@ default: | ||
export const encode = (obj: Partial<Metadata>): Uint8Array => { | ||
return encodeMessage(obj, Metadata.codec()) | ||
export const encode = (obj: Partial<Tag>): Uint8Array => { | ||
return encodeMessage(obj, Tag.codec()) | ||
} | ||
export const decode = (buf: Uint8Array | Uint8ArrayList): Metadata => { | ||
return decodeMessage(buf, Metadata.codec()) | ||
export const decode = (buf: Uint8Array | Uint8ArrayList): Tag => { | ||
return decodeMessage(buf, Tag.codec()) | ||
} | ||
} |
285
src/store.ts
@@ -1,42 +0,34 @@ | ||
import { logger } from '@libp2p/logger' | ||
import { peerIdFromBytes } from '@libp2p/peer-id' | ||
import { base32 } from 'multiformats/bases/base32' | ||
import { Peer as PeerPB } from './pb/peer.js' | ||
import type { Peer, PeerData } from '@libp2p/interface-peer-store' | ||
import type { PeerId } from '@libp2p/interface-peer-id' | ||
import type { AddressFilter, PersistentPeerStoreComponents, PersistentPeerStoreInit } from './index.js' | ||
import { equals as uint8ArrayEquals } from 'uint8arrays/equals' | ||
import { NAMESPACE_COMMON, peerIdToDatastoreKey } from './utils/peer-id-to-datastore-key.js' | ||
import { bytesToPeer } from './utils/bytes-to-peer.js' | ||
import { CodeError } from '@libp2p/interfaces/errors' | ||
import { codes } from './errors.js' | ||
import { Key } from 'interface-datastore/key' | ||
import { base32 } from 'multiformats/bases/base32' | ||
import { multiaddr } from '@multiformats/multiaddr' | ||
import { Metadata, Peer as PeerPB } from './pb/peer.js' | ||
import mortice from 'mortice' | ||
import { equals as uint8arrayEquals } from 'uint8arrays/equals' | ||
import type { Peer } from '@libp2p/interface-peer-store' | ||
import type { PeerId } from '@libp2p/interface-peer-id' | ||
import type { PersistentPeerStoreComponents } from './index.js' | ||
import type { Datastore } from 'interface-datastore' | ||
import type { PeerUpdate as PeerUpdateExternal } from '@libp2p/interface-libp2p' | ||
import mortice, { Mortice } from 'mortice' | ||
import { toPeerPB } from './utils/to-peer-pb.js' | ||
const log = logger('libp2p:peer-store:store') | ||
const NAMESPACE_COMMON = '/peers/' | ||
export interface Store { | ||
has: (peerId: PeerId) => Promise<boolean> | ||
save: (peer: Peer) => Promise<Peer> | ||
load: (peerId: PeerId) => Promise<Peer> | ||
delete: (peerId: PeerId) => Promise<void> | ||
merge: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer> | ||
mergeOrCreate: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer> | ||
patch: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer> | ||
patchOrCreate: (peerId: PeerId, data: Partial<Peer>) => Promise<Peer> | ||
all: () => AsyncIterable<Peer> | ||
lock: { | ||
readLock: () => Promise<() => void> | ||
writeLock: () => Promise<() => void> | ||
} | ||
/** | ||
* Event detail emitted when peer data changes | ||
*/ | ||
export interface PeerUpdate extends PeerUpdateExternal { | ||
updated: boolean | ||
} | ||
export class PersistentStore { | ||
private readonly components: PersistentPeerStoreComponents | ||
public lock: any | ||
private readonly peerId: PeerId | ||
private readonly datastore: Datastore | ||
public readonly lock: Mortice | ||
private readonly addressFilter?: AddressFilter | ||
constructor (components: PersistentPeerStoreComponents) { | ||
this.components = components | ||
constructor (components: PersistentPeerStoreComponents, init: PersistentPeerStoreInit = {}) { | ||
this.peerId = components.peerId | ||
this.datastore = components.datastore | ||
this.addressFilter = init.addressFilter | ||
this.lock = mortice({ | ||
@@ -48,195 +40,116 @@ name: 'peer-store', | ||
_peerIdToDatastoreKey (peerId: PeerId): Key { | ||
if (peerId.type == null) { | ||
log.error('peerId must be an instance of peer-id to store data') | ||
throw new CodeError('peerId must be an instance of peer-id', codes.ERR_INVALID_PARAMETERS) | ||
} | ||
const b32key = peerId.toCID().toString() | ||
return new Key(`${NAMESPACE_COMMON}${b32key}`) | ||
} | ||
async has (peerId: PeerId): Promise<boolean> { | ||
return await this.components.datastore.has(this._peerIdToDatastoreKey(peerId)) | ||
return await this.datastore.has(peerIdToDatastoreKey(peerId)) | ||
} | ||
async delete (peerId: PeerId): Promise<void> { | ||
await this.components.datastore.delete(this._peerIdToDatastoreKey(peerId)) | ||
if (this.peerId.equals(peerId)) { | ||
throw new CodeError('Cannot delete self peer', codes.ERR_INVALID_PARAMETERS) | ||
} | ||
await this.datastore.delete(peerIdToDatastoreKey(peerId)) | ||
} | ||
async load (peerId: PeerId): Promise<Peer> { | ||
const buf = await this.components.datastore.get(this._peerIdToDatastoreKey(peerId)) | ||
const peer = PeerPB.decode(buf) | ||
const metadata = new Map() | ||
const buf = await this.datastore.get(peerIdToDatastoreKey(peerId)) | ||
for (const meta of peer.metadata) { | ||
metadata.set(meta.key, meta.value) | ||
} | ||
return { | ||
...peer, | ||
id: peerId, | ||
addresses: peer.addresses.map(({ multiaddr: ma, isCertified }) => { | ||
return { | ||
multiaddr: multiaddr(ma), | ||
isCertified: isCertified ?? false | ||
} | ||
}), | ||
metadata, | ||
pubKey: peer.pubKey ?? undefined, | ||
peerRecordEnvelope: peer.peerRecordEnvelope ?? undefined | ||
} | ||
return await bytesToPeer(peerId, buf) | ||
} | ||
async save (peer: Peer): Promise<Peer> { | ||
if (peer.pubKey != null && peer.id.publicKey != null && !uint8arrayEquals(peer.pubKey, peer.id.publicKey)) { | ||
log.error('peer publicKey bytes do not match peer id publicKey bytes') | ||
throw new CodeError('publicKey bytes do not match peer id publicKey bytes', codes.ERR_INVALID_PARAMETERS) | ||
} | ||
async save (peerId: PeerId, data: PeerData): Promise<PeerUpdate> { | ||
const { | ||
existingBuf, | ||
existingPeer | ||
} = await this.#findExistingPeer(peerId) | ||
// dedupe addresses | ||
const addressSet = new Set() | ||
const addresses = peer.addresses | ||
.filter(address => { | ||
if (addressSet.has(address.multiaddr.toString())) { | ||
return false | ||
} | ||
const peerPb: PeerPB = await toPeerPB(peerId, data, 'patch', { | ||
addressFilter: this.addressFilter | ||
}) | ||
addressSet.add(address.multiaddr.toString()) | ||
return true | ||
}) | ||
.sort((a, b) => { | ||
return a.multiaddr.toString().localeCompare(b.multiaddr.toString()) | ||
}) | ||
.map(({ multiaddr, isCertified }) => ({ | ||
multiaddr: multiaddr.bytes, | ||
isCertified | ||
})) | ||
return await this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer) | ||
} | ||
const metadata: Metadata[] = [] | ||
async patch (peerId: PeerId, data: Partial<PeerData>): Promise<PeerUpdate> { | ||
const { | ||
existingBuf, | ||
existingPeer | ||
} = await this.#findExistingPeer(peerId) | ||
;[...peer.metadata.keys()].sort().forEach(key => { | ||
const value = peer.metadata.get(key) | ||
if (value != null) { | ||
metadata.push({ key, value }) | ||
} | ||
const peerPb: PeerPB = await toPeerPB(peerId, data, 'patch', { | ||
addressFilter: this.addressFilter, | ||
existingPeer | ||
}) | ||
const buf = PeerPB.encode({ | ||
addresses, | ||
protocols: peer.protocols.sort(), | ||
pubKey: peer.pubKey, | ||
metadata, | ||
peerRecordEnvelope: peer.peerRecordEnvelope | ||
}) | ||
return await this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer) | ||
} | ||
await this.components.datastore.put(this._peerIdToDatastoreKey(peer.id), buf.subarray()) | ||
async merge (peerId: PeerId, data: PeerData): Promise<PeerUpdate> { | ||
const { | ||
existingBuf, | ||
existingPeer | ||
} = await this.#findExistingPeer(peerId) | ||
return await this.load(peer.id) | ||
} | ||
const peerPb: PeerPB = await toPeerPB(peerId, data, 'merge', { | ||
addressFilter: this.addressFilter, | ||
existingPeer | ||
}) | ||
async patch (peerId: PeerId, data: Partial<Peer>): Promise<Peer> { | ||
const peer = await this.load(peerId) | ||
return await this._patch(peerId, data, peer) | ||
return await this.#saveIfDifferent(peerId, peerPb, existingBuf, existingPeer) | ||
} | ||
async patchOrCreate (peerId: PeerId, data: Partial<Peer>): Promise<Peer> { | ||
let peer: Peer | ||
async * all (): AsyncGenerator<Peer, void, unknown> { | ||
for await (const { key, value } of this.datastore.query({ | ||
prefix: NAMESPACE_COMMON | ||
})) { | ||
// /peers/${peer-id-as-libp2p-key-cid-string-in-base-32} | ||
const base32Str = key.toString().split('/')[2] | ||
const buf = base32.decode(base32Str) | ||
const peerId = peerIdFromBytes(buf) | ||
try { | ||
peer = await this.load(peerId) | ||
} catch (err: any) { | ||
if (err.code !== codes.ERR_NOT_FOUND) { | ||
throw err | ||
if (peerId.equals(this.peerId)) { | ||
// Skip self peer if present | ||
continue | ||
} | ||
peer = { id: peerId, addresses: [], protocols: [], metadata: new Map() } | ||
yield bytesToPeer(peerId, value) | ||
} | ||
return await this._patch(peerId, data, peer) | ||
} | ||
async _patch (peerId: PeerId, data: Partial<Peer>, peer: Peer): Promise<Peer> { | ||
return await this.save({ | ||
...peer, | ||
...data, | ||
id: peerId | ||
}) | ||
} | ||
async #findExistingPeer (peerId: PeerId): Promise<{ existingBuf?: Uint8Array, existingPeer?: Peer }> { | ||
try { | ||
const existingBuf = await this.datastore.get(peerIdToDatastoreKey(peerId)) | ||
const existingPeer = await bytesToPeer(peerId, existingBuf) | ||
async merge (peerId: PeerId, data: Partial<Peer>): Promise<Peer> { | ||
const peer = await this.load(peerId) | ||
return await this._merge(peerId, data, peer) | ||
} | ||
async mergeOrCreate (peerId: PeerId, data: Partial<Peer>): Promise<Peer> { | ||
/** @type {Peer} */ | ||
let peer | ||
try { | ||
peer = await this.load(peerId) | ||
return { | ||
existingBuf, | ||
existingPeer | ||
} | ||
} catch (err: any) { | ||
if (err.code !== codes.ERR_NOT_FOUND) { | ||
if (err.code !== 'ERR_NOT_FOUND') { | ||
throw err | ||
} | ||
peer = { id: peerId, addresses: [], protocols: [], metadata: new Map() } | ||
} | ||
return await this._merge(peerId, data, peer) | ||
return {} | ||
} | ||
async _merge (peerId: PeerId, data: Partial<Peer>, peer: Peer): Promise<Peer> { | ||
// if the peer has certified addresses, use those in | ||
// favour of the supplied versions | ||
const addresses = new Map<string, boolean>() | ||
async #saveIfDifferent (peerId: PeerId, peer: PeerPB, existingBuf?: Uint8Array, existingPeer?: Peer): Promise<PeerUpdate> { | ||
const buf = PeerPB.encode(peer) | ||
peer.addresses.forEach((addr) => { | ||
addresses.set(addr.multiaddr.toString(), addr.isCertified) | ||
}) | ||
if (existingBuf != null && uint8ArrayEquals(buf, existingBuf)) { | ||
return { | ||
peer: await bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: false | ||
} | ||
} | ||
;(data.addresses ?? []).forEach(addr => { | ||
const addrString = addr.multiaddr.toString() | ||
const isAlreadyCertified = Boolean(addresses.get(addrString)) | ||
await this.datastore.put(peerIdToDatastoreKey(peerId), buf) | ||
const isCertified = isAlreadyCertified || addr.isCertified | ||
addresses.set(addrString, isCertified) | ||
}) | ||
return await this.save({ | ||
id: peerId, | ||
addresses: Array.from(addresses.entries()).map(([addrStr, isCertified]) => { | ||
return { | ||
multiaddr: multiaddr(addrStr), | ||
isCertified | ||
} | ||
}), | ||
protocols: Array.from(new Set([ | ||
...(peer.protocols ?? []), | ||
...(data.protocols ?? []) | ||
])), | ||
metadata: new Map([ | ||
...(peer.metadata?.entries() ?? []), | ||
...(data.metadata?.entries() ?? []) | ||
]), | ||
pubKey: data.pubKey ?? (peer != null ? peer.pubKey : undefined), | ||
peerRecordEnvelope: data.peerRecordEnvelope ?? (peer != null ? peer.peerRecordEnvelope : undefined) | ||
}) | ||
} | ||
async * all (): AsyncGenerator<Peer, void, unknown> { | ||
for await (const key of this.components.datastore.queryKeys({ | ||
prefix: NAMESPACE_COMMON | ||
})) { | ||
// /peers/${peer-id-as-libp2p-key-cid-string-in-base-32} | ||
const base32Str = key.toString().split('/')[2] | ||
const buf = base32.decode(base32Str) | ||
yield this.load(peerIdFromBytes(buf)) | ||
return { | ||
peer: await bytesToPeer(peerId, buf), | ||
previous: existingPeer, | ||
updated: true | ||
} | ||
} | ||
} |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
14
8
408889
51
3153
50
1
+ Added@libp2p/crypto@^1.0.15
+ Added@libp2p/interface-content-routing@2.1.1(transitive)
+ Added@libp2p/interface-dht@2.0.3(transitive)
+ Added@libp2p/interface-keychain@2.0.5(transitive)
+ Added@libp2p/interface-libp2p@2.0.0(transitive)
+ Added@libp2p/interface-metrics@4.0.8(transitive)
+ Added@libp2p/interface-peer-discovery@2.0.0(transitive)
+ Added@libp2p/interface-peer-routing@1.1.1(transitive)
+ Added@libp2p/interface-pubsub@4.0.1(transitive)
+ Added@libp2p/interface-registrar@2.0.12(transitive)
+ Added@libp2p/interface-stream-muxer@4.1.2(transitive)
+ Added@libp2p/interface-transport@4.0.3(transitive)
+ Addedany-signal@4.1.1(transitive)
+ Addedit-pushable@3.2.3(transitive)
+ Addedp-defer@4.0.1(transitive)
- Removed@libp2p/interface-peer-info@^1.0.3
- Removed@libp2p/interface-record@^2.0.1
- Removed@libp2p/peer-record@^5.0.0
- Removed@achingbrain/ip-address@8.1.0(transitive)
- Removed@libp2p/interface-peer-store@1.2.9(transitive)
- Removed@libp2p/interface-record@2.0.7(transitive)
- Removed@libp2p/peer-record@5.0.4(transitive)
- Removed@libp2p/utils@3.0.13(transitive)
- Removedbyte-access@1.0.1(transitive)
- Removedip-regex@5.0.0(transitive)
- Removedipaddr.js@2.2.0(transitive)
- Removedis-loopback-addr@2.0.2(transitive)
- Removedjsbn@1.1.0(transitive)
- Removedlongbits@1.1.0(transitive)
- Removednetmask@2.0.2(transitive)
- Removedprivate-ip@3.0.2(transitive)
- Removedsprintf-js@1.1.2(transitive)
- Removeduint8-varint@1.0.8(transitive)
Updated@libp2p/logger@^2.0.7
Updatedmortice@^3.0.1