Comparing version 6.0.0-rc4 to 6.0.0-rc5
194
index.js
@@ -23,6 +23,7 @@ const b4a = require('b4a') | ||
// default is to automatically ack | ||
const DEFAULT_ACK_INTERVAL = 10 * 1000 | ||
const DEFAULT_ACK_INTERVAL = 10_000 | ||
const DEFAULT_ACK_THRESHOLD = 4 | ||
const FF_THRESHOLD = 16 | ||
const DEFAULT_FF_TIMEOUT = 60_000 | ||
@@ -59,5 +60,10 @@ const REMOTE_ADD_BATCH = 64 | ||
this.fastForwardingTo = handlers.fastForward !== false ? 0 : -1 | ||
this.fastForwardEnabled = handlers.fastForward !== false | ||
this.fastForwarding = false | ||
this.fastForwardTo = null | ||
if (this.fastForwardEnabled && isObject(handlers.fastForward)) { | ||
this.fastForwardTo = handlers.fastForward | ||
} | ||
this._checkWriters = [] | ||
@@ -308,2 +314,9 @@ this._appending = null | ||
if (this.fastForwardTo !== null) { | ||
const { key, length, timeout } = this.fastForwardTo | ||
this.fastForwardTo = null // will get reset once ready | ||
this.initialFastForward(key, length, timeout || DEFAULT_FF_TIMEOUT) | ||
} | ||
if (this.localWriter && this._ackInterval) this._startAckTimer() | ||
@@ -442,3 +455,3 @@ } | ||
if (this.fastForwardTo !== null) return true | ||
return this.fastForwardingTo > 0 && this.fastForwardingTo > this.system.core.getBackingCore().length | ||
return this.fastForwardEnabled && this.fastForwarding | ||
} | ||
@@ -1027,6 +1040,22 @@ | ||
// NOTE: runs in parallel with everything, can never fail | ||
async initialFastForward (key, length, timeout) { | ||
const core = this.store.get(key) | ||
const target = await this._preFastForward(core, length, timeout) | ||
await core.close() | ||
// initial fast-forward failed | ||
if (target === null) return | ||
for (const w of this.activeWriters) w.pause() | ||
this.fastForwarding = false | ||
this.fastForwardTo = target | ||
this._bumpAckTimer() | ||
this._queueBump() | ||
} | ||
async queueFastForward () { | ||
// if already FFing, let the finish. TODO: auto kill the attempt after a while and move to latest? | ||
if (this.fastForwardingTo !== 0) return | ||
if (!this.fastForwardEnabled || this.fastForwarding) return | ||
@@ -1038,18 +1067,31 @@ const core = this.system.core.getBackingCore() | ||
this.fastForwardingTo = core.session.length | ||
const target = await this._preFastForward(core.session, core.session.length, null) | ||
// fast-forward failed | ||
if (target === null) return | ||
// if it migrated underneath us, ignore for now | ||
if (core !== this.system.core.getBackingCore()) return | ||
for (const w of this.activeWriters) w.pause() | ||
this.fastForwardTo = target | ||
this._bumpAckTimer() | ||
this._queueBump() | ||
} | ||
const fastForwardTo = { | ||
key: core.session.key, | ||
length: this.fastForwardingTo, | ||
migrate: null | ||
} | ||
// NOTE: runs in parallel with everything, can never fail | ||
async _preFastForward (core, length, timeout) { | ||
this.fastForwarding = true | ||
const info = { key: core.key, length } | ||
try { | ||
// sys runs open with wait false, so get head block first for low complexity | ||
if (!(await core.session.has(this.fastForwardingTo - 1))) { | ||
await core.session.get(this.fastForwardingTo - 1) | ||
if (!(await core.has(length - 1))) { | ||
await core.get(length - 1, { timeout }) | ||
} | ||
const system = new SystemView(core.session.session(), this.fastForwardingTo) | ||
const system = new SystemView(core.session(), length) | ||
await system.ready() | ||
@@ -1060,11 +1102,12 @@ | ||
version: system.version, | ||
length: this.fastForwardingTo | ||
length | ||
} | ||
this.fastForwardingTo = 0 | ||
this.fastForwarding = false | ||
this.emit('upgrade-available', upgrade) | ||
return | ||
return null | ||
} | ||
const migrated = !system.sameIndexers(this.linearizer.indexers) | ||
const systemShouldMigrate = b4a.equals(core.key, this.system.core.key) && | ||
!system.sameIndexers(this.linearizer.indexers) | ||
@@ -1081,22 +1124,2 @@ const indexers = [] | ||
let sysCore = system.core | ||
// handle system migration | ||
if (migrated) { | ||
const idx = [] | ||
for (const { key } of system.indexers) idx.push(await this._getWriterByKey(key)) | ||
const hash = sysCore.core.tree.hash() | ||
const name = this.system.core._source.name | ||
const prologue = { hash, length: this.fastForwardingTo } | ||
const key = this.deriveKey(name, idx, prologue) | ||
await sysCore.close() // close unused session | ||
fastForwardTo.migrate = { key } | ||
sysCore = this.store.get(key) | ||
} | ||
pendingViews.push({ core: sysCore, length: this.fastForwardingTo }) | ||
// handle rest of views | ||
@@ -1115,5 +1138,4 @@ for (const v of system.views) { | ||
const promises = [] | ||
for (const { core, length } of indexers) { | ||
if (core.length === 0 && length > 0) promises.push(core.get(length - 1)) | ||
if (core.length === 0 && length > 0) promises.push(core.get(length - 1, { timeout })) | ||
} | ||
@@ -1123,3 +1145,3 @@ | ||
// we could just get the hash here, but likely user wants the block so yolo | ||
promises.push(core.get(length - 1)) | ||
promises.push(core.get(length - 1, { timeout })) | ||
} | ||
@@ -1129,26 +1151,33 @@ | ||
const closing = [] | ||
// handle system migration | ||
if (systemShouldMigrate) { | ||
const hash = system.core.core.tree.hash() | ||
const name = this.system.core._source.name | ||
const prologue = { hash, length } | ||
info.key = this.deriveKey(name, indexers, prologue) | ||
const core = this.store.get(info.key) | ||
await core.get(length - 1) | ||
closing.push(core.close()) | ||
} | ||
for (const { core } of pendingViews) { | ||
await core.close() | ||
closing.push(core.close()) | ||
} | ||
await system.close() | ||
closing.push(system.close()) | ||
await Promise.allSettled(closing) | ||
} catch (err) { | ||
this.fastForwardingTo = 0 | ||
safetyCatch(err) | ||
return | ||
return null | ||
} finally { | ||
this.fastForwarding = false | ||
} | ||
// if it migrated underneath us, ignore for now | ||
if (core !== this.system.core.getBackingCore()) { | ||
this.fastForwardingTo = 0 | ||
return | ||
} | ||
for (const w of this.activeWriters) w.pause() | ||
this.fastForwardingTo = 0 | ||
this.fastForwardTo = fastForwardTo | ||
this._bumpAckTimer() | ||
this._queueBump() | ||
return info | ||
} | ||
@@ -1163,12 +1192,16 @@ | ||
async _applyFastForward () { | ||
const core = this.system.core.getBackingCore() | ||
const from = core.length | ||
// remember these in case another fast forward gets queued | ||
const { key, length, migrate } = this.fastForwardTo | ||
const { key, length } = this.fastForwardTo | ||
const migrated = !b4a.equals(key, this.system.core.key) | ||
const core = this.store.get(key) | ||
await core.ready() | ||
const from = this.system.core.getBackingCore().length | ||
// just extra sanity check | ||
// TODO: if we simply load the core from the corestore the key check isn't needed | ||
// getting rid of that is essential for dbl ff, but for now its ok with some safety from migrations | ||
if (!b4a.equals(core.key, key) || length <= from) { | ||
if (length <= from) { | ||
this._clearFastForward() | ||
@@ -1178,10 +1211,23 @@ return | ||
const system = new SystemView(core.session.session(), length) | ||
const system = new SystemView(core.session(), length) | ||
await system.ready() | ||
const indexers = [] // only used in migrate branch | ||
if (migrate) { | ||
const prologues = [] // only used in migrate branch | ||
// preload async state | ||
if (migrated) { | ||
for (const { key } of system.indexers) { | ||
indexers.push(await this._getWriterByKey(key)) | ||
const core = this.store.get(key) | ||
await core.ready() | ||
indexers.push({ core }) | ||
await core.close() | ||
} | ||
for (const { key } of system.views) { | ||
const core = this.store.get(key) | ||
await core.ready() | ||
prologues.push(core.manifest.prologue) | ||
await core.close() | ||
} | ||
} | ||
@@ -1192,3 +1238,3 @@ | ||
const sysView = this.system.core._source | ||
const sysInfo = { key: migrate ? migrate.key : key, length } | ||
const sysInfo = { key, length } | ||
@@ -1204,7 +1250,5 @@ views.set(sysView, sysInfo) | ||
// search for corresponding view | ||
if (!view && migrate) { | ||
if (!view) { | ||
for (view of this._viewStore.opened.values()) { | ||
const hash = view.core.session.core.tree.hash() | ||
const key = this.deriveKey(view.name, indexers, { hash, length: v.length }) | ||
const key = this.deriveKey(view.name, indexers, prologues[i]) | ||
if (b4a.equals(key, v.key)) break | ||
@@ -1215,3 +1259,3 @@ view = null | ||
if (!view || view.core.session.length < v.length) { | ||
if (!view) { | ||
this._clearFastForward() // something wrong somewhere, likely a bug, just safety | ||
@@ -1245,3 +1289,3 @@ return | ||
// manually set the digest | ||
if (migrate) this._setDigest(migrate.key) | ||
if (migrated) this._setDigest(key) | ||
@@ -1751,1 +1795,5 @@ const to = length | ||
} | ||
function isObject (obj) { | ||
return typeof obj === 'object' && obj !== null | ||
} |
{ | ||
"name": "autobase", | ||
"version": "6.0.0-rc4", | ||
"version": "6.0.0-rc5", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
160251
4290