Comparing version 6.0.6 to 6.0.7
82
index.js
@@ -346,2 +346,3 @@ const b4a = require('b4a') | ||
this.emit('reindexing') | ||
this._onreindexing(record).catch(safetyCatch) | ||
} | ||
@@ -355,11 +356,58 @@ | ||
if (this.fastForwardTo !== null) { | ||
const { key, length, timeout } = this.fastForwardTo | ||
const { key, timeout } = this.fastForwardTo | ||
this.fastForwardTo = null // will get reset once ready | ||
this.initialFastForward(key, timeout || DEFAULT_FF_TIMEOUT) | ||
} | ||
if (length !== 0) { | ||
this.initialFastForward(key, length, timeout || DEFAULT_FF_TIMEOUT) | ||
if (this.localWriter && this._ackInterval) this._startAckTimer() | ||
} | ||
async _onreindexing (record) { | ||
const { key, length } = messages.Checkout.decode({ buffer: record, start: 0, end: record.byteLength }) | ||
const encryptionKey = this._viewStore.getBlockKey(this._viewStore.getSystemCore().name) | ||
const core = this.store.get({ key, encryptionKey, isBlockKey: true }).batch({ checkout: length, session: false }) | ||
const base = this | ||
const system = new SystemView(core, length) | ||
await system.ready() | ||
const indexerCores = [] | ||
for (const { key } of system.indexers) { | ||
const core = this.store.get({ key, compat: false, valueEncoding: messages.OplogMessage, encryptionKey: this.encryptionKey }) | ||
indexerCores.push(core) | ||
} | ||
await system.close() | ||
for (const core of indexerCores) tail(core).catch(safetyCatch) | ||
async function onsyskey (key) { | ||
for (const core of indexerCores) await core.close() | ||
if (key === null || !base.reindexing) return | ||
base.initialFastForward(key, DEFAULT_FF_TIMEOUT) | ||
} | ||
async function tail (core) { | ||
await core.ready() | ||
while (base.reindexing) { | ||
const seq = core.length - 1 | ||
const blk = seq >= 0 ? await core.get(seq) : null | ||
if (blk && blk.version >= 1) { | ||
const sysKey = await getSystemKey(core, seq, blk) | ||
if (sysKey) return onsyskey(sysKey) | ||
} | ||
await core.get(core.length) // force get next blk | ||
} | ||
return onsyskey(null) | ||
} | ||
if (this.localWriter && this._ackInterval) this._startAckTimer() | ||
async function getSystemKey (core, seq, blk) { | ||
if (!blk.digest) return null | ||
if (blk.digest.key) return blk.digest.key | ||
const p = await core.get(seq - blk.digest.pointer) | ||
return p.digest && p.digest.key | ||
} | ||
} | ||
@@ -1128,6 +1176,30 @@ | ||
async initialFastForward (key, length, timeout) { | ||
async initialFastForward (key, timeout) { | ||
const encryptionKey = this._viewStore.getBlockKey(this._viewStore.getSystemCore().name) | ||
const core = this.store.get({ key, encryptionKey, isBlockKey: true }) | ||
await core.ready() | ||
// get length from network | ||
const length = await new Promise((resolve, reject) => { | ||
if (core.length) return resolve(core.length) | ||
const timer = setTimeout(() => { | ||
core.off('append', resolveLength) | ||
resolve(0) | ||
}, timeout) | ||
core.once('append', resolveLength) | ||
function resolveLength () { | ||
clearTimeout(timer) | ||
resolve(core.length) | ||
} | ||
}) | ||
if (!length) { | ||
await core.close() | ||
return | ||
} | ||
const target = await this._preFastForward(core, length, timeout) | ||
@@ -1134,0 +1206,0 @@ await core.close() |
{ | ||
"name": "autobase", | ||
"version": "6.0.6", | ||
"version": "6.0.7", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
166255
4442