Comparing version
@@ -179,3 +179,3 @@ 'use strict'; | ||
//suite.add('syncTxn', syncTxn); | ||
//suite.add('getRange', getRange); | ||
suite.add('getRange', getRange); | ||
suite.add('setData', { | ||
@@ -188,4 +188,4 @@ defer: true, | ||
fn: batchDataAdd | ||
}); | ||
suite.add('get', getData); | ||
});*/ | ||
suite.add('get', getData);/* | ||
suite.add('plainJSON', plainJSON); | ||
@@ -192,0 +192,0 @@ suite.add('getBinary', getBinary);*/ |
@@ -1,78 +0,78 @@ | ||
import { WeakLRUCache } from 'weak-lru-cache/index.js' | ||
let getLastVersion | ||
const mapGet = Map.prototype.get | ||
import { WeakLRUCache } from 'weak-lru-cache/index.js'; | ||
let getLastVersion; | ||
const mapGet = Map.prototype.get; | ||
export const CachingStore = Store => class extends Store { | ||
constructor(dbName, options) { | ||
super(dbName, options) | ||
super(dbName, options); | ||
if (!this.env.cacheCommitter) { | ||
this.env.cacheCommitter = true | ||
this.env.cacheCommitter = true; | ||
this.on('aftercommit', ({ next, last }) => { | ||
do { | ||
let store = next.store | ||
let store = next.store; | ||
if (store) { | ||
if (next.flag & 1) | ||
next.store.cache.delete(next.key) // just delete it from the map | ||
next.store.cache.delete(next.key); // just delete it from the map | ||
else { | ||
let expirationPriority = next.valueSize >> 10 | ||
let cache = next.store.cache | ||
let entry = mapGet.call(cache, next.key) | ||
let expirationPriority = next.valueSize >> 10; | ||
let cache = next.store.cache; | ||
let entry = mapGet.call(cache, next.key); | ||
if (entry) | ||
cache.used(entry, expirationPriority) // this will enter it into the LRFU | ||
cache.used(entry, expirationPriority); // this will enter it into the LRFU | ||
} | ||
} | ||
} while (next != last && (next = next.next)) | ||
}) | ||
}); | ||
} | ||
this.db.cachingDb = this | ||
this.cache = new WeakLRUCache(options.cache) | ||
this.db.cachingDb = this; | ||
this.cache = new WeakLRUCache(options.cache); | ||
} | ||
get(id, cacheMode) { | ||
let value = this.cache.getValue(id) | ||
let value = this.cache.getValue(id); | ||
if (value !== undefined) | ||
return value | ||
value = super.get(id) | ||
return value; | ||
value = super.get(id); | ||
if (value && typeof value === 'object' && !cacheMode && typeof id !== 'object') { | ||
let entry = this.cache.setValue(id, value, this.lastSize >> 10) | ||
let entry = this.cache.setValue(id, value, this.lastSize >> 10); | ||
if (this.useVersions) { | ||
entry.version = getLastVersion() | ||
entry.version = getLastVersion(); | ||
} | ||
} | ||
return value | ||
return value; | ||
} | ||
getEntry(id, cacheMode) { | ||
let entry = this.cache.get(id) | ||
let entry = this.cache.get(id); | ||
if (entry) | ||
return entry | ||
let value = super.get(id) | ||
return entry; | ||
let value = super.get(id); | ||
if (value === undefined) | ||
return | ||
return; | ||
if (value && typeof value === 'object' && !cacheMode && typeof id !== 'object') { | ||
entry = this.cache.setValue(id, value, this.lastSize >> 10) | ||
entry = this.cache.setValue(id, value, this.lastSize >> 10); | ||
} else { | ||
entry = { value } | ||
entry = { value }; | ||
} | ||
if (this.useVersions) { | ||
entry.version = getLastVersion() | ||
entry.version = getLastVersion(); | ||
} | ||
return entry | ||
return entry; | ||
} | ||
putEntry(id, entry, ifVersion) { | ||
let result = super.put(id, entry.value, entry.version, ifVersion) | ||
let result = super.put(id, entry.value, entry.version, ifVersion); | ||
if (typeof id === 'object') | ||
return result | ||
return result; | ||
if (result && result.then) | ||
this.cache.setManually(id, entry) // set manually so we can keep it pinned in memory until it is committed | ||
this.cache.setManually(id, entry); // set manually so we can keep it pinned in memory until it is committed | ||
else // sync operation, immediately add to cache | ||
this.cache.set(id, entry) | ||
this.cache.set(id, entry); | ||
} | ||
put(id, value, version, ifVersion) { | ||
// if (this.cache.get(id)) // if there is a cache entry, remove it from scheduledEntries and | ||
let result = super.put(id, value, version, ifVersion) | ||
let result = super.put(id, value, version, ifVersion); | ||
if (typeof id !== 'object') { | ||
// sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed | ||
let entry = this.cache.setValue(id, value, !result || result.isSync ? 0 : -1) | ||
let entry = this.cache.setValue(id, value, !result || result.isSync ? 0 : -1); | ||
if (version !== undefined) | ||
entry.version = typeof version === 'object' ? version.version : version | ||
entry.version = typeof version === 'object' ? version.version : version; | ||
} | ||
return result | ||
return result; | ||
} | ||
@@ -83,29 +83,29 @@ putSync(id, value, version, ifVersion) { | ||
if (value && typeof value === 'object') { | ||
let entry = this.cache.setValue(id, value) | ||
let entry = this.cache.setValue(id, value); | ||
if (version !== undefined) { | ||
entry.version = typeof version === 'object' ? version.version : version | ||
entry.version = typeof version === 'object' ? version.version : version; | ||
} | ||
} else // it is possible that a value used to exist here | ||
this.cache.delete(id) | ||
this.cache.delete(id); | ||
} | ||
return super.putSync(id, value, version, ifVersion) | ||
return super.putSync(id, value, version, ifVersion); | ||
} | ||
remove(id, ifVersion) { | ||
this.cache.delete(id) | ||
return super.remove(id, ifVersion) | ||
this.cache.delete(id); | ||
return super.remove(id, ifVersion); | ||
} | ||
removeSync(id, ifVersion) { | ||
this.cache.delete(id) | ||
return super.removeSync(id, ifVersion) | ||
this.cache.delete(id); | ||
return super.removeSync(id, ifVersion); | ||
} | ||
clear() { | ||
this.cache.clear() | ||
super.clear() | ||
this.cache.clear(); | ||
super.clear(); | ||
} | ||
childTransaction(execute) { | ||
throw new Error('Child transactions are not supported in caching stores') | ||
throw new Error('Child transactions are not supported in caching stores'); | ||
} | ||
} | ||
}; | ||
export function setGetLastVersion(get) { | ||
getLastVersion = get | ||
getLastVersion = get; | ||
} |
@@ -0,0 +0,0 @@ /* sample-bdb.txt - BerkeleyDB toy/sample |
@@ -0,0 +0,0 @@ /* sample-mdb.txt - MDB toy/sample |
@@ -0,0 +0,0 @@ LZ4 Windows binary package |
@@ -0,0 +0,0 @@ LZ4 - Library Files |
@@ -259,4 +259,3 @@ import { EventEmitter } from 'events' | ||
useVersions?: boolean | ||
keyIsBuffer?: boolean | ||
keyIsUint32?: boolean | ||
keyEncoding?: 'uint32' | 'binary' | 'ordered-binary' | ||
dupSort?: boolean | ||
@@ -263,0 +262,0 @@ strictAsyncOrder?: boolean |
529
index.js
@@ -1,48 +0,44 @@ | ||
import { extname, basename, dirname} from 'path' | ||
import EventEmitter from 'events' | ||
import { Env, Compression, getAddress, require, arch, fs } from './native.js' | ||
import { CachingStore, setGetLastVersion } from './caching.js' | ||
import { addQueryMethods } from './query.js' | ||
import { addWriteMethods } from './writer.js' | ||
import { applyKeyHandling } from './keys.js' | ||
import { Encoder as MsgpackrEncoder } from 'msgpackr' | ||
const binaryBuffer = Symbol('binaryBuffer') | ||
setGetLastVersion(getLastVersion) | ||
const Uint8ArraySlice = Uint8Array.prototype.slice | ||
let keyBytes, keyBytesView | ||
const buffers = [] | ||
import { extname, basename, dirname} from 'path'; | ||
import EventEmitter from 'events'; | ||
import { Env, Compression, getAddress, require, arch, fs } from './native.js'; | ||
import { CachingStore, setGetLastVersion } from './caching.js'; | ||
import { addQueryMethods, makeReusableBuffer } from './query.js'; | ||
import { addWriteMethods } from './writer.js'; | ||
import { applyKeyHandling } from './keys.js'; | ||
import { Encoder as MsgpackrEncoder } from 'msgpackr'; | ||
setGetLastVersion(getLastVersion); | ||
let keyBytes, keyBytesView; | ||
const buffers = []; | ||
const DEFAULT_SYNC_BATCH_THRESHOLD = 200000000 // 200MB | ||
const DEFAULT_IMMEDIATE_BATCH_THRESHOLD = 10000000 // 10MB | ||
const DEFAULT_COMMIT_DELAY = 0 | ||
const DEFAULT_SYNC_BATCH_THRESHOLD = 200000000; // 200MB | ||
const DEFAULT_IMMEDIATE_BATCH_THRESHOLD = 10000000; // 10MB | ||
const DEFAULT_COMMIT_DELAY = 0; | ||
const READING_TNX = { | ||
readOnly: true | ||
} | ||
}; | ||
export const allDbs = new Map() | ||
let env | ||
let defaultCompression | ||
let lastSize, lastOffset, lastVersion | ||
const MDB_SET_KEY = 0, MDB_SET_RANGE = 1, MDB_GET_BOTH_RANGE = 2, MDB_GET_CURRENT = 3, MDB_FIRST = 4, MDB_LAST = 5, MDB_NEXT = 6, MDB_NEXT_NODUP = 7, MDB_NEXT_DUP = 8, MDB_PREV = 9, MDB_PREV_NODUP = 10, MDB_PREV_DUP = 11 | ||
let abortedNonChildTransactionWarn | ||
export const allDbs = new Map(); | ||
let env; | ||
let defaultCompression; | ||
let lastSize, lastOffset, lastVersion; | ||
let abortedNonChildTransactionWarn; | ||
export function open(path, options) { | ||
if (!keyBytes) | ||
allocateFixedBuffer() | ||
let env = new Env() | ||
let committingWrites | ||
let scheduledTransactions | ||
let scheduledOperations | ||
let asyncTransactionAfter = true, asyncTransactionStrictOrder | ||
let transactionWarned | ||
let readTxn, writeTxn, pendingBatch, currentCommit, runNextBatch, readTxnRenewed | ||
allocateFixedBuffer(); | ||
let env = new Env(); | ||
let committingWrites; | ||
let scheduledTransactions; | ||
let scheduledOperations; | ||
let asyncTransactionAfter = true, asyncTransactionStrictOrder; | ||
let transactionWarned; | ||
if (typeof path == 'object' && !options) { | ||
options = path | ||
path = options.path | ||
options = path; | ||
path = options.path; | ||
} | ||
let extension = extname(path) | ||
let name = basename(path, extension) | ||
let is32Bit = arch().endsWith('32') | ||
let extension = extname(path); | ||
let name = basename(path, extension); | ||
let is32Bit = arch().endsWith('32'); | ||
let remapChunks = (options && options.remapChunks) || ((options && options.mapSize) ? | ||
(is32Bit && options.mapSize > 0x100000000) : // larger than fits in address space, must use dynamic maps | ||
is32Bit) // without a known map size, we default to being able to handle large data correctly/well*/ | ||
is32Bit); // without a known map size, we default to being able to handle large data correctly/well*/ | ||
options = Object.assign({ | ||
@@ -60,23 +56,27 @@ path, | ||
0x20000, // Otherwise we start small with 128KB | ||
}, options) | ||
}, options); | ||
if (options.asyncTransactionOrder == 'before') { | ||
console.warn('asyncTransactionOrder: "before" is deprecated') | ||
asyncTransactionAfter = false | ||
console.warn('asyncTransactionOrder: "before" is deprecated'); | ||
asyncTransactionAfter = false; | ||
} else if (options.asyncTransactionOrder == 'strict') { | ||
asyncTransactionStrictOrder = true | ||
asyncTransactionAfter = false | ||
asyncTransactionStrictOrder = true; | ||
asyncTransactionAfter = false; | ||
} | ||
if (!fs.existsSync(options.noSubdir ? dirname(path) : path)) | ||
fs.mkdirSync(options.noSubdir ? dirname(path) : path, { recursive: true }) | ||
fs.mkdirSync(options.noSubdir ? dirname(path) : path, { recursive: true }); | ||
if (options.compression) { | ||
let setDefault | ||
let setDefault; | ||
if (options.compression == true) { | ||
if (defaultCompression) | ||
options.compression = defaultCompression | ||
else | ||
defaultCompression = options.compression = new Compression({ | ||
options.compression = defaultCompression; | ||
else { | ||
let compressionOptions = { | ||
threshold: 1000, | ||
dictionary: fs.readFileSync(new URL('./dict/dict.txt', import.meta.url.replace(/dist[\\\/]index.cjs$/, ''))), | ||
}) | ||
defaultCompression.threshold = 1000 | ||
dictionary: fs.readFileSync(new URL('./dict/dict.txt', | ||
import.meta.url.replace(/dist[\\\/]index.cjs$/, ''))), | ||
getValueBytes: makeReusableBuffer(0), | ||
}; | ||
defaultCompression = options.compression = new Compression(compressionOptions); | ||
Object.assign(defaultCompression, compressionOptions); | ||
} | ||
} else { | ||
@@ -86,5 +86,6 @@ let compressionOptions = Object.assign({ | ||
dictionary: fs.readFileSync(new URL('./dict/dict.txt', import.meta.url.replace(/dist[\\\/]index.cjs$/, ''))), | ||
}, options.compression) | ||
options.compression = new Compression(compressionOptions) | ||
options.compression.threshold = compressionOptions.threshold | ||
getValueBytes: makeReusableBuffer(0), | ||
}, options.compression); | ||
options.compression = new Compression(compressionOptions); | ||
Object.assign(options.compression, compressionOptions); | ||
} | ||
@@ -94,34 +95,14 @@ } | ||
if (options && options.clearOnStart) { | ||
console.info('Removing', path) | ||
fs.removeSync(path) | ||
console.info('Removed', path) | ||
console.info('Removing', path); | ||
fs.removeSync(path); | ||
console.info('Removed', path); | ||
} | ||
env.open(options) | ||
env.readerCheck() // clear out any stale entries | ||
function renewReadTxn() { | ||
if (readTxn) | ||
readTxn.renew() | ||
else | ||
readTxn = env.beginTxn(0x20000) | ||
readTxnRenewed = setImmediate(resetReadTxn) | ||
return readTxn | ||
} | ||
function resetReadTxn() { | ||
if (readTxnRenewed) { | ||
LMDBStore.onReadReset() | ||
readTxnRenewed = null | ||
if (readTxn.cursorCount - (readTxn.renewingCursorCount || 0) > 0) { | ||
readTxn.onlyCursor = true | ||
readTxn = null | ||
} | ||
else | ||
readTxn.reset() | ||
} | ||
} | ||
let stores = [] | ||
env.open(options); | ||
env.readerCheck(); // clear out any stale entries | ||
let stores = []; | ||
class LMDBStore extends EventEmitter { | ||
constructor(dbName, dbOptions) { | ||
super() | ||
super(); | ||
if (dbName === undefined) | ||
throw new Error('Database name must be supplied in name property (may be null for root database)') | ||
throw new Error('Database name must be supplied in name property (may be null for root database)'); | ||
@@ -132,33 +113,32 @@ const openDB = () => { | ||
create: true, | ||
txn: env.writeTxn, | ||
}, dbOptions)) | ||
this.db.name = dbName || null | ||
} | ||
if (dbOptions.compression && !(dbOptions.compression instanceof Compression)) { | ||
if (dbOptions.compression == true && options.compression) | ||
dbOptions.compression = options.compression // use the parent compression if available | ||
else | ||
dbOptions.compression = new Compression(Object.assign({ | ||
threshold: 1000, | ||
dictionary: fs.readFileSync(require.resolve('./dict/dict.txt')), | ||
}), dbOptions.compression) | ||
} | ||
}, dbOptions)); | ||
this.db.name = dbName || null; | ||
}; | ||
if (dbOptions.compression instanceof Compression) { | ||
// do nothing, already compression object | ||
} else if (dbOptions.compression && typeof dbOptions.compression == 'object') | ||
dbOptions.compression = new Compression(Object.assign({ | ||
threshold: 1000, | ||
dictionary: fs.readFileSync(require.resolve('./dict/dict.txt')), | ||
}), dbOptions.compression); | ||
else if (options.compression && dbOptions.compression !== false) | ||
dbOptions.compression = options.compression; // use the parent compression if available | ||
if (dbOptions.dupSort && (dbOptions.useVersions || dbOptions.cache)) { | ||
throw new Error('The dupSort flag can not be combined with versions or caching') | ||
throw new Error('The dupSort flag can not be combined with versions or caching'); | ||
} | ||
openDB() | ||
resetReadTxn() // a read transaction becomes invalid after opening another db | ||
this.name = dbName | ||
this.status = 'open' | ||
this.env = env | ||
this.reads = 0 | ||
this.writes = 0 | ||
this.transactions = 0 | ||
this.averageTransactionTime = 5 | ||
openDB(); | ||
this.resetReadTxn(); // a read transaction becomes invalid after opening another db | ||
this.name = dbName; | ||
this.status = 'open'; | ||
this.env = env; | ||
this.reads = 0; | ||
this.writes = 0; | ||
this.transactions = 0; | ||
this.averageTransactionTime = 5; | ||
if (dbOptions.syncBatchThreshold) | ||
console.warn('syncBatchThreshold is no longer supported') | ||
console.warn('syncBatchThreshold is no longer supported'); | ||
if (dbOptions.immediateBatchThreshold) | ||
console.warn('immediateBatchThreshold is no longer supported') | ||
this.commitDelay = DEFAULT_COMMIT_DELAY | ||
console.warn('immediateBatchThreshold is no longer supported'); | ||
this.commitDelay = DEFAULT_COMMIT_DELAY; | ||
Object.assign(this, { // these are the options that are inherited | ||
@@ -168,3 +148,3 @@ path: options.path, | ||
strictAsyncOrder: options.strictAsyncOrder, | ||
}, dbOptions) | ||
}, dbOptions); | ||
if (!this.encoding || this.encoding == 'msgpack' || this.encoding == 'cbor') { | ||
@@ -175,212 +155,95 @@ this.encoder = this.decoder = new (this.encoding == 'cbor' ? require('cbor-x').Encoder : MsgpackrEncoder) | ||
copyBuffers: true, // need to copy any embedded buffers that are found since we use unsafe buffers | ||
}, options, dbOptions)) | ||
}, options, dbOptions)); | ||
} else if (this.encoding == 'json') { | ||
this.encoder = { | ||
encode: JSON.stringify, | ||
} | ||
}; | ||
} | ||
applyKeyHandling(this) | ||
allDbs.set(dbName ? name + '-' + dbName : name, this) | ||
stores.push(this) | ||
applyKeyHandling(this); | ||
allDbs.set(dbName ? name + '-' + dbName : name, this); | ||
stores.push(this); | ||
} | ||
openDB(dbName, dbOptions) { | ||
if (typeof dbName == 'object' && !dbOptions) { | ||
dbOptions = dbName | ||
dbName = options.name | ||
dbOptions = dbName; | ||
dbName = options.name; | ||
} else | ||
dbOptions = dbOptions || {} | ||
dbOptions = dbOptions || {}; | ||
try { | ||
return dbOptions.cache ? | ||
new (CachingStore(LMDBStore))(dbName, dbOptions) : | ||
new LMDBStore(dbName, dbOptions) | ||
new LMDBStore(dbName, dbOptions); | ||
} catch(error) { | ||
if (error.message.indexOf('MDB_DBS_FULL') > -1) { | ||
error.message += ' (increase your maxDbs option)' | ||
error.message += ' (increase your maxDbs option)'; | ||
} | ||
throw error | ||
throw error; | ||
} | ||
} | ||
open(dbOptions, callback) { | ||
let db = this.openDB(dbOptions) | ||
let db = this.openDB(dbOptions); | ||
if (callback) | ||
callback(null, db) | ||
return db | ||
callback(null, db); | ||
return db; | ||
} | ||
transactionAsync(callback, asChild) { | ||
let lastOperation | ||
let after, strictOrder | ||
let lastOperation; | ||
let after, strictOrder; | ||
if (scheduledOperations) { | ||
lastOperation = asyncTransactionAfter ? scheduledOperations.appendAsyncTxn : | ||
scheduledOperations[asyncTransactionStrictOrder ? scheduledOperations.length - 1 : 0] | ||
scheduledOperations[asyncTransactionStrictOrder ? scheduledOperations.length - 1 : 0]; | ||
} else { | ||
scheduledOperations = [] | ||
scheduledOperations.bytes = 0 | ||
scheduledOperations = []; | ||
scheduledOperations.bytes = 0; | ||
} | ||
let transactionSet | ||
let transactionSetIndex | ||
let transactionSet; | ||
let transactionSetIndex; | ||
if (lastOperation === true) { // continue last set of transactions | ||
transactionSetIndex = scheduledTransactions.length - 1 | ||
transactionSet = scheduledTransactions[transactionSetIndex] | ||
transactionSetIndex = scheduledTransactions.length - 1; | ||
transactionSet = scheduledTransactions[transactionSetIndex]; | ||
} else { | ||
// for now we signify transactions as a true | ||
if (asyncTransactionAfter) // by default we add a flag to put transactions after other operations | ||
scheduledOperations.appendAsyncTxn = true | ||
scheduledOperations.appendAsyncTxn = true; | ||
else if (asyncTransactionStrictOrder) | ||
scheduledOperations.push(true) | ||
scheduledOperations.push(true); | ||
else // in before mode, we put all the async transaction at the beginning | ||
scheduledOperations.unshift(true) | ||
scheduledOperations.unshift(true); | ||
if (!scheduledTransactions) { | ||
scheduledTransactions = [] | ||
scheduledTransactions = []; | ||
} | ||
transactionSetIndex = scheduledTransactions.push(transactionSet = []) - 1 | ||
transactionSetIndex = scheduledTransactions.push(transactionSet = []) - 1; | ||
} | ||
let index = (transactionSet.push(asChild ? | ||
{asChild, callback } : callback) - 1) << 1 | ||
{asChild, callback } : callback) - 1) << 1; | ||
return this.scheduleCommit().results.then((results) => { | ||
let transactionResults = results.transactionResults[transactionSetIndex] | ||
let error = transactionResults[index] | ||
let transactionResults = results.transactionResults[transactionSetIndex]; | ||
let error = transactionResults[index]; | ||
if (error) | ||
throw error | ||
return transactionResults[index + 1] | ||
}) | ||
throw error; | ||
return transactionResults[index + 1]; | ||
}); | ||
} | ||
getSharedBufferForGet(id) { | ||
let txn = (writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())) | ||
lastSize = this.keyIsCompatibility ? txn.getBinaryShared(id) : this.db.get(this.writeKey(id, keyBytes, 0)) | ||
if (lastSize === 0xffffffff) { // not found code | ||
return //undefined | ||
} | ||
return lastSize | ||
lastSize = keyBytesView.getUint32(0, true) | ||
let bufferIndex = keyBytesView.getUint32(12, true) | ||
lastOffset = keyBytesView.getUint32(8, true) | ||
let buffer = buffers[bufferIndex] | ||
let startOffset | ||
if (!buffer || lastOffset < (startOffset = buffer.startOffset) || (lastOffset + lastSize > startOffset + 0x100000000)) { | ||
if (buffer) | ||
env.detachBuffer(buffer.buffer) | ||
startOffset = (lastOffset >>> 16) * 0x10000 | ||
console.log('make buffer for address', bufferIndex * 0x100000000 + startOffset) | ||
buffer = buffers[bufferIndex] = Buffer.from(getBufferForAddress(bufferIndex * 0x100000000 + startOffset)) | ||
buffer.startOffset = startOffset | ||
} | ||
lastOffset -= startOffset | ||
return buffer | ||
return buffer.slice(lastOffset, lastOffset + lastSize)/*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + lastSize)*/ | ||
} | ||
getSizeBinaryFast(id) { | ||
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())) | ||
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0)) | ||
} | ||
getString(id) { | ||
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())) | ||
let string = this.db.getStringByBinary(this.writeKey(id, keyBytes, 0)) | ||
if (string) | ||
lastSize = string.length | ||
return string | ||
} | ||
getBinaryFast(id) { | ||
this.getSizeBinaryFast(id) | ||
return lastSize === 0xffffffff ? undefined : this.db.unsafeBuffer.subarray(0, lastSize) | ||
} | ||
getBinary(id) { | ||
this.getSizeBinaryFast(id) | ||
return lastSize === 0xffffffff ? undefined : Uint8ArraySlice.call(this.db.unsafeBuffer, 0, lastSize) | ||
} | ||
get(id) { | ||
if (this.decoder) { | ||
this.getSizeBinaryFast(id) | ||
return lastSize === 0xffffffff ? undefined : this.decoder.decode(this.db.unsafeBuffer, lastSize) | ||
} | ||
if (this.encoding == 'binary') | ||
return this.getBinary(id) | ||
let result = this.getString(id) | ||
if (result) { | ||
if (this.encoding == 'json') | ||
return JSON.parse(result) | ||
} | ||
return result | ||
} | ||
getEntry(id) { | ||
let value = this.get(id) | ||
if (value !== undefined) { | ||
if (this.useVersions) | ||
return { | ||
value, | ||
version: getLastVersion(), | ||
//size: lastSize | ||
} | ||
else | ||
return { | ||
value, | ||
//size: lastSize | ||
} | ||
} | ||
} | ||
resetReadTxn() { | ||
resetReadTxn() | ||
} | ||
doesExist(key, versionOrValue) { | ||
if (!env.writeTxn) | ||
readTxnRenewed ? readTxn : renewReadTxn() | ||
if (versionOrValue === undefined) { | ||
this.getSizeBinaryFast(key) | ||
return lastSize !== 0xffffffff | ||
} | ||
else if (this.useVersions) { | ||
this.getSizeBinaryFast(key) | ||
return lastSize !== 0xffffffff && matches(getLastVersion(), versionOrValue) | ||
} | ||
else { | ||
if (versionOrValue && versionOrValue[binaryBuffer]) | ||
versionOrValue = versionOrValue[binaryBuffer] | ||
else if (this.encoder) | ||
versionOrValue = this.encoder.encode(versionOrValue) | ||
if (typeof versionOrValue == 'string') | ||
versionOrValue = Buffer.from(versionOrValue) | ||
return this.getValuesCount(key, { start: versionOrValue, exactMatch: true}) > 0 | ||
} | ||
} | ||
backup(path) { | ||
return new Promise((resolve, reject) => env.copy(path, false, (error) => { | ||
if (error) { | ||
reject(error) | ||
reject(error); | ||
} else { | ||
resolve() | ||
resolve(); | ||
} | ||
})) | ||
})); | ||
} | ||
close(callback) { | ||
this.db.close() | ||
if (this.isRoot) { | ||
if (readTxn) { | ||
try { | ||
readTxn.abort() | ||
} catch(error) {} | ||
} | ||
readTxnRenewed = null | ||
env.close() | ||
} | ||
this.status = 'closed' | ||
if (callback) | ||
callback() | ||
} | ||
isOperational() { | ||
return this.status == 'open' | ||
return this.status == 'open'; | ||
} | ||
getStats() { | ||
return this.db.stat(readTxnRenewed ? readTxn : renewReadTxn()) | ||
} | ||
sync(callback) { | ||
return env.sync(callback || function(error) { | ||
if (error) { | ||
console.error(error) | ||
console.error(error); | ||
} | ||
}) | ||
}); | ||
} | ||
deleteDB() { | ||
console.warn('deleteDB() is deprecated, use drop or dropSync instead') | ||
return this.dropSync() | ||
console.warn('deleteDB() is deprecated, use drop or dropSync instead'); | ||
return this.dropSync(); | ||
} | ||
@@ -392,13 +255,13 @@ dropSync() { | ||
}), | ||
{ abortable: false }) | ||
{ abortable: false }); | ||
} | ||
clear(callback) { | ||
if (typeof callback == 'function') | ||
return this.clearAsync(callback) | ||
console.warn('clear() is deprecated, use clearAsync or clearSync instead') | ||
this.clearSync() | ||
return this.clearAsync(callback); | ||
console.warn('clear() is deprecated, use clearAsync or clearSync instead'); | ||
this.clearSync(); | ||
} | ||
clearSync() { | ||
if (this.encoder && this.encoder.structures) | ||
this.encoder.structures = [] | ||
this.encoder.structures = []; | ||
this.transactionSync(() => | ||
@@ -408,42 +271,41 @@ this.db.drop({ | ||
}), | ||
{ abortable: false }) | ||
{ abortable: false }); | ||
} | ||
readerCheck() { | ||
return env.readerCheck() | ||
return env.readerCheck(); | ||
} | ||
readerList() { | ||
return env.readerList().join('') | ||
return env.readerList().join(''); | ||
} | ||
setupSharedStructures() { | ||
const getStructures = () => { | ||
let lastVersion // because we are doing a read here, we may need to save and restore the lastVersion from the last read | ||
let lastVersion; // because we are doing a read here, we may need to save and restore the lastVersion from the last read | ||
if (this.useVersions) | ||
lastVersion = getLastVersion() | ||
let buffer = this.getBinary(this.sharedStructuresKey) | ||
lastVersion = getLastVersion(); | ||
let buffer = this.getBinary(this.sharedStructuresKey); | ||
if (this.useVersions) | ||
setLastVersion(lastVersion) | ||
return buffer ? this.encoder.decode(buffer) : [] | ||
} | ||
setLastVersion(lastVersion); | ||
return buffer ? this.encoder.decode(buffer) : []; | ||
}; | ||
return { | ||
saveStructures: (structures, previousLength) => { | ||
return this.transactionSyncStart(() => { | ||
let existingStructuresBuffer = this.getBinary(this.sharedStructuresKey) | ||
let existingStructures = existingStructuresBuffer ? this.encoder.decode(existingStructuresBuffer) : [] | ||
let existingStructuresBuffer = this.getBinary(this.sharedStructuresKey); | ||
let existingStructures = existingStructuresBuffer ? this.encoder.decode(existingStructuresBuffer) : []; | ||
if (existingStructures.length != previousLength) | ||
return false // it changed, we need to indicate that we couldn't update | ||
this.put(this.sharedStructuresKey, structures) | ||
}) | ||
return false; // it changed, we need to indicate that we couldn't update | ||
this.put(this.sharedStructuresKey, structures); | ||
}); | ||
}, | ||
getStructures, | ||
copyBuffers: true, // need to copy any embedded buffers that are found since we use unsafe buffers | ||
} | ||
}; | ||
} | ||
} | ||
// if caching class overrides putSync, don't want to double call the caching code | ||
const putSync = LMDBStore.prototype.putSync | ||
const removeSync = LMDBStore.prototype.removeSync | ||
addQueryMethods(LMDBStore, { env, getReadTxn() { | ||
return readTxnRenewed ? readTxn : renewReadTxn() | ||
}, saveKey, keyBytes, keyBytesView, getLastVersion }) | ||
addWriteMethods(LMDBStore, { env, fixedBuffer: keyBytes, resetReadTxn, binaryBuffer, ...options }) | ||
const putSync = LMDBStore.prototype.putSync; | ||
const removeSync = LMDBStore.prototype.removeSync; | ||
addQueryMethods(LMDBStore, { env, saveKey, keyBytes, keyBytesView, getLastVersion }); | ||
addWriteMethods(LMDBStore, { env, fixedBuffer: keyBytes, | ||
resetReadTxn: LMDBStore.prototype.resetReadTxn, ...options }); | ||
LMDBStore.prototype.supports = { | ||
@@ -458,78 +320,47 @@ permanence: true, | ||
openCallback: true, | ||
} | ||
}; | ||
return options.cache ? | ||
new (CachingStore(LMDBStore))(options.name || null, options) : | ||
new LMDBStore(options.name || null, options) | ||
new LMDBStore(options.name || null, options); | ||
} | ||
function matches(previousVersion, ifVersion){ | ||
let matches | ||
if (previousVersion) { | ||
if (ifVersion) { | ||
matches = previousVersion == ifVersion | ||
} else { | ||
matches = false | ||
} | ||
} else { | ||
matches = !ifVersion | ||
} | ||
return matches | ||
} | ||
class Entry { | ||
constructor(value, version, db) { | ||
this.value = value | ||
this.version = version | ||
this.db = db | ||
} | ||
ifSamePut() { | ||
} | ||
ifSameRemove() { | ||
} | ||
} | ||
export function getLastEntrySize() { | ||
return lastSize | ||
return lastSize; | ||
} | ||
export function getLastVersion() { | ||
return keyBytesView.getFloat64(16, true) | ||
return keyBytesView.getFloat64(16, true); | ||
} | ||
export function setLastVersion(version) { | ||
return keyBytesView.setFloat64(16, version, true) | ||
return keyBytesView.setFloat64(16, version, true); | ||
} | ||
export function asBinary(buffer) { | ||
return { | ||
[binaryBuffer]: buffer | ||
} | ||
} | ||
let saveBuffer, saveDataView, saveDataAddress | ||
let savePosition = 8000 | ||
let saveBuffer, saveDataView, saveDataAddress; | ||
let savePosition = 8000; | ||
function allocateSaveBuffer() { | ||
saveBuffer = Buffer.alloc(8192) | ||
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength) | ||
saveBuffer.buffer.address = getAddress(saveBuffer.buffer) | ||
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset | ||
savePosition = 0 | ||
saveBuffer = Buffer.alloc(8192); | ||
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength); | ||
saveBuffer.buffer.address = getAddress(saveBuffer.buffer); | ||
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset; | ||
savePosition = 0; | ||
} | ||
function allocateFixedBuffer() { | ||
keyBytes = Buffer.allocUnsafeSlow(2048) | ||
const keyBuffer = keyBytes.buffer | ||
keyBytesView = keyBytes.dataView = new DataView(keyBytes.buffer, 0, 2048) // max key size is actually 1978 | ||
keyBytes.uint32 = new Uint32Array(keyBuffer, 0, 512) | ||
keyBytes.float64 = new Float64Array(keyBuffer, 0, 256) | ||
keyBytes.uint32.address = keyBytes.address = keyBuffer.address = getAddress(keyBuffer) | ||
keyBytes = Buffer.allocUnsafeSlow(2048); | ||
const keyBuffer = keyBytes.buffer; | ||
keyBytesView = keyBytes.dataView = new DataView(keyBytes.buffer, 0, 2048); // max key size is actually 1978 | ||
keyBytes.uint32 = new Uint32Array(keyBuffer, 0, 512); | ||
keyBytes.float64 = new Float64Array(keyBuffer, 0, 256); | ||
keyBytes.uint32.address = keyBytes.address = keyBuffer.address = getAddress(keyBuffer); | ||
} | ||
function saveKey(key, writeKey, saveTo) { | ||
if (savePosition > 6200) { | ||
allocateSaveBuffer() | ||
allocateSaveBuffer(); | ||
} | ||
let start = savePosition | ||
savePosition = writeKey(key, saveBuffer, start + 4) | ||
saveDataView.setUint32(start, savePosition - start - 4, true) | ||
saveTo.saveBuffer = saveBuffer | ||
savePosition = (savePosition + 7) & 0xfffff8 | ||
return start + saveDataAddress | ||
let start = savePosition; | ||
savePosition = writeKey(key, saveBuffer, start + 4); | ||
saveDataView.setUint32(start, savePosition - start - 4, true); | ||
saveTo.saveBuffer = saveBuffer; | ||
savePosition = (savePosition + 7) & 0xfffff8; | ||
return start + saveDataAddress; | ||
} |
96
keys.js
@@ -1,21 +0,21 @@ | ||
import { getAddress } from './native.js' | ||
import { writeKey, readKey, enableNullTermination } from 'ordered-binary/index.js' | ||
enableNullTermination() | ||
import { getAddress } from './native.js'; | ||
import { writeKey, readKey, enableNullTermination } from 'ordered-binary/index.js'; | ||
enableNullTermination(); | ||
const writeUint32Key = (key, target, start) => { | ||
(target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).setUint32(start, key, true) | ||
return start + 4 | ||
} | ||
(target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).setUint32(start, key, true); | ||
return start + 4; | ||
}; | ||
const readUint32Key = (target, start) => { | ||
return (target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).getUint32(start, true) | ||
} | ||
return (target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).getUint32(start, true); | ||
}; | ||
const writeBufferKey = (key, target, start) => { | ||
if (key.length > 1978) | ||
throw new Error('Key buffer is too long') | ||
target.set(key, start) | ||
return key.length + start | ||
} | ||
throw new Error('Key buffer is too long'); | ||
target.set(key, start); | ||
return key.length + start; | ||
}; | ||
const readBufferKey = (target, start, end) => { | ||
return Uint8ArraySlice.call(target, start, end) | ||
} | ||
return Uint8ArraySlice.call(target, start, end); | ||
}; | ||
@@ -27,3 +27,3 @@ export function applyKeyHandling(store) { | ||
readKey, | ||
} | ||
}; | ||
} | ||
@@ -33,36 +33,36 @@ if (store.encoder && store.encoder.writeKey && !store.encoder.encode) { | ||
if (savePosition > 6200) | ||
allocateSaveBuffer() | ||
let start = savePosition | ||
savePosition = writeKey(value, saveBuffer, start) | ||
saveBuffer.start = start | ||
saveBuffer.end = savePosition | ||
savePosition = (savePosition + 7) & 0xfffff8 | ||
return saveBuffer | ||
} | ||
allocateSaveBuffer(); | ||
let start = savePosition; | ||
savePosition = writeKey(value, saveBuffer, start); | ||
saveBuffer.start = start; | ||
saveBuffer.end = savePosition; | ||
savePosition = (savePosition + 7) & 0xfffff8; | ||
return saveBuffer; | ||
}; | ||
} | ||
if (store.decoder && store.decoder.readKey && !store.decoder.decode) | ||
store.decoder.decode = function(buffer, end) { return this.readKey(buffer, 0, end) } | ||
if (store.keyIsUint32) { | ||
store.writeKey = writeUint32Key | ||
store.readKey = readUint32Key | ||
} else if (store.keyIsBuffer) { | ||
store.writeKey = writeBufferKey | ||
store.readKey = readBufferKey | ||
store.decoder.decode = function(buffer) { return this.readKey(buffer, 0, buffer.length); }; | ||
if (store.keyIsUint32 || store.keyEncoding == 'uint32') { | ||
store.writeKey = writeUint32Key; | ||
store.readKey = readUint32Key; | ||
} else if (store.keyIsBuffer || store.keyEncoding == 'binary') { | ||
store.writeKey = writeBufferKey; | ||
store.readKey = readBufferKey; | ||
} else if (store.keyEncoder) { | ||
store.writeKey = store.keyEncoder.writeKey | ||
store.readKey = store.keyEncoder.readKey | ||
store.writeKey = store.keyEncoder.writeKey; | ||
store.readKey = store.keyEncoder.readKey; | ||
} else { | ||
store.writeKey = writeKey | ||
store.readKey = readKey | ||
store.writeKey = writeKey; | ||
store.readKey = readKey; | ||
} | ||
} | ||
let saveBuffer, saveDataView, saveDataAddress | ||
let savePosition = 8000 | ||
let saveBuffer, saveDataView, saveDataAddress; | ||
let savePosition = 8000; | ||
function allocateSaveBuffer() { | ||
saveBuffer = Buffer.alloc(8192) | ||
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength) | ||
saveBuffer.buffer.address = getAddress(saveBuffer.buffer) | ||
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset | ||
savePosition = 0 | ||
saveBuffer = Buffer.alloc(8192); | ||
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength); | ||
saveBuffer.buffer.address = getAddress(saveBuffer.buffer); | ||
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset; | ||
savePosition = 0; | ||
@@ -72,10 +72,10 @@ } | ||
if (savePosition > 6200) { | ||
allocateSaveBuffer() | ||
allocateSaveBuffer(); | ||
} | ||
let start = savePosition | ||
savePosition = writeKey(key, saveBuffer, start + 4) | ||
saveDataView.setUint32(start, savePosition - start - 4, true) | ||
saveTo.saveBuffer = saveBuffer | ||
savePosition = (savePosition + 7) & 0xfffff8 | ||
return start + saveDataAddress | ||
let start = savePosition; | ||
savePosition = writeKey(key, saveBuffer, start + 4); | ||
saveDataView.setUint32(start, savePosition - start - 4, true); | ||
saveTo.saveBuffer = saveBuffer; | ||
savePosition = (savePosition + 7) & 0xfffff8; | ||
return start + saveDataAddress; | ||
} |
20
level.js
export function levelup(store) { | ||
return Object.assign(Object.create(store), { | ||
get(key, options, callback) { | ||
let result = store.get(key) | ||
let result = store.get(key); | ||
if (typeof options == 'function') | ||
callback = options | ||
callback = options; | ||
if (callback) { | ||
if (result === undefined) | ||
callback(new NotFoundError()) | ||
callback(new NotFoundError()); | ||
else | ||
callback(null, result) | ||
callback(null, result); | ||
} else { | ||
if (result === undefined) | ||
return Promise.reject(new NotFoundError()) | ||
return Promise.reject(new NotFoundError()); | ||
else | ||
return Promise.resolve(result) | ||
return Promise.resolve(result); | ||
} | ||
}, | ||
}) | ||
}); | ||
} | ||
class NotFoundError extends Error { | ||
constructor(message) { | ||
super(message) | ||
this.name = 'NotFoundError' | ||
this.notFound = true | ||
super(message); | ||
this.name = 'NotFoundError'; | ||
this.notFound = true; | ||
} | ||
} |
@@ -1,11 +0,12 @@ | ||
export let Env, Compression, Cursor, getAddress, getAddressShared, require, arch, fs | ||
export let Env, Compression, Cursor, getAddress, getAddressShared, setGlobalBuffer, require, arch, fs; | ||
export function setNativeFunctions(nativeInterface) { | ||
Env = nativeInterface.Env | ||
Compression = nativeInterface.Compression | ||
getAddress = nativeInterface.getAddress | ||
getAddressShared = nativeInterface.getAddressShared | ||
Cursor = nativeInterface.Cursor | ||
require = nativeInterface.require | ||
arch = nativeInterface.arch | ||
fs = nativeInterface.fs | ||
Env = nativeInterface.Env; | ||
Compression = nativeInterface.Compression; | ||
getAddress = nativeInterface.getAddress; | ||
getAddressShared = nativeInterface.getAddressShared; | ||
setGlobalBuffer = nativeInterface.setGlobalBuffer; | ||
Cursor = nativeInterface.Cursor; | ||
require = nativeInterface.require; | ||
arch = nativeInterface.arch; | ||
fs = nativeInterface.fs; | ||
} |
import { createRequire } from 'module'; | ||
const require = createRequire(import.meta.url) | ||
import { fileURLToPath } from 'url' | ||
import { dirname } from 'path' | ||
import { setNativeFunctions } from './native.js' | ||
import fs from 'fs' | ||
import { arch } from 'os' | ||
let nativeFunctions, dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, '') | ||
const require = createRequire(import.meta.url); | ||
import { fileURLToPath } from 'url'; | ||
import { dirname } from 'path'; | ||
import { setNativeFunctions } from './native.js'; | ||
import fs from 'fs'; | ||
import { arch } from 'os'; | ||
let nativeFunctions, dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, ''); | ||
try { | ||
console.log(dirName) | ||
nativeFunctions = require('node-gyp-build')(dirName) | ||
console.log(dirName); | ||
nativeFunctions = require('node-gyp-build')(dirName); | ||
if (process.versions.modules == 93) | ||
require('v8').setFlagsFromString('--turbo-fast-api-calls') | ||
require('v8').setFlagsFromString('--turbo-fast-api-calls'); | ||
} catch(error) { | ||
if (process.versions.modules == 93) { | ||
// use this abi version as the backup version without turbo-fast-api-calls enabled | ||
Object.defineProperty(process.versions, 'modules', { value: '92' }) | ||
Object.defineProperty(process.versions, 'modules', { value: '92' }); | ||
try { | ||
nativeFunctions = require('node-gyp-build')(dirName) | ||
nativeFunctions = require('node-gyp-build')(dirName); | ||
} catch(secondError) { | ||
throw error | ||
throw error; | ||
} finally { | ||
Object.defineProperty(process.versions, 'modules', { value: '93' }) | ||
Object.defineProperty(process.versions, 'modules', { value: '93' }); | ||
} | ||
} else | ||
throw error | ||
throw error; | ||
} | ||
nativeFunctions.require = require | ||
nativeFunctions.arch = arch | ||
nativeFunctions.fs = fs | ||
setNativeFunctions(nativeFunctions) | ||
export { toBufferKey as keyValueToBuffer, compareKeys, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js' | ||
export { ABORT } from './writer.js' | ||
export { levelup } from './level.js' | ||
export { open, asBinary, getLastVersion, getLastEntrySize, setLastVersion, allDbs } from './index.js' | ||
import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js' | ||
import { open, getLastVersion } from './index.js' | ||
nativeFunctions.require = require; | ||
nativeFunctions.arch = arch; | ||
nativeFunctions.fs = fs; | ||
setNativeFunctions(nativeFunctions); | ||
export { toBufferKey as keyValueToBuffer, compareKeys, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js'; | ||
export { ABORT, asBinary } from './writer.js'; | ||
export { levelup } from './level.js'; | ||
export { open, getLastVersion, getLastEntrySize, setLastVersion, allDbs } from './index.js'; | ||
import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js'; | ||
import { open, getLastVersion } from './index.js'; | ||
export default { | ||
open, getLastVersion, compareKey, keyValueToBuffer, bufferToKeyValue | ||
} | ||
}; |
{ | ||
"name": "lmdb", | ||
"author": "Kris Zyp", | ||
"version": "2.0.0-beta2-win-ia32", | ||
"version": "2.0.0-beta3", | ||
"description": "Simple, efficient, scalable data store wrapper for LMDB", | ||
@@ -15,3 +15,7 @@ "license": "MIT", | ||
"mdb", | ||
"lightning" | ||
"lightning", | ||
"key-value store", | ||
"storage", | ||
"adapter", | ||
"performance" | ||
], | ||
@@ -38,5 +42,6 @@ "type": "module", | ||
"prepare": "rollup -c", | ||
"before-publish": "rollup -c && prebuildify --target 14.17.0 --arch=ia32 && prebuildify --target 14.17.6 && prebuildify --target 12.17.0 --arch=ia32 && prebuildify --target 12.17.0 && prebuildify --target electron@15.2.0", | ||
"before-publish": "rollup -c && prebuildify-ci download && prebuildify --target 17.0.1 && prebuildify --target 16.9.0 && prebuildify --target 15.5.0 && prebuildify --target 14.17.6 && prebuildify --target 12.17.0 && prebuildify --target electron@15.2.0", | ||
"prebuild": "prebuildify --target 17.0.1 && prebuildify --target 16.9.0 && prebuildify --target 15.5.0 && prebuildify --target 14.17.6 && prebuildify --target 12.18.0 && prebuildify --target electron@15.2.0", | ||
"prebuild-arm64": "prebuildify --arch=arm64 --target 17.0.1 && prebuildify --arch=arm64 --target 16.9.0 && prebuildify --arch=arm64 --target 14.17.6 && prebuildify --arch=arm64 --target electron@15.2.0", | ||
"prebuild-musl": "prebuildify --target 17.0.1 --libc musl --tag-libc && prebuildify --target 16.9.0 --libc musl --tag-libc && prebuildify --target 14.17.6 --libc musl --tag-libc", | ||
"prebuild-arm64": "prebuildify --arch=arm64 --target 17.0.1 --libc musl && prebuildify --arch=arm64 --target 16.9.0 && prebuildify --arch=arm64 --target 14.17.6 && prebuildify --arch=arm64 --target electron@15.2.0", | ||
"recompile": "node-gyp clean && node-gyp configure && node-gyp build", | ||
@@ -66,3 +71,3 @@ "test": "mocha test/**.test.js --recursive && npm run test:types", | ||
"mocha": "^8.3.2", | ||
"prebuildify": "kriszyp/prebuildify#b78c5a9", | ||
"prebuildify": "^5.0.0", | ||
"prebuildify-ci": "^1.0.5", | ||
@@ -69,0 +74,0 @@ "rimraf": "^3.0.2", |
433
query.js
@@ -1,6 +0,10 @@ | ||
import { ArrayLikeIterable } from './util/ArrayLikeIterable.js' | ||
import { getAddress, Cursor } from './native.js' | ||
import { saveKey } from './keys.js' | ||
import { writeKey } from 'ordered-binary/index.js' | ||
const ITERATOR_DONE = { done: true, value: undefined } | ||
import { ArrayLikeIterable } from './util/ArrayLikeIterable.js'; | ||
import { getAddress, Cursor, setGlobalBuffer } from './native.js'; | ||
import { saveKey } from './keys.js'; | ||
import { writeKey } from 'ordered-binary/index.js'; | ||
import { binaryBuffer } from './writer.js'; | ||
const ITERATOR_DONE = { done: true, value: undefined }; | ||
const Uint8ArraySlice = Uint8Array.prototype.slice; | ||
let getValueBytes = makeReusableBuffer(0); | ||
let lastSize; | ||
@@ -10,6 +14,115 @@ export function addQueryMethods(LMDBStore, { | ||
}) { | ||
let renewId = 1 | ||
LMDBStore.onReadReset = () => renewId++ | ||
let get = LMDBStore.prototype.get | ||
let readTxn, readTxnRenewed; | ||
let renewId = 1; | ||
Object.assign(LMDBStore.prototype, { | ||
getSizeBinaryFast(id) { | ||
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())); | ||
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0)); | ||
}, | ||
getString(id) { | ||
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())); | ||
let string = this.db.getStringByBinary(this.writeKey(id, keyBytes, 0)); | ||
if (typeof string === 'number') { // indicates the buffer wasn't large enough | ||
this._allocateGetBuffer(string); | ||
// and then try again | ||
string = this.db.getStringByBinary(this.writeKey(id, keyBytes, 0)); | ||
} | ||
if (string) | ||
lastSize = string.length; | ||
return string; | ||
}, | ||
getBinaryFast(id) { | ||
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())); | ||
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0)); | ||
let compression = this.compression; | ||
let bytes = compression ? compression.getValueBytes : getValueBytes; | ||
if (lastSize > bytes.maxLength) { | ||
if (lastSize === 0xffffffff) | ||
return; | ||
bytes = this._allocateGetBuffer(lastSize); | ||
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0)); | ||
} | ||
bytes.length = lastSize; | ||
return bytes; | ||
}, | ||
_allocateGetBuffer(lastSize) { | ||
let newLength = Math.min(Math.max(lastSize * 2, 0x1000), 0xfffffff8); | ||
let bytes; | ||
if (this.compression) { | ||
let dictionary = this.compression.dictionary || []; | ||
let dictLength = (dictionary.length >> 3) << 3;// make sure it is word-aligned | ||
bytes = makeReusableBuffer(newLength + dictLength); | ||
bytes.set(dictionary) // copy dictionary into start | ||
this.compression.setBuffer(bytes, dictLength); | ||
// the section after the dictionary is the target area for get values | ||
bytes = bytes.subarray(dictLength); | ||
bytes.maxLength = newLength; | ||
Object.defineProperty(bytes, 'length', { value: newLength, writable: true, configurable: true }); | ||
this.compression.getValueBytes = bytes; | ||
} else { | ||
bytes = makeReusableBuffer(newLength); | ||
setGlobalBuffer(bytes); | ||
getValueBytes = bytes; | ||
} | ||
return bytes; | ||
}, | ||
getBinary(id) { | ||
let fastBuffer = this.getBinaryFast(id); | ||
return fastBuffer && Uint8ArraySlice.call(fastBuffer, 0, lastSize); | ||
}, | ||
get(id) { | ||
if (this.decoder) { | ||
let bytes = this.getBinaryFast(id); | ||
return bytes && this.decoder.decode(bytes); | ||
} | ||
if (this.encoding == 'binary') | ||
return this.getBinary(id); | ||
let result = this.getString(id); | ||
if (result) { | ||
if (this.encoding == 'json') | ||
return JSON.parse(result); | ||
} | ||
return result; | ||
}, | ||
getEntry(id) { | ||
let value = this.get(id); | ||
if (value !== undefined) { | ||
if (this.useVersions) | ||
return { | ||
value, | ||
version: getLastVersion(), | ||
//size: lastSize | ||
}; | ||
else | ||
return { | ||
value, | ||
//size: lastSize | ||
}; | ||
} | ||
}, | ||
resetReadTxn() { | ||
resetReadTxn(); | ||
}, | ||
doesExist(key, versionOrValue) { | ||
if (!env.writeTxn) | ||
readTxnRenewed ? readTxn : renewReadTxn(); | ||
if (versionOrValue === undefined) { | ||
this.getSizeBinaryFast(key); | ||
return lastSize !== 0xffffffff; | ||
} | ||
else if (this.useVersions) { | ||
this.getSizeBinaryFast(key); | ||
return lastSize !== 0xffffffff && matches(getLastVersion(), versionOrValue); | ||
} | ||
else { | ||
if (versionOrValue && versionOrValue[binaryBuffer]) | ||
versionOrValue = versionOrValue[binaryBuffer]; | ||
else if (this.encoder) | ||
versionOrValue = this.encoder.encode(versionOrValue); | ||
if (typeof versionOrValue == 'string') | ||
versionOrValue = Buffer.from(versionOrValue); | ||
return this.getValuesCount(key, { start: versionOrValue, exactMatch: true}) > 0; | ||
} | ||
}, | ||
getValues(key, options) { | ||
@@ -19,72 +132,73 @@ let defaultOptions = { | ||
valuesForKey: true | ||
} | ||
}; | ||
if (options && options.snapshot === false) | ||
throw new Error('Can not disable snapshots for getValues') | ||
return this.getRange(options ? Object.assign(defaultOptions, options) : defaultOptions) | ||
throw new Error('Can not disable snapshots for getValues'); | ||
return this.getRange(options ? Object.assign(defaultOptions, options) : defaultOptions); | ||
}, | ||
getKeys(options) { | ||
if (!options) | ||
options = {} | ||
options.values = false | ||
return this.getRange(options) | ||
options = {}; | ||
options.values = false; | ||
return this.getRange(options); | ||
}, | ||
getCount(options) { | ||
if (!options) | ||
options = {} | ||
options.onlyCount = true | ||
return this.getRange(options)[Symbol.iterator]() | ||
options = {}; | ||
options.onlyCount = true; | ||
return this.getRange(options)[Symbol.iterator](); | ||
}, | ||
getKeysCount(options) { | ||
if (!options) | ||
options = {} | ||
options.onlyCount = true | ||
options.values = false | ||
return this.getRange(options)[Symbol.iterator]() | ||
options = {}; | ||
options.onlyCount = true; | ||
options.values = false; | ||
return this.getRange(options)[Symbol.iterator](); | ||
}, | ||
getValuesCount(key, options) { | ||
if (!options) | ||
options = {} | ||
options.key = key | ||
options.valuesForKey = true | ||
options.onlyCount = true | ||
return this.getRange(options)[Symbol.iterator]() | ||
options = {}; | ||
options.key = key; | ||
options.valuesForKey = true; | ||
options.onlyCount = true; | ||
return this.getRange(options)[Symbol.iterator](); | ||
}, | ||
getRange(options) { | ||
let iterable = new ArrayLikeIterable() | ||
let iterable = new ArrayLikeIterable(); | ||
if (!options) | ||
options = {} | ||
let includeValues = options.values !== false | ||
let includeVersions = options.versions | ||
let valuesForKey = options.valuesForKey | ||
let limit = options.limit | ||
let db = this.db | ||
let snapshot = options.snapshot | ||
options = {}; | ||
let includeValues = options.values !== false; | ||
let includeVersions = options.versions; | ||
let valuesForKey = options.valuesForKey; | ||
let limit = options.limit; | ||
let db = this.db; | ||
let snapshot = options.snapshot; | ||
let compression = this.compression; | ||
iterable[Symbol.iterator] = () => { | ||
let currentKey = valuesForKey ? options.key : options.start | ||
const reverse = options.reverse | ||
let count = 0 | ||
let cursor, cursorRenewId | ||
let txn | ||
let currentKey = valuesForKey ? options.key : options.start; | ||
const reverse = options.reverse; | ||
let count = 0; | ||
let cursor, cursorRenewId; | ||
let txn; | ||
let flags = (includeValues ? 0x100 : 0) | (reverse ? 0x400 : 0) | | ||
(valuesForKey ? 0x800 : 0) | (options.exactMatch ? 0x4000 : 0) | ||
(valuesForKey ? 0x800 : 0) | (options.exactMatch ? 0x4000 : 0); | ||
function resetCursor() { | ||
try { | ||
if (cursor) | ||
finishCursor() | ||
let writeTxn = env.writeTxn | ||
txn = writeTxn || getReadTxn() | ||
cursor = !writeTxn && db.availableCursor | ||
finishCursor(); | ||
let writeTxn = env.writeTxn; | ||
txn = writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()); | ||
cursor = !writeTxn && db.availableCursor; | ||
if (cursor) { | ||
db.availableCursor = null | ||
db.availableCursor = null; | ||
if (db.cursorTxn != txn) | ||
cursor.renew() | ||
cursor.renew(); | ||
else// if (db.currentRenewId != renewId) | ||
flags |= 0x2000 | ||
flags |= 0x2000; | ||
} else { | ||
cursor = new Cursor(db) | ||
cursor = new Cursor(db); | ||
} | ||
txn.cursorCount = (txn.cursorCount || 0) + 1 // track transaction so we always use the same one | ||
txn.cursorCount = (txn.cursorCount || 0) + 1; // track transaction so we always use the same one | ||
if (snapshot === false) { | ||
cursorRenewId = renewId // use shared read transaction | ||
txn.renewingCursorCount = (txn.renewingCursorCount || 0) + 1 // need to know how many are renewing cursors | ||
cursorRenewId = renewId; // use shared read transaction | ||
txn.renewingCursorCount = (txn.renewingCursorCount || 0) + 1; // need to know how many are renewing cursors | ||
} | ||
@@ -94,42 +208,42 @@ } catch(error) { | ||
try { | ||
cursor.close() | ||
cursor.close(); | ||
} catch(error) { } | ||
} | ||
throw error | ||
throw error; | ||
} | ||
} | ||
resetCursor() | ||
let store = this | ||
resetCursor(); | ||
let store = this; | ||
if (options.onlyCount) { | ||
flags |= 0x1000 | ||
let count = position(options.offset) | ||
finishCursor() | ||
return count | ||
flags |= 0x1000; | ||
let count = position(options.offset); | ||
finishCursor(); | ||
return count; | ||
} | ||
function position(offset) { | ||
let keySize = store.writeKey(currentKey, keyBytes, 0) | ||
let endAddress | ||
let keySize = store.writeKey(currentKey, keyBytes, 0); | ||
let endAddress; | ||
if (valuesForKey) { | ||
if (options.start === undefined && options.end === undefined) | ||
endAddress = 0 | ||
endAddress = 0; | ||
else { | ||
let startAddress | ||
let startAddress; | ||
if (store.encoder.writeKey) { | ||
startAddress = saveKey(options.start, store.encoder.writeKey, iterable) | ||
keyBytesView.setFloat64(2000, startAddress, true) | ||
endAddress = saveKey(options.end, store.encoder.writeKey, iterable) | ||
startAddress = saveKey(options.start, store.encoder.writeKey, iterable); | ||
keyBytesView.setFloat64(2000, startAddress, true); | ||
endAddress = saveKey(options.end, store.encoder.writeKey, iterable); | ||
} else if ((!options.start || options.start instanceof Uint8Array) && (!options.end || options.end instanceof Uint8Array)) { | ||
startAddress = saveKey(options.start, writeKey, iterable) | ||
keyBytesView.setFloat64(2000, startAddress, true) | ||
endAddress = saveKey(options.end, writeKey, iterable) | ||
startAddress = saveKey(options.start, writeKey, iterable); | ||
keyBytesView.setFloat64(2000, startAddress, true); | ||
endAddress = saveKey(options.end, writeKey, iterable); | ||
} else { | ||
throw new Error('Only key-based encoding is supported for start/end values') | ||
let encoded = store.encoder.encode(options.start) | ||
let bufferAddress = encoded.buffer.address || (encoded.buffer.address = getAddress(encoded) - encoded.byteOffset) | ||
startAddress = bufferAddress + encoded.byteOffset | ||
throw new Error('Only key-based encoding is supported for start/end values'); | ||
let encoded = store.encoder.encode(options.start); | ||
let bufferAddress = encoded.buffer.address || (encoded.buffer.address = getAddress(encoded) - encoded.byteOffset); | ||
startAddress = bufferAddress + encoded.byteOffset; | ||
} | ||
} | ||
} else | ||
endAddress = saveKey(options.end, store.writeKey, iterable) | ||
return cursor.position(flags, offset || 0, keySize, endAddress) | ||
endAddress = saveKey(options.end, store.writeKey, iterable); | ||
return cursor.position(flags, offset || 0, keySize, endAddress); | ||
} | ||
@@ -139,15 +253,15 @@ | ||
if (txn.isAborted) | ||
return | ||
return; | ||
if (cursorRenewId) | ||
txn.renewingCursorCount-- | ||
txn.renewingCursorCount--; | ||
if (--txn.cursorCount <= 0 && txn.onlyCursor) { | ||
cursor.close() | ||
txn.abort() // this is no longer main read txn, abort it now that we are done | ||
txn.isAborted = true | ||
cursor.close(); | ||
txn.abort(); // this is no longer main read txn, abort it now that we are done | ||
txn.isAborted = true; | ||
} else { | ||
if (db.availableCursor || txn != getReadTxn()) | ||
cursor.close() | ||
if (db.availableCursor || txn != readTxn) | ||
cursor.close(); | ||
else { // try to reuse it | ||
db.availableCursor = cursor | ||
db.cursorTxn = txn | ||
db.availableCursor = cursor; | ||
db.cursorTxn = txn; | ||
} | ||
@@ -158,29 +272,35 @@ } | ||
next() { | ||
let keySize, lastSize | ||
let keySize, lastSize; | ||
if (cursorRenewId && cursorRenewId != renewId) { | ||
resetCursor() | ||
keySize = position(0) | ||
resetCursor(); | ||
keySize = position(0); | ||
} | ||
if (count === 0) { // && includeValues) // on first entry, get current value if we need to | ||
keySize = position(options.offset) | ||
keySize = position(options.offset); | ||
} else | ||
keySize = cursor.iterate() | ||
keySize = cursor.iterate(); | ||
if (keySize === 0 || | ||
(count++ >= limit)) { | ||
finishCursor() | ||
return ITERATOR_DONE | ||
finishCursor(); | ||
return ITERATOR_DONE; | ||
} | ||
if (!valuesForKey || snapshot === false) | ||
currentKey = store.readKey(keyBytes, 32, keySize + 32) | ||
currentKey = store.readKey(keyBytes, 32, keySize + 32); | ||
if (includeValues) { | ||
let value | ||
lastSize = keyBytesView.getUint32(0, true) | ||
let value; | ||
lastSize = keyBytesView.getUint32(0, true); | ||
let bytes = compression ? compression.getValueBytes : getValueBytes; | ||
if (lastSize > bytes.maxLength) { | ||
bytes = store._allocateGetBuffer(lastSize); | ||
cursor.getCurrentValue(); | ||
} | ||
bytes.length = lastSize; | ||
if (store.decoder) { | ||
value = store.decoder.decode(db.unsafeBuffer, lastSize) | ||
value = store.decoder.decode(bytes, lastSize); | ||
} else if (store.encoding == 'binary') | ||
value = Uint8ArraySlice.call(db.unsafeBuffer, 0, lastSize) | ||
value = Uint8ArraySlice.call(bytes, 0, lastSize); | ||
else { | ||
value = store.db.unsafeBuffer.toString('utf8', 0, lastSize) | ||
value = bytes.toString('utf8', 0, lastSize); | ||
if (store.encoding == 'json' && value) | ||
value = JSON.parse(value) | ||
value = JSON.parse(value); | ||
} | ||
@@ -194,7 +314,7 @@ if (includeVersions) | ||
} | ||
} | ||
}; | ||
else if (valuesForKey) | ||
return { | ||
value | ||
} | ||
}; | ||
else | ||
@@ -206,3 +326,3 @@ return { | ||
} | ||
} | ||
}; | ||
} else if (includeVersions) { | ||
@@ -214,32 +334,113 @@ return { | ||
} | ||
} | ||
}; | ||
} else { | ||
return { | ||
value: currentKey | ||
} | ||
}; | ||
} | ||
}, | ||
return() { | ||
finishCursor() | ||
return ITERATOR_DONE | ||
finishCursor(); | ||
return ITERATOR_DONE; | ||
}, | ||
throw() { | ||
finishCursor() | ||
return ITERATOR_DONE | ||
finishCursor(); | ||
return ITERATOR_DONE; | ||
} | ||
} | ||
} | ||
return iterable | ||
}; | ||
}; | ||
return iterable; | ||
}, | ||
getMany(keys, callback) { | ||
let results = new Array(keys.length) | ||
let results = new Array(keys.length); | ||
for (let i = 0, l = keys.length; i < l; i++) { | ||
results[i] = get.call(this, keys[i]) | ||
results[i] = get.call(this, keys[i]); | ||
} | ||
if (callback) | ||
callback(null, results) | ||
return Promise.resolve(results) // we may eventually make this a true async operation | ||
callback(null, results); | ||
return Promise.resolve(results); // we may eventually make this a true async operation | ||
}, | ||
getSharedBufferForGet(id) { | ||
let txn = (env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn())); | ||
lastSize = this.keyIsCompatibility ? txn.getBinaryShared(id) : this.db.get(this.writeKey(id, keyBytes, 0)); | ||
if (lastSize === 0xffffffff) { // not found code | ||
return; //undefined | ||
} | ||
return lastSize; | ||
lastSize = keyBytesView.getUint32(0, true); | ||
let bufferIndex = keyBytesView.getUint32(12, true); | ||
lastOffset = keyBytesView.getUint32(8, true); | ||
let buffer = buffers[bufferIndex]; | ||
let startOffset; | ||
if (!buffer || lastOffset < (startOffset = buffer.startOffset) || (lastOffset + lastSize > startOffset + 0x100000000)) { | ||
if (buffer) | ||
env.detachBuffer(buffer.buffer); | ||
startOffset = (lastOffset >>> 16) * 0x10000; | ||
console.log('make buffer for address', bufferIndex * 0x100000000 + startOffset); | ||
buffer = buffers[bufferIndex] = Buffer.from(getBufferForAddress(bufferIndex * 0x100000000 + startOffset)); | ||
buffer.startOffset = startOffset; | ||
} | ||
lastOffset -= startOffset; | ||
return buffer; | ||
return buffer.slice(lastOffset, lastOffset + lastSize);/*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + lastSize)*/ | ||
}, | ||
close(callback) { | ||
this.db.close(); | ||
if (this.isRoot) { | ||
if (readTxn) { | ||
try { | ||
readTxn.abort(); | ||
} catch(error) {} | ||
} | ||
readTxnRenewed = null; | ||
env.close(); | ||
} | ||
this.status = 'closed'; | ||
if (callback) | ||
callback(); | ||
}, | ||
getStats() { | ||
return this.db.stat(readTxnRenewed ? readTxn : renewReadTxn()); | ||
} | ||
}) | ||
}); | ||
let get = LMDBStore.prototype.get; | ||
function renewReadTxn() { | ||
if (readTxn) | ||
readTxn.renew(); | ||
else | ||
readTxn = env.beginTxn(0x20000); | ||
readTxnRenewed = setImmediate(resetReadTxn); | ||
return readTxn; | ||
} | ||
function resetReadTxn() { | ||
if (readTxnRenewed) { | ||
renewId++; | ||
readTxnRenewed = null; | ||
if (readTxn.cursorCount - (readTxn.renewingCursorCount || 0) > 0) { | ||
readTxn.onlyCursor = true; | ||
readTxn = null; | ||
} | ||
else | ||
readTxn.reset(); | ||
} | ||
} | ||
} | ||
function matches(previousVersion, ifVersion){ | ||
let matches; | ||
if (previousVersion) { | ||
if (ifVersion) { | ||
matches = previousVersion == ifVersion; | ||
} else { | ||
matches = false; | ||
} | ||
} else { | ||
matches = !ifVersion; | ||
} | ||
return matches; | ||
} | ||
export function makeReusableBuffer(size) { | ||
let bytes = Buffer.alloc(size) | ||
bytes.maxLength = size; | ||
Object.defineProperty(bytes, 'length', { value: size, writable: true, configurable: true }); | ||
return bytes; | ||
} |
@@ -21,8 +21,8 @@ [](LICENSE) | ||
This library is published to the NPM package `lmdb` (and the 1.x was also published to `lmdb-store`), and can be installed with: | ||
This library is published to the NPM package `lmdb` (the 1.x versions were published to `lmdb-store`), and can be installed with: | ||
```npm install lmdb``` | ||
This library has minimal, tightly-controlled, and maintained dependencies to ensure stability and efficient memory use. It supports both ESM and CJS usage. | ||
This library has minimal, tightly-controlled, and maintained dependencies to ensure stability, security, and efficiency. It supports both native ESM and CJS usage. | ||
This has replaced the previously deprecated (LevelDOWN) `lmdb` package in the NPM package registry, but existing versions of that library are [still available](https://www.npmjs.com/package/lmdb/v/0.2.0). | ||
This library has formerly known as "lmdb-store", but "lmdb-js" better represents the broad purpose of being the cross-platform LMDB JavaScript adapter. This package has replaced the previously deprecated (LevelDOWN) `lmdb` package in the NPM package registry, but existing versions of that library are [still available](https://www.npmjs.com/package/lmdb/v/0.2.0). | ||
@@ -33,3 +33,3 @@ ## Design | ||
`lmdb-store` is designed for synchronous reads, and asynchronous writes. In idiomatic NodeJS code, I/O operations are performed asynchronously. LMDB is a memory-mapped database, reading and writing within a transaction does not use any I/O (other than the slight possibility of a page fault), and can usually be performed faster than Node's event queue callbacks can even execute, and it is easier to write code for instant synchronous values from reads. On the otherhand, commiting transactions does involve I/O, and vastly higher throughput can be achieved by batching operations and executing on a separate thread. Consequently, `lmdb-store` is designed for transactioSSns to go through this asynchronous batching process and return a simple promise that resolves once data is written and flushed to disk. | ||
`lmdb-store` is designed for synchronous reads, and asynchronous writes. In idiomatic NodeJS code, I/O operations are performed asynchronously. LMDB is a memory-mapped database, reading and writing within a transaction does not use any I/O (other than the slight possibility of a page fault), and can usually be performed faster than Node's event queue callbacks can even execute, and it is easier to write code for instant synchronous values from reads. On the otherhand, commiting transactions does involve I/O, and vastly higher throughput can be achieved by batching operations and executing on a separate thread. Consequently, `lmdb-store` is designed for transactions to go through this asynchronous batching process and return a simple promise that resolves once data is written and flushed to disk. | ||
@@ -258,3 +258,3 @@ With the default sync'ing configuration, LMDB has a crash-proof design; a machine can be turned off at any point, and data can not be corrupted unless the written data is actually changed or tampered. Writing data and waiting for confirmation that has been writted to the physical medium is critical for data integrity, but is well known to have latency (although not necessarily less efficient). However, by batching writes, when a database is under load, slower transactions enable more writes per transaction, and this library is able to drive LMDB to achieve the maximum levels of throughput with fully sync'ed operations, preserving both the durability/safety of the transactions and legendary performance. | ||
### `asBinary(buffer): Binary` | ||
This can be used to directly store a buffer or Uint8Array as a value, bypassing any encoding. If you are using a store with an encoding that isn't `binary`, setting a value with a Uint8Array will typically be encoding with encoding (for example MessagePack wraps in a header, preserving its type for `get`). However, if you want to bypass encoding, for example, if you have already encoded a value, you can use `asBinary`: | ||
This can be used to directly store a buffer or Uint8Array as a value, bypassing any encoding. If you are using a store with an encoding that isn't `binary`, setting a value with a Uint8Array will typically be encoded with the store's encoding (for example MessagePack wraps in a header, preserving its type for `get`). However, if you want to bypass encoding, for example, if you have already encoded a value, you can use `asBinary`: | ||
``` | ||
@@ -275,3 +275,3 @@ let buffer = encode(myValue) // if we have already serialized a value, perhaps to compare it or check its size | ||
### `store.getBinaryFast(key): Buffer` | ||
This will retrieve the binary data at the specified key, like `getBinary`, except it uses reusable buffers, which is faster, but means the data in the buffer is only valid until the next get operation (including cursor operations). | ||
This will retrieve the binary data at the specified key, like `getBinary`, except it uses reusable buffers, which is faster, but means the data in the buffer is only valid until the next get operation (including cursor operations). Since this is a reusable buffer it also slightly differs from a typical buffer: the `length` property is set to the length of the value (what you typically want for normal usage), but the `byteLength` will be the size of the full allocated memory area for the buffer (usually much larger). | ||
@@ -356,3 +356,3 @@ ### `resetReadTxn(): void` | ||
* `name` - This is the name of the database. This defaults to null (which is the root database) when opening the database environment (`open`). When an opening a database within an environment (`openDB`), this is required, if not specified in first parameter. | ||
* `encoding` - Sets the encoding for the database, which can be `'msgpack'`, `'json'`, `'cbor'`, `'string'`, `'ordered-binary'`or `'binary'`. | ||
* `encoding` - Sets the encoding for the database values, which can be `'msgpack'`, `'json'`, `'cbor'`, `'string'`, `'ordered-binary'`or `'binary'`. | ||
* `sharedStructuresKey` - Enables shared structures and sets the key where the shared structures will be stored. | ||
@@ -362,4 +362,3 @@ * `compression` - This enables compression. This can be set a truthy value to enable compression with default settings, or it can be an object with compression settings. | ||
* `useVersions` - Set this to true if you will be setting version numbers on the entries in the database. Note that you can not change this flag once a database has entries in it (or they won't be read correctly). | ||
* `keyIsBuffer` - This will cause the database to expect and return keys as node buffers. | ||
* `keyIsUint32` - This will cause the database to expect and return keys as unsigned 32-bit integers. | ||
* `keyEncoding` - This indicates the encoding to use for the database keys, and can be `'uint32'` for unsigned 32-bit integers, `'binary'` for raw buffers/Uint8Arrays, and the default `'ordered-binary'` allows any JS primitive as a keys. | ||
* `keyEncoder` - Provide a custom key encoder. | ||
@@ -407,3 +406,3 @@ * `dupSort` - Enables duplicate entries for keys. You will usually want to retrieve the values for a key with `getValues`. | ||
Enabling `overlappingSync` option is generally not recommended on Windows, as Window's disk flushing operation tends to have poor performance characteristics on larger databases (whereas Windows tends to perform well with standard transactions), but YMMV. This option may be enabled by default in the future, for non-Windows platforms, this is probably a good setting: | ||
Enabling `overlappingSync` option is generally not recommended on Windows, as Window's disk flushing operation tends to have very poor performance characteristics on larger databases (whereas Windows tends to perform well with standard transactions). This option may be enabled by default in the future, for non-Windows platforms. This is probably a good setting: | ||
``` | ||
@@ -430,3 +429,3 @@ overlappingSync: os.platform() != 'win32', | ||
`beforecommit` - This event is fired before a batched operation begins to start a transaction to write all queued writes to the database. The callback function can perform additional (asynchronous) writes (`put` and `remove`) and they will be included in the transaction about to be performed (this can be useful for updating a global version stamp based on all previous writes, for example). Using this event forces `eventTurnBatching` to be enabled. | ||
`beforecommit` - This event is fired before a transaction finishes/commits. The callback function can perform additional (asynchronous) writes (`put` and `remove`) and they will be included in the transaction about to be performed as the last operation(s) before the transaction commits (this can be useful for updating a global version stamp based on all previous writes, for example). Using this event forces `eventTurnBatching` to be enabled. This can be called multiples times in a transaction, but should always be called as the last operation of a transaction. | ||
@@ -433,0 +432,0 @@ ## LevelUp |
@@ -94,2 +94,19 @@ import path from 'path'; | ||
} | ||
it('zero length values', async function() { | ||
db.put(5, asBinary(Buffer.from([]))); | ||
await db2.put('key1', asBinary(Buffer.from([]))); | ||
should.equal(db.getBinary(5).length, 0); | ||
should.equal(db2.getBinary('key1').length, 0); | ||
db.put(5, asBinary(Buffer.from([4]))); | ||
db2.remove('key1'); | ||
await db2.put('key1', asBinary(Buffer.from([4]))); | ||
should.equal(db.getBinary(5).length, 1); | ||
should.equal(db2.getBinary('key1').length, 1); | ||
db.put(5, asBinary(Buffer.from([]))); | ||
db2.remove('key1'); | ||
await db2.put('key1', asBinary(Buffer.from([]))); | ||
should.equal(db.getBinary(5).length, 0); | ||
should.equal(db2.getBinary('key1').length, 0); | ||
await db2.remove('key1'); | ||
}); | ||
it('query of keys', async function() { | ||
@@ -825,3 +842,3 @@ let keys = [ | ||
name: 'mydb6', | ||
keyIsUint32: true, | ||
keyEncoding: 'uint32', | ||
create: true, | ||
@@ -884,3 +901,3 @@ })); | ||
name: 'uint32', | ||
keyIsUint32: true, | ||
keyEncoding: 'uint32', | ||
compression: true, | ||
@@ -932,3 +949,3 @@ }); | ||
path: testDirPath + '/test-mixedkeys.mdb', | ||
keyIsUint32: false, | ||
keyEncoding: 'ordered-binary', | ||
}) | ||
@@ -938,3 +955,3 @@ | ||
name: `intKeys`, | ||
keyIsUint32: true, | ||
keyEncoding: 'uint32', | ||
}) | ||
@@ -944,3 +961,3 @@ | ||
name: `strKeys`, | ||
keyIsUint32: false, | ||
keyEncoding: 'ordered-binary', | ||
}) | ||
@@ -947,0 +964,0 @@ |
@@ -1,4 +0,4 @@ | ||
const SKIP = {} | ||
const SKIP = {}; | ||
if (!Symbol.asyncIterator) { | ||
Symbol.asyncIterator = Symbol.for('Symbol.asyncIterator') | ||
Symbol.asyncIterator = Symbol.for('Symbol.asyncIterator'); | ||
} | ||
@@ -9,29 +9,29 @@ | ||
if (sourceArray) { | ||
this[Symbol.iterator] = sourceArray[Symbol.iterator].bind(sourceArray) | ||
this[Symbol.iterator] = sourceArray[Symbol.iterator].bind(sourceArray); | ||
} | ||
} | ||
map(func) { | ||
let source = this | ||
let result = new ArrayLikeIterable() | ||
let source = this; | ||
let result = new ArrayLikeIterable(); | ||
result[Symbol.iterator] = (async) => { | ||
let iterator = source[Symbol.iterator](async) | ||
let iterator = source[Symbol.iterator](async); | ||
return { | ||
next(resolvedResult) { | ||
let result | ||
let result; | ||
do { | ||
let iteratorResult | ||
let iteratorResult; | ||
if (resolvedResult) { | ||
iteratorResult = resolvedResult | ||
resolvedResult = null // don't go in this branch on next iteration | ||
iteratorResult = resolvedResult; | ||
resolvedResult = null; // don't go in this branch on next iteration | ||
} else { | ||
iteratorResult = iterator.next() | ||
iteratorResult = iterator.next(); | ||
if (iteratorResult.then) { | ||
return iteratorResult.then(iteratorResult => this.next(iteratorResult)) | ||
return iteratorResult.then(iteratorResult => this.next(iteratorResult)); | ||
} | ||
} | ||
if (iteratorResult.done === true) { | ||
this.done = true | ||
return iteratorResult | ||
this.done = true; | ||
return iteratorResult; | ||
} | ||
result = func(iteratorResult.value) | ||
result = func(iteratorResult.value); | ||
if (result && result.then) { | ||
@@ -43,3 +43,3 @@ return result.then(result => | ||
value: result | ||
}) | ||
}); | ||
} | ||
@@ -49,59 +49,59 @@ } while(result == SKIP) | ||
value: result | ||
} | ||
}; | ||
}, | ||
return() { | ||
return iterator.return() | ||
return iterator.return(); | ||
}, | ||
throw() { | ||
return iterator.throw() | ||
return iterator.throw(); | ||
} | ||
} | ||
} | ||
return result | ||
}; | ||
}; | ||
return result; | ||
} | ||
[Symbol.asyncIterator]() { | ||
return this[Symbol.iterator](true) | ||
return this[Symbol.iterator](true); | ||
} | ||
filter(func) { | ||
return this.map(element => func(element) ? element : SKIP) | ||
return this.map(element => func(element) ? element : SKIP); | ||
} | ||
forEach(callback) { | ||
let iterator = this[Symbol.iterator]() | ||
let result | ||
let iterator = this[Symbol.iterator](); | ||
let result; | ||
while ((result = iterator.next()).done !== true) { | ||
callback(result.value) | ||
callback(result.value); | ||
} | ||
} | ||
concat(secondIterable) { | ||
let concatIterable = new ArrayLikeIterable() | ||
let concatIterable = new ArrayLikeIterable(); | ||
concatIterable[Symbol.iterator] = (async) => { | ||
let iterator = this[Symbol.iterator]() | ||
let isFirst = true | ||
let iterator = this[Symbol.iterator](); | ||
let isFirst = true; | ||
let concatIterator = { | ||
next() { | ||
let result = iterator.next() | ||
let result = iterator.next(); | ||
if (isFirst && result.done) { | ||
isFirst = false | ||
iterator = secondIterable[Symbol.iterator](async) | ||
return iterator.next() | ||
isFirst = false; | ||
iterator = secondIterable[Symbol.iterator](async); | ||
return iterator.next(); | ||
} | ||
return result | ||
return result; | ||
}, | ||
return() { | ||
return iterator.return() | ||
return iterator.return(); | ||
}, | ||
throw() { | ||
return iterator.throw() | ||
return iterator.throw(); | ||
} | ||
} | ||
return concatIterator | ||
} | ||
return concatIterable | ||
}; | ||
return concatIterator; | ||
}; | ||
return concatIterable; | ||
} | ||
toJSON() { | ||
if (this.asArray && this.asArray.forEach) { | ||
return this.asArray | ||
return this.asArray; | ||
} | ||
throw new Error('Can not serialize async iteratables without first calling resolveJSON') | ||
throw new Error('Can not serialize async iteratables without first calling resolveJSON'); | ||
//return Array.from(this) | ||
@@ -111,27 +111,27 @@ } | ||
if (this._asArray) | ||
return this._asArray | ||
return this._asArray; | ||
let promise = new Promise((resolve, reject) => { | ||
let iterator = this[Symbol.iterator](true) | ||
let array = [] | ||
let iterable = this | ||
let iterator = this[Symbol.iterator](true); | ||
let array = []; | ||
let iterable = this; | ||
function next(result) { | ||
while (result.done !== true) { | ||
if (result.then) { | ||
return result.then(next) | ||
return result.then(next); | ||
} else { | ||
array.push(result.value) | ||
array.push(result.value); | ||
} | ||
result = iterator.next() | ||
result = iterator.next(); | ||
} | ||
array.iterable = iterable | ||
resolve(iterable._asArray = array) | ||
array.iterable = iterable; | ||
resolve(iterable._asArray = array); | ||
} | ||
next(iterator.next()) | ||
}) | ||
promise.iterable = this | ||
return this._asArray || (this._asArray = promise) | ||
next(iterator.next()); | ||
}); | ||
promise.iterable = this; | ||
return this._asArray || (this._asArray = promise); | ||
} | ||
resolveData() { | ||
return this.asArray | ||
return this.asArray; | ||
} | ||
} |
@@ -5,5 +5,5 @@ export function when(promise, callback, errback) { | ||
promise.then(callback, errback) : | ||
promise.then(callback) | ||
promise.then(callback); | ||
} | ||
return callback(promise) | ||
return callback(promise); | ||
} |
726
writer.js
@@ -1,99 +0,100 @@ | ||
import { getAddressShared as getAddress } from './native.js' | ||
import { when } from './util/when.js' | ||
var backpressureArray | ||
import { getAddressShared as getAddress } from './native.js'; | ||
import { when } from './util/when.js'; | ||
var backpressureArray; | ||
const MAX_KEY_SIZE = 1978 | ||
const WAITING_OPERATION = 0x2000000 | ||
const BACKPRESSURE_THRESHOLD = 50000 | ||
const TXN_DELIMITER = 0x8000000 | ||
const TXN_COMMITTED = 0x10000000 | ||
const TXN_FLUSHED = 0x20000000 | ||
const TXN_FAILED = 0x40000000 | ||
const FAILED_CONDITION = 0x4000000 | ||
const REUSE_BUFFER_MODE = 1000 | ||
const MAX_KEY_SIZE = 1978; | ||
const WAITING_OPERATION = 0x2000000; | ||
const BACKPRESSURE_THRESHOLD = 50000; | ||
const TXN_DELIMITER = 0x8000000; | ||
const TXN_COMMITTED = 0x10000000; | ||
const TXN_FLUSHED = 0x20000000; | ||
const TXN_FAILED = 0x40000000; | ||
const FAILED_CONDITION = 0x4000000; | ||
const REUSE_BUFFER_MODE = 1000; | ||
export const binaryBuffer = Symbol('binaryBuffer'); | ||
const SYNC_PROMISE_SUCCESS = Promise.resolve(true) | ||
const SYNC_PROMISE_FAIL = Promise.resolve(false) | ||
export const ABORT = {} | ||
const CALLBACK_THREW = {} | ||
SYNC_PROMISE_SUCCESS.isSync = true | ||
SYNC_PROMISE_FAIL.isSync = true | ||
const ByteArray = typeof Buffer != 'undefined' ? Buffer.from : Uint8Array | ||
const SYNC_PROMISE_SUCCESS = Promise.resolve(true); | ||
const SYNC_PROMISE_FAIL = Promise.resolve(false); | ||
export const ABORT = {}; | ||
const CALLBACK_THREW = {}; | ||
SYNC_PROMISE_SUCCESS.isSync = true; | ||
SYNC_PROMISE_FAIL.isSync = true; | ||
const ByteArray = typeof Buffer != 'undefined' ? Buffer.from : Uint8Array; | ||
//let debugLog = [] | ||
var log = [] | ||
export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, useWritemap, binaryBuffer, | ||
var log = []; | ||
export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, useWritemap, | ||
eventTurnBatching, txnStartThreshold, batchStartThreshold, overlappingSync, commitDelay, separateFlushed }) { | ||
// stands for write instructions | ||
var dynamicBytes | ||
var dynamicBytes; | ||
function allocateInstructionBuffer() { | ||
let buffer = new SharedArrayBuffer(0x10000) // Must use a shared buffer to ensure GC doesn't move it around | ||
dynamicBytes = new ByteArray(buffer) | ||
let uint32 = dynamicBytes.uint32 = new Uint32Array(buffer, 0, 0x10000 >> 2) | ||
uint32[0] = 0 | ||
dynamicBytes.float64 = new Float64Array(buffer, 0, 0x10000 >> 3) | ||
buffer.address = getAddress(buffer) | ||
uint32.address = buffer.address + uint32.byteOffset | ||
dynamicBytes.position = 0 | ||
return dynamicBytes | ||
let buffer = new SharedArrayBuffer(0x10000); // Must use a shared buffer to ensure GC doesn't move it around | ||
dynamicBytes = new ByteArray(buffer); | ||
let uint32 = dynamicBytes.uint32 = new Uint32Array(buffer, 0, 0x10000 >> 2); | ||
uint32[0] = 0; | ||
dynamicBytes.float64 = new Float64Array(buffer, 0, 0x10000 >> 3); | ||
buffer.address = getAddress(buffer); | ||
uint32.address = buffer.address + uint32.byteOffset; | ||
dynamicBytes.position = 0; | ||
return dynamicBytes; | ||
} | ||
var outstandingWriteCount = 0 | ||
var startAddress = 0 | ||
var writeTxn = null | ||
var abortedNonChildTransactionWarn | ||
var nextTxnCallbacks = [] | ||
var commitPromise, flushPromise, flushResolvers = [] | ||
commitDelay = commitDelay || 0 | ||
eventTurnBatching = eventTurnBatching === false ? false : true | ||
var enqueuedCommit | ||
var afterCommitCallbacks = [] | ||
var beforeCommitCallbacks = [] | ||
var enqueuedEventTurnBatch | ||
var outstandingWriteCount = 0; | ||
var startAddress = 0; | ||
var writeTxn = null; | ||
var abortedNonChildTransactionWarn; | ||
var nextTxnCallbacks = []; | ||
var commitPromise, flushPromise, flushResolvers = []; | ||
commitDelay = commitDelay || 0; | ||
eventTurnBatching = eventTurnBatching === false ? false : true; | ||
var enqueuedCommit; | ||
var afterCommitCallbacks = []; | ||
var beforeCommitCallbacks = []; | ||
var enqueuedEventTurnBatch; | ||
if (separateFlushed === undefined) | ||
separateFlushed = overlappingSync | ||
var batchDepth = 0 | ||
var writeBatchStart, outstandingBatchCount | ||
txnStartThreshold = txnStartThreshold || 5 | ||
batchStartThreshold = batchStartThreshold || 1000 | ||
separateFlushed = overlappingSync; | ||
var batchDepth = 0; | ||
var writeBatchStart, outstandingBatchCount; | ||
txnStartThreshold = txnStartThreshold || 5; | ||
batchStartThreshold = batchStartThreshold || 1000; | ||
allocateInstructionBuffer() | ||
dynamicBytes.uint32[0] = TXN_DELIMITER | TXN_COMMITTED | TXN_FLUSHED | ||
var txnResolution, lastQueuedResolution, nextResolution = { uint32: dynamicBytes.uint32, flagPosition: 0, } | ||
var uncommittedResolution = { next: nextResolution } | ||
var unwrittenResolution = nextResolution | ||
allocateInstructionBuffer(); | ||
dynamicBytes.uint32[0] = TXN_DELIMITER | TXN_COMMITTED | TXN_FLUSHED; | ||
var txnResolution, lastQueuedResolution, nextResolution = { uint32: dynamicBytes.uint32, flagPosition: 0, }; | ||
var uncommittedResolution = { next: nextResolution }; | ||
var unwrittenResolution = nextResolution; | ||
function writeInstructions(flags, store, key, value, version, ifVersion) { | ||
let writeStatus | ||
let targetBytes, position | ||
let valueBuffer, valueSize, valueBufferStart | ||
let writeStatus; | ||
let targetBytes, position; | ||
let valueBuffer, valueSize, valueBufferStart; | ||
if (flags & 2) { | ||
// encode first in case we have to write a shared structure | ||
let encoder = store.encoder | ||
let encoder = store.encoder; | ||
if (value && value[binaryBuffer]) | ||
valueBuffer = value[binaryBuffer] | ||
valueBuffer = value[binaryBuffer]; | ||
else if (encoder) { | ||
if (encoder.copyBuffers) // use this as indicator for support buffer reuse for now | ||
valueBuffer = encoder.encode(value, REUSE_BUFFER_MODE) | ||
valueBuffer = encoder.encode(value, REUSE_BUFFER_MODE); | ||
else { // various other encoders, including JSON.stringify, that might serialize to a string | ||
valueBuffer = encoder.encode(value) | ||
valueBuffer = encoder.encode(value); | ||
if (typeof valueBuffer == 'string') | ||
valueBuffer = Buffer.from(valueBuffer) // TODO: Would be nice to write strings inline in the instructions | ||
valueBuffer = Buffer.from(valueBuffer); // TODO: Would be nice to write strings inline in the instructions | ||
} | ||
} else if (typeof value == 'string') { | ||
valueBuffer = Buffer.from(value) // TODO: Would be nice to write strings inline in the instructions | ||
valueBuffer = Buffer.from(value); // TODO: Would be nice to write strings inline in the instructions | ||
} else if (value instanceof Uint8Array) | ||
valueBuffer = value | ||
valueBuffer = value; | ||
else | ||
throw new Error('Invalid value to put in database ' + value + ' (' + (typeof value) +'), consider using encoder') | ||
valueBufferStart = valueBuffer.start | ||
throw new Error('Invalid value to put in database ' + value + ' (' + (typeof value) +'), consider using encoder'); | ||
valueBufferStart = valueBuffer.start; | ||
if (valueBufferStart > -1) // if we have buffers with start/end position | ||
valueSize = valueBuffer.end - valueBufferStart // size | ||
valueSize = valueBuffer.end - valueBufferStart; // size | ||
else | ||
valueSize = valueBuffer.length | ||
valueSize = valueBuffer.length; | ||
if (store.dupSort && valueSize > MAX_KEY_SIZE) | ||
throw new Error('The value is larger than the maximum size (' + MAX_KEY_SIZE + ') for a value in a dupSort database') | ||
throw new Error('The value is larger than the maximum size (' + MAX_KEY_SIZE + ') for a value in a dupSort database'); | ||
} else | ||
valueSize = 0 | ||
valueSize = 0; | ||
if (writeTxn) { | ||
targetBytes = fixedBuffer | ||
position = 0 | ||
targetBytes = fixedBuffer; | ||
position = 0; | ||
} else { | ||
@@ -104,71 +105,71 @@ if (eventTurnBatching && !enqueuedEventTurnBatch && batchDepth == 0) { | ||
for (let i = 0, l = beforeCommitCallbacks.length; i < l; i++) { | ||
beforeCommitCallbacks[i]() | ||
beforeCommitCallbacks[i](); | ||
} | ||
} catch(error) { | ||
console.error(error) | ||
console.error(error); | ||
} | ||
enqueuedEventTurnBatch = null | ||
enqueuedEventTurnBatch = null; | ||
//console.log('ending event turn') | ||
batchDepth-- | ||
finishBatch() | ||
batchDepth--; | ||
finishBatch(); | ||
if (writeBatchStart) | ||
writeBatchStart() // TODO: When we support delay start of batch, optionally don't delay this | ||
}) | ||
commitPromise = null // reset the commit promise, can't know if it is really a new transaction prior to finishWrite being called | ||
flushPromise = null | ||
writeBatchStart = writeInstructions(1, store) | ||
outstandingBatchCount = 0 | ||
batchDepth++ | ||
writeBatchStart(); // TODO: When we support delay start of batch, optionally don't delay this | ||
}); | ||
commitPromise = null; // reset the commit promise, can't know if it is really a new transaction prior to finishWrite being called | ||
flushPromise = null; | ||
writeBatchStart = writeInstructions(1, store); | ||
outstandingBatchCount = 0; | ||
batchDepth++; | ||
} | ||
targetBytes = dynamicBytes | ||
position = targetBytes.position | ||
targetBytes = dynamicBytes; | ||
position = targetBytes.position; | ||
} | ||
let uint32 = targetBytes.uint32, float64 = targetBytes.float64 | ||
let flagPosition = position << 1 // flagPosition is the 32-bit word starting position | ||
let uint32 = targetBytes.uint32, float64 = targetBytes.float64; | ||
let flagPosition = position << 1; // flagPosition is the 32-bit word starting position | ||
// don't increment position until we are sure we don't have any key writing errors | ||
if (!uint32) { | ||
throw new Error('Internal buffers have been corrupted') | ||
throw new Error('Internal buffers have been corrupted'); | ||
} | ||
uint32[flagPosition + 1] = store.db.dbi | ||
uint32[flagPosition + 1] = store.db.dbi; | ||
if (flags & 4) { | ||
let keyStartPosition = (position << 3) + 12 | ||
let endPosition | ||
let keyStartPosition = (position << 3) + 12; | ||
let endPosition; | ||
try { | ||
endPosition = store.writeKey(key, targetBytes, keyStartPosition) | ||
endPosition = store.writeKey(key, targetBytes, keyStartPosition); | ||
} catch(error) { | ||
targetBytes.fill(0, keyStartPosition) | ||
throw error | ||
targetBytes.fill(0, keyStartPosition); | ||
throw error; | ||
} | ||
let keySize = endPosition - keyStartPosition | ||
let keySize = endPosition - keyStartPosition; | ||
if (keySize > MAX_KEY_SIZE) { | ||
targetBytes.fill(0, keyStartPosition) // restore zeros | ||
throw new Error('Key size is larger than the maximum key size (' + MAX_KEY_SIZE + ')') | ||
targetBytes.fill(0, keyStartPosition); // restore zeros | ||
throw new Error('Key size is larger than the maximum key size (' + MAX_KEY_SIZE + ')'); | ||
} | ||
uint32[flagPosition + 2] = keySize | ||
position = (endPosition + 16) >> 3 | ||
uint32[flagPosition + 2] = keySize; | ||
position = (endPosition + 16) >> 3; | ||
if (flags & 2) { | ||
let mustCompress | ||
let mustCompress; | ||
if (valueBufferStart > -1) { // if we have buffers with start/end position | ||
// record pointer to value buffer | ||
float64[position] = (valueBuffer.address || | ||
(valueBuffer.address = getAddress(valueBuffer.buffer) + valueBuffer.byteOffset)) + valueBufferStart | ||
mustCompress = valueBuffer[valueBufferStart] >= 254 // this is the compression indicator, so we must compress | ||
(valueBuffer.address = getAddress(valueBuffer.buffer) + valueBuffer.byteOffset)) + valueBufferStart; | ||
mustCompress = valueBuffer[valueBufferStart] >= 250; // this is the compression indicator, so we must compress | ||
} else { | ||
let valueArrayBuffer = valueBuffer.buffer | ||
let valueArrayBuffer = valueBuffer.buffer; | ||
// record pointer to value buffer | ||
float64[position] = (valueArrayBuffer.address || | ||
(valueArrayBuffer.address = getAddress(valueArrayBuffer))) + valueBuffer.byteOffset | ||
mustCompress = valueBuffer[0] >= 254 // this is the compression indicator, so we must compress | ||
(valueArrayBuffer.address = getAddress(valueArrayBuffer))) + valueBuffer.byteOffset; | ||
mustCompress = valueBuffer[0] >= 250; // this is the compression indicator, so we must compress | ||
} | ||
uint32[(position++ << 1) - 1] = valueSize | ||
uint32[(position++ << 1) - 1] = valueSize; | ||
if (store.compression && (valueSize >= store.compression.threshold || mustCompress)) { | ||
flags |= 0x100000; | ||
float64[position] = store.compression.address | ||
float64[position] = store.compression.address; | ||
if (!writeTxn) | ||
env.compress(uint32.address + (position << 3), () => { | ||
// this is never actually called, just use to pin the buffer in memory until it is finished | ||
console.log(float64) | ||
}) | ||
position++ | ||
console.log(float64); | ||
}); | ||
position++; | ||
} | ||
@@ -178,36 +179,36 @@ } | ||
if (ifVersion === null) | ||
flags |= 0x10 | ||
flags |= 0x10; | ||
else { | ||
flags |= 0x100 | ||
float64[position++] = ifVersion | ||
flags |= 0x100; | ||
float64[position++] = ifVersion; | ||
} | ||
} | ||
if (version !== undefined) { | ||
flags |= 0x200 | ||
float64[position++] = version || 0 | ||
flags |= 0x200; | ||
float64[position++] = version || 0; | ||
} | ||
} else | ||
position++ | ||
targetBytes.position = position | ||
position++; | ||
targetBytes.position = position; | ||
//console.log('js write', (targetBytes.buffer.address + (flagPosition << 2)).toString(16), flags.toString(16)) | ||
if (writeTxn) { | ||
uint32[0] = flags | ||
env.write(uint32.address) | ||
return () => (uint32[0] & FAILED_CONDITION) ? SYNC_PROMISE_FAIL : SYNC_PROMISE_SUCCESS | ||
uint32[0] = flags; | ||
env.write(uint32.address); | ||
return () => (uint32[0] & FAILED_CONDITION) ? SYNC_PROMISE_FAIL : SYNC_PROMISE_SUCCESS; | ||
} | ||
// if we ever use buffers that haven't been zero'ed, need to clear out the next slot like this: | ||
// uint32[position << 1] = 0 // clear out the next slot | ||
let nextUint32 | ||
let nextUint32; | ||
if (position > 0x1e00) { // 61440 bytes | ||
// make new buffer and make pointer to it | ||
let lastPosition = position | ||
targetBytes = allocateInstructionBuffer() | ||
position = targetBytes.position | ||
float64[lastPosition + 1] = targetBytes.uint32.address + position | ||
uint32[lastPosition << 1] = 3 // pointer instruction | ||
let lastPosition = position; | ||
targetBytes = allocateInstructionBuffer(); | ||
position = targetBytes.position; | ||
float64[lastPosition + 1] = targetBytes.uint32.address + position; | ||
uint32[lastPosition << 1] = 3; // pointer instruction | ||
//console.log('pointer from ', (lastFloat64.buffer.address + (lastPosition << 3)).toString(16), 'to', (targetBytes.buffer.address + position).toString(16), 'flag position', (uint32.buffer.address + (flagPosition << 2)).toString(16)) | ||
nextUint32 = targetBytes.uint32 | ||
nextUint32 = targetBytes.uint32; | ||
} else | ||
nextUint32 = uint32 | ||
let resolution = nextResolution | ||
nextUint32 = uint32; | ||
let resolution = nextResolution; | ||
// create the placeholder next resolution | ||
@@ -231,4 +232,4 @@ nextResolution = resolution.next = store.cache ? | ||
next: null, | ||
} | ||
let writtenBatchDepth = batchDepth | ||
}; | ||
let writtenBatchDepth = batchDepth; | ||
@@ -240,12 +241,12 @@ return (callback) => { | ||
// just poll for the status change if we miss a status update | ||
writeStatus = uint32[flagPosition] | ||
uint32[flagPosition] = flags | ||
writeStatus = uint32[flagPosition]; | ||
uint32[flagPosition] = flags; | ||
//writeStatus = Atomics.or(uint32, flagPosition, flags) | ||
if (writeBatchStart && !writeStatus) { | ||
outstandingBatchCount += 1 + (valueSize >> 12) | ||
outstandingBatchCount += 1 + (valueSize >> 12); | ||
//console.log(outstandingBatchCount, batchStartThreshold) | ||
if (outstandingBatchCount > batchStartThreshold) { | ||
outstandingBatchCount = 0 | ||
writeBatchStart() | ||
writeBatchStart = null | ||
outstandingBatchCount = 0; | ||
writeBatchStart(); | ||
writeBatchStart = null; | ||
} | ||
@@ -256,71 +257,71 @@ } | ||
// so we use the slower atomic operation | ||
writeStatus = Atomics.or(uint32, flagPosition, flags) | ||
writeStatus = Atomics.or(uint32, flagPosition, flags); | ||
outstandingWriteCount++ | ||
outstandingWriteCount++; | ||
if (writeStatus & TXN_DELIMITER) { | ||
//console.warn('Got TXN delimiter', ( uint32.address + (flagPosition << 2)).toString(16)) | ||
commitPromise = null // TODO: Don't reset these if this comes from the batch start operation on an event turn batch | ||
flushPromise = null | ||
queueCommitResolution(resolution) | ||
commitPromise = null; // TODO: Don't reset these if this comes from the batch start operation on an event turn batch | ||
flushPromise = null; | ||
queueCommitResolution(resolution); | ||
if (!startAddress) { | ||
startAddress = uint32.address + (flagPosition << 2) | ||
startAddress = uint32.address + (flagPosition << 2); | ||
} | ||
} | ||
if (!flushPromise && overlappingSync && separateFlushed) | ||
flushPromise = new Promise(resolve => flushResolvers.push(resolve)) | ||
flushPromise = new Promise(resolve => flushResolvers.push(resolve)); | ||
if (writeStatus & WAITING_OPERATION) { // write thread is waiting | ||
//console.log('resume batch thread', uint32.buffer.address + (flagPosition << 2)) | ||
env.write(0) | ||
env.write(0); | ||
} | ||
if (outstandingWriteCount > BACKPRESSURE_THRESHOLD) { | ||
if (!backpressureArray) | ||
backpressureArray = new Int32Array(new SharedArrayBuffer(4), 0, 1) | ||
Atomics.wait(backpressureArray, 0, 0, Math.round(outstandingWriteCount / BACKPRESSURE_THRESHOLD)) | ||
backpressureArray = new Int32Array(new SharedArrayBuffer(4), 0, 1); | ||
Atomics.wait(backpressureArray, 0, 0, Math.round(outstandingWriteCount / BACKPRESSURE_THRESHOLD)); | ||
} | ||
if (startAddress) { | ||
if (eventTurnBatching) | ||
startWriting() // start writing immediately because this has already been batched/queued | ||
startWriting(); // start writing immediately because this has already been batched/queued | ||
else if (!enqueuedCommit && txnStartThreshold) { | ||
enqueuedCommit = commitDelay == 0 ? setImmediate(() => startWriting()) : setTimeout(() => startWriting(), commitDelay) | ||
enqueuedCommit = commitDelay == 0 ? setImmediate(() => startWriting()) : setTimeout(() => startWriting(), commitDelay); | ||
} else if (outstandingWriteCount > txnStartThreshold) | ||
startWriting() | ||
startWriting(); | ||
} | ||
if ((outstandingWriteCount & 7) === 0) | ||
resolveWrites() | ||
resolveWrites(); | ||
if (store.cache) { | ||
resolution.key = key | ||
resolution.store = store | ||
resolution.valueSize = valueBuffer ? valueBuffer.length : 0 | ||
resolution.key = key; | ||
resolution.store = store; | ||
resolution.valueSize = valueBuffer ? valueBuffer.length : 0; | ||
} | ||
resolution.valueBuffer = valueBuffer | ||
lastQueuedResolution = resolution | ||
resolution.valueBuffer = valueBuffer; | ||
lastQueuedResolution = resolution; | ||
if (callback) { | ||
resolution.reject = callback | ||
resolution.resolve = (value) => callback(null, value) | ||
return | ||
resolution.reject = callback; | ||
resolution.resolve = (value) => callback(null, value); | ||
return; | ||
} | ||
if (ifVersion === undefined) { | ||
if (writtenBatchDepth > 1) | ||
return SYNC_PROMISE_SUCCESS // or return undefined? | ||
return SYNC_PROMISE_SUCCESS; // or return undefined? | ||
if (!commitPromise) { | ||
commitPromise = new Promise((resolve, reject) => { | ||
resolution.resolve = resolve | ||
resolution.reject = reject | ||
}) | ||
resolution.resolve = resolve; | ||
resolution.reject = reject; | ||
}); | ||
if (separateFlushed) | ||
commitPromise.flushed = overlappingSync ? flushPromise : commitPromise | ||
commitPromise.flushed = overlappingSync ? flushPromise : commitPromise; | ||
} | ||
return commitPromise | ||
return commitPromise; | ||
} | ||
let promise = new Promise((resolve, reject) => { | ||
resolution.resolve = resolve | ||
resolution.reject = reject | ||
}) | ||
resolution.resolve = resolve; | ||
resolution.reject = reject; | ||
}); | ||
if (separateFlushed) | ||
promise.flushed = overlappingSync ? flushPromise : promise | ||
return promise | ||
} | ||
promise.flushed = overlappingSync ? flushPromise : promise; | ||
return promise; | ||
}; | ||
} | ||
@@ -330,31 +331,31 @@ function startWriting() { | ||
if (enqueuedCommit) { | ||
clearImmediate(enqueuedCommit) | ||
enqueuedCommit = null | ||
clearImmediate(enqueuedCommit); | ||
enqueuedCommit = null; | ||
} | ||
let resolvers = flushResolvers | ||
flushResolvers = [] | ||
let resolvers = flushResolvers; | ||
flushResolvers = []; | ||
env.startWriting(startAddress, (status) => { | ||
//console.log('finished batch', store.name) | ||
if (dynamicBytes.uint32[dynamicBytes.position << 1] & TXN_DELIMITER) | ||
queueCommitResolution(nextResolution) | ||
queueCommitResolution(nextResolution); | ||
resolveWrites(true) | ||
resolveWrites(true); | ||
switch (status) { | ||
case 0: | ||
for (let i = 0; i < resolvers.length; i++) | ||
resolvers[i]() | ||
resolvers[i](); | ||
case 1: | ||
break; | ||
case 2: | ||
executeTxnCallbacks() | ||
break | ||
executeTxnCallbacks(); | ||
break; | ||
default: | ||
console.error(status) | ||
console.error(status); | ||
if (commitRejectPromise) { | ||
commitRejectPromise.reject(status) | ||
commitRejectPromise = null | ||
commitRejectPromise.reject(status); | ||
commitRejectPromise = null; | ||
} | ||
} | ||
}) | ||
startAddress = 0 | ||
}); | ||
startAddress = 0; | ||
} | ||
@@ -364,15 +365,15 @@ | ||
if (!resolution.isTxn) { | ||
resolution.isTxn = true | ||
resolution.isTxn = true; | ||
if (txnResolution) { | ||
txnResolution.nextTxn = resolution | ||
txnResolution.nextTxn = resolution; | ||
//outstandingWriteCount = 0 | ||
} | ||
else | ||
txnResolution = resolution | ||
txnResolution = resolution; | ||
} | ||
} | ||
var TXN_DONE = (separateFlushed ? TXN_COMMITTED : TXN_FLUSHED) | TXN_FAILED | ||
var TXN_DONE = (separateFlushed ? TXN_COMMITTED : TXN_FLUSHED) | TXN_FAILED; | ||
function resolveWrites(async) { | ||
// clean up finished instructions | ||
let instructionStatus | ||
let instructionStatus; | ||
while ((instructionStatus = unwrittenResolution.uint32[unwrittenResolution.flagPosition]) | ||
@@ -382,11 +383,11 @@ & 0x1000000) { | ||
if (unwrittenResolution.callbacks) { | ||
nextTxnCallbacks.push(unwrittenResolution.callbacks) | ||
unwrittenResolution.callbacks = null | ||
nextTxnCallbacks.push(unwrittenResolution.callbacks); | ||
unwrittenResolution.callbacks = null; | ||
} | ||
if (!unwrittenResolution.isTxn) | ||
unwrittenResolution.uint32 = null | ||
unwrittenResolution.valueBuffer = null | ||
unwrittenResolution.flag = instructionStatus | ||
outstandingWriteCount-- | ||
unwrittenResolution = unwrittenResolution.next | ||
unwrittenResolution.uint32 = null; | ||
unwrittenResolution.valueBuffer = null; | ||
unwrittenResolution.flag = instructionStatus; | ||
outstandingWriteCount--; | ||
unwrittenResolution = unwrittenResolution.next; | ||
} | ||
@@ -396,5 +397,5 @@ while (txnResolution && | ||
if (instructionStatus & TXN_FAILED) | ||
rejectCommit() | ||
rejectCommit(); | ||
else | ||
resolveCommit(async) | ||
resolveCommit(async); | ||
} | ||
@@ -404,37 +405,37 @@ } | ||
function resolveCommit(async) { | ||
afterCommit() | ||
afterCommit(); | ||
if (async) | ||
resetReadTxn() | ||
resetReadTxn(); | ||
else | ||
queueMicrotask(resetReadTxn) // TODO: only do this if there are actually committed writes? | ||
queueMicrotask(resetReadTxn); // TODO: only do this if there are actually committed writes? | ||
do { | ||
if (uncommittedResolution.resolve) { | ||
let flag = uncommittedResolution.flag | ||
let flag = uncommittedResolution.flag; | ||
if (flag < 0) | ||
uncommittedResolution.reject(new Error("Error occurred in write")) | ||
uncommittedResolution.reject(new Error("Error occurred in write")); | ||
else if (flag & FAILED_CONDITION) { | ||
uncommittedResolution.resolve(false) | ||
uncommittedResolution.resolve(false); | ||
} else | ||
uncommittedResolution.resolve(true) | ||
uncommittedResolution.resolve(true); | ||
} | ||
} while((uncommittedResolution = uncommittedResolution.next) && uncommittedResolution != txnResolution) | ||
txnResolution = txnResolution.nextTxn | ||
txnResolution = txnResolution.nextTxn; | ||
} | ||
var commitRejectPromise | ||
var commitRejectPromise; | ||
function rejectCommit() { | ||
afterCommit() | ||
afterCommit(); | ||
if (!commitRejectPromise) { | ||
let rejectFunction | ||
commitRejectPromise = new Promise((resolve, reject) => rejectFunction = reject) | ||
commitRejectPromise.reject = rejectFunction | ||
let rejectFunction; | ||
commitRejectPromise = new Promise((resolve, reject) => rejectFunction = reject); | ||
commitRejectPromise.reject = rejectFunction; | ||
} | ||
do { | ||
if (uncommittedResolution.reject) { | ||
let flag = uncommittedResolution.flag & 0xf | ||
let error = new Error("Commit failed (see commitError for details)") | ||
error.commitError = commitRejectPromise | ||
uncommittedResolution.reject(error) | ||
let flag = uncommittedResolution.flag & 0xf; | ||
let error = new Error("Commit failed (see commitError for details)"); | ||
error.commitError = commitRejectPromise; | ||
uncommittedResolution.reject(error); | ||
} | ||
} while((uncommittedResolution = uncommittedResolution.next) && uncommittedResolution != txnResolution) | ||
txnResolution = txnResolution.nextTxn | ||
txnResolution = txnResolution.nextTxn; | ||
} | ||
@@ -446,5 +447,5 @@ function atomicStatus(uint32, flagPosition, newStatus) { | ||
// just poll for the status change if we miss a status update | ||
let writeStatus = uint32[flagPosition] | ||
uint32[flagPosition] = newStatus | ||
return writeStatus | ||
let writeStatus = uint32[flagPosition]; | ||
uint32[flagPosition] = newStatus; | ||
return writeStatus; | ||
//return Atomics.or(uint32, flagPosition, newStatus) | ||
@@ -454,51 +455,51 @@ } else // otherwise the transaction could end at any time and we need to know the | ||
// so we use the slower atomic operation | ||
return Atomics.or(uint32, flagPosition, newStatus) | ||
return Atomics.or(uint32, flagPosition, newStatus); | ||
} | ||
function afterCommit() { | ||
for (let i = 0, l = afterCommitCallbacks.length; i < l; i++) { | ||
afterCommitCallbacks[i]({ next: uncommittedResolution, last: unwrittenResolution}) | ||
afterCommitCallbacks[i]({ next: uncommittedResolution, last: unwrittenResolution}); | ||
} | ||
} | ||
async function executeTxnCallbacks() { | ||
env.writeTxn = writeTxn = {} | ||
let promises | ||
let txnCallbacks | ||
env.writeTxn = writeTxn = {}; | ||
let promises; | ||
let txnCallbacks; | ||
for (let i = 0, l = nextTxnCallbacks.length; i < l; i++) { | ||
txnCallbacks = nextTxnCallbacks[i] | ||
txnCallbacks = nextTxnCallbacks[i]; | ||
for (let i = 0, l = txnCallbacks.length; i < l; i++) { | ||
let userTxnCallback = txnCallbacks[i] | ||
let asChild = userTxnCallback.asChild | ||
let userTxnCallback = txnCallbacks[i]; | ||
let asChild = userTxnCallback.asChild; | ||
if (asChild) { | ||
if (promises) { | ||
// must complete any outstanding transactions before proceeding | ||
await Promise.all(promises) | ||
promises = null | ||
await Promise.all(promises); | ||
promises = null; | ||
} | ||
env.beginTxn(1) // abortable | ||
env.beginTxn(1); // abortable | ||
try { | ||
let result = userTxnCallback.callback() | ||
let result = userTxnCallback.callback(); | ||
if (result && result.then) { | ||
await result | ||
await result; | ||
} | ||
if (result === ABORT) | ||
env.abortTxn() | ||
env.abortTxn(); | ||
else | ||
env.commitTxn() | ||
txnCallbacks[i] = result | ||
env.commitTxn(); | ||
txnCallbacks[i] = result; | ||
} catch(error) { | ||
env.abortTxn() | ||
txnError(error, i) | ||
env.abortTxn(); | ||
txnError(error, i); | ||
} | ||
} else { | ||
try { | ||
let result = userTxnCallback() | ||
txnCallbacks[i] = result | ||
let result = userTxnCallback(); | ||
txnCallbacks[i] = result; | ||
if (result && result.then) { | ||
if (!promises) | ||
promises = [] | ||
promises.push(result.catch(() => {})) | ||
promises = []; | ||
promises.push(result.catch(() => {})); | ||
} | ||
} catch(error) { | ||
txnError(error, i) | ||
txnError(error, i); | ||
} | ||
@@ -508,18 +509,18 @@ } | ||
} | ||
nextTxnCallbacks = [] | ||
nextTxnCallbacks = []; | ||
if (promises) { // finish any outstanding commit functions | ||
await Promise.all(promises) | ||
await Promise.all(promises); | ||
} | ||
env.writeTxn = writeTxn = false | ||
env.writeTxn = writeTxn = false; | ||
function txnError(error, i) { | ||
(txnCallbacks.errors || (txnCallbacks.errors = []))[i] = error | ||
txnCallbacks[i] = CALLBACK_THREW | ||
(txnCallbacks.errors || (txnCallbacks.errors = []))[i] = error; | ||
txnCallbacks[i] = CALLBACK_THREW; | ||
} | ||
} | ||
function finishBatch() { | ||
dynamicBytes.uint32[(dynamicBytes.position + 1) << 1] = 0 // clear out the next slot | ||
let writeStatus = atomicStatus(dynamicBytes.uint32, (dynamicBytes.position++) << 1, 2) // atomically write the end block | ||
nextResolution.flagPosition += 2 | ||
dynamicBytes.uint32[(dynamicBytes.position + 1) << 1] = 0; // clear out the next slot | ||
let writeStatus = atomicStatus(dynamicBytes.uint32, (dynamicBytes.position++) << 1, 2); // atomically write the end block | ||
nextResolution.flagPosition += 2; | ||
if (writeStatus & WAITING_OPERATION) { | ||
env.write(0) | ||
env.write(0); | ||
} | ||
@@ -529,40 +530,40 @@ } | ||
put(key, value, versionOrOptions, ifVersion) { | ||
let callback, flags = 15, type = typeof versionOrOptions | ||
let callback, flags = 15, type = typeof versionOrOptions; | ||
if (type == 'object') { | ||
if (versionOrOptions.noOverwrite) | ||
flags |= 0x10 | ||
flags |= 0x10; | ||
if (versionOrOptions.noDupData) | ||
flags |= 0x20 | ||
flags |= 0x20; | ||
if (versionOrOptions.append) | ||
flags |= 0x20000 | ||
flags |= 0x20000; | ||
if (versionOrOptions.ifVersion != undefined) | ||
ifVersion = versionsOrOptions.ifVersion | ||
versionOrOptions = versionOrOptions.version | ||
ifVersion = versionsOrOptions.ifVersion; | ||
versionOrOptions = versionOrOptions.version; | ||
if (typeof ifVersion == 'function') | ||
callback = ifVersion | ||
callback = ifVersion; | ||
} else if (type == 'function') { | ||
callback = versionOrOptions | ||
callback = versionOrOptions; | ||
} | ||
return writeInstructions(flags, this, key, value, this.useVersions ? versionOrOptions || 0 : undefined, ifVersion)(callback) | ||
return writeInstructions(flags, this, key, value, this.useVersions ? versionOrOptions || 0 : undefined, ifVersion)(callback); | ||
}, | ||
remove(key, ifVersionOrValue, callback) { | ||
let flags = 13 | ||
let ifVersion, value | ||
let flags = 13; | ||
let ifVersion, value; | ||
if (ifVersionOrValue !== undefined) { | ||
if (typeof ifVersionOrValue == 'function') | ||
callback = ifVersionOrValue | ||
callback = ifVersionOrValue; | ||
else if (this.useVersions) | ||
ifVersion = ifVersionOrValue | ||
ifVersion = ifVersionOrValue; | ||
else { | ||
flags = 14 | ||
value = ifVersionOrValue | ||
flags = 14; | ||
value = ifVersionOrValue; | ||
} | ||
} | ||
return writeInstructions(flags, this, key, value, undefined, ifVersion)(callback) | ||
return writeInstructions(flags, this, key, value, undefined, ifVersion)(callback); | ||
}, | ||
del(key, options, callback) { | ||
return this.remove(key, options, callback) | ||
return this.remove(key, options, callback); | ||
}, | ||
ifNoExists(key, callback) { | ||
return this.ifVersion(key, null, callback) | ||
return this.ifVersion(key, null, callback); | ||
}, | ||
@@ -573,25 +574,25 @@ | ||
return new Batch((operations, callback) => { | ||
let promise = this.ifVersion(key, version, operations) | ||
let promise = this.ifVersion(key, version, operations); | ||
if (callback) | ||
promise.then(callback) | ||
return promise | ||
}) | ||
promise.then(callback); | ||
return promise; | ||
}); | ||
} | ||
if (writeTxn) { | ||
if (this.doesExist(key, version)) { | ||
callback() | ||
return SYNC_PROMISE_SUCCESS | ||
callback(); | ||
return SYNC_PROMISE_SUCCESS; | ||
} | ||
return SYNC_PROMISE_FAIL | ||
return SYNC_PROMISE_FAIL; | ||
} | ||
let finishStartWrite = writeInstructions(typeof key === 'undefined' ? 1 : 4, this, key, undefined, undefined, version) | ||
let promise | ||
batchDepth += 2 | ||
let finishStartWrite = writeInstructions(typeof key === 'undefined' ? 1 : 4, this, key, undefined, undefined, version); | ||
let promise; | ||
batchDepth += 2; | ||
if (batchDepth > 2) | ||
promise = finishStartWrite() | ||
promise = finishStartWrite(); | ||
else { | ||
writeBatchStart = () => { | ||
promise = finishStartWrite() | ||
} | ||
outstandingBatchCount = 0 | ||
promise = finishStartWrite(); | ||
}; | ||
outstandingBatchCount = 0; | ||
} | ||
@@ -601,7 +602,7 @@ //console.warn('wrote start of ifVersion', this.path) | ||
if (typeof callback === 'function') { | ||
callback() | ||
callback(); | ||
} else { | ||
for (let i = 0, l = callback.length; i < l; i++) { | ||
let operation = callback[i] | ||
this[operation.type](operation.key, operation.value) | ||
let operation = callback[i]; | ||
this[operation.type](operation.key, operation.value); | ||
} | ||
@@ -612,26 +613,26 @@ } | ||
if (!promise) { | ||
finishBatch() | ||
batchDepth -= 2 | ||
promise = finishStartWrite() // finish write once all the operations have been written (and it hasn't been written prematurely) | ||
writeBatchStart = null | ||
finishBatch(); | ||
batchDepth -= 2; | ||
promise = finishStartWrite(); // finish write once all the operations have been written (and it hasn't been written prematurely) | ||
writeBatchStart = null; | ||
} else { | ||
batchDepth -= 2 | ||
finishBatch() | ||
batchDepth -= 2; | ||
finishBatch(); | ||
} | ||
} | ||
return promise | ||
return promise; | ||
}, | ||
batch(callbackOrOperations) { | ||
return this.ifVersion(undefined, undefined, callbackOrOperations) | ||
return this.ifVersion(undefined, undefined, callbackOrOperations); | ||
}, | ||
drop(callback) { | ||
return writeInstructions(1024 + 12, this, undefined, undefined, undefined, undefined)(callback) | ||
return writeInstructions(1024 + 12, this, undefined, undefined, undefined, undefined)(callback); | ||
}, | ||
clearAsync(callback) { | ||
if (this.encoder && this.encoder.structures) | ||
this.encoder.structures = [] | ||
return writeInstructions(12, this, undefined, undefined, undefined, undefined)(callback) | ||
this.encoder.structures = []; | ||
return writeInstructions(12, this, undefined, undefined, undefined, undefined)(callback); | ||
}, | ||
_triggerError() { | ||
finishBatch() | ||
finishBatch(); | ||
}, | ||
@@ -641,15 +642,15 @@ | ||
if (writeTxn) | ||
return this.put(key, value, versionOrOptions, ifVersion) | ||
return this.put(key, value, versionOrOptions, ifVersion); | ||
else | ||
return this.transactionSync(() => | ||
this.put(key, value, versionOrOptions, ifVersion) == SYNC_PROMISE_SUCCESS, | ||
{ abortable: false }) | ||
{ abortable: false }); | ||
}, | ||
removeSync(key, ifVersionOrValue) { | ||
if (writeTxn) | ||
return this.remove(key, ifVersionOrValue) | ||
return this.remove(key, ifVersionOrValue); | ||
else | ||
return this.transactionSync(() => | ||
this.remove(key, ifVersionOrValue) == SYNC_PROMISE_SUCCESS, | ||
{ abortable: false }) | ||
{ abortable: false }); | ||
}, | ||
@@ -659,47 +660,47 @@ transaction(callback) { | ||
// already nested in a transaction, just execute and return | ||
return callback() | ||
return callback(); | ||
} | ||
return this.transactionAsync(callback) | ||
return this.transactionAsync(callback); | ||
}, | ||
childTransaction(callback) { | ||
if (useWritemap) | ||
throw new Error('Child transactions are not supported in writemap mode') | ||
throw new Error('Child transactions are not supported in writemap mode'); | ||
if (writeTxn) { | ||
env.beginTxn(1) // abortable | ||
env.beginTxn(1); // abortable | ||
try { | ||
return when(callback(), (result) => { | ||
if (result === ABORT) | ||
env.abortTxn() | ||
env.abortTxn(); | ||
else | ||
env.commitTxn() | ||
return result | ||
env.commitTxn(); | ||
return result; | ||
}, (error) => { | ||
env.abortTxn() | ||
throw error | ||
}) | ||
env.abortTxn(); | ||
throw error; | ||
}); | ||
} catch(error) { | ||
env.abortTxn() | ||
throw error | ||
env.abortTxn(); | ||
throw error; | ||
} | ||
} | ||
return this.transactionAsync(callback, true) | ||
return this.transactionAsync(callback, true); | ||
}, | ||
transactionAsync(callback, asChild) { | ||
let txnIndex | ||
let txnCallbacks | ||
let txnIndex; | ||
let txnCallbacks; | ||
if (!nextResolution.callbacks) { | ||
txnCallbacks = [asChild ? { callback, asChild } : callback] | ||
nextResolution.callbacks = txnCallbacks | ||
txnCallbacks.results = writeInstructions(8 | (this.strictAsyncOrder ? 0x100000 : 0), this)() | ||
txnIndex = 0 | ||
txnCallbacks = [asChild ? { callback, asChild } : callback]; | ||
nextResolution.callbacks = txnCallbacks; | ||
txnCallbacks.results = writeInstructions(8 | (this.strictAsyncOrder ? 0x100000 : 0), this)(); | ||
txnIndex = 0; | ||
} else { | ||
txnCallbacks = lastQueuedResolution.callbacks | ||
txnIndex = txnCallbacks.push(asChild ? { callback, asChild } : callback) - 1 | ||
txnCallbacks = lastQueuedResolution.callbacks; | ||
txnIndex = txnCallbacks.push(asChild ? { callback, asChild } : callback) - 1; | ||
} | ||
return txnCallbacks.results.then((results) => { | ||
let result = txnCallbacks[txnIndex] | ||
let result = txnCallbacks[txnIndex]; | ||
if (result === CALLBACK_THREW) | ||
throw txnCallbacks.errors[txnIndex] | ||
return result | ||
}) | ||
throw txnCallbacks.errors[txnIndex]; | ||
return result; | ||
}); | ||
}, | ||
@@ -710,49 +711,49 @@ transactionSync(callback, flags) { | ||
// already nested in a transaction, execute as child transaction (if possible) and return | ||
return this.childTransaction(callback) | ||
let result = callback() // else just run in current transaction | ||
return this.childTransaction(callback); | ||
let result = callback(); // else just run in current transaction | ||
if (result == ABORT && !abortedNonChildTransactionWarn) { | ||
console.warn('Can not abort a transaction inside another transaction with ' + (this.cache ? 'caching enabled' : 'useWritemap enabled')) | ||
abortedNonChildTransactionWarn = true | ||
console.warn('Can not abort a transaction inside another transaction with ' + (this.cache ? 'caching enabled' : 'useWritemap enabled')); | ||
abortedNonChildTransactionWarn = true; | ||
} | ||
return result | ||
return result; | ||
} | ||
try { | ||
this.transactions++ | ||
env.beginTxn(flags == undefined ? 3 : flags) | ||
writeTxn = env.writeTxn = {} | ||
this.transactions++; | ||
env.beginTxn(flags == undefined ? 3 : flags); | ||
writeTxn = env.writeTxn = {}; | ||
return when(callback(), (result) => { | ||
try { | ||
if (result === ABORT) | ||
env.abortTxn() | ||
env.abortTxn(); | ||
else { | ||
env.commitTxn() | ||
resetReadTxn() | ||
env.commitTxn(); | ||
resetReadTxn(); | ||
} | ||
return result | ||
return result; | ||
} finally { | ||
env.writeTxn = writeTxn = null | ||
env.writeTxn = writeTxn = null; | ||
} | ||
}, (error) => { | ||
try { env.abortTxn() } catch(e) {} | ||
env.writeTxn = writeTxn = null | ||
throw error | ||
}) | ||
try { env.abortTxn(); } catch(e) {} | ||
env.writeTxn = writeTxn = null; | ||
throw error; | ||
}); | ||
} catch(error) { | ||
try { env.abortTxn() } catch(e) {} | ||
env.writeTxn = writeTxn = null | ||
throw error | ||
try { env.abortTxn(); } catch(e) {} | ||
env.writeTxn = writeTxn = null; | ||
throw error; | ||
} | ||
}, | ||
transactionSyncStart(callback) { | ||
return this.transactionSync(callback, 0) | ||
return this.transactionSync(callback, 0); | ||
}, | ||
on(event, callback) { | ||
if (event == 'beforecommit') { | ||
eventTurnBatching = true | ||
beforeCommitCallbacks.push(callback) | ||
eventTurnBatching = true; | ||
beforeCommitCallbacks.push(callback); | ||
} else if (event == 'aftercommit') | ||
afterCommitCallbacks.push(callback) | ||
afterCommitCallbacks.push(callback); | ||
} | ||
}) | ||
LMDBStore.prototype.del = LMDBStore.prototype.remove | ||
}); | ||
LMDBStore.prototype.del = LMDBStore.prototype.remove; | ||
} | ||
@@ -762,17 +763,22 @@ | ||
constructor(callback) { | ||
super() | ||
this.callback = callback | ||
super(); | ||
this.callback = callback; | ||
} | ||
put(key, value) { | ||
this.push({ type: 'put', key, value }) | ||
this.push({ type: 'put', key, value }); | ||
} | ||
del(key) { | ||
this.push({ type: 'del', key }) | ||
this.push({ type: 'del', key }); | ||
} | ||
clear() { | ||
this.splice(0, this.length) | ||
this.splice(0, this.length); | ||
} | ||
write(callback) { | ||
return this.callback(this, callback) | ||
return this.callback(this, callback); | ||
} | ||
} | ||
export function asBinary(buffer) { | ||
return { | ||
[binaryBuffer]: buffer | ||
}; | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
15883643
170.48%161
18.38%10965
1.1%467
-0.21%26
333.33%