Comparing version 1.0.0-alpha.3 to 1.0.0-alpha.4
26
index.js
@@ -104,3 +104,3 @@ const streamx = require('streamx') | ||
async _getInputNode (input, seq, opts = {}) { | ||
async _getInputNode (input, seq, opts) { | ||
if (seq < 0) return null | ||
@@ -113,3 +113,3 @@ if (Buffer.isBuffer(input)) input = input.toString('hex') | ||
const block = await input.get(seq) | ||
const block = await input.get(seq, opts) | ||
if (!block) return null | ||
@@ -279,6 +279,7 @@ | ||
const positionsByKey = new Map() | ||
const positionsByKey = opts.checkpoint || new Map() | ||
const nodesByKey = new Map() | ||
const snapshotLengthsByKey = new Map() | ||
const wait = opts.wait !== false | ||
let running = false | ||
@@ -295,2 +296,3 @@ let bumped = false | ||
} | ||
stream.checkpoint = positionsByKey | ||
@@ -332,4 +334,4 @@ this._readStreams.push(stream) | ||
const unsatisfied = hasUnsatisfiedInputs(oldest, self._inputsByKey) | ||
if (unsatisfied && opts.resolve) { | ||
const resolved = await opts.resolve(oldest) | ||
if (unsatisfied && opts.onresolve) { | ||
const resolved = await opts.onresolve(oldest) | ||
if (resolved !== false) continue | ||
@@ -346,3 +348,3 @@ // If resolved is false, yield the unresolved node as usual | ||
if (opts.wait) await opts.wait(mapped) | ||
if (opts.onwait) await opts.onwait(mapped) | ||
} | ||
@@ -362,4 +364,6 @@ } finally { | ||
async function updateAll () { | ||
const allowUpdates = opts.live || opts.resolve || opts.wait // TODO: Make this behavior more customizable | ||
const allowUpdates = opts.live || opts.onresolve || opts.onwait // TODO: Make this behavior more customizable | ||
const inputKeys = allowUpdates ? self._inputsByKey.keys() : snapshotLengthsByKey.keys() | ||
const loadPromises = [] | ||
for (const key of inputKeys) { | ||
@@ -378,8 +382,12 @@ const input = self._inputsByKey.get(key) | ||
} else { | ||
if (pos >= input.length) await input.update() | ||
if (pos >= input.length && wait) await input.update() | ||
if (pos >= input.length) continue | ||
} | ||
nodesByKey.set(key, await self._getInputNode(input, pos)) | ||
loadPromises.push(self._getInputNode(input, pos, { wait }).then(node => [key, node])) | ||
} | ||
for (const [key, node] of await Promise.all(loadPromises)) { | ||
nodesByKey.set(key, node) | ||
} | ||
} | ||
@@ -386,0 +394,0 @@ |
{ | ||
"name": "autobase", | ||
"version": "1.0.0-alpha.3", | ||
"version": "1.0.0-alpha.4", | ||
"description": "Autobase lets you write concise multiwriter data structures with Hypercore", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -62,3 +62,3 @@ # Autobase | ||
outputs: null, // A list of output Hypercores | ||
autocommit: true // Automatically persist changes to rebased indexes after updates | ||
autocommit: true // Automatically persist changes to output Hypercores after updates | ||
} | ||
@@ -79,3 +79,3 @@ ``` | ||
#### `await Autobase.isAutobase(core)` | ||
Returns `true` if `core` is either an Autobase input or a rebased index. | ||
Returns `true` if `core` is an Autobase input or an output. | ||
@@ -156,7 +156,15 @@ #### `await base.append(value, [clock], [input])` | ||
Read streams have a public property `checkpoint`, which can be used to create new read streams that resume from the checkpoint's position: | ||
```js | ||
const stream1 = base.createReadStream() | ||
// Do something with stream1 here | ||
const stream2 = base.createReadStream({ checkpoint: stream1.checkpoint }) // Resume from stream1.checkpoint | ||
``` | ||
`createReadStream` can be passed two custom async hooks: | ||
* `resolve`: Called when an unsatisfied node (a node that links to an unknown input) is encountered. Can be used to dynamically add inputs to the Autobase. | ||
* `onresolve`: Called when an unsatisfied node (a node that links to an unknown input) is encountered. Can be used to dynamically add inputs to the Autobase. | ||
* Returning `true` indicates that you added new inputs to the Autobase, and so the read stream should begin processing those inputs. | ||
* Returning `false` indicates that you did not resolve the missing links, and so the node should be yielded immediately as is. | ||
* `wait`: Called after each node is yielded. Can be used to dynamically add inputs to the Autobase. | ||
* `onwait`: Called after each node is yielded. Can be used to dynamically add inputs to the Autobase. | ||
@@ -167,5 +175,7 @@ Options include: | ||
live: false, // Enable live mode (the stream will continuously yield new nodes) | ||
map: (node) => node // A sync map function | ||
resolve: async (node) => true | false, // A resolve hook (described above) | ||
wait: async (node) => undefined // A wait hook (described above) | ||
map: (node) => node // A sync map function, | ||
checkpoint: null, // Resume from where a previous read stream left off (`readStream.checkpoint`) | ||
wait: true, // If false, the read stream will only yield previously-downloaded blocks. | ||
onresolve: async (node) => true | false, // A resolve hook (described above) | ||
onwait: async (node) => undefined // A wait hook (described above) | ||
} | ||
@@ -172,0 +182,0 @@ ``` |
@@ -117,3 +117,3 @@ const test = require('tape') | ||
test('read stream - resolve hook, resolvable', async t => { | ||
test('read stream - onresolve hook, resolvable', async t => { | ||
const writerA = new Hypercore(ram) | ||
@@ -135,3 +135,3 @@ const writerB = new Hypercore(ram) | ||
{ | ||
// Without the resolve hook, the read stream should consider A to be purged | ||
// Without the onresolve hook, the read stream should consider A to be purged | ||
const output = await collect(base2.createReadStream()) | ||
@@ -143,5 +143,5 @@ t.same(output.length, 2) | ||
{ | ||
// With the resolve hook, the read stream can be passed missing writers | ||
// With the onresolve hook, the read stream can be passed missing writers | ||
const output = await collect(base2.createReadStream({ | ||
async resolve (node) { | ||
async onresolve (node) { | ||
t.same(node.id, writerB.key.toString('hex')) | ||
@@ -160,3 +160,3 @@ t.same(node.clock.get(writerA.key.toString('hex')), 0) | ||
test('read stream - resolve hook, not resolvable', async t => { | ||
test('read stream - onresolve hook, not resolvable', async t => { | ||
const writerA = new Hypercore(ram) | ||
@@ -178,3 +178,3 @@ const writerB = new Hypercore(ram) | ||
{ | ||
// Without the resolve hook, the read stream should consider A to be purged | ||
// Without the onresolve hook, the read stream should consider A to be purged | ||
const output = await collect(base2.createReadStream()) | ||
@@ -186,5 +186,5 @@ t.same(output.length, 2) | ||
{ | ||
// With the resolve hook, returning false should emit the unresolved nodes (same behavior as { resolve: undefined } option) | ||
// With the onresolve hook, returning false should emit the unresolved nodes (same behavior as { onresolve: undefined } option) | ||
const output = await collect(base2.createReadStream({ | ||
async resolve (node) { | ||
async onresolve (node) { | ||
t.same(node.id, writerB.key.toString('hex')) | ||
@@ -202,3 +202,3 @@ t.same(node.clock.get(writerA.key.toString('hex')), 0) | ||
test('read stream - wait hook', async t => { | ||
test('read stream - onwait hook', async t => { | ||
const writerA = new Hypercore(ram) | ||
@@ -220,3 +220,3 @@ const writerB = new Hypercore(ram) | ||
{ | ||
// Without the wait hook, the read stream should consider A to be purged | ||
// Without the onwait hook, the read stream should consider A to be purged | ||
const output = await collect(base2.createReadStream()) | ||
@@ -228,5 +228,5 @@ t.same(output.length, 2) | ||
{ | ||
// With the wait hook, inputs can be added before the stream ends | ||
// With the onwait hook, inputs can be added before the stream ends | ||
const output = await collect(base2.createReadStream({ | ||
async wait (node) { | ||
async onwait (node) { | ||
if (node.value.toString() !== 'b1') return | ||
@@ -243,2 +243,80 @@ await base2.addInput(writerA) | ||
test('read stream - resume from checkpoint', async t => { | ||
const writerA = new Hypercore(ram) | ||
const writerB = new Hypercore(ram) | ||
const writerC = new Hypercore(ram) | ||
const base = new Autobase([writerA, writerB, writerC]) | ||
await base.ready() | ||
// Create three dependent branches | ||
for (let i = 0; i < 1; i++) { | ||
await base.append(`a${i}`, await base.latest(writerA), writerA) | ||
} | ||
for (let i = 0; i < 2; i++) { | ||
await base.append(`b${i}`, await base.latest(writerA), writerB) | ||
} | ||
for (let i = 0; i < 3; i++) { | ||
await base.append(`c${i}`, await base.latest(writerC), writerC) | ||
} | ||
const firstStream = base.createReadStream() | ||
{ | ||
const output = await collect(firstStream) | ||
t.same(output.length, 6) | ||
validateReadOrder(t, output) | ||
} | ||
// Add 3 more records to A -- not causally linked to B or C | ||
for (let i = 1; i < 4; i++) { | ||
await base.append(`a${i}`, await base.latest(writerA), writerA) | ||
} | ||
{ | ||
const output = await collect(base.createReadStream({ checkpoint: firstStream.checkpoint })) | ||
t.same(output.length, 3) | ||
validateReadOrder(t, output) | ||
} | ||
t.end() | ||
}) | ||
test('read stream - { wait: false } will not download remote blocks', async t => { | ||
const writerA = new Hypercore(ram) | ||
await writerA.ready() | ||
const writerB = new Hypercore(ram) | ||
const remoteWriterA = new Hypercore(ram, writerA.key) | ||
const base1 = new Autobase([writerA], { input: writerA }) | ||
const base2 = new Autobase([remoteWriterA, writerB], { input: writerB }) | ||
const s1 = writerA.replicate(true, { live: true }) | ||
const s2 = remoteWriterA.replicate(false, { live: true }) | ||
s1.pipe(s2).pipe(s1) | ||
await base1.append('a0') | ||
await base2.append('b0') | ||
await base1.append('a1') | ||
await base2.append('b1') | ||
await remoteWriterA.get(0) // Download the first block | ||
{ | ||
// With wait: false, the read stream should only yield locally-available nodes | ||
const output = await collect(base2.createReadStream({ wait: false })) | ||
t.same(output.length, 3) | ||
validateReadOrder(t, output) | ||
} | ||
{ | ||
// The normal read stream should download all blocks. | ||
const output = await collect(base2.createReadStream()) | ||
t.same(output.length, 4) | ||
validateReadOrder(t, output) | ||
} | ||
t.end() | ||
}) | ||
function validateReadOrder (t, nodes) { | ||
@@ -245,0 +323,0 @@ for (let i = 0; i < nodes.length - 2; i++) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
94617
2423
252