Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

autobase

Package Overview
Dependencies
Maintainers
2
Versions
90
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

autobase - npm Package Compare versions

Comparing version 1.0.0-alpha.3 to 1.0.0-alpha.4

26

index.js

@@ -104,3 +104,3 @@ const streamx = require('streamx')

async _getInputNode (input, seq, opts = {}) {
async _getInputNode (input, seq, opts) {
if (seq < 0) return null

@@ -113,3 +113,3 @@ if (Buffer.isBuffer(input)) input = input.toString('hex')

const block = await input.get(seq)
const block = await input.get(seq, opts)
if (!block) return null

@@ -279,6 +279,7 @@

const positionsByKey = new Map()
const positionsByKey = opts.checkpoint || new Map()
const nodesByKey = new Map()
const snapshotLengthsByKey = new Map()
const wait = opts.wait !== false
let running = false

@@ -295,2 +296,3 @@ let bumped = false

}
stream.checkpoint = positionsByKey

@@ -332,4 +334,4 @@ this._readStreams.push(stream)

const unsatisfied = hasUnsatisfiedInputs(oldest, self._inputsByKey)
if (unsatisfied && opts.resolve) {
const resolved = await opts.resolve(oldest)
if (unsatisfied && opts.onresolve) {
const resolved = await opts.onresolve(oldest)
if (resolved !== false) continue

@@ -346,3 +348,3 @@ // If resolved is false, yield the unresolved node as usual

if (opts.wait) await opts.wait(mapped)
if (opts.onwait) await opts.onwait(mapped)
}

@@ -362,4 +364,6 @@ } finally {

async function updateAll () {
const allowUpdates = opts.live || opts.resolve || opts.wait // TODO: Make this behavior more customizable
const allowUpdates = opts.live || opts.onresolve || opts.onwait // TODO: Make this behavior more customizable
const inputKeys = allowUpdates ? self._inputsByKey.keys() : snapshotLengthsByKey.keys()
const loadPromises = []
for (const key of inputKeys) {

@@ -378,8 +382,12 @@ const input = self._inputsByKey.get(key)

} else {
if (pos >= input.length) await input.update()
if (pos >= input.length && wait) await input.update()
if (pos >= input.length) continue
}
nodesByKey.set(key, await self._getInputNode(input, pos))
loadPromises.push(self._getInputNode(input, pos, { wait }).then(node => [key, node]))
}
for (const [key, node] of await Promise.all(loadPromises)) {
nodesByKey.set(key, node)
}
}

@@ -386,0 +394,0 @@

{
"name": "autobase",
"version": "1.0.0-alpha.3",
"version": "1.0.0-alpha.4",
"description": "Autobase lets you write concise multiwriter data structures with Hypercore",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -62,3 +62,3 @@ # Autobase

outputs: null, // A list of output Hypercores
autocommit: true // Automatically persist changes to rebased indexes after updates
autocommit: true // Automatically persist changes to output Hypercores after updates
}

@@ -79,3 +79,3 @@ ```

#### `await Autobase.isAutobase(core)`
Returns `true` if `core` is either an Autobase input or a rebased index.
Returns `true` if `core` is an Autobase input or an output.

@@ -156,7 +156,15 @@ #### `await base.append(value, [clock], [input])`

Read streams have a public property `checkpoint`, which can be used to create new read streams that resume from the checkpoint's position:
```js
const stream1 = base.createReadStream()
// Do something with stream1 here
const stream2 = base.createReadStream({ checkpoint: stream1.checkpoint }) // Resume from stream1.checkpoint
```
`createReadStream` can be passed two custom async hooks:
* `resolve`: Called when an unsatisfied node (a node that links to an unknown input) is encountered. Can be used to dynamically add inputs to the Autobase.
* `onresolve`: Called when an unsatisfied node (a node that links to an unknown input) is encountered. Can be used to dynamically add inputs to the Autobase.
* Returning `true` indicates that you added new inputs to the Autobase, and so the read stream should begin processing those inputs.
* Returning `false` indicates that you did not resolve the missing links, and so the node should be yielded immediately as is.
* `wait`: Called after each node is yielded. Can be used to dynamically add inputs to the Autobase.
* `onwait`: Called after each node is yielded. Can be used to dynamically add inputs to the Autobase.

@@ -167,5 +175,7 @@ Options include:

live: false, // Enable live mode (the stream will continuously yield new nodes)
map: (node) => node // A sync map function
resolve: async (node) => true | false, // A resolve hook (described above)
wait: async (node) => undefined // A wait hook (described above)
map: (node) => node // A sync map function,
checkpoint: null, // Resume from where a previous read stream left off (`readStream.checkpoint`)
wait: true, // If false, the read stream will only yield previously-downloaded blocks.
onresolve: async (node) => true | false, // A resolve hook (described above)
onwait: async (node) => undefined // A wait hook (described above)
}

@@ -172,0 +182,0 @@ ```

@@ -117,3 +117,3 @@ const test = require('tape')

test('read stream - resolve hook, resolvable', async t => {
test('read stream - onresolve hook, resolvable', async t => {
const writerA = new Hypercore(ram)

@@ -135,3 +135,3 @@ const writerB = new Hypercore(ram)

{
// Without the resolve hook, the read stream should consider A to be purged
// Without the onresolve hook, the read stream should consider A to be purged
const output = await collect(base2.createReadStream())

@@ -143,5 +143,5 @@ t.same(output.length, 2)

{
// With the resolve hook, the read stream can be passed missing writers
// With the onresolve hook, the read stream can be passed missing writers
const output = await collect(base2.createReadStream({
async resolve (node) {
async onresolve (node) {
t.same(node.id, writerB.key.toString('hex'))

@@ -160,3 +160,3 @@ t.same(node.clock.get(writerA.key.toString('hex')), 0)

test('read stream - resolve hook, not resolvable', async t => {
test('read stream - onresolve hook, not resolvable', async t => {
const writerA = new Hypercore(ram)

@@ -178,3 +178,3 @@ const writerB = new Hypercore(ram)

{
// Without the resolve hook, the read stream should consider A to be purged
// Without the onresolve hook, the read stream should consider A to be purged
const output = await collect(base2.createReadStream())

@@ -186,5 +186,5 @@ t.same(output.length, 2)

{
// With the resolve hook, returning false should emit the unresolved nodes (same behavior as { resolve: undefined } option)
// With the onresolve hook, returning false should emit the unresolved nodes (same behavior as { onresolve: undefined } option)
const output = await collect(base2.createReadStream({
async resolve (node) {
async onresolve (node) {
t.same(node.id, writerB.key.toString('hex'))

@@ -202,3 +202,3 @@ t.same(node.clock.get(writerA.key.toString('hex')), 0)

test('read stream - wait hook', async t => {
test('read stream - onwait hook', async t => {
const writerA = new Hypercore(ram)

@@ -220,3 +220,3 @@ const writerB = new Hypercore(ram)

{
// Without the wait hook, the read stream should consider A to be purged
// Without the onwait hook, the read stream should consider A to be purged
const output = await collect(base2.createReadStream())

@@ -228,5 +228,5 @@ t.same(output.length, 2)

{
// With the wait hook, inputs can be added before the stream ends
// With the onwait hook, inputs can be added before the stream ends
const output = await collect(base2.createReadStream({
async wait (node) {
async onwait (node) {
if (node.value.toString() !== 'b1') return

@@ -243,2 +243,80 @@ await base2.addInput(writerA)

test('read stream - resume from checkpoint', async t => {
const writerA = new Hypercore(ram)
const writerB = new Hypercore(ram)
const writerC = new Hypercore(ram)
const base = new Autobase([writerA, writerB, writerC])
await base.ready()
// Create three dependent branches
for (let i = 0; i < 1; i++) {
await base.append(`a${i}`, await base.latest(writerA), writerA)
}
for (let i = 0; i < 2; i++) {
await base.append(`b${i}`, await base.latest(writerA), writerB)
}
for (let i = 0; i < 3; i++) {
await base.append(`c${i}`, await base.latest(writerC), writerC)
}
const firstStream = base.createReadStream()
{
const output = await collect(firstStream)
t.same(output.length, 6)
validateReadOrder(t, output)
}
// Add 3 more records to A -- not causally linked to B or C
for (let i = 1; i < 4; i++) {
await base.append(`a${i}`, await base.latest(writerA), writerA)
}
{
const output = await collect(base.createReadStream({ checkpoint: firstStream.checkpoint }))
t.same(output.length, 3)
validateReadOrder(t, output)
}
t.end()
})
test('read stream - { wait: false } will not download remote blocks', async t => {
const writerA = new Hypercore(ram)
await writerA.ready()
const writerB = new Hypercore(ram)
const remoteWriterA = new Hypercore(ram, writerA.key)
const base1 = new Autobase([writerA], { input: writerA })
const base2 = new Autobase([remoteWriterA, writerB], { input: writerB })
const s1 = writerA.replicate(true, { live: true })
const s2 = remoteWriterA.replicate(false, { live: true })
s1.pipe(s2).pipe(s1)
await base1.append('a0')
await base2.append('b0')
await base1.append('a1')
await base2.append('b1')
await remoteWriterA.get(0) // Download the first block
{
// With wait: false, the read stream should only yield locally-available nodes
const output = await collect(base2.createReadStream({ wait: false }))
t.same(output.length, 3)
validateReadOrder(t, output)
}
{
// The normal read stream should download all blocks.
const output = await collect(base2.createReadStream())
t.same(output.length, 4)
validateReadOrder(t, output)
}
t.end()
})
function validateReadOrder (t, nodes) {

@@ -245,0 +323,0 @@ for (let i = 0; i < nodes.length - 2; i++) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc