Comparing version 6.1.3 to 6.1.4
181
index.js
@@ -10,2 +10,3 @@ const b4a = require('b4a') | ||
const CoreCoupler = require('core-coupler') | ||
const mutexify = require('mutexify/promise') | ||
@@ -21,3 +22,2 @@ const Linearizer = require('./lib/linearizer') | ||
const AutoWakeup = require('./lib/wakeup') | ||
const mutexify = require('mutexify/promise') | ||
@@ -97,3 +97,3 @@ const inspect = Symbol.for('nodejs.util.inspect.custom') | ||
this._hasPendingCheckpoint = false | ||
this._pendingRemoval = false | ||
this._pendingLocalRemoval = false | ||
this._completeRemovalAt = null | ||
@@ -802,3 +802,3 @@ this._systemPointer = 0 | ||
async _getWriterByKey (key, len, seen, allowGC, system) { | ||
async _getWriterByKey (key, len, seen, allowGC, isAdded, system) { | ||
assert(this._draining === true || (this.opening && !this.opened)) | ||
@@ -816,2 +816,3 @@ | ||
if (w !== null) { | ||
if (isAdded && w.core.writable && this.localWriter === null) this._setLocalWriter(w) | ||
w.seen(seen) | ||
@@ -821,21 +822,17 @@ return w | ||
let isActive = true | ||
const sys = system || this.system | ||
const writerInfo = await sys.get(key) | ||
if (len === -1) { | ||
const sys = system || this.system | ||
const writerInfo = await sys.get(key) | ||
// TODO: this indirectly disables backwards-dag-walk - we should reenable when FF is enabled | ||
// this is because the remote writer might not have our next heads in mem if it knows | ||
// that following the indexed sets makes that redundant. tmp(?) solution for now is to | ||
// just inflate the writers anyway - if FF is enabled simply jumping ahead is likely a better solution | ||
// if (writerInfo === null) return null | ||
// len = writerInfo.length | ||
if (!allowGC && writerInfo === null) return null | ||
len = writerInfo === null ? 0 : writerInfo.length | ||
isActive = writerInfo !== null && !writerInfo.isRemoved | ||
} | ||
w = this._makeWriter(key, len, isActive) | ||
const isActive = writerInfo !== null && (isAdded || !writerInfo.isRemoved) | ||
const isRemoved = len === 0 | ||
? writerInfo !== null && (!isAdded && writerInfo.isRemoved) | ||
: !isActive // a writer might have referenced a removed writer | ||
w = this._makeWriter(key, len, isActive, isRemoved) | ||
if (!w) return null | ||
@@ -877,3 +874,3 @@ | ||
_makeWriterCore (key, isActive) { | ||
_makeWriterCore (key) { | ||
const pooled = this.corePool.get(key) | ||
@@ -886,3 +883,2 @@ if (pooled) { | ||
const local = b4a.equals(key, this.local.key) | ||
if (local && !isActive) return null | ||
@@ -896,18 +892,15 @@ const core = local | ||
_makeWriter (key, length, isActive) { | ||
const core = this._makeWriterCore(key, isActive) | ||
if (!core) return null | ||
_makeWriter (key, length, isActive, isRemoved) { | ||
const core = this._makeWriterCore(key) | ||
const w = new Writer(this, core, length, isRemoved) | ||
const w = new Writer(this, core, length) | ||
if (core.writable) { | ||
this.localWriter = w | ||
if (this._ackInterval) this._startAckTimer() | ||
this.emit('writable') | ||
} else { | ||
core.on('append', this._onremotewriterchangeBound) | ||
core.on('download', this._onremotewriterchangeBound) | ||
core.on('manifest', this._onremotewriterchangeBound) | ||
if (isActive) this._setLocalWriter(w) // only set active writer | ||
return w | ||
} | ||
core.on('append', this._onremotewriterchangeBound) | ||
core.on('download', this._onremotewriterchangeBound) | ||
core.on('manifest', this._onremotewriterchangeBound) | ||
return w | ||
@@ -928,3 +921,3 @@ } | ||
if (this.localWriter !== null) return | ||
await this._getWriterByKey(this.local.key, -1, 0, false, sys) | ||
await this._getWriterByKey(this.local.key, -1, 0, false, false, sys) | ||
this._tryLoadingLocal = false | ||
@@ -960,3 +953,3 @@ } | ||
for (const head of sys.indexers) { | ||
const writer = await this._getWriterByKey(head.key, head.length, 0, false, sys) | ||
const writer = await this._getWriterByKey(head.key, head.length, 0, false, false, sys) | ||
writer.isIndexer = true | ||
@@ -970,8 +963,8 @@ writer.inflateBackground() | ||
for (const { key, length } of sys.heads) { | ||
await this._getWriterByKey(key, length, 0, false, sys) | ||
await this._getWriterByKey(key, length, 0, false, false, sys) | ||
} | ||
} | ||
async _reindex (nodes) { | ||
if (nodes && nodes.length) { | ||
async _reindex () { | ||
if (this._updates.length) { | ||
this._undoAll() | ||
@@ -983,2 +976,3 @@ await this.system.update() | ||
await this._closeAllActiveWriters(true) | ||
await this._makeLinearizer(this.system) | ||
@@ -991,24 +985,46 @@ if (!sameIndexers) await this._viewStore.migrate() | ||
if (nodes) { | ||
for (const node of nodes) node.reset() | ||
for (const node of nodes) this.linearizer.addHead(node) | ||
if (this.localWriter) { | ||
const value = await this.system.get(this.local.key) | ||
const length = value ? value.length : 0 | ||
this.localWriter.reset(length) | ||
} | ||
if (!this._pendingRemoval) return | ||
if (!this.localWriter || !this.localWriter.isIndexer) return | ||
this._pendingRemoval = false | ||
for (const idx of this.linearizer.indexers) { | ||
if (idx !== this.localWriter) continue | ||
this._pendingRemoval = true | ||
break | ||
if (!hasWriter(this.linearizer.indexers, this.localWriter)) { | ||
this._clearLocalIndexer() | ||
if (this._pendingLocalRemoval) this._unsetLocalWriter() | ||
} | ||
} | ||
if (this._pendingRemoval) return // still pending | ||
_onUpgrade (version) { | ||
if (version > this.maxSupportedVersion) throw new Error('Autobase upgrade required') | ||
} | ||
this._addCheckpoints = false | ||
_setLocalWriter (w) { | ||
this.localWriter = w | ||
if (this._ackInterval) this._startAckTimer() | ||
this.emit('writable') | ||
} | ||
_unsetLocalWriter () { | ||
if (!this.localWriter) return | ||
this._closeWriter(this.localWriter, true) | ||
if (this.localWriter.isIndexer) this._clearLocalIndexer() | ||
this.localWriter = null | ||
this._pendingLocalRemoval = false | ||
this.emit('unwritable') | ||
} | ||
_onUpgrade (version) { | ||
if (version > this.maxSupportedVersion) throw new Error('Autobase upgrade required') | ||
_clearLocalIndexer () { | ||
if (!this.localWriter) return | ||
this.localWriter.isIndexer = false | ||
if (this._ackTimer) this._ackTimer.stop() | ||
this._ackTimer = null | ||
this._addCheckpoints = false | ||
} | ||
@@ -1131,2 +1147,4 @@ | ||
if (this._pendingLocalRemoval && !this.localWriter.isIndexer) this._unsetLocalWriter() | ||
if (this.closing) return | ||
@@ -1159,3 +1177,3 @@ | ||
await this._gcWriters() | ||
await this._reindex(changed) | ||
await this._reindex() | ||
} | ||
@@ -1194,3 +1212,3 @@ } | ||
for (const { key } of this._wakeup) { | ||
await this._getWriterByKey(key, -1, 0, true, null) | ||
await this._getWriterByKey(key, -1, 0, true, false, null) | ||
} | ||
@@ -1202,3 +1220,3 @@ | ||
for (const { key } of await this._getLocallyStoredHeads()) { | ||
await this._getWriterByKey(key, -1, 0, true, null) | ||
await this._getWriterByKey(key, -1, 0, true, false, null) | ||
} | ||
@@ -1215,3 +1233,3 @@ } | ||
await this._getWriterByKey(key, -1, 0, true, null) | ||
await this._getWriterByKey(key, -1, 0, true, false, null) | ||
} | ||
@@ -1253,4 +1271,2 @@ | ||
if (this._pendingRemoval) return true | ||
// flush any pending indexers | ||
@@ -1319,3 +1335,3 @@ if (this.system.pendingIndexers.length > 0) { | ||
await this._closeAllActiveWriters() | ||
await this._closeAllActiveWriters(false) | ||
@@ -1518,2 +1534,3 @@ await this.system.update() | ||
if (value) info.localLength = value.length | ||
if (value.isRemoved) info.localLength = -1 | ||
} | ||
@@ -1647,3 +1664,3 @@ | ||
await system.close() | ||
await this._closeAllActiveWriters() | ||
await this._closeAllActiveWriters(false) | ||
@@ -1655,3 +1672,3 @@ this._undoAll() | ||
if (info) await view.catchup(info) | ||
else if (migrated) view.migrateTo(indexers, 0) | ||
else if (migrated) await view.migrateTo(indexers, 0) | ||
} | ||
@@ -1661,3 +1678,6 @@ | ||
if (this.localWriter) this.localWriter.reset(localLength) | ||
if (this.localWriter) { | ||
if (localLength < 0) this._unsetLocalWriter() | ||
else this.localWriter.reset(localLength) | ||
} | ||
@@ -1668,3 +1688,6 @@ await this._makeLinearizer(this.system) | ||
// manually set the digest | ||
if (migrated) this._setDigest(key) | ||
if (migrated) { | ||
this._setDigest(key) | ||
this.recouple() | ||
} | ||
@@ -1684,3 +1707,3 @@ if (b4a.equals(this.fastForwardTo.key, key) && this.fastForwardTo.length === length) { | ||
async _closeAllActiveWriters () { | ||
async _closeAllActiveWriters (keepPool) { | ||
for (const w of this.activeWriters) { | ||
@@ -1690,3 +1713,3 @@ if (this.localWriter === w) continue | ||
} | ||
await this.corePool.clear() | ||
if (keepPool) await this.corePool.clear() | ||
} | ||
@@ -1729,3 +1752,3 @@ | ||
const writer = (await this._getWriterByKey(key, -1, 0, false, null)) || this._makeWriter(key, 0, true) | ||
const writer = (await this._getWriterByKey(key, -1, 0, false, true, null)) || this._makeWriter(key, 0, true) | ||
await writer.ready() | ||
@@ -1751,7 +1774,7 @@ | ||
if (b4a.equals(key, this.local.key)) { | ||
if (this._addCheckpoints) this._pendingRemoval = true | ||
else this.localWriter = null // immediately remove | ||
} | ||
if (b4a.equals(key, this.local.key)) this._pendingLocalRemoval = true | ||
const w = this.activeWriters.get(key) | ||
if (w) w.isRemoved = true | ||
this._queueBump() | ||
@@ -1825,2 +1848,4 @@ } | ||
// todo: refresh the active writer set in case any were removed | ||
let batch = 0 | ||
@@ -1856,3 +1881,3 @@ let applyBatch = [] | ||
return u.indexed.slice(i).concat(u.tip) | ||
return true | ||
} | ||
@@ -1862,3 +1887,3 @@ | ||
if (this.fastForwardTo !== null && this.fastForwardTo.length > this.system.core.length && b4a.equals(this.fastForwardTo.key, this.system.core.key)) { | ||
return null | ||
return false | ||
} | ||
@@ -1881,3 +1906,3 @@ | ||
if (node.value !== null) { | ||
if (node.value !== null && !node.writer.isRemoved) { | ||
applyBatch.push({ | ||
@@ -1947,3 +1972,3 @@ indexed, | ||
return u.indexed.slice(i + 1).concat(u.tip) | ||
return true | ||
} | ||
@@ -1956,3 +1981,3 @@ | ||
return null | ||
return false | ||
} | ||
@@ -1964,3 +1989,3 @@ | ||
for (const { key, length } of this.system.indexers) { | ||
const indexer = await this._getWriterByKey(key, length, 0, false, null) | ||
const indexer = await this._getWriterByKey(key, length, 0, false, false, null) | ||
indexers.push(indexer) | ||
@@ -1999,3 +2024,3 @@ } | ||
for (const { key, length } of this.system.indexers) { | ||
const w = await this._getWriterByKey(key, length, 0, false, null) | ||
const w = await this._getWriterByKey(key, length, 0, false, false, null) | ||
@@ -2096,3 +2121,3 @@ if (length > w.core.length) localUnflushed = true // local writer has nodes in mem | ||
for (const { key } of info.indexers) { | ||
p.push(await this._getWriterByKey(key, -1, 0, false, null)) | ||
p.push(await this._getWriterByKey(key, -1, 0, false, false, null)) | ||
} | ||
@@ -2165,3 +2190,2 @@ | ||
this.localWriter._addCheckpoints(checkpoint) | ||
this._hasPendingCheckpoint = false | ||
@@ -2222,1 +2246,8 @@ } | ||
} | ||
function hasWriter (writers, target) { | ||
for (const w of writers) { | ||
if (w === target) return true | ||
} | ||
return false | ||
} |
@@ -5,2 +5,4 @@ const c = require('compact-encoding') | ||
const VERSION = 1 | ||
module.exports = class WakeupExtension { | ||
@@ -27,3 +29,5 @@ constructor (base, core) { | ||
sendWakeup (key, target) { | ||
const m = this._encodeWakeup() | ||
const m = this._encodeWakeup(VERSION) | ||
if (!m) return | ||
for (const peer of this.core.peers) { | ||
@@ -42,3 +46,3 @@ if (b4a.equals(peer.remotePublicKey, key)) { | ||
broadcastWakeup () { | ||
const m = this._encodeWakeup() | ||
const m = this._encodeWakeup(VERSION) | ||
if (m) this.extension.broadcast(m) | ||
@@ -45,0 +49,0 @@ } |
@@ -202,3 +202,8 @@ const b4a = require('b4a') | ||
_createManifest (indexers, name, prologue) { | ||
if (!indexers.length) return staticManifest(this._deriveStaticHash(name)) | ||
if (!indexers.length && !(prologue && prologue.length > 0)) { | ||
prologue = { | ||
hash: this._deriveStaticHash(name), | ||
length: 0 | ||
} | ||
} | ||
@@ -226,15 +231,2 @@ for (const idx of indexers) { | ||
function staticManifest (hash) { | ||
return { | ||
version: MANIFEST_VERSION, | ||
hash: 'blake2b', | ||
signers: [], | ||
quorum: 0, | ||
prologue: { | ||
hash, | ||
length: 0 | ||
} | ||
} | ||
} | ||
function getBlockKey (bootstrap, encryptionKey, name) { | ||
@@ -241,0 +233,0 @@ return encryptionKey && crypto.hash([NS_VIEW_BLOCK_KEY, bootstrap, encryptionKey, b4a.from(name)]) |
@@ -162,4 +162,6 @@ const Hyperbee = require('hyperbee') | ||
const key = b4a.from(hex, 'hex') | ||
const value = { isIndexer, isRemoved: false, length } | ||
const info = await this._batch.get(key, { valueEncoding: Member, keyEncoding: MEMBERS }) | ||
const value = { isIndexer, isRemoved: info.value.isRemoved, length } | ||
await this._batch.put(key, value, { valueEncoding: Member, keyEncoding: MEMBERS }) | ||
@@ -333,6 +335,9 @@ } | ||
const node = await this._batch.get(key, { valueEncoding: Member, keyEncoding: MEMBERS }) | ||
const length = node ? node.value.length : 0 | ||
await this._batch.put(key, { | ||
isIndexer: false, | ||
isRemoved: true, | ||
length: this._seenLength(key) | ||
length | ||
}, { | ||
@@ -339,0 +344,0 @@ valueEncoding: Member, |
@@ -13,3 +13,3 @@ const Linearizer = require('./linearizer') | ||
module.exports = class Writer extends ReadyResource { | ||
constructor (base, core, length) { | ||
constructor (base, core, length, isRemoved) { | ||
super() | ||
@@ -19,2 +19,3 @@ | ||
this.core = core | ||
this.isRemoved = isRemoved | ||
this.updated = false | ||
@@ -21,0 +22,0 @@ this.range = null |
{ | ||
"name": "autobase", | ||
"version": "6.1.3", | ||
"version": "6.1.4", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
181991
4897