lmdb-store
Advanced tools
Comparing version 0.2.2 to 0.3.0
201
index.js
@@ -12,2 +12,3 @@ const fs = require('fs-extra') | ||
const VALUE_OVERFLOW_THRESHOLD = 2048 | ||
const DEFAULT_SYNC_BATCH_THRESHOLD = 3000 | ||
const AS_STRING = { | ||
@@ -37,6 +38,6 @@ asBuffer: false | ||
let env = new Env() | ||
let db | ||
let committingWrites | ||
let scheduledWrites | ||
let scheduledOperations | ||
let readTxn, writeTxn | ||
let extension = pathModule.extname(path) | ||
@@ -58,31 +59,39 @@ let name = pathModule.basename(path, extension) | ||
env.open(options) | ||
let events = new EventEmitter() | ||
function openDB() { | ||
try { | ||
db = env.openDbi({ | ||
name: 'data', | ||
create: true, | ||
keyIsBuffer: true, | ||
}) | ||
} catch(error) { | ||
handleError(error, null, null, openDB) | ||
readTxn = env.beginTxn(READING_TNX) | ||
readTxn.reset() | ||
let stores = [] | ||
class LMDBStore extends EventEmitter { | ||
constructor(dbName) { | ||
super() | ||
const openDB = () => { | ||
try { | ||
this.db = env.openDbi({ | ||
name: dbName || 'data', | ||
create: true, | ||
keyIsBuffer: true, | ||
}) | ||
} catch(error) { | ||
handleError(error, null, null, openDB) | ||
} | ||
} | ||
openDB() | ||
this.dbName = dbName | ||
this.env = env | ||
this.bytesRead = 0 | ||
this.bytesWritten = 0 | ||
this.reads = 0 | ||
this.writes = 0 | ||
this.transactions = 0 | ||
this.averageTransactionTime = 5 | ||
this.syncBatchThreshold = DEFAULT_SYNC_BATCH_THRESHOLD | ||
Object.assign(this, options) | ||
allDbs.set(dbName ? name + '-' + dbName : name, this) | ||
stores.push(this) | ||
} | ||
} | ||
openDB() | ||
const store = { | ||
db, | ||
env, | ||
path, | ||
name, | ||
events, | ||
bytesRead: 0, | ||
bytesWritten: 0, | ||
reads: 0, | ||
writes: 0, | ||
transactions: 0, | ||
readTxn: env.beginTxn(READING_TNX), | ||
openDB(dbName) { | ||
return new LMDBStore(dbName) | ||
} | ||
transaction(execute, noSync, abort) { | ||
let result | ||
if (this.writeTxn) { | ||
if (writeTxn) { | ||
// already nested in a transaction, just execute and return | ||
@@ -99,3 +108,3 @@ result = execute() | ||
this.transactions++ | ||
txn = this.writeTxn = env.beginTxn() | ||
txn = writeTxn = env.beginTxn() | ||
let startCpu = process.cpuUsage() | ||
@@ -124,16 +133,15 @@ let start = Date.now() | ||
} | ||
this.writeTxn = null | ||
writeTxn = null | ||
} | ||
}, | ||
} | ||
get(id, copy) { | ||
let txn | ||
try { | ||
const writeTxn = this.writeTxn | ||
if (writeTxn) { | ||
txn = writeTxn | ||
} else { | ||
txn = this.readTxn | ||
txn = readTxn | ||
txn.renew() | ||
} | ||
let result = copy ? txn.getBinaryUnsafe(db, id, AS_BINARY) : txn.getBinary(db, id, AS_BINARY) | ||
let result = copy ? txn.getBinaryUnsafe(this.db, id, AS_BINARY) : txn.getBinary(this.db, id, AS_BINARY) | ||
if (result === null) // missing entry, really should be undefined | ||
@@ -156,3 +164,3 @@ result = undefined | ||
} | ||
}, | ||
} | ||
put(id, value, ifValue) { | ||
@@ -162,7 +170,7 @@ if (!scheduledOperations) { | ||
} | ||
let index = scheduledOperations.push([db, id, value, ifValue]) - 1 | ||
let index = scheduledOperations.push([this.db, id, value, ifValue]) - 1 | ||
let commit = this.scheduleCommit() | ||
return ifValue === undefined ? commit.unconditionalResults : | ||
commit.results.then((writeResults) => writeResults[index] === 0) | ||
}, | ||
} | ||
putSync(id, value) { | ||
@@ -180,7 +188,7 @@ let txn | ||
txn = this.writeTxn || env.beginTxn() | ||
txn.putBinary(db, id, value, AS_BINARY) | ||
txn = writeTxn || env.beginTxn() | ||
txn.putBinary(this.db, id, value, AS_BINARY) | ||
/*if (Date.now() - start > 20) | ||
console.log('after put', Date.now() - start, process.cpuUsage(startCpu))*/ | ||
if (!this.writeTxn) { | ||
if (!writeTxn) { | ||
txn.commit() | ||
@@ -190,14 +198,14 @@ //console.log('after commit', Date.now() - start, process.cpuUsage(startCpu)) | ||
} catch(error) { | ||
if (this.writeTxn) | ||
if (writeTxn) | ||
throw error // if we are in a transaction, the whole transaction probably needs to restart | ||
return handleError(error, this, txn, () => this.putSync(id, value)) | ||
} | ||
}, | ||
} | ||
removeSync(id) { | ||
let txn | ||
try { | ||
txn = this.writeTxn || env.beginTxn() | ||
txn = writeTxn || env.beginTxn() | ||
this.writes++ | ||
txn.del(db, id) | ||
if (!this.writeTxn) { | ||
txn.del(this.db, id) | ||
if (!writeTxn) { | ||
txn.commit() | ||
@@ -208,11 +216,11 @@ } | ||
if (error.message.startsWith('MDB_NOTFOUND')) { | ||
if (!this.writeTxn) | ||
if (!writeTxn) | ||
txn.abort() | ||
return false // calling remove on non-existent property is fine, but we will indicate its lack of existence with the return value | ||
} | ||
if (this.writeTxn) | ||
if (writeTxn) | ||
throw error // if we are in a transaction, the whole transaction probably needs to restart | ||
return handleError(error, this, txn, () => this.removeSync(id)) | ||
} | ||
}, | ||
} | ||
remove(id, ifValue) { | ||
@@ -222,11 +230,11 @@ if (!scheduledOperations) { | ||
} | ||
let index = scheduledOperations.push([db, id, undefined, ifValue]) - 1 | ||
let index = scheduledOperations.push([this.db, id, undefined, ifValue]) - 1 | ||
let commit = this.scheduleCommit() | ||
return ifValue === undefined ? commit.unconditionalResults : | ||
commit.results.then((writeResults) => writeResults[index] === 0) | ||
}, | ||
} | ||
iterable(options) { | ||
console.warn('iterable is deprecated') | ||
return this.getRange(options) | ||
}, | ||
} | ||
getRange(options) { | ||
@@ -242,6 +250,6 @@ let iterable = new ArrayLikeIterable() | ||
array = [] | ||
let cursor, txn = store.readTxn | ||
let cursor | ||
try { | ||
txn.renew() | ||
cursor = new Cursor(txn, db, AS_BINARY) | ||
readTxn.renew() | ||
cursor = new Cursor(readTxn, this.db, AS_BINARY) | ||
if (reverse) { | ||
@@ -279,3 +287,3 @@ // for reverse retrieval, goToRange is backwards because it positions at the key equal or *greater than* the provided key | ||
cursor.close() | ||
txn.reset() | ||
readTxn.reset() | ||
} catch(error) { | ||
@@ -287,3 +295,3 @@ if (cursor) { | ||
} | ||
return handleError(error, this, txn, getNextBlock) | ||
return handleError(error, this, readTxn, getNextBlock) | ||
} | ||
@@ -295,2 +303,3 @@ } | ||
getNextBlock() | ||
let store = this | ||
return { | ||
@@ -327,4 +336,3 @@ next() { | ||
return iterable | ||
}, | ||
averageTransactionTime: 5, | ||
} | ||
scheduleCommit() { | ||
@@ -337,3 +345,5 @@ if (!this.pendingBatch) { | ||
this.runNextBatch = null | ||
events.emit('beforecommit', { scheduledOperations }) | ||
for (const store of stores) { | ||
store.emit('beforecommit', { scheduledOperations }) | ||
} | ||
clearTimeout(timeout) | ||
@@ -381,3 +391,3 @@ this.currentCommit = whenCommitted | ||
} | ||
if (scheduledOperations && scheduledOperations.length > 3000 && this.runNextBatch) { | ||
if (scheduledOperations && scheduledOperations.length >= this.syncBatchThreshold && this.runNextBatch) { | ||
// past a certain threshold, run it immediately and synchronously | ||
@@ -394,3 +404,3 @@ let batch = this.pendingBatch | ||
return this.pendingBatch | ||
}, | ||
} | ||
batchSync() { | ||
@@ -430,3 +440,3 @@ let value | ||
return results | ||
}, | ||
} | ||
batch(operations) { | ||
@@ -443,3 +453,3 @@ this.writes += operations.length | ||
} | ||
scheduledOperations.push([db, operation.key, value]) | ||
scheduledOperations.push([this.db, operation.key, value]) | ||
} catch (error) { | ||
@@ -454,16 +464,7 @@ if (error.message.startsWith('MDB_NOTFOUND')) { | ||
return this.scheduleCommit().unconditionalResults | ||
}, | ||
} | ||
close() { | ||
db.close() | ||
this.db.close() | ||
env.close() | ||
}, | ||
on(event, callback) { | ||
return events.on(event, callback) | ||
}, | ||
once(event, callback) { | ||
return events.once(event, callback) | ||
}, | ||
emit(event) { | ||
return events.emit.apply(events, arguments) | ||
}, | ||
} | ||
sync(callback) { | ||
@@ -475,9 +476,9 @@ return env.sync(callback || function(error) { | ||
}) | ||
}, | ||
} | ||
clear() { | ||
//console.log('clearing db', name) | ||
try { | ||
db.drop({ | ||
this.db.drop({ | ||
justFreePages: true, | ||
txn: this.writeTxn, | ||
txn: writeTxn, | ||
}) | ||
@@ -487,3 +488,3 @@ } catch(error) { | ||
} | ||
}, | ||
} | ||
testResize() { | ||
@@ -495,9 +496,7 @@ handleError(new Error('MDB_MAP_FULL'), this, null, () => { | ||
} | ||
store.readTxn.reset() | ||
allDbs.set(name, store) | ||
return store | ||
function handleError(error, db, txn, retry) { | ||
return new LMDBStore() | ||
function handleError(error, store, txn, retry) { | ||
try { | ||
if (db && db.readTxn) { | ||
db.readTxn.abort() | ||
if (readTxn) { | ||
readTxn.abort() | ||
} | ||
@@ -508,4 +507,4 @@ } catch(error) { | ||
try { | ||
if (db && db.writeTxn) | ||
db.writeTxn.abort() | ||
if (writeTxn) | ||
writeTxn.abort() | ||
} catch(error) { | ||
@@ -515,3 +514,3 @@ // console.warn('txn already aborted') | ||
try { | ||
if (txn && txn !== (db && db.readTxn) && txn !== (db && db.writeTxn)) | ||
if (txn && txn !== readTxn && txn !== writeTxn) | ||
txn.abort() | ||
@@ -522,9 +521,9 @@ } catch(error) { | ||
if (db && db.writeTxn) | ||
db.writeTxn = null | ||
if (writeTxn) | ||
writeTxn = null | ||
if (error.message == 'The transaction is already closed.') { | ||
try { | ||
db.readTxn = env.beginTxn(READING_TNX) | ||
readTxn = env.beginTxn(READING_TNX) | ||
} catch(error) { | ||
return handleError(error, db, null, retry) | ||
return handleError(error, store, null, retry) | ||
} | ||
@@ -535,4 +534,4 @@ return retry() | ||
const newSize = Math.ceil(env.info().mapSize * 1.3 / 0x200000 + 1) * 0x200000 | ||
if (db) { | ||
db.emit('remap') | ||
for (const store of stores) { | ||
store.emit('remap') | ||
} | ||
@@ -542,6 +541,4 @@ | ||
console.log('Resized database', name, 'to', newSize) | ||
if (db) { | ||
db.readTxn = env.beginTxn(READING_TNX) | ||
db.readTxn.reset() | ||
} | ||
readTxn = env.beginTxn(READING_TNX) | ||
readTxn.reset() | ||
let result = retry() | ||
@@ -551,7 +548,9 @@ return result | ||
// the noSync setting means that we can have partial corruption and we need to be able to recover | ||
db.emit('remap') | ||
for (const store of stores) { | ||
store.emit('remap') | ||
} | ||
try { | ||
env.close() | ||
} catch (error) {} | ||
console.warn('Corrupted database,', location, 'attempting to delete the db file and restart', error) | ||
console.warn('Corrupted database,', location, 'attempting to delete the store file and restart', error) | ||
fs.removeSync(location + '.mdb') | ||
@@ -563,4 +562,4 @@ env = new Env() | ||
} | ||
db.readTxn = env.beginTxn(READING_TNX) | ||
db.readTxn.reset() | ||
readTxn = env.beginTxn(READING_TNX) | ||
readTxn.reset() | ||
error.message = 'In database ' + name + ': ' + error.message | ||
@@ -567,0 +566,0 @@ throw error |
{ | ||
"name": "lmdb-store", | ||
"author": "Kris Zyp", | ||
"version": "0.2.2", | ||
"version": "0.3.0", | ||
"description": "Simple, effiecent, scalable data store wrapper for LDMB", | ||
@@ -6,0 +6,0 @@ "license": "MIT", |
@@ -64,3 +64,3 @@ <a href="https://dev.doctorevidence.com/"><img src="./assets/powers-dre.png" width="203" /></a> | ||
The `lmdb-store` instance includes `on` and `once` methods for listening to database events. There are two events: | ||
The `lmdb-store` instance is an <a href="https://nodejs.org/dist/latest-v11.x/docs/api/events.html#events_class_eventemitter">EventEmitter</a>, allowing application to listen to database events. There are two events: | ||
@@ -67,0 +67,0 @@ `remap` - This event is fired before a database is resized, and the memory-map is remapped. If any data has been read using `get` with `noCopy` prior to this event, that data/buffer _must_ not be accessed after this event, or it will cause a segmentation fault and your program will exit (this is non-recoverable). |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
100962