Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@alcalzone/jsonl-db

Package Overview
Dependencies
Maintainers
1
Versions
35
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@alcalzone/jsonl-db - npm Package Compare versions

Comparing version 1.0.1 to 1.1.0

26

build/lib/db.d.ts

@@ -39,2 +39,16 @@ import * as fs from "fs-extra";

}>;
/**
* Can be used to throttle write accesses to the filesystem. By default,
* every change is immediately written to the FS
*/
throttleFS?: {
/**
* Minimum wait time between two consecutive write accesses. Default: 0
*/
intervalMs: number;
/**
* Maximum commands to be buffered before forcing a write access. Default: +Infinity
*/
maxBufferedCommands?: number;
};
}

@@ -66,2 +80,4 @@ export declare class JsonlDB<V extends unknown = unknown> {

private _writeBacklog;
private _writeCorkCount;
private _writeCorkTimeout;
private _dumpBacklog;

@@ -82,2 +98,5 @@ private compressInterval;

private needToCompress;
private cork;
private uncork;
private autoCork;
/**

@@ -94,6 +113,9 @@ * Writes a line into the correct backlog

private compressPromise;
private compressInternal;
/** Compresses the db by dumping it and overwriting the aof file. */
compress(): Promise<void>;
private _closeDBPromise;
private _closeDumpPromise;
/** Resolves when the `writeThread()` is finished */
private _writePromise;
/** Resolves when the `dump()` method is finished */
private _dumpPromise;
/** Closes the DB and waits for all data to be written */

@@ -100,0 +122,0 @@ close(): Promise<void>;

125

build/lib/db.js

@@ -22,2 +22,3 @@ "use strict";

this._isOpen = false;
this._writeCorkCount = 0;
this.validateOptions(options);

@@ -62,2 +63,11 @@ this.filename = filename;

}
if (options.throttleFS) {
const { intervalMs, maxBufferedCommands } = options.throttleFS;
if (intervalMs < 0) {
throw new Error("intervalMs must be >= 0");
}
if (maxBufferedCommands != undefined && maxBufferedCommands < 0) {
throw new Error("maxBufferedCommands must be >= 0");
}
}
}

@@ -195,3 +205,2 @@ get size() {

}
// TODO: use cork() and uncork() to throttle filesystem accesses
updateStatistics(command) {

@@ -218,2 +227,44 @@ if (command === "") {

}
cork() {
/* istanbul ignore else - this is impossible to test */
if (this._writeBacklog && this._writeCorkCount === 0) {
this._writeBacklog.cork();
this._writeCorkCount++;
}
}
uncork() {
if (this._writeCorkCount > 0 && this._writeCorkTimeout) {
clearTimeout(this._writeCorkTimeout);
this._writeCorkTimeout = undefined;
}
while (this._writeBacklog && this._writeCorkCount > 0) {
this._writeBacklog.uncork();
this._writeCorkCount--;
}
}
autoCork() {
var _a;
if (!((_a = this.options.throttleFS) === null || _a === void 0 ? void 0 : _a.intervalMs))
return;
const schedule = () => {
if (this._writeCorkTimeout) {
clearTimeout(this._writeCorkTimeout);
}
this._writeCorkTimeout = setTimeout(() => maybeUncork.bind(this)(), this.options.throttleFS.intervalMs);
};
function maybeUncork() {
if (this._writeBacklog && this._writeBacklog.writableLength > 0) {
// This gets the stream flowing again. The write thread will call
// autoCork when it is done
this.uncork();
}
else {
// Nothing to uncork, schedule the next timeout
schedule();
}
}
// Cork once and schedule the uncork
this.cork();
schedule();
}
/**

@@ -224,2 +275,3 @@ * Writes a line into the correct backlog

write(line, noAutoCompress = false) {
var _a;
/* istanbul ignore else */

@@ -239,2 +291,9 @@ if (this._compressBacklog && !this._compressBacklog.destroyed) {

this._writeBacklog.write(line);
// If this is a throttled stream, uncork it as soon as the write
// buffer is larger than configured
if (((_a = this.options.throttleFS) === null || _a === void 0 ? void 0 : _a.maxBufferedCommands) != undefined &&
this._writeBacklog.writableLength >
this.options.throttleFS.maxBufferedCommands) {
this.uncork();
}
}

@@ -260,3 +319,3 @@ }

async dump() {
this._closeDumpPromise = deferred_promise_1.createDeferredPromise();
this._dumpPromise = deferred_promise_1.createDeferredPromise();
// Open the file for writing (or truncate if it exists)

@@ -284,3 +343,3 @@ this._dumpFd = await fs.open(this.dumpFilename, "w+");

this._dumpFd = undefined;
this._closeDumpPromise.resolve();
this._dumpPromise.resolve();
}

@@ -290,5 +349,7 @@ /** Asynchronously performs all write actions */

var e_1, _a;
var _b, _c;
var _b;
// This must be called before any awaits
this._writeBacklog = new stream.PassThrough({ objectMode: true });
this.autoCork();
this._writePromise = deferred_promise_1.createDeferredPromise();
// Open the file for appending and reading

@@ -298,4 +359,4 @@ this._fd = await fs.open(this.filename, "a+");

try {
for (var _d = __asyncValues(this._writeBacklog), _e; _e = await _d.next(), !_e.done;) {
const action = _e.value;
for (var _c = __asyncValues(this._writeBacklog), _d; _d = await _c.next(), !_d.done;) {
const action = _d.value;
if (action === "") {

@@ -310,2 +371,6 @@ // Since we opened the file in append mode, we cannot truncate

}
// When this is a throttled stream, auto-cork it when it was drained
if (this._writeBacklog.readableLength === 0) {
this.autoCork();
}
}

@@ -316,13 +381,13 @@ }

try {
if (_e && !_e.done && (_a = _d.return)) await _a.call(_d);
if (_d && !_d.done && (_a = _c.return)) await _a.call(_c);
}
finally { if (e_1) throw e_1.error; }
}
this._writeBacklog.destroy();
// The write backlog was closed, this means that the DB is being closed
// close the file and resolve the close promise
await fs.close(this._fd);
(_c = this._closeDBPromise) === null || _c === void 0 ? void 0 : _c.resolve();
this._writePromise.resolve();
}
/** Compresses the db by dumping it and overwriting the aof file. */
async compress() {
async compressInternal() {
if (!this._writeBacklog || this.compressPromise)

@@ -337,9 +402,9 @@ return;

// After dumping, restart the write thread so no duplicate entries get written
this._closeDBPromise = deferred_promise_1.createDeferredPromise();
// Disable writing into the backlog stream and buffer all writes
// in the compress backlog in the meantime
this._compressBacklog = new stream.PassThrough({ objectMode: true });
this.uncork();
this._writeBacklog.end();
await this._writePromise;
this._writeBacklog = undefined;
await this._closeDBPromise;
// Replace the aof file

@@ -365,8 +430,16 @@ await fs.move(this.filename, this.filename + ".bak");

}
/** Compresses the db by dumping it and overwriting the aof file. */
async compress() {
if (!this._isOpen)
return;
return this.compressInternal();
}
/** Closes the DB and waits for all data to be written */
async close() {
var _a, _b;
var _a;
this._isOpen = false;
if (this.compressInterval)
clearInterval(this.compressInterval);
if (this._writeCorkTimeout)
clearTimeout(this._writeCorkTimeout);
if (this.compressPromise) {

@@ -378,24 +451,26 @@ // Wait until any pending compress processes are complete

// Compress if required
await this.compress();
await this.compressInternal();
}
// Disable writing into the backlog stream and wait for the write process to finish
if (this._writeBacklog) {
this._closeDBPromise = deferred_promise_1.createDeferredPromise();
// Disable writing into the backlog stream
this.uncork();
this._writeBacklog.end();
this._writeBacklog = undefined;
// Disable writing into the dump backlog stream
(_b = this._dumpBacklog) === null || _b === void 0 ? void 0 : _b.end();
this._dumpBacklog = undefined;
await this._closeDBPromise;
await this._writePromise;
}
// Also wait for a potential dump process to finish
if (this._closeDumpPromise) {
await this._closeDumpPromise;
/* istanbul ignore next - this is impossible to test since it requires exact timing */
if (this._dumpBacklog) {
// Disable writing into the dump backlog stream
this._dumpBacklog.end();
await this._dumpPromise;
}
// Reset all variables
this._closeDBPromise = undefined;
this._closeDumpPromise = undefined;
this._writePromise = undefined;
this._dumpPromise = undefined;
this._db.clear();
this._fd = undefined;
this._dumpFd = undefined;
this._changesSinceLastCompress = 0;
this._uncompressedSize = Number.NaN;
this._writeCorkCount = 0;
}

@@ -402,0 +477,0 @@ }

{
"name": "@alcalzone/jsonl-db",
"version": "1.0.1",
"version": "1.1.0",
"description": "Simple JSONL-based key-value store",

@@ -45,3 +45,3 @@ "main": "./build/index.js",

"@babel/plugin-proposal-optional-chaining": "^7.9.0",
"@babel/preset-env": "^7.9.5",
"@babel/preset-env": "^7.9.6",
"@babel/preset-typescript": "^7.9.0",

@@ -56,4 +56,4 @@ "@commitlint/cli": "^8.3.5",

"@types/node": "^13.13.4",
"@typescript-eslint/eslint-plugin": "^2.29.0",
"@typescript-eslint/parser": "^2.29.0",
"@typescript-eslint/eslint-plugin": "^2.30.0",
"@typescript-eslint/parser": "^2.30.0",
"commitizen": "^4.0.4",

@@ -60,0 +60,0 @@ "coveralls": "^3.1.0",

@@ -36,2 +36,5 @@ # jsonl-db

```
### Handling invalid data
If corrupt data is encountered while opening the DB, the call to `open()` will be rejected. If this is to be expected, use the options parameter on the constructor to turn on forgiving behavior:

@@ -44,2 +47,4 @@ ```ts

### Support custom objects/values
You can optionally transform the parsed values by passing a reviver function. This allows storing non-primitive objects in the database if those can be transformed to JSON (e.g. by overwriting the `toJSON` method).

@@ -55,2 +60,4 @@ ```ts

### Closing the database
Data written to the DB is persisted asynchronously. Be sure to call `close()` when you no longer need the database in order to flush all pending writes and close all files:

@@ -63,4 +70,18 @@

### Controlling file system access
By default, the database immediately writes to the database file. You can throttle the write accesses using the `throttleFS` constructor option. Be aware that buffered data will be lost in case the process crashes.
```ts
const db = new DB("/path/to/file", { throttleFS: { /* throttle options */ } });
```
The following options exist:
| Option | Default | Description |
|-----------------|---------|-------------|
| intervalMs | 0 | Write to the database file no more than every `intervalMs` milliseconds. |
| maxBufferedCommands | +Infinity | Force a write after `maxBufferedCommands` have been buffered. This reduces memory consumption and data loss in case of a crash. |
To create a compressed copy of the database in `/path/to/file.dump`, use the `dump()` method. If any data is written to the db during the dump, it is appended to the dump but most likely compressed.
### Copying and compressing the database
```ts

@@ -92,2 +113,4 @@ await db.dump();

### Import / Export
Importing JSON files can be done this way:

@@ -115,2 +138,5 @@ ```ts

### 1.1.0 (2020-05-02)
Added functionality to throttle write accesses
### 1.0.1 (2020-04-29)

@@ -117,0 +143,0 @@ Export `JsonlDBOptions` from the main entry point

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc