Socket
Socket
Sign inDemoInstall

hyperbee

Package Overview
Dependencies
Maintainers
2
Versions
109
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hyperbee - npm Package Compare versions

Comparing version 2.6.2 to 2.7.0

137

index.js

@@ -1,2 +0,1 @@

const { EventEmitter } = require('events')
const codecs = require('codecs')

@@ -7,3 +6,2 @@ const { Readable } = require('streamx')

const safetyCatch = require('safety-catch')
const debounceify = require('debounceify')

@@ -392,12 +390,4 @@ const RangeIterator = require('./iterators/range')

watch (range, onchange) {
if (typeof range === 'function') {
onchange = range
range = undefined
}
const watcher = new Watcher(this, range)
if (onchange) watcher.on('change', onchange)
return watcher
watch (range) {
return new Watcher(this, range)
}

@@ -407,3 +397,3 @@

for (const watcher of this._watchers) {
watcher._onappendBound()
watcher._onappend()
}

@@ -461,3 +451,3 @@ }

for (const watcher of this._watchers) {
watcher.destroy()
await watcher.destroy()
}

@@ -872,6 +862,4 @@

class Watcher extends EventEmitter {
class Watcher {
constructor (bee, range) {
super()
bee._watchers.add(this)

@@ -885,11 +873,14 @@

this.running = false
this.latestDiff = 0
this.range = range
this.current = null
this.previous = null
this.stream = null
this._onappendBound = debounceify(this._onappend.bind(this))
this._lock = mutexify()
this._resolveOnChange = null
this._opening = this._ready().catch(safetyCatch)
this._opening = this._ready()
this._opening.catch(safetyCatch)
}

@@ -899,43 +890,78 @@

await this.bee.ready()
this.latestDiff = this.bee.version
this.current = this.bee.snapshot() // Point from which to start watching
this.opened = true
}
async _onappend () {
this.running = true
[Symbol.asyncIterator] () {
return this
}
_onappend () {
const resolve = this._resolveOnChange
this._resolveOnChange = null
if (resolve) resolve()
}
async _waitForChanges () {
if (this.current.version < this.bee.version || this.closed) return
await new Promise(resolve => {
this._resolveOnChange = resolve
})
}
async next () {
try {
await this._run()
return await this._next()
} catch (err) {
if (!this.closed) this.emit('error', err)
this.destroy()
} finally {
this.running = false
await this.destroy()
throw err
}
}
async _run () {
if (this.closed) return
if (this.opened === false) await this._opening
async _next () {
const release = await this._lock()
const snapshot = this.bee.snapshot()
this.stream = snapshot.createDiffStream(this.latestDiff, this.range)
try {
if (this.closed) return { value: undefined, done: true }
try {
for await (const data of this.stream) { // eslint-disable-line
this.emit('change', snapshot.version, this.latestDiff)
break
if (!this.opened) await this._opening
while (true) {
await this._waitForChanges()
if (this.closed) return { value: undefined, done: true }
if (this.previous) await this.previous.close()
this.previous = this.current.snapshot()
if (this.current) await this.current.close()
this.current = this.bee.snapshot()
this.stream = this.current.createDiffStream(this.previous.version, this.range)
try {
for await (const data of this.stream) { // eslint-disable-line
return { done: false, value: { current: this.current, previous: this.previous } }
}
} finally {
this.stream = null
}
}
} finally {
this.stream = null
this.latestDiff = snapshot.version
await snapshot.close()
release()
}
}
destroy () {
async return () {
await this.destroy()
return { done: true }
}
async destroy () {
if (this.closed) return
this.closed = true
this.bee._watchers.delete(this)
if (this.stream && !this.stream.destroying) {

@@ -945,6 +971,27 @@ this.stream.destroy()

this.bee._watchers.delete(this)
this._onappend() // Continue execution being closed
this.emit('close')
await this._closeSnapshots()
const release = await this._lock()
release()
}
_closeSnapshots () {
const closing = []
if (this.previous) {
const previous = this.previous
this.previous = null
closing.push(previous.close())
}
if (this.current) {
const current = this.current
this.current = null
closing.push(current.close())
}
return Promise.all(closing)
}
}

@@ -951,0 +998,0 @@

{
"name": "hyperbee",
"version": "2.6.2",
"version": "2.7.0",
"description": "An append-only B-tree running on a Hypercore.",

@@ -14,3 +14,2 @@ "main": "index.js",

"codecs": "^3.0.0",
"debounceify": "^1.0.0",
"mutexify": "^1.4.0",

@@ -17,0 +16,0 @@ "protocol-buffers-encodings": "^1.2.0",

@@ -233,3 +233,3 @@ # Hyperbee 🐝

#### `watcher = db.watch([range], [onchange])`
#### `watcher = db.watch([range])`

@@ -240,15 +240,18 @@ Listens to changes that are on the optional `range`.

`watcher.destroy()`\
Closes the watcher.
Usage example:
```js
for await (const { current, previous } of watcher) {
console.log(current.version)
console.log(previous.version)
}
```
`watcher.on('change', (newVersion, oldVersion) => {})`\
Emitted after a feed change.
Returns a new value after a change, `current` and `previous` are snapshots that are auto-closed before next value.
`watcher.on('error', onerror)`\
Critical and unexpected errors will be thrown, but watcher is normally graceful.\
Also, watcher auto closes on errors.
Don't close those snapshots yourself because they're used internally, let them be auto-closed.
`watcher.on('close', onclose)`\
Emitted after the watcher is closed.
`watcher.destroy()`
Stops the watcher. You could also stop it by using `break` in the loop.
#### `dbCheckout = db.checkout(version)`

@@ -255,0 +258,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc