happn-db-provider-loki
Advanced tools
Comparing version 1.0.2 to 1.0.3
96
index.js
@@ -9,3 +9,4 @@ const db = require('lokijs'), | ||
_ = commons._, | ||
path = require('path'); | ||
path = require('path'), | ||
pathSep = commons.path.sep; | ||
@@ -28,2 +29,3 @@ module.exports = class LokiDataProvider extends commons.BaseDataProvider { | ||
} | ||
initialize(callback) { | ||
@@ -36,3 +38,9 @@ this.dirty = true; | ||
}); | ||
this.persistenceOn = this.settings.filename != null; | ||
if (this.persistenceOn) { | ||
let pathArray = this.settings.filename.split(pathSep); | ||
pathArray[pathArray.length - 1] = 'temp_' + pathArray[pathArray.length - 1]; | ||
this.settings.tempDataFilename = pathArray.join(pathSep); | ||
} | ||
this.operationQueue = async.queue((operation, cb) => { | ||
@@ -55,2 +63,3 @@ this.processOperation(operation, cb); | ||
} | ||
reconstruct(callback) { | ||
@@ -60,4 +69,22 @@ if (!fs.existsSync(this.settings.filename)) { | ||
} | ||
this.readDataFile(this.settings.filename, (error1) => { | ||
if (!error1) return this.snapshot(callback); | ||
this.logger.warn( | ||
`Could not resconstruct loki db from ${ | ||
this.settings.filename | ||
}, attempting to reconstruct from temp file. ${error1.toString()}` | ||
); | ||
this.readDataFile(this.settings.tempDataFilename, (error2) => { | ||
if (!error2) return this.snapshot(callback); | ||
this.logger.error( | ||
`Could not rescpnstruct loki db from file or temp file. Error on tempfile: ${error2.toString()}` | ||
); | ||
callback(error1); //Rather callback with the error on the main file (?) | ||
}); | ||
}); | ||
} | ||
readDataFile(filename, callback) { | ||
const reader = readline.createInterface({ | ||
input: fs.createReadStream(this.settings.filename), | ||
input: fs.createReadStream(filename), | ||
crlfDelay: Infinity, | ||
@@ -70,3 +97,7 @@ }); | ||
if (lineIndex === 0) { | ||
this.db.loadJSON(JSON.parse(line).snapshot, { retainDirtyFlags: false }); | ||
try { | ||
this.db.loadJSON(JSON.parse(line).snapshot, { retainDirtyFlags: false }); | ||
} catch (e) { | ||
callback(e); | ||
} | ||
this.collection = this.db.collections[0]; | ||
@@ -88,3 +119,3 @@ } else { | ||
if (!errorHappened) { | ||
this.snapshot(callback); | ||
callback(null); | ||
} | ||
@@ -101,2 +132,3 @@ }); | ||
} | ||
parsePersistedOperation(line) { | ||
@@ -173,2 +205,3 @@ let operation = JSON.parse(line).operation; | ||
options = options || {}; | ||
let document = this.collection.findOne({ path }); | ||
@@ -207,23 +240,25 @@ let result, | ||
this.operationCount = 0; | ||
this.persistSnapshotData({ snapshot: this.db.serialize() }, callback); | ||
this.persistSnapshotData({ snapshot: this.db.serialize() }, (e) => { | ||
if (e) return callback(e); | ||
this.copyTempDataToMain(callback); | ||
}); | ||
} | ||
copyTempDataToMain(callback) { | ||
if (fs.existsSync(this.settings.filename)) fs.unlinkSync(this.settings.filename); | ||
fs.copy(this.settings.tempDataFilename, this.settings.filename, callback); | ||
} | ||
storePlayback(operation, callback) { | ||
return (e, result) => { | ||
if (e) { | ||
callback(e); | ||
return; | ||
} | ||
if (!this.persistenceOn) { | ||
callback(null, result); | ||
return; | ||
} | ||
if (e) return callback(e); | ||
if (!this.persistenceOn) return callback(null, result); | ||
this.appendOperationData({ operation }, (appendFailure) => { | ||
if (appendFailure) { | ||
this.logger.error('failed persisting operation data', appendFailure); | ||
callback(appendFailure); | ||
return; | ||
return callback(appendFailure); | ||
} | ||
if (this.operationCount < this.settings.snapshotRollOverThreshold) { | ||
callback(null, result); | ||
return; | ||
return callback(null, result); | ||
} | ||
@@ -233,4 +268,3 @@ this.snapshot((e) => { | ||
this.logger.error('snapshot rollover failed', e); | ||
callback(e); | ||
return; | ||
return callback(e); | ||
} | ||
@@ -243,5 +277,5 @@ callback(null, result); | ||
getFileStream() { | ||
getFileStream(filename) { | ||
if (this.fileStream == null) { | ||
let realPath = fs.realpathSync(this.settings.filename); | ||
let realPath = fs.realpathSync(filename); | ||
fs.ensureDirSync(path.dirname(realPath)); | ||
@@ -255,4 +289,7 @@ this.fileStream = fs.createWriteStream(realPath, { flags: 'a' }); | ||
this.operationCount++; | ||
const fileStream = this.getFileStream(); | ||
fileStream.write(`${JSON.stringify(operationData)}\r\n`, this.fsync(callback)); | ||
const fileStream = this.getFileStream(this.settings.filename); | ||
fileStream.write( | ||
`${JSON.stringify(operationData)}\r\n`, | ||
this.fsync(this.settings.filename, callback) | ||
); | ||
} | ||
@@ -262,3 +299,3 @@ | ||
fs.writeFile( | ||
this.settings.filename, | ||
this.settings.tempDataFilename, | ||
`${JSON.stringify(snapshotData)}\r\n`, | ||
@@ -268,6 +305,7 @@ { | ||
}, | ||
this.fsync(callback) | ||
this.fsync(this.settings.tempDataFilename, callback) | ||
); | ||
} | ||
fsync(callback) { | ||
fsync(filename, callback) { | ||
return (e) => { | ||
@@ -282,3 +320,3 @@ if (e) { | ||
} | ||
fs.open(this.settings.filename, 'r+', (errorOpening, fd) => { | ||
fs.open(filename, 'r+', (errorOpening, fd) => { | ||
if (errorOpening) { | ||
@@ -369,2 +407,3 @@ callback(new Error(`failed syncing to storage device: ${errorOpening.message}`)); | ||
let pathCriteria = this.getPathCriteria(path); | ||
if (!parameters) parameters = {}; | ||
@@ -374,3 +413,2 @@ if (parameters.criteria) pathCriteria = this.addCriteria(pathCriteria, parameters.criteria); | ||
let options = parameters.options || {}; | ||
if (results.count() === 0) { | ||
@@ -388,3 +426,2 @@ return options.count ? { data: { value: 0 } } : finalResult; | ||
finalResult = results.compoundsort(sortOptions).data({ forceClones: true, removeMeta: true }); | ||
if (options.skip) { | ||
@@ -457,2 +494,3 @@ finalResult = finalResult.slice(options.skip); | ||
} | ||
this.operationQueue.push( | ||
@@ -459,0 +497,0 @@ { |
{ | ||
"name": "happn-db-provider-loki", | ||
"version": "1.0.2", | ||
"version": "1.0.3", | ||
"description": "Loki data provider for happn", | ||
@@ -14,7 +14,7 @@ "scripts": { | ||
"lokijs": "^1.5.12", | ||
"happn-commons": "^1.0.1" | ||
"happn-commons": "^1.0.2" | ||
}, | ||
"devDependencies": { | ||
"happn-commons-test": "^1.0.2" | ||
"happn-commons-test": "^1.0.3" | ||
} | ||
} |
@@ -11,2 +11,8 @@ 1.0.0 2022-03-01 | ||
----------------- | ||
- SMC-4198: updated dependencies | ||
- SMC-4198: updated dependencies | ||
1.0.3 2022-03-29 | ||
----------------- | ||
- TEN-92, TEN-93: logging levels and events | ||
- TEN-102: loki snapshot file redundancy | ||
- TEN-92: made all dependencies test dependencies |
15529
468
Updatedhappn-commons@^1.0.2