@alcalzone/jsonl-db
Advanced tools
Comparing version 2.4.1 to 2.4.2
@@ -96,10 +96,7 @@ export interface JsonlDBOptions<V> { | ||
get isOpen(): boolean; | ||
private _persistencePromise; | ||
private _persistenceTasks; | ||
private _journal; | ||
private _fd; | ||
private _dumpFd; | ||
private _compressBacklog; | ||
private _writeBacklog; | ||
private _writeCorkCount; | ||
private _writeCorkTimeout; | ||
private _dumpBacklog; | ||
private _compressInterval; | ||
private drainJournal; | ||
private _openPromise; | ||
@@ -123,12 +120,2 @@ open(): Promise<void>; | ||
exportJson(filename: string, options?: FsWriteOptions): Promise<void>; | ||
private updateStatistics; | ||
private needToCompress; | ||
private cork; | ||
private uncork; | ||
private autoCork; | ||
/** | ||
* Writes a line into the correct backlog | ||
* @param noAutoCompress Whether auto-compression should be disabled | ||
*/ | ||
private write; | ||
private entryToLine; | ||
@@ -140,15 +127,28 @@ private makeLazyClear; | ||
* Saves a compressed copy of the DB into the given path. | ||
* | ||
* **WARNING:** This MUST be called from {@link persistenceThread}! | ||
* @param targetFilename Where the compressed copy should be written. Default: `<filename>.dump` | ||
* @param drainJournal Whether the journal should be drained when writing the compressed copy or simply cloned. | ||
*/ | ||
private dumpInternal; | ||
/** | ||
* Saves a compressed copy of the DB into the given path. | ||
* @param targetFilename Where the compressed copy should be written. Default: `<filename>.dump` | ||
*/ | ||
dump(targetFilename?: string): Promise<void>; | ||
/** Asynchronously performs all write actions */ | ||
private writeThread; | ||
private compressPromise; | ||
private compressInternal; | ||
private needToCompressBySize; | ||
private needToCompressByTime; | ||
private persistenceThread; | ||
/** Writes the given journal to the given file descriptor. Returns the new file descriptor if the file was re-opened during the process */ | ||
private writeJournalToFile; | ||
/** | ||
* Compresses the db by dumping it and overwriting the aof file. | ||
* | ||
* **WARNING:** This MUST be called from {@link persistenceThread}! | ||
*/ | ||
private doCompress; | ||
/** Compresses the db by dumping it and overwriting the aof file. */ | ||
compress(): Promise<void>; | ||
/** Resolves when the `writeThread()` is finished */ | ||
private _writePromise; | ||
/** Resolves when the `dump()` method is finished */ | ||
private _dumpPromise; | ||
/** Compresses the db by dumping it and overwriting the aof file. */ | ||
private compressInternal; | ||
/** Closes the DB and waits for all data to be written */ | ||
@@ -155,0 +155,0 @@ close(): Promise<void>; |
@@ -23,2 +23,3 @@ "use strict"; | ||
exports.JsonlDB = void 0; | ||
const async_1 = require("alcalzone-shared/async"); | ||
const deferred_promise_1 = require("alcalzone-shared/deferred-promise"); | ||
@@ -30,3 +31,2 @@ const objects_1 = require("alcalzone-shared/objects"); | ||
const readline = __importStar(require("readline")); | ||
const stream = __importStar(require("stream")); | ||
var Operation; | ||
@@ -51,2 +51,7 @@ (function (Operation) { | ||
} | ||
function getCurrentErrorStack() { | ||
const tmp = { message: "" }; | ||
Error.captureStackTrace(tmp); | ||
return tmp.stack.split("\n").slice(2).join("\n"); | ||
} | ||
class JsonlDB { | ||
@@ -58,3 +63,4 @@ constructor(filename, options = {}) { | ||
this._isOpen = false; | ||
this._writeCorkCount = 0; | ||
this._persistenceTasks = []; | ||
this._journal = []; | ||
this.validateOptions(options); | ||
@@ -117,2 +123,5 @@ this.filename = filename; | ||
} | ||
drainJournal() { | ||
return this._journal.splice(0, this._journal.length); | ||
} | ||
// /** Opens the database file or creates it if it doesn't exist */ | ||
@@ -201,20 +210,9 @@ async open() { | ||
} | ||
const { onOpen, intervalMs, intervalMinChanges = 1, } = (_a = this.options.autoCompress) !== null && _a !== void 0 ? _a : {}; | ||
// If the DB should be compressed while opening, do it before starting the write thread | ||
if (onOpen) { | ||
await this.compressInternal(); | ||
} | ||
// Start the write thread | ||
this._openPromise = (0, deferred_promise_1.createDeferredPromise)(); | ||
void this.writeThread(); | ||
// Start background persistence thread | ||
this._persistencePromise = this.persistenceThread(); | ||
await this._openPromise; | ||
this._isOpen = true; | ||
// Start regular auto-compression | ||
if (intervalMs) { | ||
this._compressInterval = setInterval(() => { | ||
if (this._changesSinceLastCompress >= intervalMinChanges) { | ||
void this.compress(); | ||
} | ||
}, intervalMs); | ||
} | ||
// If the DB should be compressed while opening, do it now | ||
if ((_a = this.options.autoCompress) === null || _a === void 0 ? void 0 : _a.onOpen) | ||
await this.compress(); | ||
} | ||
@@ -268,12 +266,17 @@ /** | ||
// Overwrite the broken db file with it and delete the dump file | ||
await fs.move(this.backupFilename, this.filename, { | ||
overwrite: true, | ||
}); | ||
try { | ||
await fs.remove(this.dumpFilename); | ||
await fs.move(this.backupFilename, this.filename, { | ||
overwrite: true, | ||
}); | ||
try { | ||
await fs.remove(this.dumpFilename); | ||
} | ||
catch { | ||
// ignore | ||
} | ||
return; | ||
} | ||
catch { | ||
// ignore | ||
// Moving failed, try the next possibility | ||
} | ||
return; | ||
} | ||
@@ -290,13 +293,18 @@ // Try the dump file as a last attempt | ||
if (dumpFileIsOK) { | ||
// Overwrite the broken db file with the dump file and delete the backup file | ||
await fs.move(this.dumpFilename, this.filename, { | ||
overwrite: true, | ||
}); | ||
try { | ||
await fs.remove(this.backupFilename); | ||
// Overwrite the broken db file with the dump file and delete the backup file | ||
await fs.move(this.dumpFilename, this.filename, { | ||
overwrite: true, | ||
}); | ||
try { | ||
await fs.remove(this.backupFilename); | ||
} | ||
catch { | ||
// ignore | ||
} | ||
return; | ||
} | ||
catch { | ||
// ignore | ||
// Moving failed | ||
} | ||
return; | ||
} | ||
@@ -340,3 +348,5 @@ } | ||
this._db.clear(); | ||
this.write(this.makeLazyClear()); | ||
// All pending writes are obsolete, remove them from the journal | ||
this.drainJournal(); | ||
this._journal.push(this.makeLazyClear()); | ||
} | ||
@@ -350,3 +360,3 @@ delete(key) { | ||
// Something was deleted | ||
this.write(this.makeLazyDelete(key)); | ||
this._journal.push(this.makeLazyDelete(key)); | ||
} | ||
@@ -360,3 +370,3 @@ return ret; | ||
this._db.set(key, value); | ||
this.write(this.makeLazyWrite(key, value)); | ||
this._journal.push(this.makeLazyWrite(key, value)); | ||
return this; | ||
@@ -382,3 +392,3 @@ } | ||
this._db.set(key, value); | ||
this.write(this.makeLazyWrite(key, value), true); | ||
this._journal.push(this.makeLazyWrite(key, value)); | ||
} | ||
@@ -392,98 +402,2 @@ } | ||
} | ||
updateStatistics(entry) { | ||
if (entry.op === Operation.Clear) { | ||
this._uncompressedSize = 0; | ||
} | ||
else { | ||
this._uncompressedSize++; | ||
} | ||
this._changesSinceLastCompress++; | ||
} | ||
needToCompress() { | ||
var _a; | ||
// compression is busy? | ||
if (this.compressPromise) | ||
return false; | ||
const { sizeFactor = Number.POSITIVE_INFINITY, sizeFactorMinimumSize = 0, } = (_a = this.options.autoCompress) !== null && _a !== void 0 ? _a : {}; | ||
if (this.uncompressedSize >= sizeFactorMinimumSize && | ||
this.uncompressedSize >= sizeFactor * this.size) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
cork() { | ||
/* istanbul ignore else - this is impossible to test */ | ||
if (this._writeBacklog && this._writeCorkCount === 0) { | ||
this._writeBacklog.cork(); | ||
this._writeCorkCount++; | ||
} | ||
} | ||
uncork() { | ||
if (this._writeCorkCount > 0 && this._writeCorkTimeout) { | ||
clearTimeout(this._writeCorkTimeout); | ||
this._writeCorkTimeout = undefined; | ||
} | ||
while (this._writeBacklog && this._writeCorkCount > 0) { | ||
this._writeBacklog.uncork(); | ||
this._writeCorkCount--; | ||
} | ||
} | ||
autoCork() { | ||
var _a, _b, _c; | ||
if (!((_a = this.options.throttleFS) === null || _a === void 0 ? void 0 : _a.intervalMs)) | ||
return; | ||
const maybeUncork = () => { | ||
var _a; | ||
if (this._writeBacklog && this._writeBacklog.writableLength > 0) { | ||
// This gets the stream flowing again. The write thread will call | ||
// autoCork when it is done | ||
this.uncork(); | ||
} | ||
else { | ||
// Nothing to uncork, schedule the next timeout | ||
(_a = this._writeCorkTimeout) === null || _a === void 0 ? void 0 : _a.refresh(); | ||
} | ||
}; | ||
// Cork once and schedule the uncork | ||
this.cork(); | ||
this._writeCorkTimeout = | ||
(_c = (_b = this._writeCorkTimeout) === null || _b === void 0 ? void 0 : _b.refresh()) !== null && _c !== void 0 ? _c : setTimeout(maybeUncork, this.options.throttleFS.intervalMs); | ||
} | ||
/** | ||
* Writes a line into the correct backlog | ||
* @param noAutoCompress Whether auto-compression should be disabled | ||
*/ | ||
write(lazy, noAutoCompress = false) { | ||
var _a; | ||
/* istanbul ignore else */ | ||
if (this._compressBacklog && !this._compressBacklog.destroyed) { | ||
// The compress backlog handling also handles the file statistics | ||
this._compressBacklog.write(lazy); | ||
} | ||
else if (this._writeBacklog && !this._writeBacklog.destroyed) { | ||
// Update line statistics | ||
this.updateStatistics(lazy); | ||
// Either compress or write to the main file, never both | ||
if (!noAutoCompress && this.needToCompress()) { | ||
this.compress(); | ||
} | ||
else { | ||
this._writeBacklog.write(lazy); | ||
// If this is a throttled stream, uncork it as soon as the write | ||
// buffer is larger than configured | ||
if (((_a = this.options.throttleFS) === null || _a === void 0 ? void 0 : _a.maxBufferedCommands) != undefined && | ||
this._writeBacklog.writableLength > | ||
this.options.throttleFS.maxBufferedCommands) { | ||
this.uncork(); | ||
} | ||
} | ||
} | ||
else { | ||
throw new Error("Cannot write into the database while no streams are open!"); | ||
} | ||
// If necessary, write to the dump backlog, so the dump doesn't miss any data | ||
if (this._dumpBacklog && !this._dumpBacklog.destroyed) { | ||
this._dumpBacklog.write(lazy); | ||
} | ||
} | ||
entryToLine(key, value) { | ||
@@ -537,13 +451,15 @@ var _a, _b, _c; | ||
* Saves a compressed copy of the DB into the given path. | ||
* | ||
* **WARNING:** This MUST be called from {@link persistenceThread}! | ||
* @param targetFilename Where the compressed copy should be written. Default: `<filename>.dump` | ||
* @param drainJournal Whether the journal should be drained when writing the compressed copy or simply cloned. | ||
*/ | ||
async dump(targetFilename = this.dumpFilename) { | ||
this._dumpPromise = (0, deferred_promise_1.createDeferredPromise)(); | ||
async dumpInternal(targetFilename = this.dumpFilename, drainJournal) { | ||
// Open the file for writing (or truncate if it exists) | ||
this._dumpFd = await fs.open(targetFilename, "w+"); | ||
// And start dumping the DB | ||
// Start by creating a dump backlog, so parallel writes will be remembered | ||
this._dumpBacklog = new stream.PassThrough({ objectMode: true }); | ||
const fd = await fs.open(targetFilename, "w+"); | ||
// Create a copy of the other entries in the DB | ||
// Also, remember how many entries were in the journal. These are already part of | ||
// the map, so we don't need to append them later and keep a consistent state | ||
const entries = [...this._db]; | ||
const journalLength = this._journal.length; | ||
// And persist them | ||
@@ -555,31 +471,128 @@ let serialized = ""; | ||
} | ||
await fs.appendFile(this._dumpFd, serialized); | ||
// In case there is any data in the backlog stream, persist that too | ||
let lazy; | ||
serialized = ""; | ||
while (null !== (lazy = this._dumpBacklog.read())) { | ||
serialized += lazy.serialize() + "\n"; | ||
await fs.appendFile(fd, serialized); | ||
// In case there is any new data in the journal, persist that too | ||
let journal = drainJournal | ||
? this._journal.splice(0, this._journal.length) | ||
: this._journal; | ||
journal = journal.slice(journalLength); | ||
await this.writeJournalToFile(fd, journal, false); | ||
await fs.close(fd); | ||
} | ||
/** | ||
* Saves a compressed copy of the DB into the given path. | ||
* @param targetFilename Where the compressed copy should be written. Default: `<filename>.dump` | ||
*/ | ||
async dump(targetFilename = this.dumpFilename) { | ||
// Prevent dumping the DB when it is closed | ||
if (!this._isOpen) | ||
return; | ||
const done = (0, deferred_promise_1.createDeferredPromise)(); | ||
this._persistenceTasks.push({ | ||
type: "dump", | ||
filename: targetFilename, | ||
done, | ||
}); | ||
const stack = getCurrentErrorStack(); | ||
try { | ||
await done; | ||
} | ||
await fs.appendFile(this._dumpFd, serialized); | ||
this._dumpBacklog.destroy(); | ||
this._dumpBacklog = undefined; | ||
// The dump backlog was closed, this means that the dump is finished. | ||
// Close the file and resolve the close promise | ||
await fs.fsync(this._dumpFd); // The dump should be on disk ASAP, so we fsync | ||
await fs.close(this._dumpFd); | ||
this._dumpFd = undefined; | ||
this._dumpPromise.resolve(); | ||
catch (e) { | ||
e.stack += "\n" + stack; | ||
throw e; | ||
} | ||
} | ||
/** Asynchronously performs all write actions */ | ||
async writeThread() { | ||
needToCompressBySize() { | ||
var _a; | ||
// This must be called before any awaits | ||
this._writeBacklog = new stream.PassThrough({ objectMode: true }); | ||
this.autoCork(); | ||
this._writePromise = (0, deferred_promise_1.createDeferredPromise)(); | ||
const { sizeFactor = Number.POSITIVE_INFINITY, sizeFactorMinimumSize = 0, } = (_a = this.options.autoCompress) !== null && _a !== void 0 ? _a : {}; | ||
if (this._uncompressedSize >= sizeFactorMinimumSize && | ||
this._uncompressedSize >= sizeFactor * this.size) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
needToCompressByTime(lastCompress) { | ||
if (!this.options.autoCompress) | ||
return false; | ||
const { intervalMs, intervalMinChanges = 1 } = this.options.autoCompress; | ||
if (!intervalMs) | ||
return false; | ||
return (this._changesSinceLastCompress >= intervalMinChanges && | ||
Date.now() - lastCompress >= intervalMs); | ||
} | ||
async persistenceThread() { | ||
var _a, _b, _c, _d, _e, _f, _g, _h; | ||
// Keep track of the write accesses and compression attempts | ||
let lastWrite = Date.now(); | ||
let lastCompress = Date.now(); | ||
const throttleInterval = (_b = (_a = this.options.throttleFS) === null || _a === void 0 ? void 0 : _a.intervalMs) !== null && _b !== void 0 ? _b : 0; | ||
const maxBufferedCommands = (_d = (_c = this.options.throttleFS) === null || _c === void 0 ? void 0 : _c.maxBufferedCommands) !== null && _d !== void 0 ? _d : Number.POSITIVE_INFINITY; | ||
// Open the file for appending and reading | ||
this._fd = await fs.open(this.filename, "a+"); | ||
(_a = this._openPromise) === null || _a === void 0 ? void 0 : _a.resolve(); | ||
(_e = this._openPromise) === null || _e === void 0 ? void 0 : _e.resolve(); | ||
const sleepDuration = 20; // ms | ||
while (true) { | ||
// Figure out what to do | ||
let task; | ||
if (this.needToCompressBySize() || | ||
this.needToCompressByTime(lastCompress)) { | ||
// Need to compress | ||
task = { type: "compress", done: (0, deferred_promise_1.createDeferredPromise)() }; | ||
} | ||
else { | ||
// Take the first tasks of from the task queue | ||
task = (_f = this._persistenceTasks.shift()) !== null && _f !== void 0 ? _f : { type: "none" }; | ||
} | ||
let isStopCmd = false; | ||
switch (task.type) { | ||
case "stop": | ||
isStopCmd = true; | ||
// fall through | ||
case "none": { | ||
// Write to disk if necessary | ||
const shouldWrite = this._journal.length > 0 && | ||
(isStopCmd || | ||
Date.now() - lastWrite > throttleInterval || | ||
this._journal.length > maxBufferedCommands); | ||
if (shouldWrite) { | ||
// Drain the journal | ||
const journal = this.drainJournal(); | ||
this._fd = await this.writeJournalToFile(this._fd, journal); | ||
lastWrite = Date.now(); | ||
} | ||
if (isStopCmd) { | ||
await fs.close(this._fd); | ||
this._fd = undefined; | ||
return; | ||
} | ||
break; | ||
} | ||
case "dump": { | ||
try { | ||
await this.dumpInternal(task.filename, false); | ||
task.done.resolve(); | ||
} | ||
catch (e) { | ||
task.done.reject(e); | ||
} | ||
break; | ||
} | ||
case "compress": { | ||
try { | ||
await this.doCompress(); | ||
lastCompress = Date.now(); | ||
(_g = task.done) === null || _g === void 0 ? void 0 : _g.resolve(); | ||
} | ||
catch (e) { | ||
(_h = task.done) === null || _h === void 0 ? void 0 : _h.reject(e); | ||
} | ||
break; | ||
} | ||
} | ||
await (0, async_1.wait)(sleepDuration); | ||
} | ||
} | ||
/** Writes the given journal to the given file descriptor. Returns the new file descriptor if the file was re-opened during the process */ | ||
async writeJournalToFile(fd, journal, updateStatistics = true) { | ||
// The chunk map is used to buffer all entries that are currently waiting in line | ||
// so we avoid serializing redundant entries. When the write backlog is throttled, | ||
// so we avoid serializing redundant entries. When the writing is throttled, | ||
// the chunk map will only be used for a short time. | ||
@@ -589,5 +602,4 @@ const chunk = new Map(); | ||
let truncate = false; | ||
for await (const action of this | ||
._writeBacklog) { | ||
if (action.op === Operation.Clear) { | ||
for (const entry of journal) { | ||
if (entry.op === Operation.Clear) { | ||
chunk.clear(); | ||
@@ -598,63 +610,47 @@ truncate = true; | ||
// Only remember the last entry for each key | ||
chunk.set(action.key, action); | ||
chunk.set(entry.key, entry); | ||
} | ||
// When the backlog has been drained, perform the necessary write actions | ||
if (this._writeBacklog.readableLength === 0) { | ||
if (truncate) { | ||
// Since we opened the file in append mode, we cannot truncate | ||
// therefore close and open in write mode again | ||
await fs.close(this._fd); | ||
this._fd = await fs.open(this.filename, "w+"); | ||
truncate = false; | ||
} | ||
// Collect all changes | ||
for (const entry of chunk.values()) { | ||
if (entry.op !== Operation.Clear) { | ||
serialized += entry.serialize() + "\n"; | ||
} | ||
} | ||
// and write once | ||
await fs.appendFile(this._fd, serialized); | ||
serialized = ""; | ||
chunk.clear(); | ||
} | ||
// When the journal has been drained, perform the necessary write actions | ||
if (truncate) { | ||
// Since we opened the file in append mode, we cannot truncate | ||
// therefore close and open in write mode again | ||
await fs.close(fd); | ||
fd = await fs.open(this.filename, "w+"); | ||
truncate = false; | ||
if (updateStatistics) { | ||
// Now the DB size is effectively 0 and we have no "uncompressed" changes pending | ||
this._uncompressedSize = 0; | ||
this._changesSinceLastCompress = 0; | ||
} | ||
// When this is a throttled stream, auto-cork it when it was drained | ||
if (this._writeBacklog.readableLength === 0 && this._isOpen) { | ||
this.autoCork(); | ||
} | ||
// Collect all changes | ||
for (const entry of chunk.values()) { | ||
serialized += entry.serialize() + "\n"; | ||
if (updateStatistics) { | ||
this._uncompressedSize++; | ||
this._changesSinceLastCompress++; | ||
} | ||
} | ||
this._writeBacklog.destroy(); | ||
// The write backlog was closed, this means that the DB is being closed | ||
// Flush the file contents to disk, close the file and resolve the close promise | ||
await fs.fsync(this._fd); | ||
// and write once, making sure everything is written | ||
await fs.appendFile(fd, serialized); | ||
await fs.fsync(fd); | ||
return fd; | ||
} | ||
/** | ||
* Compresses the db by dumping it and overwriting the aof file. | ||
* | ||
* **WARNING:** This MUST be called from {@link persistenceThread}! | ||
*/ | ||
async doCompress() { | ||
// 1. Ensure the backup contains everything in the DB and journal | ||
const journal = this.drainJournal(); | ||
this._fd = await this.writeJournalToFile(this._fd, journal); | ||
await fs.close(this._fd); | ||
this._writePromise.resolve(); | ||
} | ||
async compressInternal() { | ||
if (this.compressPromise) | ||
return this.compressPromise; | ||
this.compressPromise = (0, deferred_promise_1.createDeferredPromise)(); | ||
// If someone else is currently dumping the DB, wait for them to finish | ||
if (this._dumpPromise) | ||
await this._dumpPromise; | ||
// Immediately remember the database size or writes while compressing | ||
// will be incorrectly reflected | ||
this._uncompressedSize = this.size; | ||
this._changesSinceLastCompress = 0; | ||
await this.dump(); | ||
// After dumping, restart the write thread so no duplicate entries get written | ||
// Disable writing into the backlog stream and buffer all writes | ||
// in the compress backlog in the meantime | ||
this._compressBacklog = new stream.PassThrough({ objectMode: true }); | ||
this.uncork(); | ||
// Replace the aof file. To make sure that the data fully reaches the storage, we employ the following strategy: | ||
// 1. Ensure there are no pending rename operations or file creations | ||
this._fd = undefined; | ||
// 2. Create a dump, draining the journal to avoid duplicate writes | ||
await this.dumpInternal(this.dumpFilename, true); | ||
// 3. Ensure there are no pending rename operations or file creations | ||
await fsyncDir(path.dirname(this.filename)); | ||
// 2. Ensure the db file is fully written to disk. The write thread will fsync before closing | ||
if (this._writeBacklog) { | ||
this._writeBacklog.end(); | ||
await this._writePromise; | ||
this._writeBacklog = undefined; | ||
} | ||
// 3. Create backup, rename the dump file, then ensure the directory entries are written to disk | ||
// 4. Swap files around, then ensure the directory entries are written to disk | ||
await fs.move(this.filename, this.backupFilename, { | ||
@@ -665,21 +661,9 @@ overwrite: true, | ||
await fsyncDir(path.dirname(this.filename)); | ||
// 4. Delete backup | ||
// 5. Delete backup | ||
await fs.unlink(this.backupFilename); | ||
if (this._isOpen) { | ||
// Start the write thread again | ||
this._openPromise = (0, deferred_promise_1.createDeferredPromise)(); | ||
void this.writeThread(); | ||
await this._openPromise; | ||
} | ||
// In case there is any data in the backlog stream, persist that too | ||
let lazy; | ||
while (null !== (lazy = this._compressBacklog.read())) { | ||
this.updateStatistics(lazy); | ||
this._writeBacklog.write(lazy); | ||
} | ||
this._compressBacklog.destroy(); | ||
this._compressBacklog = undefined; | ||
// If any method is waiting for the compress process, signal it that we're done | ||
this.compressPromise.resolve(); | ||
this.compressPromise = undefined; | ||
// 6. open the main DB file again in append mode | ||
this._fd = await fs.open(this.filename, "a+"); | ||
// Remember the new statistics | ||
this._uncompressedSize = this._db.size; | ||
this._changesSinceLastCompress = 0; | ||
} | ||
@@ -690,43 +674,41 @@ /** Compresses the db by dumping it and overwriting the aof file. */ | ||
return; | ||
return this.compressInternal(); | ||
await this.compressInternal(); | ||
} | ||
/** Compresses the db by dumping it and overwriting the aof file. */ | ||
async compressInternal() { | ||
// Avoid having multiple compress operations running in parallel | ||
const task = this._persistenceTasks.find((t) => t.type === "compress"); | ||
if (task) | ||
return task.done; | ||
const done = (0, deferred_promise_1.createDeferredPromise)(); | ||
this._persistenceTasks.push({ | ||
type: "compress", | ||
done, | ||
}); | ||
const stack = getCurrentErrorStack(); | ||
try { | ||
await done; | ||
} | ||
catch (e) { | ||
e.stack += "\n" + stack; | ||
throw e; | ||
} | ||
} | ||
/** Closes the DB and waits for all data to be written */ | ||
async close() { | ||
var _a; | ||
if (!this._isOpen) | ||
return; | ||
this._isOpen = false; | ||
if (this._compressInterval) | ||
clearInterval(this._compressInterval); | ||
if (this._writeCorkTimeout) | ||
clearTimeout(this._writeCorkTimeout); | ||
if (this.compressPromise) { | ||
// Wait until any pending compress processes are complete | ||
await this.compressPromise; | ||
} | ||
else if ((_a = this.options.autoCompress) === null || _a === void 0 ? void 0 : _a.onClose) { | ||
// Compress if required | ||
// Compress on close if required | ||
if ((_a = this.options.autoCompress) === null || _a === void 0 ? void 0 : _a.onClose) { | ||
await this.compressInternal(); | ||
} | ||
// Disable writing into the backlog stream and wait for the write process to finish | ||
if (this._writeBacklog) { | ||
this.uncork(); | ||
this._writeBacklog.end(); | ||
await this._writePromise; | ||
} | ||
// Also wait for a potential dump process to finish | ||
/* istanbul ignore next - this is impossible to test since it requires exact timing */ | ||
if (this._dumpBacklog) { | ||
// Disable writing into the dump backlog stream | ||
this._dumpBacklog.end(); | ||
} | ||
if (this._dumpPromise) | ||
await this._dumpPromise; | ||
// Stop persistence thread and wait for it to finish | ||
this._persistenceTasks.push({ type: "stop" }); | ||
await this._persistencePromise; | ||
// Reset all variables | ||
this._writePromise = undefined; | ||
this._dumpPromise = undefined; | ||
this._db.clear(); | ||
this._fd = undefined; | ||
this._dumpFd = undefined; | ||
this._changesSinceLastCompress = 0; | ||
this._uncompressedSize = Number.NaN; | ||
this._writeCorkCount = 0; | ||
// Free the lock | ||
@@ -733,0 +715,0 @@ try { |
{ | ||
"name": "@alcalzone/jsonl-db", | ||
"version": "2.4.1", | ||
"version": "2.4.2", | ||
"description": "Simple JSONL-based key-value store", | ||
@@ -38,6 +38,6 @@ "main": "./build/index.js", | ||
"@alcalzone/release-script-plugin-license": "~3.4.1", | ||
"@babel/cli": "^7.16.0", | ||
"@babel/core": "^7.16.0", | ||
"@babel/preset-env": "^7.16.0", | ||
"@babel/preset-typescript": "^7.16.0", | ||
"@babel/cli": "^7.16.8", | ||
"@babel/core": "^7.16.12", | ||
"@babel/preset-env": "^7.16.11", | ||
"@babel/preset-typescript": "^7.16.7", | ||
"@commitlint/cli": "^13.2.1", | ||
@@ -47,3 +47,3 @@ "@commitlint/config-conventional": "^13.2.0", | ||
"@types/fs-extra": "^9.0.13", | ||
"@types/jest": "^27.0.2", | ||
"@types/jest": "^27.4.0", | ||
"@types/mock-fs": "^4.13.1", | ||
@@ -60,3 +60,3 @@ "@types/node": "^12.20.39", | ||
"husky": "^7.0.4", | ||
"jest": "^27.3.1", | ||
"jest": "^27.4.5", | ||
"jest-extended": "^0.11.5", | ||
@@ -66,3 +66,3 @@ "prettier": "^2.5.0", | ||
"ts-node": "^10.4.0", | ||
"typescript": "^4.5.2" | ||
"typescript": "^4.5.4" | ||
}, | ||
@@ -69,0 +69,0 @@ "dependencies": { |
@@ -145,2 +145,7 @@ # jsonl-db | ||
--> | ||
### 2.4.2 (2022-02-09) | ||
* Errors while automatically restoring the DB from a backup or dump are now caught | ||
* Simplified and decoupled the persistence code. Individual commands like `dump` and `compress` are now properly sequenced and should no longer conflict with each other. | ||
* Increased throughput for primitive entries by ~2x | ||
### 2.4.1 (2021-12-30) | ||
@@ -147,0 +152,0 @@ * Individual writes are now collected in a string and written at once, increasing throughput for larger entries by ~10x. |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
232
68942
864