Comparing version 3.0.12 to 3.0.13
399
caching.js
@@ -9,214 +9,223 @@ import { WeakLRUCache, clearKeptObjects } from './native.js'; | ||
export const CachingStore = (Store, env) => { | ||
let childTxnChanges | ||
let childTxnChanges; | ||
return class LMDBStore extends Store { | ||
constructor(dbName, options) { | ||
super(dbName, options); | ||
if (!env.cacheCommitter) { | ||
env.cacheCommitter = true; | ||
this.on('aftercommit', ({ next, last, txnId }) => { | ||
do { | ||
let meta = next.meta; | ||
let store = meta && meta.store; | ||
if (store) { | ||
if (next.flag & FAILED_CONDITION) | ||
store.cache.delete(meta.key); // just delete it from the map | ||
else { | ||
let expirationPriority = meta.valueSize >> 10; | ||
let cache = store.cache; | ||
let entry = mapGet.call(cache, meta.key); | ||
if (entry && !entry.txnId) { | ||
entry.txnId = txnId; | ||
cache.used(entry, expirationPriority + 4); // this will enter it into the LRFU (with a little lower priority than a read) | ||
constructor(dbName, options) { | ||
super(dbName, options); | ||
if (!env.cacheCommitter) { | ||
env.cacheCommitter = true; | ||
this.on('aftercommit', ({ next, last, txnId }) => { | ||
do { | ||
let meta = next.meta; | ||
let store = meta && meta.store; | ||
if (store) { | ||
if (next.flag & FAILED_CONDITION) | ||
store.cache.delete(meta.key); // just delete it from the map | ||
else { | ||
let expirationPriority = meta.valueSize >> 10; | ||
let cache = store.cache; | ||
let entry = mapGet.call(cache, meta.key); | ||
if (entry && !entry.txnId) { | ||
entry.txnId = txnId; | ||
cache.used(entry, expirationPriority + 4); // this will enter it into the LRFU (with a little lower priority than a read) | ||
} | ||
} | ||
} | ||
} | ||
} while (next != last && (next = next.next)) | ||
}); | ||
} | ||
this.db.cachingDb = this; | ||
if (options.cache.clearKeptInterval) | ||
options.cache.clearKeptObjects = clearKeptObjects; | ||
this.cache = new WeakLRUCache(options.cache); | ||
if (options.cache.validated) | ||
this.cache.validated = true; | ||
} | ||
get isCaching() { | ||
return true | ||
} | ||
get(id, options) { | ||
let value; | ||
if (this.cache.validated) { | ||
let entry = this.cache.get(id); | ||
if (entry) { | ||
let cachedValue = entry.value; | ||
if (entry.txnId != null) { | ||
value = super.get(id, { ifNotTxnId: entry.txnId, transaction: options && options.transaction }); | ||
if (value === UNMODIFIED) | ||
return cachedValue; | ||
} else // with no txn id we do not validate; this is the state of a cached value after a write before it transacts | ||
return cachedValue; | ||
} else | ||
value = super.get(id, options); | ||
} else if (options && options.transaction) { | ||
return super.get(id, options); | ||
} else { | ||
value = this.cache.getValue(id); | ||
if (value !== undefined) { | ||
return value; | ||
} while (next != last && (next = next.next)); | ||
}); | ||
} | ||
value = super.get(id); | ||
this.db.cachingDb = this; | ||
if (options.cache.clearKeptInterval) | ||
options.cache.clearKeptObjects = clearKeptObjects; | ||
this.cache = new WeakLRUCache(options.cache); | ||
if (options.cache.validated) this.cache.validated = true; | ||
} | ||
if (value && typeof value === 'object' && !options && typeof id !== 'object') { | ||
let entry = this.cache.setValue(id, value, this.lastSize >> 10); | ||
if (this.useVersions) { | ||
entry.version = getLastVersion(); | ||
get isCaching() { | ||
return true; | ||
} | ||
get(id, options) { | ||
let value; | ||
if (this.cache.validated) { | ||
let entry = this.cache.get(id); | ||
if (entry) { | ||
let cachedValue = entry.value; | ||
if (entry.txnId != null) { | ||
value = super.get(id, { | ||
ifNotTxnId: entry.txnId, | ||
transaction: options && options.transaction, | ||
}); | ||
if (value === UNMODIFIED) return cachedValue; | ||
} // with no txn id we do not validate; this is the state of a cached value after a write before it transacts | ||
else return cachedValue; | ||
} else value = super.get(id, options); | ||
} else if (options && options.transaction) { | ||
return super.get(id, options); | ||
} else { | ||
value = this.cache.getValue(id); | ||
if (value !== undefined) { | ||
return value; | ||
} | ||
value = super.get(id); | ||
} | ||
if (this.cache.validated) | ||
entry.txnId = getLastTxnId(); | ||
if ( | ||
value && | ||
typeof value === 'object' && | ||
!options && | ||
typeof id !== 'object' | ||
) { | ||
let entry = this.cache.setValue(id, value, this.lastSize >> 10); | ||
if (this.useVersions) { | ||
entry.version = getLastVersion(); | ||
} | ||
if (this.cache.validated) entry.txnId = getLastTxnId(); | ||
} | ||
return value; | ||
} | ||
return value; | ||
} | ||
getEntry(id, options) { | ||
let entry, value; | ||
if (this.cache.validated) { | ||
entry = this.cache.get(id); | ||
if (entry) { | ||
if (entry.txnId != null) { | ||
value = super.get(id, { ifNotTxnId: entry.txnId, transaction: options && options.transaction }); | ||
if (value === UNMODIFIED) | ||
return entry; | ||
} else // with no txn id we do not validate; this is the state of a cached value after a write before it transacts | ||
getEntry(id, options) { | ||
let entry, value; | ||
if (this.cache.validated) { | ||
entry = this.cache.get(id); | ||
if (entry) { | ||
if (entry.txnId != null) { | ||
value = super.get(id, { | ||
ifNotTxnId: entry.txnId, | ||
transaction: options && options.transaction, | ||
}); | ||
if (value === UNMODIFIED) return entry; | ||
} // with no txn id we do not validate; this is the state of a cached value after a write before it transacts | ||
else return entry; | ||
} else value = super.get(id, options); | ||
} else if (options && options.transaction) { | ||
return super.getEntry(id, options); | ||
} else { | ||
entry = this.cache.get(id); | ||
if (entry !== undefined) { | ||
return entry; | ||
} else | ||
value = super.get(id, options); | ||
} else if (options && options.transaction) { | ||
return super.getEntry(id, options); | ||
} else { | ||
entry = this.cache.get(id); | ||
if (entry !== undefined) { | ||
return entry; | ||
} | ||
value = super.get(id); | ||
} | ||
value = super.get(id); | ||
if (value === undefined) return; | ||
if (value && typeof value === 'object' && typeof id !== 'object') { | ||
entry = this.cache.setValue(id, value, this.lastSize >> 10); | ||
} else entry = { value }; | ||
if (this.useVersions) entry.version = getLastVersion(); | ||
if (this.cache.validated) entry.txnId = getLastTxnId(); | ||
return entry; | ||
} | ||
if (value === undefined) | ||
return; | ||
if (value && typeof value === 'object' && !options && typeof id !== 'object') { | ||
entry = this.cache.setValue(id, value, this.lastSize >> 10); | ||
} else | ||
entry = { value }; | ||
if (this.useVersions) | ||
entry.version = getLastVersion(); | ||
if (this.cache.validated) | ||
entry.txnId = getLastTxnId(); | ||
return entry; | ||
} | ||
putEntry(id, entry, ifVersion) { | ||
let result = super.put(id, entry.value, entry.version, ifVersion); | ||
if (typeof id === 'object') | ||
putEntry(id, entry, ifVersion) { | ||
let result = super.put(id, entry.value, entry.version, ifVersion); | ||
if (typeof id === 'object') return result; | ||
if (result && result.then) | ||
this.cache.setManually(id, entry); // set manually so we can keep it pinned in memory until it is committed | ||
// sync operation, immediately add to cache | ||
else this.cache.set(id, entry); | ||
} | ||
put(id, value, version, ifVersion) { | ||
let result = super.put(id, value, version, ifVersion); | ||
if (typeof id !== 'object') { | ||
if (value && value['\x10binary-data\x02']) { | ||
// don't cache binary data, since it will be decoded on get | ||
this.cache.delete(id); | ||
return result; | ||
} | ||
let entry; | ||
if (this.cachePuts === false) { | ||
// we are not caching puts, clear the entry at least | ||
this.cache.delete(id); | ||
} else { | ||
if (result?.isSync) { | ||
// sync operation, immediately add to cache | ||
if (result.result) | ||
// if it succeeds | ||
entry = this.cache.setValue(id, value, 0); | ||
else { | ||
this.cache.delete(id); | ||
return result; | ||
} // sync failure | ||
// otherwise keep it pinned in memory until it is committed | ||
} else entry = this.cache.setValue(id, value, -1); | ||
} | ||
if (childTxnChanges) childTxnChanges.add(id); | ||
if (version !== undefined && entry) | ||
entry.version = | ||
typeof version === 'object' ? version.version : version; | ||
} | ||
return result; | ||
if (result && result.then) | ||
this.cache.setManually(id, entry); // set manually so we can keep it pinned in memory until it is committed | ||
else // sync operation, immediately add to cache | ||
this.cache.set(id, entry); | ||
} | ||
put(id, value, version, ifVersion) { | ||
let result = super.put(id, value, version, ifVersion); | ||
if (typeof id !== 'object') { | ||
if (value && value['\x10binary-data\x02']) { | ||
// don't cache binary data, since it will be decoded on get | ||
this.cache.delete(id); | ||
return result; | ||
} | ||
putSync(id, value, version, ifVersion) { | ||
let result = super.putSync(id, value, version, ifVersion); | ||
if (id !== 'object') { | ||
// sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed | ||
if ( | ||
value && | ||
this.cachePuts !== false && | ||
typeof value === 'object' && | ||
result | ||
) { | ||
let entry = this.cache.setValue(id, value); | ||
if (childTxnChanges) childTxnChanges.add(id); | ||
if (version !== undefined) { | ||
entry.version = | ||
typeof version === 'object' ? version.version : version; | ||
} | ||
} // it is possible that a value used to exist here | ||
else this.cache.delete(id); | ||
} | ||
let entry; | ||
if (this.cachePuts === false) { // we are not caching puts, clear the entry at least | ||
this.cache.delete(id); | ||
} else { | ||
if (result?.isSync) { | ||
// sync operation, immediately add to cache | ||
if (result.result) // if it succeeds | ||
entry = this.cache.setValue(id, value, 0); | ||
else { | ||
this.cache.delete(id); | ||
return result; | ||
} // sync failure | ||
// otherwise keep it pinned in memory until it is committed | ||
} else entry = this.cache.setValue(id, value, -1); | ||
} | ||
if (childTxnChanges) | ||
childTxnChanges.add(id); | ||
if (version !== undefined && entry) | ||
entry.version = typeof version === 'object' ? version.version : version; | ||
return result; | ||
} | ||
return result; | ||
} | ||
putSync(id, value, version, ifVersion) { | ||
let result = super.putSync(id, value, version, ifVersion); | ||
if (id !== 'object') { | ||
// sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed | ||
if (value && this.cachePuts !== false && typeof value === 'object' && result) { | ||
let entry = this.cache.setValue(id, value); | ||
if (childTxnChanges) | ||
childTxnChanges.add(id); | ||
if (version !== undefined) { | ||
entry.version = typeof version === 'object' ? version.version : version; | ||
remove(id, ifVersion) { | ||
this.cache.delete(id); | ||
return super.remove(id, ifVersion); | ||
} | ||
removeSync(id, ifVersion) { | ||
this.cache.delete(id); | ||
return super.removeSync(id, ifVersion); | ||
} | ||
clearAsync(callback) { | ||
this.cache.clear(); | ||
return super.clearAsync(callback); | ||
} | ||
clearSync() { | ||
this.cache.clear(); | ||
super.clearSync(); | ||
} | ||
childTransaction(callback) { | ||
return super.childTransaction(() => { | ||
let cache = this.cache; | ||
let previousChanges = childTxnChanges; | ||
try { | ||
childTxnChanges = new Set(); | ||
return when( | ||
callback(), | ||
(result) => { | ||
if (result === ABORT) return abort(); | ||
childTxnChanges = previousChanges; | ||
return result; | ||
}, | ||
abort, | ||
); | ||
} catch (error) { | ||
abort(error); | ||
} | ||
} else // it is possible that a value used to exist here | ||
this.cache.delete(id); | ||
function abort(error) { | ||
// if the transaction was aborted, remove all affected entries from cache | ||
for (let id of childTxnChanges) cache.delete(id); | ||
childTxnChanges = previousChanges; | ||
if (error) throw error; | ||
else return ABORT; | ||
} | ||
}); | ||
} | ||
return result; | ||
} | ||
remove(id, ifVersion) { | ||
this.cache.delete(id); | ||
return super.remove(id, ifVersion); | ||
} | ||
removeSync(id, ifVersion) { | ||
this.cache.delete(id); | ||
return super.removeSync(id, ifVersion); | ||
} | ||
clearAsync(callback) { | ||
this.cache.clear(); | ||
return super.clearAsync(callback); | ||
} | ||
clearSync() { | ||
this.cache.clear(); | ||
super.clearSync(); | ||
} | ||
childTransaction(callback) { | ||
return super.childTransaction(() => { | ||
let cache = this.cache; | ||
let previousChanges = childTxnChanges; | ||
try { | ||
childTxnChanges = new Set(); | ||
return when(callback(), (result) => { | ||
if (result === ABORT) | ||
return abort(); | ||
childTxnChanges = previousChanges; | ||
return result; | ||
}, abort); | ||
} catch(error) { | ||
abort(error); | ||
doesExist(key, versionOrValue) { | ||
let entry = this.cache.get(key); | ||
if (entry) { | ||
if (versionOrValue == null) { | ||
return versionOrValue !== null; | ||
} else if (this.useVersions) { | ||
return ( | ||
versionOrValue === IF_EXISTS || entry.version === versionOrValue | ||
); | ||
} | ||
} | ||
function abort(error) { | ||
// if the transaction was aborted, remove all affected entries from cache | ||
for (let id of childTxnChanges) | ||
cache.delete(id); | ||
childTxnChanges = previousChanges; | ||
if (error) | ||
throw error; | ||
else | ||
return ABORT; | ||
} | ||
}); | ||
} | ||
doesExist(key, versionOrValue) { | ||
let entry = this.cache.get(key); | ||
if (entry) { | ||
if (versionOrValue == null) { | ||
return versionOrValue !== null; | ||
} else if (this.useVersions) { | ||
return versionOrValue === IF_EXISTS || entry.version === versionOrValue; | ||
} | ||
return super.doesExist(key, versionOrValue); | ||
} | ||
return super.doesExist(key, versionOrValue); | ||
} | ||
}; | ||
@@ -223,0 +232,0 @@ }; |
{ | ||
"name": "lmdb", | ||
"author": "Kris Zyp", | ||
"version": "3.0.12", | ||
"version": "3.0.13", | ||
"description": "Simple, efficient, scalable, high-performance LMDB interface", | ||
@@ -114,9 +114,9 @@ "license": "MIT", | ||
"optionalDependencies": { | ||
"@lmdb/lmdb-darwin-arm64": "3.0.12", | ||
"@lmdb/lmdb-darwin-x64": "3.0.12", | ||
"@lmdb/lmdb-linux-arm": "3.0.12", | ||
"@lmdb/lmdb-linux-arm64": "3.0.12", | ||
"@lmdb/lmdb-linux-x64": "3.0.12", | ||
"@lmdb/lmdb-win32-x64": "3.0.12" | ||
"@lmdb/lmdb-darwin-arm64": "3.0.13", | ||
"@lmdb/lmdb-darwin-x64": "3.0.13", | ||
"@lmdb/lmdb-linux-arm": "3.0.13", | ||
"@lmdb/lmdb-linux-arm64": "3.0.13", | ||
"@lmdb/lmdb-linux-x64": "3.0.13", | ||
"@lmdb/lmdb-win32-x64": "3.0.13" | ||
} | ||
} |
12
read.js
@@ -712,2 +712,3 @@ import { RangeIterable } from './util/RangeIterable.js'; | ||
if (cursorRenewId && (cursorRenewId != renewId || txn.isDone)) { | ||
if (flags & 0x10000) flags = flags & ~0x10000; // turn off exclusive start when repositioning | ||
resetCursor(); | ||
@@ -944,4 +945,7 @@ keySize = position(0); | ||
// if it is root, we need to abort and/or wait for transactions to finish | ||
if (readTxn) readTxn.abort(); | ||
else readTxn = {}; | ||
if (readTxn) { | ||
try { | ||
readTxn.abort(); | ||
} catch (error) {} | ||
} else readTxn = {}; | ||
readTxn.isDone = true; | ||
@@ -971,3 +975,5 @@ Object.defineProperty(readTxn, 'renew', { | ||
env.address = 0; | ||
env.close(); | ||
try { | ||
env.close(); | ||
} catch (error) {} | ||
} else this.db.close(); | ||
@@ -974,0 +980,0 @@ this.status = 'closed'; |
@@ -450,3 +450,10 @@ [![license](https://img.shields.io/badge/license-MIT-brightgreen)](LICENSE) | ||
* `compression` - This enables compression. This can be set a truthy value to enable compression with default settings, or it can be an object with compression settings. | ||
* `cache` - Setting this to true enables caching. This can also be set to an object specifying the settings/options for the cache (see [settings for weak-lru-cache](https://github.com/kriszyp/weak-lru-cache#weaklrucacheoptions-constructor)). For long-running synchronous operations, it is recommended that you set the `clearKeptInterval` (a value of 100 is a good choice). | ||
* `cache` - Setting this to true enables caching. This can also be set to an object specifying the settings/options for the cache (see [settings for weak-lru-cache](https://github.com/kriszyp/weak-lru-cache#weaklrucacheoptions-constructor)). For long-running synchronous operations, it is recommended that you set the `clearKeptInterval` (a value of 100 is a good choice). The object cache is stored separately for each process/worker, so if you are running across multiple workers or processes, you will either need to use messaging to invalidate cached entries when they are updated on other threads, or alternately, you can configure the cache to always check that the in-memory object matches the stored object with the flag `validated` flag set to `true`. For example, if you are using the cache with the multiple workers, the easiest way to ensure objects are always up-to-date is: | ||
```js | ||
open({ | ||
cache: { | ||
validated: true | ||
} | ||
}) | ||
``` | ||
* `useVersions` - Set this to true if you will be setting version numbers on the entries in the database. Note that you can not change this flag once a database has entries in it (or they won't be read correctly). | ||
@@ -453,0 +460,0 @@ * `keyEncoding` - This indicates the encoding to use for the database keys, and can be `'uint32'` for unsigned 32-bit integers, `'binary'` for raw buffers/Uint8Arrays, and the default `'ordered-binary'` allows any JS primitive as a keys. |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2514210
7691
596
51