applesauce-core
Advanced tools
Comparing version 0.0.0-next-20250212160639 to 0.0.0-next-20250213170222
@@ -22,4 +22,4 @@ import { Filter, NostrEvent } from "nostr-tools"; | ||
updated: Subject<import("nostr-tools").Event>; | ||
/** A stream of events removed of the database */ | ||
deleted: Subject<import("nostr-tools").Event>; | ||
/** A stream of events removed from the database */ | ||
removed: Subject<import("nostr-tools").Event>; | ||
/** A method thats called before a new event is inserted */ | ||
@@ -47,4 +47,4 @@ onBeforeInsert?: (event: NostrEvent) => void; | ||
updateEvent(event: NostrEvent): NostrEvent; | ||
/** Deletes an event from the database and notifies all subscriptions */ | ||
deleteEvent(eventOrId: string | NostrEvent): boolean; | ||
/** Removes an event from the database and notifies all subscriptions */ | ||
removeEvent(eventOrId: string | NostrEvent): boolean; | ||
/** Sets the claim on the event and touches it */ | ||
@@ -51,0 +51,0 @@ claimEvent(event: NostrEvent, claim: any): void; |
@@ -25,4 +25,4 @@ import { binarySearch, insertEventIntoDescendingList } from "nostr-tools/utils"; | ||
updated = new Subject(); | ||
/** A stream of events removed of the database */ | ||
deleted = new Subject(); | ||
/** A stream of events removed from the database */ | ||
removed = new Subject(); | ||
/** A method thats called before a new event is inserted */ | ||
@@ -123,4 +123,4 @@ onBeforeInsert; | ||
} | ||
/** Deletes an event from the database and notifies all subscriptions */ | ||
deleteEvent(eventOrId) { | ||
/** Removes an event from the database and notifies all subscriptions */ | ||
removeEvent(eventOrId) { | ||
let event = typeof eventOrId === "string" ? this.events.get(eventOrId) : eventOrId; | ||
@@ -153,3 +153,6 @@ if (!event) | ||
} | ||
this.deleted.next(event); | ||
// remove any claims this event has | ||
this.claims.delete(event); | ||
// notify subscribers this event was removed | ||
this.removed.next(event); | ||
return true; | ||
@@ -308,3 +311,3 @@ } | ||
if (!this.isClaimed(event)) { | ||
this.deleteEvent(event); | ||
this.removeEvent(event); | ||
removed++; | ||
@@ -311,0 +314,0 @@ if (removed >= limit) |
@@ -11,2 +11,8 @@ import { Filter, NostrEvent } from "nostr-tools"; | ||
constructor(); | ||
protected deletedIds: Set<string>; | ||
protected deletedCoords: Map<string, number>; | ||
protected checkDeleted(event: string | NostrEvent): boolean; | ||
protected handleDeleteEvent(deleteEvent: NostrEvent): void; | ||
/** Copies important metadata from and identical event to another */ | ||
static mergeDuplicateEvent(source: NostrEvent, dest: NostrEvent): void; | ||
/** | ||
@@ -19,6 +25,2 @@ * Adds an event to the database and update subscriptions | ||
remove(event: string | NostrEvent): boolean; | ||
protected deletedIds: Set<string>; | ||
protected deletedCoords: Map<string, number>; | ||
protected handleDeleteEvent(deleteEvent: NostrEvent): void; | ||
protected checkDeleted(event: NostrEvent): boolean; | ||
/** Removes any event that is not being used by a subscription */ | ||
@@ -28,5 +30,8 @@ prune(max?: number): number; | ||
update(event: NostrEvent): NostrEvent; | ||
/** Get all events matching a filter */ | ||
getAll(filters: Filter[]): Set<NostrEvent>; | ||
/** Check if the store has an event */ | ||
hasEvent(uid: string): boolean; | ||
getEvent(uid: string): NostrEvent | undefined; | ||
/** Check if the store has a replaceable event */ | ||
hasReplaceable(kind: number, pubkey: string, d?: string): boolean; | ||
@@ -37,2 +42,12 @@ /** Gets the latest version of a replaceable event */ | ||
getReplaceableHistory(kind: number, pubkey: string, d?: string): NostrEvent[] | undefined; | ||
/** | ||
* Creates an observable that streams all events that match the filter and remains open | ||
* @param filters | ||
* @param [onlyNew=false] Only subscribe to new events | ||
*/ | ||
filters(filters: Filter | Filter[], onlyNew?: boolean): Observable<NostrEvent>; | ||
/** Returns an observable that completes when an event is removed */ | ||
removed(id: string): Observable<never>; | ||
/** Creates an observable that emits when event is updated */ | ||
updated(id: string): Observable<NostrEvent>; | ||
/** Creates an observable that subscribes to a single event */ | ||
@@ -50,10 +65,4 @@ event(id: string): Observable<NostrEvent | undefined>; | ||
}[]): Observable<Record<string, NostrEvent>>; | ||
/** | ||
* Creates an observable that streams all events that match the filter | ||
* @param filters | ||
* @param [onlyNew=false] Only subscribe to new events | ||
*/ | ||
stream(filters: Filter | Filter[], onlyNew?: boolean): Observable<NostrEvent>; | ||
/** Creates an observable that updates with an array of sorted events */ | ||
timeline(filters: Filter | Filter[], keepOldVersions?: boolean): Observable<NostrEvent[]>; | ||
} |
import { kinds } from "nostr-tools"; | ||
import { insertEventIntoDescendingList } from "nostr-tools/utils"; | ||
import { isParameterizedReplaceableKind } from "nostr-tools/kinds"; | ||
import { Observable } from "rxjs"; | ||
import { defer, distinctUntilChanged, distinctUntilKeyChanged, EMPTY, endWith, filter, finalize, from, map, merge, mergeMap, mergeWith, of, repeat, scan, take, takeUntil, tap, } from "rxjs"; | ||
import { Database } from "./database.js"; | ||
import { getEventUID, getReplaceableUID, getTagValue, isReplaceable } from "../helpers/event.js"; | ||
import { getEventUID, getReplaceableIdentifier, getReplaceableUID, getTagValue, isReplaceable, } from "../helpers/event.js"; | ||
import { matchFilters } from "../helpers/filter.js"; | ||
import { addSeenRelay, getSeenRelays } from "../helpers/relays.js"; | ||
import { getDeleteCoordinates, getDeleteIds } from "../helpers/delete.js"; | ||
import { claimEvents } from "../observable/claim-events.js"; | ||
import { claimLatest } from "../observable/claim-latest.js"; | ||
export class EventStore { | ||
@@ -24,2 +26,46 @@ database; | ||
} | ||
// delete state | ||
deletedIds = new Set(); | ||
deletedCoords = new Map(); | ||
checkDeleted(event) { | ||
if (typeof event === "string") | ||
return this.deletedIds.has(event); | ||
else { | ||
if (this.deletedIds.has(event.id)) | ||
return true; | ||
if (isParameterizedReplaceableKind(event.kind)) { | ||
const deleted = this.deletedCoords.get(getEventUID(event)); | ||
if (deleted) | ||
return deleted > event.created_at; | ||
} | ||
return false; | ||
} | ||
} | ||
// handling delete events | ||
handleDeleteEvent(deleteEvent) { | ||
const ids = getDeleteIds(deleteEvent); | ||
for (const id of ids) { | ||
this.deletedIds.add(id); | ||
// remove deleted events in the database | ||
const event = this.database.getEvent(id); | ||
if (event) | ||
this.database.removeEvent(event); | ||
} | ||
const coords = getDeleteCoordinates(deleteEvent); | ||
for (const coord of coords) { | ||
this.deletedCoords.set(coord, Math.max(this.deletedCoords.get(coord) ?? 0, deleteEvent.created_at)); | ||
// remove deleted events in the database | ||
const event = this.database.getEvent(coord); | ||
if (event && event.created_at < deleteEvent.created_at) | ||
this.database.removeEvent(event); | ||
} | ||
} | ||
/** Copies important metadata from and identical event to another */ | ||
static mergeDuplicateEvent(source, dest) { | ||
const relays = getSeenRelays(source); | ||
if (relays) { | ||
for (const relay of relays) | ||
addSeenRelay(dest, relay); | ||
} | ||
} | ||
/** | ||
@@ -32,13 +78,10 @@ * Adds an event to the database and update subscriptions | ||
this.handleDeleteEvent(event); | ||
// ignore if the event was deleted | ||
// Ignore if the event was deleted | ||
if (this.checkDeleted(event)) | ||
return event; | ||
// insert event into database | ||
// Insert event into database | ||
const inserted = this.database.addEvent(event); | ||
// copy seen relays to new event | ||
const relays = getSeenRelays(event); | ||
if (relays) { | ||
for (const relay of relays) | ||
addSeenRelay(inserted, relay); | ||
} | ||
// Copy cached data if its a duplicate event | ||
if (event !== inserted) | ||
EventStore.mergeDuplicateEvent(event, inserted); | ||
// attach relay this event was from | ||
@@ -53,3 +96,3 @@ if (fromRelay) | ||
for (const old of older) | ||
this.database.deleteEvent(old); | ||
this.database.removeEvent(old); | ||
// return the newest version of the replaceable event | ||
@@ -65,40 +108,4 @@ // most of the time this will be === event, but not always | ||
remove(event) { | ||
if (typeof event === "string") | ||
return this.database.deleteEvent(event); | ||
else if (this.database.hasEvent(event.id)) { | ||
return this.database.deleteEvent(event.id); | ||
} | ||
else | ||
return false; | ||
return this.database.removeEvent(event); | ||
} | ||
deletedIds = new Set(); | ||
deletedCoords = new Map(); | ||
handleDeleteEvent(deleteEvent) { | ||
const ids = getDeleteIds(deleteEvent); | ||
for (const id of ids) { | ||
this.deletedIds.add(id); | ||
// remove deleted events in the database | ||
const event = this.database.getEvent(id); | ||
if (event) | ||
this.database.deleteEvent(event); | ||
} | ||
const coords = getDeleteCoordinates(deleteEvent); | ||
for (const coord of coords) { | ||
this.deletedCoords.set(coord, Math.max(this.deletedCoords.get(coord) ?? 0, deleteEvent.created_at)); | ||
// remove deleted events in the database | ||
const event = this.database.getEvent(coord); | ||
if (event && event.created_at < deleteEvent.created_at) | ||
this.database.deleteEvent(event); | ||
} | ||
} | ||
checkDeleted(event) { | ||
if (this.deletedIds.has(event.id)) | ||
return true; | ||
if (isParameterizedReplaceableKind(event.kind)) { | ||
const deleted = this.deletedCoords.get(getEventUID(event)); | ||
if (deleted) | ||
return deleted > event.created_at; | ||
} | ||
return false; | ||
} | ||
/** Removes any event that is not being used by a subscription */ | ||
@@ -112,5 +119,7 @@ prune(max) { | ||
} | ||
/** Get all events matching a filter */ | ||
getAll(filters) { | ||
return this.database.getForFilters(filters); | ||
} | ||
/** Check if the store has an event */ | ||
hasEvent(uid) { | ||
@@ -122,2 +131,3 @@ return this.database.hasEvent(uid); | ||
} | ||
/** Check if the store has a replaceable event */ | ||
hasReplaceable(kind, pubkey, d) { | ||
@@ -134,214 +144,136 @@ return this.database.hasReplaceable(kind, pubkey, d); | ||
} | ||
/** | ||
* Creates an observable that streams all events that match the filter and remains open | ||
* @param filters | ||
* @param [onlyNew=false] Only subscribe to new events | ||
*/ | ||
filters(filters, onlyNew = false) { | ||
filters = Array.isArray(filters) ? filters : [filters]; | ||
return merge( | ||
// merge existing events | ||
onlyNew ? EMPTY : from(this.getAll(filters)), | ||
// subscribe to future events | ||
this.database.inserted.pipe(filter((e) => matchFilters(filters, e)))); | ||
} | ||
/** Returns an observable that completes when an event is removed */ | ||
removed(id) { | ||
const deleted = this.checkDeleted(id); | ||
if (deleted) | ||
return EMPTY; | ||
return this.database.removed.pipe( | ||
// listen for removed events | ||
filter((e) => e.id === id), | ||
// complete as soon as we find a matching removed event | ||
take(1), | ||
// switch to empty | ||
mergeMap(() => EMPTY)); | ||
} | ||
/** Creates an observable that emits when event is updated */ | ||
updated(id) { | ||
return this.database.updated.pipe(filter((e) => e.id === id)); | ||
} | ||
/** Creates an observable that subscribes to a single event */ | ||
event(id) { | ||
return new Observable((observer) => { | ||
let current = this.database.getEvent(id); | ||
if (current) { | ||
observer.next(current); | ||
this.database.claimEvent(current, observer); | ||
} | ||
// subscribe to future events | ||
const inserted = this.database.inserted.subscribe((event) => { | ||
if (event.id === id) { | ||
current = event; | ||
observer.next(event); | ||
this.database.claimEvent(event, observer); | ||
} | ||
}); | ||
// subscribe to updated events | ||
const updated = this.database.updated.subscribe((event) => { | ||
if (event.id === id) | ||
observer.next(current); | ||
}); | ||
// subscribe to deleted events | ||
const deleted = this.database.deleted.subscribe((event) => { | ||
if (current?.id === event.id) { | ||
this.database.removeClaim(current, observer); | ||
current = undefined; | ||
observer.next(undefined); | ||
} | ||
}); | ||
return () => { | ||
deleted.unsubscribe(); | ||
updated.unsubscribe(); | ||
inserted.unsubscribe(); | ||
if (current) | ||
this.database.removeClaim(current, observer); | ||
}; | ||
}); | ||
return merge( | ||
// get current event and ignore if there is none | ||
defer(() => { | ||
let event = this.getEvent(id); | ||
return event ? of(event) : EMPTY; | ||
}), | ||
// subscribe to updates | ||
this.database.inserted.pipe(filter((e) => e.id === id)), | ||
// subscribe to updates | ||
this.updated(id), | ||
// emit undefined when deleted | ||
this.removed(id).pipe(endWith(undefined))).pipe( | ||
// claim all events | ||
claimEvents(this.database)); | ||
} | ||
/** Creates an observable that subscribes to multiple events */ | ||
events(ids) { | ||
return new Observable((observer) => { | ||
let events = {}; | ||
for (const id of ids) { | ||
const event = this.getEvent(id); | ||
if (event) { | ||
events = { ...events, [id]: event }; | ||
this.database.claimEvent(event, observer); | ||
} | ||
return merge( | ||
// lazily get existing events | ||
defer(() => from(ids.map((id) => this.getEvent(id)))), | ||
// subscribe to new events | ||
this.database.inserted.pipe(filter((e) => ids.includes(e.id))), | ||
// subscribe to updates | ||
this.database.updated.pipe(filter((e) => ids.includes(e.id)))).pipe( | ||
// ignore empty messages | ||
filter((e) => !!e), | ||
// claim all events until cleanup | ||
claimEvents(this.database), | ||
// watch for removed events | ||
mergeWith(this.database.removed.pipe(filter((e) => ids.includes(e.id)), map((e) => e.id))), | ||
// merge all events into a directory | ||
scan((dir, event) => { | ||
if (typeof event === "string") { | ||
// delete event by id | ||
const clone = { ...dir }; | ||
delete clone[event]; | ||
return clone; | ||
} | ||
observer.next(events); | ||
// subscribe to future events | ||
const inserted = this.database.inserted.subscribe((event) => { | ||
const id = event.id; | ||
if (ids.includes(id) && !events[id]) { | ||
events = { ...events, [id]: event }; | ||
observer.next(events); | ||
// claim new event | ||
this.database.claimEvent(event, observer); | ||
} | ||
}); | ||
// subscribe to updated events | ||
const updated = this.database.updated.subscribe((event) => { | ||
if (ids.includes(event.id)) | ||
observer.next(events); | ||
}); | ||
// subscribe to deleted events | ||
const deleted = this.database.deleted.subscribe((event) => { | ||
const id = event.id; | ||
if (ids.includes(id)) { | ||
const current = events[id]; | ||
if (current) { | ||
this.database.removeClaim(current, observer); | ||
delete events[id]; | ||
observer.next(events); | ||
} | ||
} | ||
}); | ||
return () => { | ||
inserted.unsubscribe(); | ||
deleted.unsubscribe(); | ||
updated.unsubscribe(); | ||
for (const [_uid, event] of Object.entries(events)) { | ||
this.database.removeClaim(event, observer); | ||
} | ||
}; | ||
}); | ||
else { | ||
// add even to directory | ||
return { ...dir, [event.id]: event }; | ||
} | ||
}, {})); | ||
} | ||
/** Creates an observable that subscribes to the latest version of a replaceable event */ | ||
replaceable(kind, pubkey, d) { | ||
return new Observable((observer) => { | ||
const uid = getReplaceableUID(kind, pubkey, d); | ||
// get latest version | ||
let current = this.database.getReplaceable(kind, pubkey, d)?.[0]; | ||
if (current) { | ||
observer.next(current); | ||
this.database.claimEvent(current, observer); | ||
} | ||
// subscribe to future events | ||
const inserted = this.database.inserted.subscribe((event) => { | ||
if (isReplaceable(event.kind) && | ||
getEventUID(event) === uid && | ||
(!current || event.created_at > current.created_at)) { | ||
// remove old claim | ||
if (current) | ||
this.database.removeClaim(current, observer); | ||
current = event; | ||
observer.next(event); | ||
// claim new event | ||
this.database.claimEvent(current, observer); | ||
} | ||
}); | ||
// subscribe to updated events | ||
const updated = this.database.updated.subscribe((event) => { | ||
if (event === current) | ||
observer.next(event); | ||
}); | ||
// subscribe to deleted events | ||
const deleted = this.database.deleted.subscribe((event) => { | ||
if (getEventUID(event) === uid && event === current) { | ||
this.database.removeClaim(current, observer); | ||
current = undefined; | ||
observer.next(undefined); | ||
} | ||
}); | ||
return () => { | ||
inserted.unsubscribe(); | ||
deleted.unsubscribe(); | ||
updated.unsubscribe(); | ||
if (current) | ||
this.database.removeClaim(current, observer); | ||
}; | ||
}); | ||
let current = undefined; | ||
return merge( | ||
// lazily get current event | ||
defer(() => { | ||
let event = this.getReplaceable(kind, pubkey, d); | ||
return event ? of(event) : EMPTY; | ||
}), | ||
// subscribe to new events | ||
this.database.inserted.pipe(filter((e) => e.pubkey == pubkey && e.kind === kind && (d !== undefined ? getReplaceableIdentifier(e) === d : true)))).pipe( | ||
// only update if event is newer | ||
distinctUntilKeyChanged("created_at"), | ||
// Hacky way to extract the current event so takeUntil can access it | ||
tap((event) => (current = event)), | ||
// complete when event is removed | ||
takeUntil(this.database.removed.pipe(filter((e) => e.id === current?.id))), | ||
// emit undefined when removed | ||
endWith(undefined), | ||
// keep the observable hot | ||
repeat(), | ||
// claim latest event | ||
claimLatest(this.database)); | ||
} | ||
/** Creates an observable that subscribes to the latest version of an array of replaceable events*/ | ||
replaceableSet(pointers) { | ||
return new Observable((observer) => { | ||
const coords = pointers.map((p) => getReplaceableUID(p.kind, p.pubkey, p.identifier)); | ||
let events = {}; | ||
const handleEvent = (event) => { | ||
const uid = getEventUID(event); | ||
const current = events[uid]; | ||
if (current) { | ||
if (event.created_at > current.created_at) { | ||
this.database.removeClaim(current, observer); | ||
} | ||
else | ||
return; | ||
} | ||
events = { ...events, [uid]: event }; | ||
this.database.claimEvent(event, observer); | ||
}; | ||
// get latest version | ||
for (const pointer of pointers) { | ||
const events = this.database.getReplaceable(pointer.kind, pointer.pubkey, pointer.identifier); | ||
if (events) | ||
handleEvent(events[0]); | ||
const uids = new Set(pointers.map((p) => getReplaceableUID(p.kind, p.pubkey, p.identifier))); | ||
return merge( | ||
// start with existing events | ||
defer(() => from(pointers.map((p) => this.getReplaceable(p.kind, p.pubkey, p.identifier)))), | ||
// subscribe to new events | ||
this.database.inserted.pipe(filter((e) => isReplaceable(e.kind) && uids.has(getEventUID(e))))).pipe( | ||
// filter out undefined | ||
filter((e) => !!e), | ||
// claim all events | ||
claimEvents(this.database), | ||
// convert events to add commands | ||
map((e) => ["add", e]), | ||
// watch for removed events | ||
mergeWith(this.database.removed.pipe(filter((e) => isReplaceable(e.kind) && uids.has(getEventUID(e))), map((e) => ["remove", e]))), | ||
// reduce events into directory | ||
scan((dir, [action, event]) => { | ||
const uid = getEventUID(event); | ||
if (action === "add") { | ||
// add event to dir if its newer | ||
if (!dir[uid] || event.created_at > dir[uid].created_at) | ||
return { ...dir, [uid]: event }; | ||
} | ||
observer.next(events); | ||
// subscribe to future events | ||
const inserted = this.database.inserted.subscribe((event) => { | ||
if (isReplaceable(event.kind) && coords.includes(getEventUID(event))) { | ||
handleEvent(event); | ||
observer.next(events); | ||
} | ||
}); | ||
// subscribe to updated events | ||
const updated = this.database.updated.subscribe((event) => { | ||
if (isReplaceable(event.kind) && coords.includes(getEventUID(event))) { | ||
observer.next(events); | ||
} | ||
}); | ||
// subscribe to deleted events | ||
const deleted = this.database.deleted.subscribe((event) => { | ||
const uid = getEventUID(event); | ||
if (events[uid]) { | ||
// clone object and delete event | ||
events = { ...events }; | ||
delete events[uid]; | ||
this.database.removeClaim(event, observer); | ||
observer.next(events); | ||
} | ||
}); | ||
return () => { | ||
inserted.unsubscribe(); | ||
deleted.unsubscribe(); | ||
updated.unsubscribe(); | ||
for (const [_id, event] of Object.entries(events)) { | ||
this.database.removeClaim(event, observer); | ||
} | ||
}; | ||
}); | ||
} | ||
/** | ||
* Creates an observable that streams all events that match the filter | ||
* @param filters | ||
* @param [onlyNew=false] Only subscribe to new events | ||
*/ | ||
stream(filters, onlyNew = false) { | ||
filters = Array.isArray(filters) ? filters : [filters]; | ||
return new Observable((observer) => { | ||
if (!onlyNew) { | ||
let events = this.database.getForFilters(filters); | ||
for (const event of events) | ||
observer.next(event); | ||
else if (action === "remove" && dir[uid] === event) { | ||
// remove event from dir | ||
let newDir = { ...dir }; | ||
delete newDir[uid]; | ||
return newDir; | ||
} | ||
// subscribe to future events | ||
const sub = this.database.inserted.subscribe((event) => { | ||
if (matchFilters(filters, event)) | ||
observer.next(event); | ||
}); | ||
return () => sub.unsubscribe(); | ||
}); | ||
return dir; | ||
}, {}), | ||
// ignore changes that do not modify the dir | ||
distinctUntilChanged()); | ||
} | ||
@@ -351,68 +283,36 @@ /** Creates an observable that updates with an array of sorted events */ | ||
filters = Array.isArray(filters) ? filters : [filters]; | ||
return new Observable((observer) => { | ||
const seen = new Map(); | ||
const timeline = []; | ||
// NOTE: only call this if we know the event is in timeline | ||
const removeFromTimeline = (event) => { | ||
timeline.splice(timeline.indexOf(event), 1); | ||
if (!keepOldVersions && isReplaceable(event.kind)) | ||
seen.delete(getEventUID(event)); | ||
this.database.removeClaim(event, observer); | ||
}; | ||
// inserts an event into the timeline and handles replaceable events | ||
const insertIntoTimeline = (event) => { | ||
// remove old versions | ||
if (!keepOldVersions && isReplaceable(event.kind)) { | ||
const uid = getEventUID(event); | ||
const old = seen.get(uid); | ||
if (old) { | ||
if (event.created_at > old.created_at) | ||
removeFromTimeline(old); | ||
else | ||
return; | ||
} | ||
seen.set(uid, event); | ||
} | ||
// insert into timeline | ||
insertEventIntoDescendingList(timeline, event); | ||
this.database.claimEvent(event, observer); | ||
}; | ||
// build initial timeline | ||
const events = this.database.getForFilters(filters); | ||
for (const event of events) | ||
insertIntoTimeline(event); | ||
observer.next([...timeline]); | ||
// subscribe to future events | ||
const inserted = this.database.inserted.subscribe((event) => { | ||
if (matchFilters(filters, event)) { | ||
insertIntoTimeline(event); | ||
observer.next([...timeline]); | ||
} | ||
}); | ||
// subscribe to updated events | ||
const updated = this.database.updated.subscribe((event) => { | ||
if (timeline.includes(event)) { | ||
observer.next([...timeline]); | ||
} | ||
}); | ||
// subscribe to deleted events | ||
const deleted = this.database.deleted.subscribe((event) => { | ||
if (timeline.includes(event)) { | ||
removeFromTimeline(event); | ||
observer.next([...timeline]); | ||
} | ||
}); | ||
return () => { | ||
inserted.unsubscribe(); | ||
deleted.unsubscribe(); | ||
updated.unsubscribe(); | ||
// remove all claims | ||
for (const event of timeline) { | ||
this.database.removeClaim(event, observer); | ||
} | ||
// forget seen replaceable events | ||
seen.clear(); | ||
}; | ||
}); | ||
const seen = new Map(); | ||
return merge( | ||
// get current events | ||
defer(() => from(this.getAll(filters))), | ||
// subscribe to newer events | ||
this.database.inserted.pipe(filter((e) => matchFilters(filters, e)))).pipe( | ||
// remove duplicate events | ||
distinctUntilKeyChanged("id"), | ||
// claim all seen events | ||
claimEvents(this.database), | ||
// subscribe to delete events | ||
mergeWith(this.database.removed.pipe(filter((e) => matchFilters(filters, e)), map((e) => e.id))), | ||
// build a timeline | ||
scan((timeline, event) => { | ||
// filter out removed events from timeline | ||
if (typeof event === "string") | ||
return timeline.filter((e) => e.id !== event); | ||
// add event into timeline | ||
const arr = insertEventIntoDescendingList([...timeline], event); | ||
// remove old replaceable events if enabled | ||
if (keepOldVersions && isReplaceable(event.kind)) { | ||
const uid = getEventUID(event); | ||
const old = seen.get(uid); | ||
// remove old event from timeline | ||
if (old) | ||
arr.slice(arr.indexOf(old), 1); | ||
// update latest version | ||
seen.set(uid, event); | ||
} | ||
return arr; | ||
}, []), | ||
// hacky hack to clear seen on unsubscribe | ||
finalize(() => seen.clear())); | ||
} | ||
} |
@@ -0,1 +1,2 @@ | ||
import { normalizeURL } from "./url.js"; | ||
export const SeenRelaysSymbol = Symbol.for("seen-relays"); | ||
@@ -23,3 +24,3 @@ // Seen relays | ||
try { | ||
return validateRelayURL(relayUrl).toString(); | ||
return normalizeURL(validateRelayURL(relayUrl)).toString(); | ||
} | ||
@@ -26,0 +27,0 @@ catch (e) { |
@@ -1,2 +0,2 @@ | ||
import { Observable, ShareConfig } from "rxjs"; | ||
import { OperatorFunction } from "rxjs"; | ||
/** | ||
@@ -6,4 +6,2 @@ * Creates an operator that adds a 'value' property and multiplexes the source | ||
*/ | ||
export declare function shareLatestValue<T>(config?: ShareConfig<T>): (source: Observable<T>) => Observable<T> & { | ||
value: T | undefined; | ||
}; | ||
export declare function shareLatestValue<T>(): OperatorFunction<T, T | undefined>; |
@@ -1,3 +0,2 @@ | ||
import { share } from "rxjs"; | ||
import { tap } from "rxjs/operators"; | ||
import { BehaviorSubject, share } from "rxjs"; | ||
/** | ||
@@ -7,16 +6,20 @@ * Creates an operator that adds a 'value' property and multiplexes the source | ||
*/ | ||
export function shareLatestValue(config = {}) { | ||
return (source) => { | ||
// Create storage for latest value | ||
let latestValue = undefined; | ||
// Create shared source with value tracking | ||
const shared$ = source.pipe(tap((value) => { | ||
latestValue = value; | ||
}), share(config)); | ||
// Add value property | ||
Object.defineProperty(shared$, "value", { | ||
get: () => latestValue, | ||
}); | ||
return shared$; | ||
}; | ||
export function shareLatestValue() { | ||
return (source$) => source$.pipe(share({ connector: () => new BehaviorSubject(undefined) })); | ||
// return (source: Observable<T>): Observable<T> & { value: T | undefined } => { | ||
// // Create storage for latest value | ||
// let latestValue: T | undefined = undefined; | ||
// // Create shared source with value tracking | ||
// const shared$ = source.pipe( | ||
// tap((value) => { | ||
// latestValue = value; | ||
// }), | ||
// share(config), | ||
// ); | ||
// // Add value property | ||
// Object.defineProperty(shared$, "value", { | ||
// get: () => latestValue, | ||
// }); | ||
// return shared$ as Observable<T> & { value: T | undefined }; | ||
// }; | ||
} |
@@ -12,3 +12,3 @@ import { safeParse } from "applesauce-core/helpers/json"; | ||
return events | ||
.stream([{ kinds: [kinds.ChannelHideMessage], "#e": [channel.id], authors: [channel.pubkey, ...authors] }]) | ||
.filters([{ kinds: [kinds.ChannelHideMessage], "#e": [channel.id], authors: [channel.pubkey, ...authors] }]) | ||
.pipe(map((event) => { | ||
@@ -42,3 +42,3 @@ const reason = safeParse(event.content)?.reason; | ||
let latest = channel; | ||
return events.stream(filters).pipe(map((event) => { | ||
return events.filters(filters).pipe(map((event) => { | ||
try { | ||
@@ -64,3 +64,3 @@ if (event.pubkey === latest.pubkey && event.created_at > latest.created_at) { | ||
return events | ||
.stream([{ kinds: [kinds.ChannelMuteUser], "#e": [channel.id], authors: [channel.pubkey, ...authors] }]) | ||
.filters([{ kinds: [kinds.ChannelMuteUser], "#e": [channel.id], authors: [channel.pubkey, ...authors] }]) | ||
.pipe(map((event) => { | ||
@@ -67,0 +67,0 @@ const reason = safeParse(event.content)?.reason; |
@@ -37,3 +37,3 @@ import { kinds } from "nostr-tools"; | ||
key: `${rootUID}-${kinds.join(",")}`, | ||
run: (events) => events.stream([rootFilter, replyFilter]).pipe(map((event) => { | ||
run: (events) => events.filters([rootFilter, replyFilter]).pipe(map((event) => { | ||
if (!items.has(getEventUID(event))) { | ||
@@ -40,0 +40,0 @@ const refs = getNip10References(event); |
@@ -12,4 +12,2 @@ import { BehaviorSubject, Observable } from "rxjs"; | ||
key: string; | ||
/** The args array this query was created with. This is mostly for debugging */ | ||
args?: Array<any>; | ||
/** | ||
@@ -16,0 +14,0 @@ * The meat of the query, this should return an Observables that subscribes to the eventStore in some way |
@@ -1,5 +0,4 @@ | ||
import { filter } from "rxjs"; | ||
import { filter, shareReplay } from "rxjs"; | ||
import { LRU } from "../helpers/lru.js"; | ||
import * as Queries from "../queries/index.js"; | ||
import { shareLatestValue } from "../observable/share-latest-value.js"; | ||
import { getObservableValue } from "../observable/get-observable-value.js"; | ||
@@ -19,3 +18,3 @@ export class QueryStore { | ||
const tempQuery = queryConstructor(...args); | ||
const key = `${queryConstructor.name}|${tempQuery.key}`; | ||
const key = queryConstructor.name + "|" + tempQuery.key; | ||
let query = this.queries.get(key); | ||
@@ -27,4 +26,5 @@ if (!query) { | ||
if (!this.observables.has(query)) { | ||
query.args = args; | ||
const observable = query.run(this.store, this).pipe(shareLatestValue()); | ||
const observable = query | ||
.run(this.store, this) | ||
.pipe(shareReplay({ refCount: true, bufferSize: 1 })); | ||
this.observables.set(query, observable); | ||
@@ -31,0 +31,0 @@ return observable; |
{ | ||
"name": "applesauce-core", | ||
"version": "0.0.0-next-20250212160639", | ||
"version": "0.0.0-next-20250213170222", | ||
"description": "", | ||
@@ -75,2 +75,3 @@ "type": "module", | ||
"devDependencies": { | ||
"@hirez_io/observer-spy": "^2.2.0", | ||
"@types/debug": "^4.1.12", | ||
@@ -77,0 +78,0 @@ "@types/hash-sum": "^1.0.2", |
196605
161
4750
5