@alcalzone/jsonl-db
Advanced tools
Comparing version 1.0.1 to 1.1.0
@@ -39,2 +39,16 @@ import * as fs from "fs-extra"; | ||
}>; | ||
/** | ||
* Can be used to throttle write accesses to the filesystem. By default, | ||
* every change is immediately written to the FS | ||
*/ | ||
throttleFS?: { | ||
/** | ||
* Minimum wait time between two consecutive write accesses. Default: 0 | ||
*/ | ||
intervalMs: number; | ||
/** | ||
* Maximum commands to be buffered before forcing a write access. Default: +Infinity | ||
*/ | ||
maxBufferedCommands?: number; | ||
}; | ||
} | ||
@@ -66,2 +80,4 @@ export declare class JsonlDB<V extends unknown = unknown> { | ||
private _writeBacklog; | ||
private _writeCorkCount; | ||
private _writeCorkTimeout; | ||
private _dumpBacklog; | ||
@@ -82,2 +98,5 @@ private compressInterval; | ||
private needToCompress; | ||
private cork; | ||
private uncork; | ||
private autoCork; | ||
/** | ||
@@ -94,6 +113,9 @@ * Writes a line into the correct backlog | ||
private compressPromise; | ||
private compressInternal; | ||
/** Compresses the db by dumping it and overwriting the aof file. */ | ||
compress(): Promise<void>; | ||
private _closeDBPromise; | ||
private _closeDumpPromise; | ||
/** Resolves when the `writeThread()` is finished */ | ||
private _writePromise; | ||
/** Resolves when the `dump()` method is finished */ | ||
private _dumpPromise; | ||
/** Closes the DB and waits for all data to be written */ | ||
@@ -100,0 +122,0 @@ close(): Promise<void>; |
@@ -22,2 +22,3 @@ "use strict"; | ||
this._isOpen = false; | ||
this._writeCorkCount = 0; | ||
this.validateOptions(options); | ||
@@ -62,2 +63,11 @@ this.filename = filename; | ||
} | ||
if (options.throttleFS) { | ||
const { intervalMs, maxBufferedCommands } = options.throttleFS; | ||
if (intervalMs < 0) { | ||
throw new Error("intervalMs must be >= 0"); | ||
} | ||
if (maxBufferedCommands != undefined && maxBufferedCommands < 0) { | ||
throw new Error("maxBufferedCommands must be >= 0"); | ||
} | ||
} | ||
} | ||
@@ -195,3 +205,2 @@ get size() { | ||
} | ||
// TODO: use cork() and uncork() to throttle filesystem accesses | ||
updateStatistics(command) { | ||
@@ -218,2 +227,44 @@ if (command === "") { | ||
} | ||
cork() { | ||
/* istanbul ignore else - this is impossible to test */ | ||
if (this._writeBacklog && this._writeCorkCount === 0) { | ||
this._writeBacklog.cork(); | ||
this._writeCorkCount++; | ||
} | ||
} | ||
uncork() { | ||
if (this._writeCorkCount > 0 && this._writeCorkTimeout) { | ||
clearTimeout(this._writeCorkTimeout); | ||
this._writeCorkTimeout = undefined; | ||
} | ||
while (this._writeBacklog && this._writeCorkCount > 0) { | ||
this._writeBacklog.uncork(); | ||
this._writeCorkCount--; | ||
} | ||
} | ||
autoCork() { | ||
var _a; | ||
if (!((_a = this.options.throttleFS) === null || _a === void 0 ? void 0 : _a.intervalMs)) | ||
return; | ||
const schedule = () => { | ||
if (this._writeCorkTimeout) { | ||
clearTimeout(this._writeCorkTimeout); | ||
} | ||
this._writeCorkTimeout = setTimeout(() => maybeUncork.bind(this)(), this.options.throttleFS.intervalMs); | ||
}; | ||
function maybeUncork() { | ||
if (this._writeBacklog && this._writeBacklog.writableLength > 0) { | ||
// This gets the stream flowing again. The write thread will call | ||
// autoCork when it is done | ||
this.uncork(); | ||
} | ||
else { | ||
// Nothing to uncork, schedule the next timeout | ||
schedule(); | ||
} | ||
} | ||
// Cork once and schedule the uncork | ||
this.cork(); | ||
schedule(); | ||
} | ||
/** | ||
@@ -224,2 +275,3 @@ * Writes a line into the correct backlog | ||
write(line, noAutoCompress = false) { | ||
var _a; | ||
/* istanbul ignore else */ | ||
@@ -239,2 +291,9 @@ if (this._compressBacklog && !this._compressBacklog.destroyed) { | ||
this._writeBacklog.write(line); | ||
// If this is a throttled stream, uncork it as soon as the write | ||
// buffer is larger than configured | ||
if (((_a = this.options.throttleFS) === null || _a === void 0 ? void 0 : _a.maxBufferedCommands) != undefined && | ||
this._writeBacklog.writableLength > | ||
this.options.throttleFS.maxBufferedCommands) { | ||
this.uncork(); | ||
} | ||
} | ||
@@ -260,3 +319,3 @@ } | ||
async dump() { | ||
this._closeDumpPromise = deferred_promise_1.createDeferredPromise(); | ||
this._dumpPromise = deferred_promise_1.createDeferredPromise(); | ||
// Open the file for writing (or truncate if it exists) | ||
@@ -284,3 +343,3 @@ this._dumpFd = await fs.open(this.dumpFilename, "w+"); | ||
this._dumpFd = undefined; | ||
this._closeDumpPromise.resolve(); | ||
this._dumpPromise.resolve(); | ||
} | ||
@@ -290,5 +349,7 @@ /** Asynchronously performs all write actions */ | ||
var e_1, _a; | ||
var _b, _c; | ||
var _b; | ||
// This must be called before any awaits | ||
this._writeBacklog = new stream.PassThrough({ objectMode: true }); | ||
this.autoCork(); | ||
this._writePromise = deferred_promise_1.createDeferredPromise(); | ||
// Open the file for appending and reading | ||
@@ -298,4 +359,4 @@ this._fd = await fs.open(this.filename, "a+"); | ||
try { | ||
for (var _d = __asyncValues(this._writeBacklog), _e; _e = await _d.next(), !_e.done;) { | ||
const action = _e.value; | ||
for (var _c = __asyncValues(this._writeBacklog), _d; _d = await _c.next(), !_d.done;) { | ||
const action = _d.value; | ||
if (action === "") { | ||
@@ -310,2 +371,6 @@ // Since we opened the file in append mode, we cannot truncate | ||
} | ||
// When this is a throttled stream, auto-cork it when it was drained | ||
if (this._writeBacklog.readableLength === 0) { | ||
this.autoCork(); | ||
} | ||
} | ||
@@ -316,13 +381,13 @@ } | ||
try { | ||
if (_e && !_e.done && (_a = _d.return)) await _a.call(_d); | ||
if (_d && !_d.done && (_a = _c.return)) await _a.call(_c); | ||
} | ||
finally { if (e_1) throw e_1.error; } | ||
} | ||
this._writeBacklog.destroy(); | ||
// The write backlog was closed, this means that the DB is being closed | ||
// close the file and resolve the close promise | ||
await fs.close(this._fd); | ||
(_c = this._closeDBPromise) === null || _c === void 0 ? void 0 : _c.resolve(); | ||
this._writePromise.resolve(); | ||
} | ||
/** Compresses the db by dumping it and overwriting the aof file. */ | ||
async compress() { | ||
async compressInternal() { | ||
if (!this._writeBacklog || this.compressPromise) | ||
@@ -337,9 +402,9 @@ return; | ||
// After dumping, restart the write thread so no duplicate entries get written | ||
this._closeDBPromise = deferred_promise_1.createDeferredPromise(); | ||
// Disable writing into the backlog stream and buffer all writes | ||
// in the compress backlog in the meantime | ||
this._compressBacklog = new stream.PassThrough({ objectMode: true }); | ||
this.uncork(); | ||
this._writeBacklog.end(); | ||
await this._writePromise; | ||
this._writeBacklog = undefined; | ||
await this._closeDBPromise; | ||
// Replace the aof file | ||
@@ -365,8 +430,16 @@ await fs.move(this.filename, this.filename + ".bak"); | ||
} | ||
/** Compresses the db by dumping it and overwriting the aof file. */ | ||
async compress() { | ||
if (!this._isOpen) | ||
return; | ||
return this.compressInternal(); | ||
} | ||
/** Closes the DB and waits for all data to be written */ | ||
async close() { | ||
var _a, _b; | ||
var _a; | ||
this._isOpen = false; | ||
if (this.compressInterval) | ||
clearInterval(this.compressInterval); | ||
if (this._writeCorkTimeout) | ||
clearTimeout(this._writeCorkTimeout); | ||
if (this.compressPromise) { | ||
@@ -378,24 +451,26 @@ // Wait until any pending compress processes are complete | ||
// Compress if required | ||
await this.compress(); | ||
await this.compressInternal(); | ||
} | ||
// Disable writing into the backlog stream and wait for the write process to finish | ||
if (this._writeBacklog) { | ||
this._closeDBPromise = deferred_promise_1.createDeferredPromise(); | ||
// Disable writing into the backlog stream | ||
this.uncork(); | ||
this._writeBacklog.end(); | ||
this._writeBacklog = undefined; | ||
// Disable writing into the dump backlog stream | ||
(_b = this._dumpBacklog) === null || _b === void 0 ? void 0 : _b.end(); | ||
this._dumpBacklog = undefined; | ||
await this._closeDBPromise; | ||
await this._writePromise; | ||
} | ||
// Also wait for a potential dump process to finish | ||
if (this._closeDumpPromise) { | ||
await this._closeDumpPromise; | ||
/* istanbul ignore next - this is impossible to test since it requires exact timing */ | ||
if (this._dumpBacklog) { | ||
// Disable writing into the dump backlog stream | ||
this._dumpBacklog.end(); | ||
await this._dumpPromise; | ||
} | ||
// Reset all variables | ||
this._closeDBPromise = undefined; | ||
this._closeDumpPromise = undefined; | ||
this._writePromise = undefined; | ||
this._dumpPromise = undefined; | ||
this._db.clear(); | ||
this._fd = undefined; | ||
this._dumpFd = undefined; | ||
this._changesSinceLastCompress = 0; | ||
this._uncompressedSize = Number.NaN; | ||
this._writeCorkCount = 0; | ||
} | ||
@@ -402,0 +477,0 @@ } |
{ | ||
"name": "@alcalzone/jsonl-db", | ||
"version": "1.0.1", | ||
"version": "1.1.0", | ||
"description": "Simple JSONL-based key-value store", | ||
@@ -45,3 +45,3 @@ "main": "./build/index.js", | ||
"@babel/plugin-proposal-optional-chaining": "^7.9.0", | ||
"@babel/preset-env": "^7.9.5", | ||
"@babel/preset-env": "^7.9.6", | ||
"@babel/preset-typescript": "^7.9.0", | ||
@@ -56,4 +56,4 @@ "@commitlint/cli": "^8.3.5", | ||
"@types/node": "^13.13.4", | ||
"@typescript-eslint/eslint-plugin": "^2.29.0", | ||
"@typescript-eslint/parser": "^2.29.0", | ||
"@typescript-eslint/eslint-plugin": "^2.30.0", | ||
"@typescript-eslint/parser": "^2.30.0", | ||
"commitizen": "^4.0.4", | ||
@@ -60,0 +60,0 @@ "coveralls": "^3.1.0", |
@@ -36,2 +36,5 @@ # jsonl-db | ||
``` | ||
### Handling invalid data | ||
If corrupt data is encountered while opening the DB, the call to `open()` will be rejected. If this is to be expected, use the options parameter on the constructor to turn on forgiving behavior: | ||
@@ -44,2 +47,4 @@ ```ts | ||
### Support custom objects/values | ||
You can optionally transform the parsed values by passing a reviver function. This allows storing non-primitive objects in the database if those can be transformed to JSON (e.g. by overwriting the `toJSON` method). | ||
@@ -55,2 +60,4 @@ ```ts | ||
### Closing the database | ||
Data written to the DB is persisted asynchronously. Be sure to call `close()` when you no longer need the database in order to flush all pending writes and close all files: | ||
@@ -63,4 +70,18 @@ | ||
### Controlling file system access | ||
By default, the database immediately writes to the database file. You can throttle the write accesses using the `throttleFS` constructor option. Be aware that buffered data will be lost in case the process crashes. | ||
```ts | ||
const db = new DB("/path/to/file", { throttleFS: { /* throttle options */ } }); | ||
``` | ||
The following options exist: | ||
| Option | Default | Description | | ||
|-----------------|---------|-------------| | ||
| intervalMs | 0 | Write to the database file no more than every `intervalMs` milliseconds. | | ||
| maxBufferedCommands | +Infinity | Force a write after `maxBufferedCommands` have been buffered. This reduces memory consumption and data loss in case of a crash. | | ||
To create a compressed copy of the database in `/path/to/file.dump`, use the `dump()` method. If any data is written to the db during the dump, it is appended to the dump but most likely compressed. | ||
### Copying and compressing the database | ||
```ts | ||
@@ -92,2 +113,4 @@ await db.dump(); | ||
### Import / Export | ||
Importing JSON files can be done this way: | ||
@@ -115,2 +138,5 @@ ```ts | ||
### 1.1.0 (2020-05-02) | ||
Added functionality to throttle write accesses | ||
### 1.0.1 (2020-04-29) | ||
@@ -117,0 +143,0 @@ Export `JsonlDBOptions` from the main entry point |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
50207
587
170