Comparing version 2.6.2 to 2.7.0
137
index.js
@@ -1,2 +0,1 @@ | ||
const { EventEmitter } = require('events') | ||
const codecs = require('codecs') | ||
@@ -7,3 +6,2 @@ const { Readable } = require('streamx') | ||
const safetyCatch = require('safety-catch') | ||
const debounceify = require('debounceify') | ||
@@ -392,12 +390,4 @@ const RangeIterator = require('./iterators/range') | ||
watch (range, onchange) { | ||
if (typeof range === 'function') { | ||
onchange = range | ||
range = undefined | ||
} | ||
const watcher = new Watcher(this, range) | ||
if (onchange) watcher.on('change', onchange) | ||
return watcher | ||
watch (range) { | ||
return new Watcher(this, range) | ||
} | ||
@@ -407,3 +397,3 @@ | ||
for (const watcher of this._watchers) { | ||
watcher._onappendBound() | ||
watcher._onappend() | ||
} | ||
@@ -461,3 +451,3 @@ } | ||
for (const watcher of this._watchers) { | ||
watcher.destroy() | ||
await watcher.destroy() | ||
} | ||
@@ -872,6 +862,4 @@ | ||
class Watcher extends EventEmitter { | ||
class Watcher { | ||
constructor (bee, range) { | ||
super() | ||
bee._watchers.add(this) | ||
@@ -885,11 +873,14 @@ | ||
this.running = false | ||
this.latestDiff = 0 | ||
this.range = range | ||
this.current = null | ||
this.previous = null | ||
this.stream = null | ||
this._onappendBound = debounceify(this._onappend.bind(this)) | ||
this._lock = mutexify() | ||
this._resolveOnChange = null | ||
this._opening = this._ready().catch(safetyCatch) | ||
this._opening = this._ready() | ||
this._opening.catch(safetyCatch) | ||
} | ||
@@ -899,43 +890,78 @@ | ||
await this.bee.ready() | ||
this.latestDiff = this.bee.version | ||
this.current = this.bee.snapshot() // Point from which to start watching | ||
this.opened = true | ||
} | ||
async _onappend () { | ||
this.running = true | ||
[Symbol.asyncIterator] () { | ||
return this | ||
} | ||
_onappend () { | ||
const resolve = this._resolveOnChange | ||
this._resolveOnChange = null | ||
if (resolve) resolve() | ||
} | ||
async _waitForChanges () { | ||
if (this.current.version < this.bee.version || this.closed) return | ||
await new Promise(resolve => { | ||
this._resolveOnChange = resolve | ||
}) | ||
} | ||
async next () { | ||
try { | ||
await this._run() | ||
return await this._next() | ||
} catch (err) { | ||
if (!this.closed) this.emit('error', err) | ||
this.destroy() | ||
} finally { | ||
this.running = false | ||
await this.destroy() | ||
throw err | ||
} | ||
} | ||
async _run () { | ||
if (this.closed) return | ||
if (this.opened === false) await this._opening | ||
async _next () { | ||
const release = await this._lock() | ||
const snapshot = this.bee.snapshot() | ||
this.stream = snapshot.createDiffStream(this.latestDiff, this.range) | ||
try { | ||
if (this.closed) return { value: undefined, done: true } | ||
try { | ||
for await (const data of this.stream) { // eslint-disable-line | ||
this.emit('change', snapshot.version, this.latestDiff) | ||
break | ||
if (!this.opened) await this._opening | ||
while (true) { | ||
await this._waitForChanges() | ||
if (this.closed) return { value: undefined, done: true } | ||
if (this.previous) await this.previous.close() | ||
this.previous = this.current.snapshot() | ||
if (this.current) await this.current.close() | ||
this.current = this.bee.snapshot() | ||
this.stream = this.current.createDiffStream(this.previous.version, this.range) | ||
try { | ||
for await (const data of this.stream) { // eslint-disable-line | ||
return { done: false, value: { current: this.current, previous: this.previous } } | ||
} | ||
} finally { | ||
this.stream = null | ||
} | ||
} | ||
} finally { | ||
this.stream = null | ||
this.latestDiff = snapshot.version | ||
await snapshot.close() | ||
release() | ||
} | ||
} | ||
destroy () { | ||
async return () { | ||
await this.destroy() | ||
return { done: true } | ||
} | ||
async destroy () { | ||
if (this.closed) return | ||
this.closed = true | ||
this.bee._watchers.delete(this) | ||
if (this.stream && !this.stream.destroying) { | ||
@@ -945,6 +971,27 @@ this.stream.destroy() | ||
this.bee._watchers.delete(this) | ||
this._onappend() // Continue execution being closed | ||
this.emit('close') | ||
await this._closeSnapshots() | ||
const release = await this._lock() | ||
release() | ||
} | ||
_closeSnapshots () { | ||
const closing = [] | ||
if (this.previous) { | ||
const previous = this.previous | ||
this.previous = null | ||
closing.push(previous.close()) | ||
} | ||
if (this.current) { | ||
const current = this.current | ||
this.current = null | ||
closing.push(current.close()) | ||
} | ||
return Promise.all(closing) | ||
} | ||
} | ||
@@ -951,0 +998,0 @@ |
{ | ||
"name": "hyperbee", | ||
"version": "2.6.2", | ||
"version": "2.7.0", | ||
"description": "An append-only B-tree running on a Hypercore.", | ||
@@ -14,3 +14,2 @@ "main": "index.js", | ||
"codecs": "^3.0.0", | ||
"debounceify": "^1.0.0", | ||
"mutexify": "^1.4.0", | ||
@@ -17,0 +16,0 @@ "protocol-buffers-encodings": "^1.2.0", |
@@ -233,3 +233,3 @@ # Hyperbee 🐝 | ||
#### `watcher = db.watch([range], [onchange])` | ||
#### `watcher = db.watch([range])` | ||
@@ -240,15 +240,18 @@ Listens to changes that are on the optional `range`. | ||
`watcher.destroy()`\ | ||
Closes the watcher. | ||
Usage example: | ||
```js | ||
for await (const { current, previous } of watcher) { | ||
console.log(current.version) | ||
console.log(previous.version) | ||
} | ||
``` | ||
`watcher.on('change', (newVersion, oldVersion) => {})`\ | ||
Emitted after a feed change. | ||
Returns a new value after a change, `current` and `previous` are snapshots that are auto-closed before next value. | ||
`watcher.on('error', onerror)`\ | ||
Critical and unexpected errors will be thrown, but watcher is normally graceful.\ | ||
Also, watcher auto closes on errors. | ||
Don't close those snapshots yourself because they're used internally, let them be auto-closed. | ||
`watcher.on('close', onclose)`\ | ||
Emitted after the watcher is closed. | ||
`watcher.destroy()` | ||
Stops the watcher. You could also stop it by using `break` in the loop. | ||
#### `dbCheckout = db.checkout(version)` | ||
@@ -255,0 +258,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
82205
6
2312
308
- Removeddebounceify@^1.0.0
- Removeddebounceify@1.1.0(transitive)