@peerbit/shared-log
Advanced tools
Comparing version
@@ -12,6 +12,6 @@ import { Entry } from "@peerbit/log"; | ||
entry: Entry<T>; | ||
references: Entry<T>[]; | ||
gidRefrences: string[]; | ||
constructor(properties: { | ||
entry: Entry<T>; | ||
references: Entry<T>[]; | ||
gidRefrences: string[]; | ||
}); | ||
@@ -44,3 +44,3 @@ } | ||
} | ||
export declare const createExchangeHeadsMessage: (log: Log<any>, heads: Entry<any>[], gidParentCache: Cache<Entry<any>[]>) => Promise<ExchangeHeadsMessage<any>>; | ||
export declare const createExchangeHeadsMessages: (log: Log<any>, heads: Entry<any>[], gidParentCache: Cache<Entry<any>[]>) => Promise<ExchangeHeadsMessage<any>[]>; | ||
export declare const allEntriesWithUniqueGids: (log: Log<any>, entry: Entry<any>, gidParentCache: Cache<Entry<any>[]>) => Promise<Entry<any>[]>; |
@@ -22,6 +22,6 @@ var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { | ||
entry; | ||
references; // are some parents to the entry | ||
gidRefrences; // are some parents to the entry | ||
constructor(properties) { | ||
this.entry = properties.entry; | ||
this.references = properties.references; | ||
this.gidRefrences = properties.gidRefrences; | ||
} | ||
@@ -34,5 +34,5 @@ }; | ||
__decorate([ | ||
field({ type: vec(Entry) }), | ||
field({ type: vec("string") }), | ||
__metadata("design:type", Array) | ||
], EntryWithRefs.prototype, "references", void 0); | ||
], EntryWithRefs.prototype, "gidRefrences", void 0); | ||
EntryWithRefs = __decorate([ | ||
@@ -115,16 +115,41 @@ variant(0), | ||
export { ResponseIPrune }; | ||
export const createExchangeHeadsMessage = async (log, heads, gidParentCache) => { | ||
const headsSet = new Set(heads); | ||
const headsWithRefs = await Promise.all(heads.map(async (head) => { | ||
const refs = (await allEntriesWithUniqueGids(log, head, gidParentCache)) // 1mb total limit split on all heads | ||
.filter((r) => !headsSet.has(r)); | ||
return new EntryWithRefs({ | ||
entry: head, | ||
references: refs | ||
const MAX_EXCHANGE_MESSAGE_SIZE = 5e6; // 5mb (since stream limits are 10mb) | ||
export const createExchangeHeadsMessages = async (log, heads, gidParentCache) => { | ||
const messages = []; | ||
let size = 0; | ||
let current = []; | ||
const visitedHeads = new Set(); | ||
for (const fromHead of heads) { | ||
visitedHeads.add(fromHead.hash); | ||
// TODO eventually we don't want to load all refs | ||
// since majority of the old leader would not be interested in these anymore | ||
const refs = (await allEntriesWithUniqueGids(log, fromHead, gidParentCache)).filter((x) => { | ||
if (visitedHeads.has(x.hash)) { | ||
return false; | ||
} | ||
visitedHeads.add(x.hash); | ||
return true; | ||
}); | ||
})); | ||
logger.debug(`Send latest heads of '${log.id}'`); | ||
return new ExchangeHeadsMessage({ | ||
heads: headsWithRefs | ||
}); | ||
if (refs.length > 1000) { | ||
logger.warn("Large refs count: ", refs.length); | ||
} | ||
current.push(new EntryWithRefs({ | ||
entry: fromHead, | ||
gidRefrences: refs.map((x) => x.meta.gid) | ||
})); | ||
size += fromHead.size; | ||
if (size > MAX_EXCHANGE_MESSAGE_SIZE) { | ||
messages.push(new ExchangeHeadsMessage({ | ||
heads: current | ||
})); | ||
current = []; | ||
continue; | ||
} | ||
} | ||
if (current.length > 0) { | ||
messages.push(new ExchangeHeadsMessage({ | ||
heads: current | ||
})); | ||
} | ||
return messages; | ||
}; | ||
@@ -148,3 +173,4 @@ export const allEntriesWithUniqueGids = async (log, entry, gidParentCache) => { | ||
if (!indexedEntry) { | ||
logger.error("Failed to find indexed entry for hash: " + next); | ||
logger.error("Failed to find indexed entry for hash when fetching references: " + | ||
next); | ||
} | ||
@@ -151,0 +177,0 @@ else { |
@@ -13,3 +13,2 @@ import { RequestContext, RPC } from "@peerbit/rpc"; | ||
export * from "./replication.js"; | ||
import PQueue from "p-queue"; | ||
export { Observer, Replicator, Role }; | ||
@@ -44,2 +43,3 @@ export declare const logger: import("pino").Logger<never>; | ||
timeUntilRoleMaturity?: number; | ||
waitForReplicatorTimeout?: number; | ||
}; | ||
@@ -83,4 +83,6 @@ export declare const DEFAULT_MIN_REPLICAS = 2; | ||
private rebalanceParticipationDebounced; | ||
private distributeInterval; | ||
replicas: ReplicationLimits; | ||
timeUntilRoleMaturity: number; | ||
waitForReplicatorTimeout: number; | ||
constructor(properties?: { | ||
@@ -141,4 +143,4 @@ id?: Uint8Array; | ||
}): Promise<any>[]; | ||
_queue: PQueue; | ||
distribute(): Promise<void>; | ||
private _queue; | ||
distribute(): Promise<false | void>; | ||
_distribute(): Promise<false | undefined>; | ||
@@ -154,4 +156,3 @@ _onUnsubscription(evt: CustomEvent<UnsubcriptionEvent>): Promise<void>; | ||
calculateTrend(): Promise<number>; | ||
xxx: number; | ||
rebalanceParticipation(onRoleChange?: boolean): Promise<boolean>; | ||
} |
@@ -17,3 +17,3 @@ var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { | ||
import { logger as loggerFn } from "@peerbit/logger"; | ||
import { ExchangeHeadsMessage, RequestIPrune, ResponseIPrune, createExchangeHeadsMessage } from "./exchange-heads.js"; | ||
import { ExchangeHeadsMessage, RequestIPrune, ResponseIPrune, createExchangeHeadsMessages } from "./exchange-heads.js"; | ||
import { AbortError, waitFor } from "@peerbit/time"; | ||
@@ -26,3 +26,3 @@ import { Observer, Replicator, Role } from "./role.js"; | ||
import yallist from "yallist"; | ||
import { AcknowledgeDelivery, SeekDelivery, SilentDelivery } from "@peerbit/stream-interface"; | ||
import { AcknowledgeDelivery, SeekDelivery, SilentDelivery, NotStartedError } from "@peerbit/stream-interface"; | ||
import { AnyBlockStore, RemoteBlocks } from "@peerbit/blocks"; | ||
@@ -86,4 +86,6 @@ import { BlocksMessage } from "./blocks.js"; | ||
rebalanceParticipationDebounced; | ||
distributeInterval; | ||
replicas; | ||
timeUntilRoleMaturity; | ||
waitForReplicatorTimeout; | ||
constructor(properties) { | ||
@@ -101,3 +103,4 @@ super(); | ||
setupRebalanceDebounceFunction() { | ||
this.rebalanceParticipationDebounced = debounce(() => this.rebalanceParticipation(), Math.max(REBALANCE_DEBOUNCE_INTERVAL, (this.getReplicatorsSorted()?.length || 0) * REBALANCE_DEBOUNCE_INTERVAL)); | ||
this.rebalanceParticipationDebounced = debounce(() => this.rebalanceParticipation(), Math.max(REBALANCE_DEBOUNCE_INTERVAL, Math.log((this.getReplicatorsSorted()?.length || 0) * | ||
REBALANCE_DEBOUNCE_INTERVAL))); | ||
} | ||
@@ -201,12 +204,24 @@ setupRole(options) { | ||
let mode = undefined; | ||
if (options?.target === "replicators" || !options?.target) { | ||
const leaders = await this.findLeaders(result.entry.meta.gid, decodeReplicas(result.entry).getValue(this)); | ||
const isLeader = leaders.includes(this.node.identity.publicKey.hashcode()); | ||
mode = isLeader | ||
? new SilentDelivery({ redundancy: 1, to: leaders }) | ||
: new AcknowledgeDelivery({ redundancy: 1, to: leaders }); | ||
for (const message of await createExchangeHeadsMessages(this.log, [result.entry], this._gidParentCache)) { | ||
if (options?.target === "replicators" || !options?.target) { | ||
const minReplicas = decodeReplicas(result.entry).getValue(this); | ||
let leaders = await this.findLeaders(result.entry.meta.gid, minReplicas); | ||
const isLeader = leaders.includes(this.node.identity.publicKey.hashcode()); | ||
if (message.heads[0].gidRefrences.length > 0) { | ||
const newAndOldLeaders = new Set(leaders); | ||
for (const ref of message.heads[0].gidRefrences) { | ||
for (const hash of await this.findLeaders(ref, minReplicas)) { | ||
newAndOldLeaders.add(hash); | ||
} | ||
} | ||
leaders = newAndOldLeaders; | ||
} | ||
mode = isLeader | ||
? new SilentDelivery({ redundancy: 1, to: leaders }) | ||
: new AcknowledgeDelivery({ redundancy: 1, to: leaders }); | ||
} | ||
await this.rpc.send(message, { | ||
mode | ||
}); | ||
} | ||
await this.rpc.send(await createExchangeHeadsMessage(this.log, [result.entry], this._gidParentCache), { | ||
mode | ||
}); | ||
this.rebalanceParticipationDebounced?.(); | ||
@@ -235,2 +250,4 @@ return result; | ||
options?.timeUntilRoleMaturity || WAIT_FOR_ROLE_MATURITY; | ||
this.waitForReplicatorTimeout = | ||
options?.waitForReplicatorTimeout || WAIT_FOR_REPLICATOR_TIMEOUT; | ||
this._gidParentCache = new Cache({ max: 1000 }); | ||
@@ -316,2 +333,8 @@ this._closeController = new AbortController(); | ||
await this.log.load(); | ||
// TODO (do better) | ||
// we do this distribution interval to eliminate the sideeffects arriving from updating roles and joining entries continously. | ||
// an alternative to this would be to call distribute/maybe prune after every join if our role has changed | ||
this.distributeInterval = setInterval(() => { | ||
this.distribute(); | ||
}, 7.5 * 1000); | ||
} | ||
@@ -338,2 +361,3 @@ async afterOpen() { | ||
async _close() { | ||
clearInterval(this.distributeInterval); | ||
this._closeController.abort(); | ||
@@ -435,9 +459,9 @@ this.node.services.pubsub.removeEventListener("subscribe", this._onSubscriptionFn); | ||
if (isLeader || this.sync?.(entry.entry)) { | ||
toMerge.push(entry); | ||
toMerge.push(entry.entry); | ||
} | ||
else { | ||
for (const ref of entry.references) { | ||
const map = this.log.headsIndex.gids.get(await ref.getGid()); | ||
for (const ref of entry.gidRefrences) { | ||
const map = this.log.headsIndex.gids.get(ref); | ||
if (map && map.size > 0) { | ||
toMerge.push(entry); | ||
toMerge.push(entry.entry); | ||
(toDelete || (toDelete = [])).push(entry.entry); | ||
@@ -556,3 +580,3 @@ continue outer; | ||
signal: this._closeController.signal, | ||
timeout: WAIT_FOR_REPLICATOR_TIMEOUT | ||
timeout: this.waitForReplicatorTimeout | ||
}) | ||
@@ -571,2 +595,5 @@ .then(async () => { | ||
} | ||
if (e instanceof NotStartedError) { | ||
return; | ||
} | ||
logger.error("Failed to find peer who updated their role: " + e?.message); | ||
@@ -614,3 +641,3 @@ }); | ||
} | ||
async waitForIsLeader(slot, numberOfLeaders, timeout = WAIT_FOR_REPLICATOR_TIMEOUT) { | ||
async waitForIsLeader(slot, numberOfLeaders, timeout = this.waitForReplicatorTimeout) { | ||
return new Promise((res, rej) => { | ||
@@ -1037,3 +1064,3 @@ const removeListeners = () => { | ||
} | ||
(this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add(() => this._distribute()); | ||
return (this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add(() => this._distribute()); | ||
} | ||
@@ -1103,6 +1130,6 @@ async _distribute() { | ||
// TODO better choice of step size | ||
for (let i = 0; i < entries.length; i += 100) { | ||
const message = await createExchangeHeadsMessage(this.log, entries.slice(i, i + 100), this._gidParentCache); | ||
// TODO perhaps send less messages to more receivers for performance reasons? | ||
// TODO wait for previous send to target before trying to send more? | ||
const messages = await createExchangeHeadsMessages(this.log, entries, this._gidParentCache); | ||
// TODO perhaps send less messages to more receivers for performance reasons? | ||
// TODO wait for previous send to target before trying to send more? | ||
for (const message of messages) { | ||
this.rpc.send(message, { | ||
@@ -1159,9 +1186,5 @@ mode: new SilentDelivery({ to: [target], redundancy: 1 }) | ||
} | ||
xxx; | ||
async rebalanceParticipation(onRoleChange = true) { | ||
// update more participation rate to converge to the average expected rate or bounded by | ||
// resources such as memory and or cpu | ||
const t = +new Date(); | ||
// console.log(t - this.xxx) | ||
this.xxx = t; | ||
if (this.closed) { | ||
@@ -1168,0 +1191,0 @@ return false; |
@@ -16,3 +16,3 @@ export class PIDReplicationController { | ||
return memory < 0 | ||
? memory * 0.9 + balance * 0.06 + coverage * 0.04 | ||
? memory * 0.9 + balance * 0.07 + coverage * 0.03 | ||
: balance * 0.6 + coverage * 0.4; | ||
@@ -68,3 +68,3 @@ } } = options; | ||
// Beta controls how much of the accumulated error we should forget | ||
const beta = 0.8; | ||
const beta = 0.3; | ||
this.integral = beta * totalError + (1 - beta) * this.integral; | ||
@@ -80,29 +80,36 @@ const iTerm = this.ki * this.integral; | ||
this.prevError = totalError; | ||
if (newFactor < 0 || newFactor > 1) { | ||
// reset integral term if we are "way" out of bounds | ||
if (newFactor < 0 && this.integral < 0) { | ||
this.integral = 0; | ||
} | ||
// prevent drift when everone wants to do less | ||
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) { | ||
newFactor = currentFactor; | ||
else if (newFactor > 1 && this.integral > 0) { | ||
this.integral = 0; | ||
} | ||
*/ | ||
/* console.log({ | ||
id: this.id, | ||
currentFactor, | ||
newFactor, | ||
factorDiff: newFactor - currentFactor, | ||
pTerm, | ||
dTerm, | ||
iTerm, | ||
totalError, | ||
errorTarget: errorBalance, | ||
errorCoverage, | ||
errorMemory, | ||
peerCount, | ||
totalFactor, | ||
totalFactorDiff, | ||
targetScaler: balanceErrorScaler, | ||
estimatedTotalSize | ||
}); */ | ||
// prevent drift when everone wants to do less | ||
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) { | ||
newFactor = currentFactor; | ||
this.integral = 0; | ||
} */ | ||
/* */ | ||
/* { | ||
console.log({ | ||
id: this.id, | ||
currentFactor, | ||
newFactor, | ||
factorDiff: newFactor - currentFactor, | ||
pTerm, | ||
dTerm, | ||
iTerm, | ||
totalError, | ||
errorTarget: errorBalance, | ||
errorCoverage, | ||
errorMemory, | ||
peerCount, | ||
totalFactor, | ||
totalFactorDiff, | ||
targetScaler: balanceErrorScaler, | ||
memoryUsage, | ||
estimatedTotalSize | ||
}); | ||
} */ | ||
return Math.max(Math.min(newFactor, 1), 0); | ||
@@ -109,0 +116,0 @@ } |
{ | ||
"name": "@peerbit/shared-log", | ||
"version": "5.0.1", | ||
"version": "6.0.0", | ||
"description": "Shared log", | ||
@@ -33,14 +33,14 @@ "type": "module", | ||
"dependencies": { | ||
"@dao-xyz/borsh": "^5.1.8", | ||
"@peerbit/log": "3.0.12", | ||
"@dao-xyz/borsh": "^5.2.1", | ||
"@peerbit/log": "3.0.13", | ||
"@peerbit/logger": "1.0.2", | ||
"@peerbit/program": "3.0.10", | ||
"@peerbit/rpc": "3.0.12", | ||
"@peerbit/time": "2.0.4", | ||
"@peerbit/program": "3.0.11", | ||
"@peerbit/rpc": "3.0.13", | ||
"@peerbit/time": "2.0.5", | ||
"p-debounce": "^4.0.0" | ||
}, | ||
"devDependencies": { | ||
"@peerbit/test-utils": "^2.0.12" | ||
"@peerbit/test-utils": "^2.0.13" | ||
}, | ||
"gitHead": "2539f936bbe572f34a297bc8275fcd059105fc2a" | ||
"gitHead": "180a8c4e6ab6f713134c949d2d7ce9fc5648a4ce" | ||
} |
@@ -20,8 +20,8 @@ import { variant, field, vec, fixedArray } from "@dao-xyz/borsh"; | ||
@field({ type: vec(Entry) }) | ||
references: Entry<T>[]; // are some parents to the entry | ||
@field({ type: vec("string") }) | ||
gidRefrences: string[]; // are some parents to the entry | ||
constructor(properties: { entry: Entry<T>; references: Entry<T>[] }) { | ||
constructor(properties: { entry: Entry<T>; gidRefrences: string[] }) { | ||
this.entry = properties.entry; | ||
this.references = properties.references; | ||
this.gidRefrences = properties.gidRefrences; | ||
} | ||
@@ -79,23 +79,56 @@ } | ||
} | ||
const MAX_EXCHANGE_MESSAGE_SIZE = 5e6; // 5mb (since stream limits are 10mb) | ||
export const createExchangeHeadsMessage = async ( | ||
export const createExchangeHeadsMessages = async ( | ||
log: Log<any>, | ||
heads: Entry<any>[], | ||
gidParentCache: Cache<Entry<any>[]> | ||
) => { | ||
const headsSet = new Set(heads); | ||
const headsWithRefs = await Promise.all( | ||
heads.map(async (head) => { | ||
const refs = (await allEntriesWithUniqueGids(log, head, gidParentCache)) // 1mb total limit split on all heads | ||
.filter((r) => !headsSet.has(r)); | ||
return new EntryWithRefs({ | ||
entry: head, | ||
references: refs | ||
}); | ||
}) | ||
); | ||
logger.debug(`Send latest heads of '${log.id}'`); | ||
return new ExchangeHeadsMessage({ | ||
heads: headsWithRefs | ||
}); | ||
): Promise<ExchangeHeadsMessage<any>[]> => { | ||
const messages: ExchangeHeadsMessage<any>[] = []; | ||
let size = 0; | ||
let current: EntryWithRefs<any>[] = []; | ||
const visitedHeads = new Set<string>(); | ||
for (const fromHead of heads) { | ||
visitedHeads.add(fromHead.hash); | ||
// TODO eventually we don't want to load all refs | ||
// since majority of the old leader would not be interested in these anymore | ||
const refs = ( | ||
await allEntriesWithUniqueGids(log, fromHead, gidParentCache) | ||
).filter((x) => { | ||
if (visitedHeads.has(x.hash)) { | ||
return false; | ||
} | ||
visitedHeads.add(x.hash); | ||
return true; | ||
}); | ||
if (refs.length > 1000) { | ||
logger.warn("Large refs count: ", refs.length); | ||
} | ||
current.push( | ||
new EntryWithRefs({ | ||
entry: fromHead, | ||
gidRefrences: refs.map((x) => x.meta.gid) | ||
}) | ||
); | ||
size += fromHead.size; | ||
if (size > MAX_EXCHANGE_MESSAGE_SIZE) { | ||
messages.push( | ||
new ExchangeHeadsMessage({ | ||
heads: current | ||
}) | ||
); | ||
current = []; | ||
continue; | ||
} | ||
} | ||
if (current.length > 0) { | ||
messages.push( | ||
new ExchangeHeadsMessage({ | ||
heads: current | ||
}) | ||
); | ||
} | ||
return messages; | ||
}; | ||
@@ -125,3 +158,6 @@ | ||
if (!indexedEntry) { | ||
logger.error("Failed to find indexed entry for hash: " + next); | ||
logger.error( | ||
"Failed to find indexed entry for hash when fetching references: " + | ||
next | ||
); | ||
} else { | ||
@@ -128,0 +164,0 @@ nexts.push(indexedEntry); |
122
src/index.ts
@@ -25,3 +25,3 @@ import { RequestContext, RPC } from "@peerbit/rpc"; | ||
ResponseIPrune, | ||
createExchangeHeadsMessage | ||
createExchangeHeadsMessages | ||
} from "./exchange-heads.js"; | ||
@@ -54,3 +54,4 @@ import { | ||
SeekDelivery, | ||
SilentDelivery | ||
SilentDelivery, | ||
NotStartedError | ||
} from "@peerbit/stream-interface"; | ||
@@ -133,2 +134,3 @@ import { AnyBlockStore, RemoteBlocks } from "@peerbit/blocks"; | ||
timeUntilRoleMaturity?: number; | ||
waitForReplicatorTimeout?: number; | ||
}; | ||
@@ -210,5 +212,7 @@ | ||
private distributeInterval: ReturnType<typeof setInterval>; | ||
replicas: ReplicationLimits; | ||
timeUntilRoleMaturity: number; | ||
waitForReplicatorTimeout: number; | ||
@@ -234,3 +238,6 @@ constructor(properties?: { id?: Uint8Array }) { | ||
REBALANCE_DEBOUNCE_INTERVAL, | ||
(this.getReplicatorsSorted()?.length || 0) * REBALANCE_DEBOUNCE_INTERVAL | ||
Math.log( | ||
(this.getReplicatorsSorted()?.length || 0) * | ||
REBALANCE_DEBOUNCE_INTERVAL | ||
) | ||
) | ||
@@ -362,26 +369,34 @@ ); | ||
if (options?.target === "replicators" || !options?.target) { | ||
const leaders = await this.findLeaders( | ||
result.entry.meta.gid, | ||
decodeReplicas(result.entry).getValue(this) | ||
); | ||
const isLeader = leaders.includes( | ||
this.node.identity.publicKey.hashcode() | ||
); | ||
mode = isLeader | ||
? new SilentDelivery({ redundancy: 1, to: leaders }) | ||
: new AcknowledgeDelivery({ redundancy: 1, to: leaders }); | ||
} | ||
for (const message of await createExchangeHeadsMessages( | ||
this.log, | ||
[result.entry], | ||
this._gidParentCache | ||
)) { | ||
if (options?.target === "replicators" || !options?.target) { | ||
const minReplicas = decodeReplicas(result.entry).getValue(this); | ||
let leaders: string[] | Set<string> = await this.findLeaders( | ||
result.entry.meta.gid, | ||
minReplicas | ||
); | ||
const isLeader = leaders.includes( | ||
this.node.identity.publicKey.hashcode() | ||
); | ||
if (message.heads[0].gidRefrences.length > 0) { | ||
const newAndOldLeaders = new Set(leaders); | ||
for (const ref of message.heads[0].gidRefrences) { | ||
for (const hash of await this.findLeaders(ref, minReplicas)) { | ||
newAndOldLeaders.add(hash); | ||
} | ||
} | ||
leaders = newAndOldLeaders; | ||
} | ||
mode = isLeader | ||
? new SilentDelivery({ redundancy: 1, to: leaders }) | ||
: new AcknowledgeDelivery({ redundancy: 1, to: leaders }); | ||
} | ||
await this.rpc.send( | ||
await createExchangeHeadsMessage( | ||
this.log, | ||
[result.entry], | ||
this._gidParentCache | ||
), | ||
{ | ||
await this.rpc.send(message, { | ||
mode | ||
} | ||
); | ||
}); | ||
} | ||
this.rebalanceParticipationDebounced?.(); | ||
@@ -413,2 +428,4 @@ | ||
options?.timeUntilRoleMaturity || WAIT_FOR_ROLE_MATURITY; | ||
this.waitForReplicatorTimeout = | ||
options?.waitForReplicatorTimeout || WAIT_FOR_REPLICATOR_TIMEOUT; | ||
this._gidParentCache = new Cache({ max: 1000 }); | ||
@@ -518,2 +535,9 @@ this._closeController = new AbortController(); | ||
await this.log.load(); | ||
// TODO (do better) | ||
// we do this distribution interval to eliminate the sideeffects arriving from updating roles and joining entries continously. | ||
// an alternative to this would be to call distribute/maybe prune after every join if our role has changed | ||
this.distributeInterval = setInterval(() => { | ||
this.distribute(); | ||
}, 7.5 * 1000); | ||
} | ||
@@ -549,2 +573,3 @@ | ||
private async _close() { | ||
clearInterval(this.distributeInterval); | ||
this._closeController.abort(); | ||
@@ -640,3 +665,3 @@ | ||
const toMerge: EntryWithRefs<any>[] = []; | ||
const toMerge: Entry<any>[] = []; | ||
let toDelete: Entry<any>[] | undefined = undefined; | ||
@@ -671,3 +696,5 @@ let maybeDelete: EntryWithRefs<any>[][] | undefined = undefined; | ||
)); | ||
const isLeader = !!leaders; | ||
if (isLeader) { | ||
@@ -690,10 +717,8 @@ if (leaders.find((x) => x === context.from!.hashcode())) { | ||
if (isLeader || this.sync?.(entry.entry)) { | ||
toMerge.push(entry); | ||
toMerge.push(entry.entry); | ||
} else { | ||
for (const ref of entry.references) { | ||
const map = this.log.headsIndex.gids.get( | ||
await ref.getGid() | ||
); | ||
for (const ref of entry.gidRefrences) { | ||
const map = this.log.headsIndex.gids.get(ref); | ||
if (map && map.size > 0) { | ||
toMerge.push(entry); | ||
toMerge.push(entry.entry); | ||
(toDelete || (toDelete = [])).push(entry.entry); | ||
@@ -841,3 +866,3 @@ continue outer; | ||
signal: this._closeController.signal, | ||
timeout: WAIT_FOR_REPLICATOR_TIMEOUT | ||
timeout: this.waitForReplicatorTimeout | ||
}) | ||
@@ -860,2 +885,5 @@ .then(async () => { | ||
} | ||
if (e instanceof NotStartedError) { | ||
return; | ||
} | ||
logger.error( | ||
@@ -931,3 +959,3 @@ "Failed to find peer who updated their role: " + e?.message | ||
numberOfLeaders: number, | ||
timeout = WAIT_FOR_REPLICATOR_TIMEOUT | ||
timeout = this.waitForReplicatorTimeout | ||
): Promise<string[] | false> { | ||
@@ -1503,3 +1531,3 @@ return new Promise((res, rej) => { | ||
_queue: PQueue; | ||
private _queue: PQueue; | ||
async distribute() { | ||
@@ -1509,4 +1537,4 @@ if (this._queue?.size > 0) { | ||
} | ||
(this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add(() => | ||
this._distribute() | ||
return (this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add( | ||
() => this._distribute() | ||
); | ||
@@ -1593,10 +1621,10 @@ } | ||
// TODO better choice of step size | ||
for (let i = 0; i < entries.length; i += 100) { | ||
const message = await createExchangeHeadsMessage( | ||
this.log, | ||
entries.slice(i, i + 100), | ||
this._gidParentCache | ||
); | ||
// TODO perhaps send less messages to more receivers for performance reasons? | ||
// TODO wait for previous send to target before trying to send more? | ||
const messages = await createExchangeHeadsMessages( | ||
this.log, | ||
entries, | ||
this._gidParentCache | ||
); | ||
// TODO perhaps send less messages to more receivers for performance reasons? | ||
// TODO wait for previous send to target before trying to send more? | ||
for (const message of messages) { | ||
this.rpc.send(message, { | ||
@@ -1685,3 +1713,2 @@ mode: new SilentDelivery({ to: [target], redundancy: 1 }) | ||
xxx: number; | ||
async rebalanceParticipation(onRoleChange = true) { | ||
@@ -1691,5 +1718,2 @@ // update more participation rate to converge to the average expected rate or bounded by | ||
const t = +new Date(); | ||
// console.log(t - this.xxx) | ||
this.xxx = t; | ||
if (this.closed) { | ||
@@ -1696,0 +1720,0 @@ return false; |
@@ -34,3 +34,3 @@ export type ReplicationErrorFunction = (objectives: { | ||
return memory < 0 | ||
? memory * 0.9 + balance * 0.06 + coverage * 0.04 | ||
? memory * 0.9 + balance * 0.07 + coverage * 0.03 | ||
: balance * 0.6 + coverage * 0.4; | ||
@@ -108,3 +108,3 @@ } | ||
// Beta controls how much of the accumulated error we should forget | ||
const beta = 0.8; | ||
const beta = 0.3; | ||
this.integral = beta * totalError + (1 - beta) * this.integral; | ||
@@ -125,32 +125,38 @@ | ||
if (newFactor < 0 || newFactor > 1) { | ||
// reset integral term if we are "way" out of bounds | ||
if (newFactor < 0 && this.integral < 0) { | ||
this.integral = 0; | ||
} else if (newFactor > 1 && this.integral > 0) { | ||
this.integral = 0; | ||
} | ||
// prevent drift when everone wants to do less | ||
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) { | ||
newFactor = currentFactor; | ||
this.integral = 0; | ||
} | ||
*/ | ||
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) { | ||
newFactor = currentFactor; | ||
this.integral = 0; | ||
} */ | ||
/* console.log({ | ||
id: this.id, | ||
currentFactor, | ||
newFactor, | ||
factorDiff: newFactor - currentFactor, | ||
pTerm, | ||
dTerm, | ||
iTerm, | ||
totalError, | ||
errorTarget: errorBalance, | ||
errorCoverage, | ||
errorMemory, | ||
peerCount, | ||
totalFactor, | ||
totalFactorDiff, | ||
targetScaler: balanceErrorScaler, | ||
estimatedTotalSize | ||
}); */ | ||
/* */ | ||
/* { | ||
console.log({ | ||
id: this.id, | ||
currentFactor, | ||
newFactor, | ||
factorDiff: newFactor - currentFactor, | ||
pTerm, | ||
dTerm, | ||
iTerm, | ||
totalError, | ||
errorTarget: errorBalance, | ||
errorCoverage, | ||
errorMemory, | ||
peerCount, | ||
totalFactor, | ||
totalFactorDiff, | ||
targetScaler: balanceErrorScaler, | ||
memoryUsage, | ||
estimatedTotalSize | ||
}); | ||
} */ | ||
return Math.max(Math.min(newFactor, 1), 0); | ||
@@ -157,0 +163,0 @@ } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
231450
2.65%4433
2.76%+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
Updated
Updated
Updated
Updated
Updated