New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@peerbit/shared-log

Package Overview
Dependencies
Maintainers
1
Versions
261
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@peerbit/shared-log - npm Package Compare versions

Comparing version

to
6.0.0

6

lib/esm/exchange-heads.d.ts

@@ -12,6 +12,6 @@ import { Entry } from "@peerbit/log";

entry: Entry<T>;
references: Entry<T>[];
gidRefrences: string[];
constructor(properties: {
entry: Entry<T>;
references: Entry<T>[];
gidRefrences: string[];
});

@@ -44,3 +44,3 @@ }

}
export declare const createExchangeHeadsMessage: (log: Log<any>, heads: Entry<any>[], gidParentCache: Cache<Entry<any>[]>) => Promise<ExchangeHeadsMessage<any>>;
export declare const createExchangeHeadsMessages: (log: Log<any>, heads: Entry<any>[], gidParentCache: Cache<Entry<any>[]>) => Promise<ExchangeHeadsMessage<any>[]>;
export declare const allEntriesWithUniqueGids: (log: Log<any>, entry: Entry<any>, gidParentCache: Cache<Entry<any>[]>) => Promise<Entry<any>[]>;

@@ -22,6 +22,6 @@ var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) {

entry;
references; // are some parents to the entry
gidRefrences; // are some parents to the entry
constructor(properties) {
this.entry = properties.entry;
this.references = properties.references;
this.gidRefrences = properties.gidRefrences;
}

@@ -34,5 +34,5 @@ };

__decorate([
field({ type: vec(Entry) }),
field({ type: vec("string") }),
__metadata("design:type", Array)
], EntryWithRefs.prototype, "references", void 0);
], EntryWithRefs.prototype, "gidRefrences", void 0);
EntryWithRefs = __decorate([

@@ -115,16 +115,41 @@ variant(0),

export { ResponseIPrune };
export const createExchangeHeadsMessage = async (log, heads, gidParentCache) => {
const headsSet = new Set(heads);
const headsWithRefs = await Promise.all(heads.map(async (head) => {
const refs = (await allEntriesWithUniqueGids(log, head, gidParentCache)) // 1mb total limit split on all heads
.filter((r) => !headsSet.has(r));
return new EntryWithRefs({
entry: head,
references: refs
const MAX_EXCHANGE_MESSAGE_SIZE = 5e6; // 5mb (since stream limits are 10mb)
export const createExchangeHeadsMessages = async (log, heads, gidParentCache) => {
const messages = [];
let size = 0;
let current = [];
const visitedHeads = new Set();
for (const fromHead of heads) {
visitedHeads.add(fromHead.hash);
// TODO eventually we don't want to load all refs
// since majority of the old leader would not be interested in these anymore
const refs = (await allEntriesWithUniqueGids(log, fromHead, gidParentCache)).filter((x) => {
if (visitedHeads.has(x.hash)) {
return false;
}
visitedHeads.add(x.hash);
return true;
});
}));
logger.debug(`Send latest heads of '${log.id}'`);
return new ExchangeHeadsMessage({
heads: headsWithRefs
});
if (refs.length > 1000) {
logger.warn("Large refs count: ", refs.length);
}
current.push(new EntryWithRefs({
entry: fromHead,
gidRefrences: refs.map((x) => x.meta.gid)
}));
size += fromHead.size;
if (size > MAX_EXCHANGE_MESSAGE_SIZE) {
messages.push(new ExchangeHeadsMessage({
heads: current
}));
current = [];
continue;
}
}
if (current.length > 0) {
messages.push(new ExchangeHeadsMessage({
heads: current
}));
}
return messages;
};

@@ -148,3 +173,4 @@ export const allEntriesWithUniqueGids = async (log, entry, gidParentCache) => {

if (!indexedEntry) {
logger.error("Failed to find indexed entry for hash: " + next);
logger.error("Failed to find indexed entry for hash when fetching references: " +
next);
}

@@ -151,0 +177,0 @@ else {

@@ -13,3 +13,2 @@ import { RequestContext, RPC } from "@peerbit/rpc";

export * from "./replication.js";
import PQueue from "p-queue";
export { Observer, Replicator, Role };

@@ -44,2 +43,3 @@ export declare const logger: import("pino").Logger<never>;

timeUntilRoleMaturity?: number;
waitForReplicatorTimeout?: number;
};

@@ -83,4 +83,6 @@ export declare const DEFAULT_MIN_REPLICAS = 2;

private rebalanceParticipationDebounced;
private distributeInterval;
replicas: ReplicationLimits;
timeUntilRoleMaturity: number;
waitForReplicatorTimeout: number;
constructor(properties?: {

@@ -141,4 +143,4 @@ id?: Uint8Array;

}): Promise<any>[];
_queue: PQueue;
distribute(): Promise<void>;
private _queue;
distribute(): Promise<false | void>;
_distribute(): Promise<false | undefined>;

@@ -154,4 +156,3 @@ _onUnsubscription(evt: CustomEvent<UnsubcriptionEvent>): Promise<void>;

calculateTrend(): Promise<number>;
xxx: number;
rebalanceParticipation(onRoleChange?: boolean): Promise<boolean>;
}

@@ -17,3 +17,3 @@ var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) {

import { logger as loggerFn } from "@peerbit/logger";
import { ExchangeHeadsMessage, RequestIPrune, ResponseIPrune, createExchangeHeadsMessage } from "./exchange-heads.js";
import { ExchangeHeadsMessage, RequestIPrune, ResponseIPrune, createExchangeHeadsMessages } from "./exchange-heads.js";
import { AbortError, waitFor } from "@peerbit/time";

@@ -26,3 +26,3 @@ import { Observer, Replicator, Role } from "./role.js";

import yallist from "yallist";
import { AcknowledgeDelivery, SeekDelivery, SilentDelivery } from "@peerbit/stream-interface";
import { AcknowledgeDelivery, SeekDelivery, SilentDelivery, NotStartedError } from "@peerbit/stream-interface";
import { AnyBlockStore, RemoteBlocks } from "@peerbit/blocks";

@@ -86,4 +86,6 @@ import { BlocksMessage } from "./blocks.js";

rebalanceParticipationDebounced;
distributeInterval;
replicas;
timeUntilRoleMaturity;
waitForReplicatorTimeout;
constructor(properties) {

@@ -101,3 +103,4 @@ super();

setupRebalanceDebounceFunction() {
this.rebalanceParticipationDebounced = debounce(() => this.rebalanceParticipation(), Math.max(REBALANCE_DEBOUNCE_INTERVAL, (this.getReplicatorsSorted()?.length || 0) * REBALANCE_DEBOUNCE_INTERVAL));
this.rebalanceParticipationDebounced = debounce(() => this.rebalanceParticipation(), Math.max(REBALANCE_DEBOUNCE_INTERVAL, Math.log((this.getReplicatorsSorted()?.length || 0) *
REBALANCE_DEBOUNCE_INTERVAL)));
}

@@ -201,12 +204,24 @@ setupRole(options) {

let mode = undefined;
if (options?.target === "replicators" || !options?.target) {
const leaders = await this.findLeaders(result.entry.meta.gid, decodeReplicas(result.entry).getValue(this));
const isLeader = leaders.includes(this.node.identity.publicKey.hashcode());
mode = isLeader
? new SilentDelivery({ redundancy: 1, to: leaders })
: new AcknowledgeDelivery({ redundancy: 1, to: leaders });
for (const message of await createExchangeHeadsMessages(this.log, [result.entry], this._gidParentCache)) {
if (options?.target === "replicators" || !options?.target) {
const minReplicas = decodeReplicas(result.entry).getValue(this);
let leaders = await this.findLeaders(result.entry.meta.gid, minReplicas);
const isLeader = leaders.includes(this.node.identity.publicKey.hashcode());
if (message.heads[0].gidRefrences.length > 0) {
const newAndOldLeaders = new Set(leaders);
for (const ref of message.heads[0].gidRefrences) {
for (const hash of await this.findLeaders(ref, minReplicas)) {
newAndOldLeaders.add(hash);
}
}
leaders = newAndOldLeaders;
}
mode = isLeader
? new SilentDelivery({ redundancy: 1, to: leaders })
: new AcknowledgeDelivery({ redundancy: 1, to: leaders });
}
await this.rpc.send(message, {
mode
});
}
await this.rpc.send(await createExchangeHeadsMessage(this.log, [result.entry], this._gidParentCache), {
mode
});
this.rebalanceParticipationDebounced?.();

@@ -235,2 +250,4 @@ return result;

options?.timeUntilRoleMaturity || WAIT_FOR_ROLE_MATURITY;
this.waitForReplicatorTimeout =
options?.waitForReplicatorTimeout || WAIT_FOR_REPLICATOR_TIMEOUT;
this._gidParentCache = new Cache({ max: 1000 });

@@ -316,2 +333,8 @@ this._closeController = new AbortController();

await this.log.load();
// TODO (do better)
// we do this distribution interval to eliminate the sideeffects arriving from updating roles and joining entries continously.
// an alternative to this would be to call distribute/maybe prune after every join if our role has changed
this.distributeInterval = setInterval(() => {
this.distribute();
}, 7.5 * 1000);
}

@@ -338,2 +361,3 @@ async afterOpen() {

async _close() {
clearInterval(this.distributeInterval);
this._closeController.abort();

@@ -435,9 +459,9 @@ this.node.services.pubsub.removeEventListener("subscribe", this._onSubscriptionFn);

if (isLeader || this.sync?.(entry.entry)) {
toMerge.push(entry);
toMerge.push(entry.entry);
}
else {
for (const ref of entry.references) {
const map = this.log.headsIndex.gids.get(await ref.getGid());
for (const ref of entry.gidRefrences) {
const map = this.log.headsIndex.gids.get(ref);
if (map && map.size > 0) {
toMerge.push(entry);
toMerge.push(entry.entry);
(toDelete || (toDelete = [])).push(entry.entry);

@@ -556,3 +580,3 @@ continue outer;

signal: this._closeController.signal,
timeout: WAIT_FOR_REPLICATOR_TIMEOUT
timeout: this.waitForReplicatorTimeout
})

@@ -571,2 +595,5 @@ .then(async () => {

}
if (e instanceof NotStartedError) {
return;
}
logger.error("Failed to find peer who updated their role: " + e?.message);

@@ -614,3 +641,3 @@ });

}
async waitForIsLeader(slot, numberOfLeaders, timeout = WAIT_FOR_REPLICATOR_TIMEOUT) {
async waitForIsLeader(slot, numberOfLeaders, timeout = this.waitForReplicatorTimeout) {
return new Promise((res, rej) => {

@@ -1037,3 +1064,3 @@ const removeListeners = () => {

}
(this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add(() => this._distribute());
return (this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add(() => this._distribute());
}

@@ -1103,6 +1130,6 @@ async _distribute() {

// TODO better choice of step size
for (let i = 0; i < entries.length; i += 100) {
const message = await createExchangeHeadsMessage(this.log, entries.slice(i, i + 100), this._gidParentCache);
// TODO perhaps send less messages to more receivers for performance reasons?
// TODO wait for previous send to target before trying to send more?
const messages = await createExchangeHeadsMessages(this.log, entries, this._gidParentCache);
// TODO perhaps send less messages to more receivers for performance reasons?
// TODO wait for previous send to target before trying to send more?
for (const message of messages) {
this.rpc.send(message, {

@@ -1159,9 +1186,5 @@ mode: new SilentDelivery({ to: [target], redundancy: 1 })

}
xxx;
async rebalanceParticipation(onRoleChange = true) {
// update more participation rate to converge to the average expected rate or bounded by
// resources such as memory and or cpu
const t = +new Date();
// console.log(t - this.xxx)
this.xxx = t;
if (this.closed) {

@@ -1168,0 +1191,0 @@ return false;

@@ -16,3 +16,3 @@ export class PIDReplicationController {

return memory < 0
? memory * 0.9 + balance * 0.06 + coverage * 0.04
? memory * 0.9 + balance * 0.07 + coverage * 0.03
: balance * 0.6 + coverage * 0.4;

@@ -68,3 +68,3 @@ } } = options;

// Beta controls how much of the accumulated error we should forget
const beta = 0.8;
const beta = 0.3;
this.integral = beta * totalError + (1 - beta) * this.integral;

@@ -80,29 +80,36 @@ const iTerm = this.ki * this.integral;

this.prevError = totalError;
if (newFactor < 0 || newFactor > 1) {
// reset integral term if we are "way" out of bounds
if (newFactor < 0 && this.integral < 0) {
this.integral = 0;
}
// prevent drift when everone wants to do less
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) {
newFactor = currentFactor;
else if (newFactor > 1 && this.integral > 0) {
this.integral = 0;
}
*/
/* console.log({
id: this.id,
currentFactor,
newFactor,
factorDiff: newFactor - currentFactor,
pTerm,
dTerm,
iTerm,
totalError,
errorTarget: errorBalance,
errorCoverage,
errorMemory,
peerCount,
totalFactor,
totalFactorDiff,
targetScaler: balanceErrorScaler,
estimatedTotalSize
}); */
// prevent drift when everone wants to do less
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) {
newFactor = currentFactor;
this.integral = 0;
} */
/* */
/* {
console.log({
id: this.id,
currentFactor,
newFactor,
factorDiff: newFactor - currentFactor,
pTerm,
dTerm,
iTerm,
totalError,
errorTarget: errorBalance,
errorCoverage,
errorMemory,
peerCount,
totalFactor,
totalFactorDiff,
targetScaler: balanceErrorScaler,
memoryUsage,
estimatedTotalSize
});
} */
return Math.max(Math.min(newFactor, 1), 0);

@@ -109,0 +116,0 @@ }

{
"name": "@peerbit/shared-log",
"version": "5.0.1",
"version": "6.0.0",
"description": "Shared log",

@@ -33,14 +33,14 @@ "type": "module",

"dependencies": {
"@dao-xyz/borsh": "^5.1.8",
"@peerbit/log": "3.0.12",
"@dao-xyz/borsh": "^5.2.1",
"@peerbit/log": "3.0.13",
"@peerbit/logger": "1.0.2",
"@peerbit/program": "3.0.10",
"@peerbit/rpc": "3.0.12",
"@peerbit/time": "2.0.4",
"@peerbit/program": "3.0.11",
"@peerbit/rpc": "3.0.13",
"@peerbit/time": "2.0.5",
"p-debounce": "^4.0.0"
},
"devDependencies": {
"@peerbit/test-utils": "^2.0.12"
"@peerbit/test-utils": "^2.0.13"
},
"gitHead": "2539f936bbe572f34a297bc8275fcd059105fc2a"
"gitHead": "180a8c4e6ab6f713134c949d2d7ce9fc5648a4ce"
}

@@ -20,8 +20,8 @@ import { variant, field, vec, fixedArray } from "@dao-xyz/borsh";

@field({ type: vec(Entry) })
references: Entry<T>[]; // are some parents to the entry
@field({ type: vec("string") })
gidRefrences: string[]; // are some parents to the entry
constructor(properties: { entry: Entry<T>; references: Entry<T>[] }) {
constructor(properties: { entry: Entry<T>; gidRefrences: string[] }) {
this.entry = properties.entry;
this.references = properties.references;
this.gidRefrences = properties.gidRefrences;
}

@@ -79,23 +79,56 @@ }

}
const MAX_EXCHANGE_MESSAGE_SIZE = 5e6; // 5mb (since stream limits are 10mb)
export const createExchangeHeadsMessage = async (
export const createExchangeHeadsMessages = async (
log: Log<any>,
heads: Entry<any>[],
gidParentCache: Cache<Entry<any>[]>
) => {
const headsSet = new Set(heads);
const headsWithRefs = await Promise.all(
heads.map(async (head) => {
const refs = (await allEntriesWithUniqueGids(log, head, gidParentCache)) // 1mb total limit split on all heads
.filter((r) => !headsSet.has(r));
return new EntryWithRefs({
entry: head,
references: refs
});
})
);
logger.debug(`Send latest heads of '${log.id}'`);
return new ExchangeHeadsMessage({
heads: headsWithRefs
});
): Promise<ExchangeHeadsMessage<any>[]> => {
const messages: ExchangeHeadsMessage<any>[] = [];
let size = 0;
let current: EntryWithRefs<any>[] = [];
const visitedHeads = new Set<string>();
for (const fromHead of heads) {
visitedHeads.add(fromHead.hash);
// TODO eventually we don't want to load all refs
// since majority of the old leader would not be interested in these anymore
const refs = (
await allEntriesWithUniqueGids(log, fromHead, gidParentCache)
).filter((x) => {
if (visitedHeads.has(x.hash)) {
return false;
}
visitedHeads.add(x.hash);
return true;
});
if (refs.length > 1000) {
logger.warn("Large refs count: ", refs.length);
}
current.push(
new EntryWithRefs({
entry: fromHead,
gidRefrences: refs.map((x) => x.meta.gid)
})
);
size += fromHead.size;
if (size > MAX_EXCHANGE_MESSAGE_SIZE) {
messages.push(
new ExchangeHeadsMessage({
heads: current
})
);
current = [];
continue;
}
}
if (current.length > 0) {
messages.push(
new ExchangeHeadsMessage({
heads: current
})
);
}
return messages;
};

@@ -125,3 +158,6 @@

if (!indexedEntry) {
logger.error("Failed to find indexed entry for hash: " + next);
logger.error(
"Failed to find indexed entry for hash when fetching references: " +
next
);
} else {

@@ -128,0 +164,0 @@ nexts.push(indexedEntry);

@@ -25,3 +25,3 @@ import { RequestContext, RPC } from "@peerbit/rpc";

ResponseIPrune,
createExchangeHeadsMessage
createExchangeHeadsMessages
} from "./exchange-heads.js";

@@ -54,3 +54,4 @@ import {

SeekDelivery,
SilentDelivery
SilentDelivery,
NotStartedError
} from "@peerbit/stream-interface";

@@ -133,2 +134,3 @@ import { AnyBlockStore, RemoteBlocks } from "@peerbit/blocks";

timeUntilRoleMaturity?: number;
waitForReplicatorTimeout?: number;
};

@@ -210,5 +212,7 @@

private distributeInterval: ReturnType<typeof setInterval>;
replicas: ReplicationLimits;
timeUntilRoleMaturity: number;
waitForReplicatorTimeout: number;

@@ -234,3 +238,6 @@ constructor(properties?: { id?: Uint8Array }) {

REBALANCE_DEBOUNCE_INTERVAL,
(this.getReplicatorsSorted()?.length || 0) * REBALANCE_DEBOUNCE_INTERVAL
Math.log(
(this.getReplicatorsSorted()?.length || 0) *
REBALANCE_DEBOUNCE_INTERVAL
)
)

@@ -362,26 +369,34 @@ );

if (options?.target === "replicators" || !options?.target) {
const leaders = await this.findLeaders(
result.entry.meta.gid,
decodeReplicas(result.entry).getValue(this)
);
const isLeader = leaders.includes(
this.node.identity.publicKey.hashcode()
);
mode = isLeader
? new SilentDelivery({ redundancy: 1, to: leaders })
: new AcknowledgeDelivery({ redundancy: 1, to: leaders });
}
for (const message of await createExchangeHeadsMessages(
this.log,
[result.entry],
this._gidParentCache
)) {
if (options?.target === "replicators" || !options?.target) {
const minReplicas = decodeReplicas(result.entry).getValue(this);
let leaders: string[] | Set<string> = await this.findLeaders(
result.entry.meta.gid,
minReplicas
);
const isLeader = leaders.includes(
this.node.identity.publicKey.hashcode()
);
if (message.heads[0].gidRefrences.length > 0) {
const newAndOldLeaders = new Set(leaders);
for (const ref of message.heads[0].gidRefrences) {
for (const hash of await this.findLeaders(ref, minReplicas)) {
newAndOldLeaders.add(hash);
}
}
leaders = newAndOldLeaders;
}
mode = isLeader
? new SilentDelivery({ redundancy: 1, to: leaders })
: new AcknowledgeDelivery({ redundancy: 1, to: leaders });
}
await this.rpc.send(
await createExchangeHeadsMessage(
this.log,
[result.entry],
this._gidParentCache
),
{
await this.rpc.send(message, {
mode
}
);
});
}
this.rebalanceParticipationDebounced?.();

@@ -413,2 +428,4 @@

options?.timeUntilRoleMaturity || WAIT_FOR_ROLE_MATURITY;
this.waitForReplicatorTimeout =
options?.waitForReplicatorTimeout || WAIT_FOR_REPLICATOR_TIMEOUT;
this._gidParentCache = new Cache({ max: 1000 });

@@ -518,2 +535,9 @@ this._closeController = new AbortController();

await this.log.load();
// TODO (do better)
// we do this distribution interval to eliminate the sideeffects arriving from updating roles and joining entries continously.
// an alternative to this would be to call distribute/maybe prune after every join if our role has changed
this.distributeInterval = setInterval(() => {
this.distribute();
}, 7.5 * 1000);
}

@@ -549,2 +573,3 @@

private async _close() {
clearInterval(this.distributeInterval);
this._closeController.abort();

@@ -640,3 +665,3 @@

const toMerge: EntryWithRefs<any>[] = [];
const toMerge: Entry<any>[] = [];
let toDelete: Entry<any>[] | undefined = undefined;

@@ -671,3 +696,5 @@ let maybeDelete: EntryWithRefs<any>[][] | undefined = undefined;

));
const isLeader = !!leaders;
if (isLeader) {

@@ -690,10 +717,8 @@ if (leaders.find((x) => x === context.from!.hashcode())) {

if (isLeader || this.sync?.(entry.entry)) {
toMerge.push(entry);
toMerge.push(entry.entry);
} else {
for (const ref of entry.references) {
const map = this.log.headsIndex.gids.get(
await ref.getGid()
);
for (const ref of entry.gidRefrences) {
const map = this.log.headsIndex.gids.get(ref);
if (map && map.size > 0) {
toMerge.push(entry);
toMerge.push(entry.entry);
(toDelete || (toDelete = [])).push(entry.entry);

@@ -841,3 +866,3 @@ continue outer;

signal: this._closeController.signal,
timeout: WAIT_FOR_REPLICATOR_TIMEOUT
timeout: this.waitForReplicatorTimeout
})

@@ -860,2 +885,5 @@ .then(async () => {

}
if (e instanceof NotStartedError) {
return;
}
logger.error(

@@ -931,3 +959,3 @@ "Failed to find peer who updated their role: " + e?.message

numberOfLeaders: number,
timeout = WAIT_FOR_REPLICATOR_TIMEOUT
timeout = this.waitForReplicatorTimeout
): Promise<string[] | false> {

@@ -1503,3 +1531,3 @@ return new Promise((res, rej) => {

_queue: PQueue;
private _queue: PQueue;
async distribute() {

@@ -1509,4 +1537,4 @@ if (this._queue?.size > 0) {

}
(this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add(() =>
this._distribute()
return (this._queue || (this._queue = new PQueue({ concurrency: 1 }))).add(
() => this._distribute()
);

@@ -1593,10 +1621,10 @@ }

// TODO better choice of step size
for (let i = 0; i < entries.length; i += 100) {
const message = await createExchangeHeadsMessage(
this.log,
entries.slice(i, i + 100),
this._gidParentCache
);
// TODO perhaps send less messages to more receivers for performance reasons?
// TODO wait for previous send to target before trying to send more?
const messages = await createExchangeHeadsMessages(
this.log,
entries,
this._gidParentCache
);
// TODO perhaps send less messages to more receivers for performance reasons?
// TODO wait for previous send to target before trying to send more?
for (const message of messages) {
this.rpc.send(message, {

@@ -1685,3 +1713,2 @@ mode: new SilentDelivery({ to: [target], redundancy: 1 })

xxx: number;
async rebalanceParticipation(onRoleChange = true) {

@@ -1691,5 +1718,2 @@ // update more participation rate to converge to the average expected rate or bounded by

const t = +new Date();
// console.log(t - this.xxx)
this.xxx = t;
if (this.closed) {

@@ -1696,0 +1720,0 @@ return false;

@@ -34,3 +34,3 @@ export type ReplicationErrorFunction = (objectives: {

return memory < 0
? memory * 0.9 + balance * 0.06 + coverage * 0.04
? memory * 0.9 + balance * 0.07 + coverage * 0.03
: balance * 0.6 + coverage * 0.4;

@@ -108,3 +108,3 @@ }

// Beta controls how much of the accumulated error we should forget
const beta = 0.8;
const beta = 0.3;
this.integral = beta * totalError + (1 - beta) * this.integral;

@@ -125,32 +125,38 @@

if (newFactor < 0 || newFactor > 1) {
// reset integral term if we are "way" out of bounds
if (newFactor < 0 && this.integral < 0) {
this.integral = 0;
} else if (newFactor > 1 && this.integral > 0) {
this.integral = 0;
}
// prevent drift when everone wants to do less
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) {
newFactor = currentFactor;
this.integral = 0;
}
*/
/* if (newFactor < currentFactor && totalFactorDiff < 0 && totalFactor < 0.5) {
newFactor = currentFactor;
this.integral = 0;
} */
/* console.log({
id: this.id,
currentFactor,
newFactor,
factorDiff: newFactor - currentFactor,
pTerm,
dTerm,
iTerm,
totalError,
errorTarget: errorBalance,
errorCoverage,
errorMemory,
peerCount,
totalFactor,
totalFactorDiff,
targetScaler: balanceErrorScaler,
estimatedTotalSize
}); */
/* */
/* {
console.log({
id: this.id,
currentFactor,
newFactor,
factorDiff: newFactor - currentFactor,
pTerm,
dTerm,
iTerm,
totalError,
errorTarget: errorBalance,
errorCoverage,
errorMemory,
peerCount,
totalFactor,
totalFactorDiff,
targetScaler: balanceErrorScaler,
memoryUsage,
estimatedTotalSize
});
} */
return Math.max(Math.min(newFactor, 1), 0);

@@ -157,0 +163,0 @@ }

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet