New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

lmdb

Package Overview
Dependencies
Maintainers
3
Versions
184
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

lmdb - npm Package Compare versions

Comparing version

to
2.0.0-beta3

mod.ts

6

benchmark/index.js

@@ -179,3 +179,3 @@ 'use strict';

//suite.add('syncTxn', syncTxn);
//suite.add('getRange', getRange);
suite.add('getRange', getRange);
suite.add('setData', {

@@ -188,4 +188,4 @@ defer: true,

fn: batchDataAdd
});
suite.add('get', getData);
});*/
suite.add('get', getData);/*
suite.add('plainJSON', plainJSON);

@@ -192,0 +192,0 @@ suite.add('getBinary', getBinary);*/

@@ -1,78 +0,78 @@

import { WeakLRUCache } from 'weak-lru-cache/index.js'
let getLastVersion
const mapGet = Map.prototype.get
import { WeakLRUCache } from 'weak-lru-cache/index.js';
let getLastVersion;
const mapGet = Map.prototype.get;
export const CachingStore = Store => class extends Store {
constructor(dbName, options) {
super(dbName, options)
super(dbName, options);
if (!this.env.cacheCommitter) {
this.env.cacheCommitter = true
this.env.cacheCommitter = true;
this.on('aftercommit', ({ next, last }) => {
do {
let store = next.store
let store = next.store;
if (store) {
if (next.flag & 1)
next.store.cache.delete(next.key) // just delete it from the map
next.store.cache.delete(next.key); // just delete it from the map
else {
let expirationPriority = next.valueSize >> 10
let cache = next.store.cache
let entry = mapGet.call(cache, next.key)
let expirationPriority = next.valueSize >> 10;
let cache = next.store.cache;
let entry = mapGet.call(cache, next.key);
if (entry)
cache.used(entry, expirationPriority) // this will enter it into the LRFU
cache.used(entry, expirationPriority); // this will enter it into the LRFU
}
}
} while (next != last && (next = next.next))
})
});
}
this.db.cachingDb = this
this.cache = new WeakLRUCache(options.cache)
this.db.cachingDb = this;
this.cache = new WeakLRUCache(options.cache);
}
get(id, cacheMode) {
let value = this.cache.getValue(id)
let value = this.cache.getValue(id);
if (value !== undefined)
return value
value = super.get(id)
return value;
value = super.get(id);
if (value && typeof value === 'object' && !cacheMode && typeof id !== 'object') {
let entry = this.cache.setValue(id, value, this.lastSize >> 10)
let entry = this.cache.setValue(id, value, this.lastSize >> 10);
if (this.useVersions) {
entry.version = getLastVersion()
entry.version = getLastVersion();
}
}
return value
return value;
}
getEntry(id, cacheMode) {
let entry = this.cache.get(id)
let entry = this.cache.get(id);
if (entry)
return entry
let value = super.get(id)
return entry;
let value = super.get(id);
if (value === undefined)
return
return;
if (value && typeof value === 'object' && !cacheMode && typeof id !== 'object') {
entry = this.cache.setValue(id, value, this.lastSize >> 10)
entry = this.cache.setValue(id, value, this.lastSize >> 10);
} else {
entry = { value }
entry = { value };
}
if (this.useVersions) {
entry.version = getLastVersion()
entry.version = getLastVersion();
}
return entry
return entry;
}
putEntry(id, entry, ifVersion) {
let result = super.put(id, entry.value, entry.version, ifVersion)
let result = super.put(id, entry.value, entry.version, ifVersion);
if (typeof id === 'object')
return result
return result;
if (result && result.then)
this.cache.setManually(id, entry) // set manually so we can keep it pinned in memory until it is committed
this.cache.setManually(id, entry); // set manually so we can keep it pinned in memory until it is committed
else // sync operation, immediately add to cache
this.cache.set(id, entry)
this.cache.set(id, entry);
}
put(id, value, version, ifVersion) {
// if (this.cache.get(id)) // if there is a cache entry, remove it from scheduledEntries and
let result = super.put(id, value, version, ifVersion)
let result = super.put(id, value, version, ifVersion);
if (typeof id !== 'object') {
// sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed
let entry = this.cache.setValue(id, value, !result || result.isSync ? 0 : -1)
let entry = this.cache.setValue(id, value, !result || result.isSync ? 0 : -1);
if (version !== undefined)
entry.version = typeof version === 'object' ? version.version : version
entry.version = typeof version === 'object' ? version.version : version;
}
return result
return result;
}

@@ -83,29 +83,29 @@ putSync(id, value, version, ifVersion) {

if (value && typeof value === 'object') {
let entry = this.cache.setValue(id, value)
let entry = this.cache.setValue(id, value);
if (version !== undefined) {
entry.version = typeof version === 'object' ? version.version : version
entry.version = typeof version === 'object' ? version.version : version;
}
} else // it is possible that a value used to exist here
this.cache.delete(id)
this.cache.delete(id);
}
return super.putSync(id, value, version, ifVersion)
return super.putSync(id, value, version, ifVersion);
}
remove(id, ifVersion) {
this.cache.delete(id)
return super.remove(id, ifVersion)
this.cache.delete(id);
return super.remove(id, ifVersion);
}
removeSync(id, ifVersion) {
this.cache.delete(id)
return super.removeSync(id, ifVersion)
this.cache.delete(id);
return super.removeSync(id, ifVersion);
}
clear() {
this.cache.clear()
super.clear()
this.cache.clear();
super.clear();
}
childTransaction(execute) {
throw new Error('Child transactions are not supported in caching stores')
throw new Error('Child transactions are not supported in caching stores');
}
}
};
export function setGetLastVersion(get) {
getLastVersion = get
getLastVersion = get;
}

@@ -0,0 +0,0 @@ /* sample-bdb.txt - BerkeleyDB toy/sample

@@ -0,0 +0,0 @@ /* sample-mdb.txt - MDB toy/sample

@@ -0,0 +0,0 @@ LZ4 Windows binary package

@@ -0,0 +0,0 @@ LZ4 - Library Files

@@ -259,4 +259,3 @@ import { EventEmitter } from 'events'

useVersions?: boolean
keyIsBuffer?: boolean
keyIsUint32?: boolean
keyEncoding?: 'uint32' | 'binary' | 'ordered-binary'
dupSort?: boolean

@@ -263,0 +262,0 @@ strictAsyncOrder?: boolean

@@ -1,48 +0,44 @@

import { extname, basename, dirname} from 'path'
import EventEmitter from 'events'
import { Env, Compression, getAddress, require, arch, fs } from './native.js'
import { CachingStore, setGetLastVersion } from './caching.js'
import { addQueryMethods } from './query.js'
import { addWriteMethods } from './writer.js'
import { applyKeyHandling } from './keys.js'
import { Encoder as MsgpackrEncoder } from 'msgpackr'
const binaryBuffer = Symbol('binaryBuffer')
setGetLastVersion(getLastVersion)
const Uint8ArraySlice = Uint8Array.prototype.slice
let keyBytes, keyBytesView
const buffers = []
import { extname, basename, dirname} from 'path';
import EventEmitter from 'events';
import { Env, Compression, getAddress, require, arch, fs } from './native.js';
import { CachingStore, setGetLastVersion } from './caching.js';
import { addQueryMethods, makeReusableBuffer } from './query.js';
import { addWriteMethods } from './writer.js';
import { applyKeyHandling } from './keys.js';
import { Encoder as MsgpackrEncoder } from 'msgpackr';
setGetLastVersion(getLastVersion);
let keyBytes, keyBytesView;
const buffers = [];
const DEFAULT_SYNC_BATCH_THRESHOLD = 200000000 // 200MB
const DEFAULT_IMMEDIATE_BATCH_THRESHOLD = 10000000 // 10MB
const DEFAULT_COMMIT_DELAY = 0
const DEFAULT_SYNC_BATCH_THRESHOLD = 200000000; // 200MB
const DEFAULT_IMMEDIATE_BATCH_THRESHOLD = 10000000; // 10MB
const DEFAULT_COMMIT_DELAY = 0;
const READING_TNX = {
readOnly: true
}
};
export const allDbs = new Map()
let env
let defaultCompression
let lastSize, lastOffset, lastVersion
const MDB_SET_KEY = 0, MDB_SET_RANGE = 1, MDB_GET_BOTH_RANGE = 2, MDB_GET_CURRENT = 3, MDB_FIRST = 4, MDB_LAST = 5, MDB_NEXT = 6, MDB_NEXT_NODUP = 7, MDB_NEXT_DUP = 8, MDB_PREV = 9, MDB_PREV_NODUP = 10, MDB_PREV_DUP = 11
let abortedNonChildTransactionWarn
export const allDbs = new Map();
let env;
let defaultCompression;
let lastSize, lastOffset, lastVersion;
let abortedNonChildTransactionWarn;
export function open(path, options) {
if (!keyBytes)
allocateFixedBuffer()
let env = new Env()
let committingWrites
let scheduledTransactions
let scheduledOperations
let asyncTransactionAfter = true, asyncTransactionStrictOrder
let transactionWarned
let readTxn, writeTxn, pendingBatch, currentCommit, runNextBatch, readTxnRenewed
allocateFixedBuffer();
let env = new Env();
let committingWrites;
let scheduledTransactions;
let scheduledOperations;
let asyncTransactionAfter = true, asyncTransactionStrictOrder;
let transactionWarned;
if (typeof path == 'object' && !options) {
options = path
path = options.path
options = path;
path = options.path;
}
let extension = extname(path)
let name = basename(path, extension)
let is32Bit = arch().endsWith('32')
let extension = extname(path);
let name = basename(path, extension);
let is32Bit = arch().endsWith('32');
let remapChunks = (options && options.remapChunks) || ((options && options.mapSize) ?
(is32Bit && options.mapSize > 0x100000000) : // larger than fits in address space, must use dynamic maps
is32Bit) // without a known map size, we default to being able to handle large data correctly/well*/
is32Bit); // without a known map size, we default to being able to handle large data correctly/well*/
options = Object.assign({

@@ -60,23 +56,27 @@ path,

0x20000, // Otherwise we start small with 128KB
}, options)
}, options);
if (options.asyncTransactionOrder == 'before') {
console.warn('asyncTransactionOrder: "before" is deprecated')
asyncTransactionAfter = false
console.warn('asyncTransactionOrder: "before" is deprecated');
asyncTransactionAfter = false;
} else if (options.asyncTransactionOrder == 'strict') {
asyncTransactionStrictOrder = true
asyncTransactionAfter = false
asyncTransactionStrictOrder = true;
asyncTransactionAfter = false;
}
if (!fs.existsSync(options.noSubdir ? dirname(path) : path))
fs.mkdirSync(options.noSubdir ? dirname(path) : path, { recursive: true })
fs.mkdirSync(options.noSubdir ? dirname(path) : path, { recursive: true });
if (options.compression) {
let setDefault
let setDefault;
if (options.compression == true) {
if (defaultCompression)
options.compression = defaultCompression
else
defaultCompression = options.compression = new Compression({
options.compression = defaultCompression;
else {
let compressionOptions = {
threshold: 1000,
dictionary: fs.readFileSync(new URL('./dict/dict.txt', import.meta.url.replace(/dist[\\\/]index.cjs$/, ''))),
})
defaultCompression.threshold = 1000
dictionary: fs.readFileSync(new URL('./dict/dict.txt',
import.meta.url.replace(/dist[\\\/]index.cjs$/, ''))),
getValueBytes: makeReusableBuffer(0),
};
defaultCompression = options.compression = new Compression(compressionOptions);
Object.assign(defaultCompression, compressionOptions);
}
} else {

@@ -86,5 +86,6 @@ let compressionOptions = Object.assign({

dictionary: fs.readFileSync(new URL('./dict/dict.txt', import.meta.url.replace(/dist[\\\/]index.cjs$/, ''))),
}, options.compression)
options.compression = new Compression(compressionOptions)
options.compression.threshold = compressionOptions.threshold
getValueBytes: makeReusableBuffer(0),
}, options.compression);
options.compression = new Compression(compressionOptions);
Object.assign(options.compression, compressionOptions);
}

@@ -94,34 +95,14 @@ }

if (options && options.clearOnStart) {
console.info('Removing', path)
fs.removeSync(path)
console.info('Removed', path)
console.info('Removing', path);
fs.removeSync(path);
console.info('Removed', path);
}
env.open(options)
env.readerCheck() // clear out any stale entries
function renewReadTxn() {
if (readTxn)
readTxn.renew()
else
readTxn = env.beginTxn(0x20000)
readTxnRenewed = setImmediate(resetReadTxn)
return readTxn
}
function resetReadTxn() {
if (readTxnRenewed) {
LMDBStore.onReadReset()
readTxnRenewed = null
if (readTxn.cursorCount - (readTxn.renewingCursorCount || 0) > 0) {
readTxn.onlyCursor = true
readTxn = null
}
else
readTxn.reset()
}
}
let stores = []
env.open(options);
env.readerCheck(); // clear out any stale entries
let stores = [];
class LMDBStore extends EventEmitter {
constructor(dbName, dbOptions) {
super()
super();
if (dbName === undefined)
throw new Error('Database name must be supplied in name property (may be null for root database)')
throw new Error('Database name must be supplied in name property (may be null for root database)');

@@ -132,33 +113,32 @@ const openDB = () => {

create: true,
txn: env.writeTxn,
}, dbOptions))
this.db.name = dbName || null
}
if (dbOptions.compression && !(dbOptions.compression instanceof Compression)) {
if (dbOptions.compression == true && options.compression)
dbOptions.compression = options.compression // use the parent compression if available
else
dbOptions.compression = new Compression(Object.assign({
threshold: 1000,
dictionary: fs.readFileSync(require.resolve('./dict/dict.txt')),
}), dbOptions.compression)
}
}, dbOptions));
this.db.name = dbName || null;
};
if (dbOptions.compression instanceof Compression) {
// do nothing, already compression object
} else if (dbOptions.compression && typeof dbOptions.compression == 'object')
dbOptions.compression = new Compression(Object.assign({
threshold: 1000,
dictionary: fs.readFileSync(require.resolve('./dict/dict.txt')),
}), dbOptions.compression);
else if (options.compression && dbOptions.compression !== false)
dbOptions.compression = options.compression; // use the parent compression if available
if (dbOptions.dupSort && (dbOptions.useVersions || dbOptions.cache)) {
throw new Error('The dupSort flag can not be combined with versions or caching')
throw new Error('The dupSort flag can not be combined with versions or caching');
}
openDB()
resetReadTxn() // a read transaction becomes invalid after opening another db
this.name = dbName
this.status = 'open'
this.env = env
this.reads = 0
this.writes = 0
this.transactions = 0
this.averageTransactionTime = 5
openDB();
this.resetReadTxn(); // a read transaction becomes invalid after opening another db
this.name = dbName;
this.status = 'open';
this.env = env;
this.reads = 0;
this.writes = 0;
this.transactions = 0;
this.averageTransactionTime = 5;
if (dbOptions.syncBatchThreshold)
console.warn('syncBatchThreshold is no longer supported')
console.warn('syncBatchThreshold is no longer supported');
if (dbOptions.immediateBatchThreshold)
console.warn('immediateBatchThreshold is no longer supported')
this.commitDelay = DEFAULT_COMMIT_DELAY
console.warn('immediateBatchThreshold is no longer supported');
this.commitDelay = DEFAULT_COMMIT_DELAY;
Object.assign(this, { // these are the options that are inherited

@@ -168,3 +148,3 @@ path: options.path,

strictAsyncOrder: options.strictAsyncOrder,
}, dbOptions)
}, dbOptions);
if (!this.encoding || this.encoding == 'msgpack' || this.encoding == 'cbor') {

@@ -175,212 +155,95 @@ this.encoder = this.decoder = new (this.encoding == 'cbor' ? require('cbor-x').Encoder : MsgpackrEncoder)

copyBuffers: true, // need to copy any embedded buffers that are found since we use unsafe buffers
}, options, dbOptions))
}, options, dbOptions));
} else if (this.encoding == 'json') {
this.encoder = {
encode: JSON.stringify,
}
};
}
applyKeyHandling(this)
allDbs.set(dbName ? name + '-' + dbName : name, this)
stores.push(this)
applyKeyHandling(this);
allDbs.set(dbName ? name + '-' + dbName : name, this);
stores.push(this);
}
openDB(dbName, dbOptions) {
if (typeof dbName == 'object' && !dbOptions) {
dbOptions = dbName
dbName = options.name
dbOptions = dbName;
dbName = options.name;
} else
dbOptions = dbOptions || {}
dbOptions = dbOptions || {};
try {
return dbOptions.cache ?
new (CachingStore(LMDBStore))(dbName, dbOptions) :
new LMDBStore(dbName, dbOptions)
new LMDBStore(dbName, dbOptions);
} catch(error) {
if (error.message.indexOf('MDB_DBS_FULL') > -1) {
error.message += ' (increase your maxDbs option)'
error.message += ' (increase your maxDbs option)';
}
throw error
throw error;
}
}
open(dbOptions, callback) {
let db = this.openDB(dbOptions)
let db = this.openDB(dbOptions);
if (callback)
callback(null, db)
return db
callback(null, db);
return db;
}
transactionAsync(callback, asChild) {
let lastOperation
let after, strictOrder
let lastOperation;
let after, strictOrder;
if (scheduledOperations) {
lastOperation = asyncTransactionAfter ? scheduledOperations.appendAsyncTxn :
scheduledOperations[asyncTransactionStrictOrder ? scheduledOperations.length - 1 : 0]
scheduledOperations[asyncTransactionStrictOrder ? scheduledOperations.length - 1 : 0];
} else {
scheduledOperations = []
scheduledOperations.bytes = 0
scheduledOperations = [];
scheduledOperations.bytes = 0;
}
let transactionSet
let transactionSetIndex
let transactionSet;
let transactionSetIndex;
if (lastOperation === true) { // continue last set of transactions
transactionSetIndex = scheduledTransactions.length - 1
transactionSet = scheduledTransactions[transactionSetIndex]
transactionSetIndex = scheduledTransactions.length - 1;
transactionSet = scheduledTransactions[transactionSetIndex];
} else {
// for now we signify transactions as a true
if (asyncTransactionAfter) // by default we add a flag to put transactions after other operations
scheduledOperations.appendAsyncTxn = true
scheduledOperations.appendAsyncTxn = true;
else if (asyncTransactionStrictOrder)
scheduledOperations.push(true)
scheduledOperations.push(true);
else // in before mode, we put all the async transaction at the beginning
scheduledOperations.unshift(true)
scheduledOperations.unshift(true);
if (!scheduledTransactions) {
scheduledTransactions = []
scheduledTransactions = [];
}
transactionSetIndex = scheduledTransactions.push(transactionSet = []) - 1
transactionSetIndex = scheduledTransactions.push(transactionSet = []) - 1;
}
let index = (transactionSet.push(asChild ?
{asChild, callback } : callback) - 1) << 1
{asChild, callback } : callback) - 1) << 1;
return this.scheduleCommit().results.then((results) => {
let transactionResults = results.transactionResults[transactionSetIndex]
let error = transactionResults[index]
let transactionResults = results.transactionResults[transactionSetIndex];
let error = transactionResults[index];
if (error)
throw error
return transactionResults[index + 1]
})
throw error;
return transactionResults[index + 1];
});
}
getSharedBufferForGet(id) {
let txn = (writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()))
lastSize = this.keyIsCompatibility ? txn.getBinaryShared(id) : this.db.get(this.writeKey(id, keyBytes, 0))
if (lastSize === 0xffffffff) { // not found code
return //undefined
}
return lastSize
lastSize = keyBytesView.getUint32(0, true)
let bufferIndex = keyBytesView.getUint32(12, true)
lastOffset = keyBytesView.getUint32(8, true)
let buffer = buffers[bufferIndex]
let startOffset
if (!buffer || lastOffset < (startOffset = buffer.startOffset) || (lastOffset + lastSize > startOffset + 0x100000000)) {
if (buffer)
env.detachBuffer(buffer.buffer)
startOffset = (lastOffset >>> 16) * 0x10000
console.log('make buffer for address', bufferIndex * 0x100000000 + startOffset)
buffer = buffers[bufferIndex] = Buffer.from(getBufferForAddress(bufferIndex * 0x100000000 + startOffset))
buffer.startOffset = startOffset
}
lastOffset -= startOffset
return buffer
return buffer.slice(lastOffset, lastOffset + lastSize)/*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + lastSize)*/
}
getSizeBinaryFast(id) {
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()))
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0))
}
getString(id) {
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()))
let string = this.db.getStringByBinary(this.writeKey(id, keyBytes, 0))
if (string)
lastSize = string.length
return string
}
getBinaryFast(id) {
this.getSizeBinaryFast(id)
return lastSize === 0xffffffff ? undefined : this.db.unsafeBuffer.subarray(0, lastSize)
}
getBinary(id) {
this.getSizeBinaryFast(id)
return lastSize === 0xffffffff ? undefined : Uint8ArraySlice.call(this.db.unsafeBuffer, 0, lastSize)
}
get(id) {
if (this.decoder) {
this.getSizeBinaryFast(id)
return lastSize === 0xffffffff ? undefined : this.decoder.decode(this.db.unsafeBuffer, lastSize)
}
if (this.encoding == 'binary')
return this.getBinary(id)
let result = this.getString(id)
if (result) {
if (this.encoding == 'json')
return JSON.parse(result)
}
return result
}
getEntry(id) {
let value = this.get(id)
if (value !== undefined) {
if (this.useVersions)
return {
value,
version: getLastVersion(),
//size: lastSize
}
else
return {
value,
//size: lastSize
}
}
}
resetReadTxn() {
resetReadTxn()
}
doesExist(key, versionOrValue) {
if (!env.writeTxn)
readTxnRenewed ? readTxn : renewReadTxn()
if (versionOrValue === undefined) {
this.getSizeBinaryFast(key)
return lastSize !== 0xffffffff
}
else if (this.useVersions) {
this.getSizeBinaryFast(key)
return lastSize !== 0xffffffff && matches(getLastVersion(), versionOrValue)
}
else {
if (versionOrValue && versionOrValue[binaryBuffer])
versionOrValue = versionOrValue[binaryBuffer]
else if (this.encoder)
versionOrValue = this.encoder.encode(versionOrValue)
if (typeof versionOrValue == 'string')
versionOrValue = Buffer.from(versionOrValue)
return this.getValuesCount(key, { start: versionOrValue, exactMatch: true}) > 0
}
}
backup(path) {
return new Promise((resolve, reject) => env.copy(path, false, (error) => {
if (error) {
reject(error)
reject(error);
} else {
resolve()
resolve();
}
}))
}));
}
close(callback) {
this.db.close()
if (this.isRoot) {
if (readTxn) {
try {
readTxn.abort()
} catch(error) {}
}
readTxnRenewed = null
env.close()
}
this.status = 'closed'
if (callback)
callback()
}
isOperational() {
return this.status == 'open'
return this.status == 'open';
}
getStats() {
return this.db.stat(readTxnRenewed ? readTxn : renewReadTxn())
}
sync(callback) {
return env.sync(callback || function(error) {
if (error) {
console.error(error)
console.error(error);
}
})
});
}
deleteDB() {
console.warn('deleteDB() is deprecated, use drop or dropSync instead')
return this.dropSync()
console.warn('deleteDB() is deprecated, use drop or dropSync instead');
return this.dropSync();
}

@@ -392,13 +255,13 @@ dropSync() {

}),
{ abortable: false })
{ abortable: false });
}
clear(callback) {
if (typeof callback == 'function')
return this.clearAsync(callback)
console.warn('clear() is deprecated, use clearAsync or clearSync instead')
this.clearSync()
return this.clearAsync(callback);
console.warn('clear() is deprecated, use clearAsync or clearSync instead');
this.clearSync();
}
clearSync() {
if (this.encoder && this.encoder.structures)
this.encoder.structures = []
this.encoder.structures = [];
this.transactionSync(() =>

@@ -408,42 +271,41 @@ this.db.drop({

}),
{ abortable: false })
{ abortable: false });
}
readerCheck() {
return env.readerCheck()
return env.readerCheck();
}
readerList() {
return env.readerList().join('')
return env.readerList().join('');
}
setupSharedStructures() {
const getStructures = () => {
let lastVersion // because we are doing a read here, we may need to save and restore the lastVersion from the last read
let lastVersion; // because we are doing a read here, we may need to save and restore the lastVersion from the last read
if (this.useVersions)
lastVersion = getLastVersion()
let buffer = this.getBinary(this.sharedStructuresKey)
lastVersion = getLastVersion();
let buffer = this.getBinary(this.sharedStructuresKey);
if (this.useVersions)
setLastVersion(lastVersion)
return buffer ? this.encoder.decode(buffer) : []
}
setLastVersion(lastVersion);
return buffer ? this.encoder.decode(buffer) : [];
};
return {
saveStructures: (structures, previousLength) => {
return this.transactionSyncStart(() => {
let existingStructuresBuffer = this.getBinary(this.sharedStructuresKey)
let existingStructures = existingStructuresBuffer ? this.encoder.decode(existingStructuresBuffer) : []
let existingStructuresBuffer = this.getBinary(this.sharedStructuresKey);
let existingStructures = existingStructuresBuffer ? this.encoder.decode(existingStructuresBuffer) : [];
if (existingStructures.length != previousLength)
return false // it changed, we need to indicate that we couldn't update
this.put(this.sharedStructuresKey, structures)
})
return false; // it changed, we need to indicate that we couldn't update
this.put(this.sharedStructuresKey, structures);
});
},
getStructures,
copyBuffers: true, // need to copy any embedded buffers that are found since we use unsafe buffers
}
};
}
}
// if caching class overrides putSync, don't want to double call the caching code
const putSync = LMDBStore.prototype.putSync
const removeSync = LMDBStore.prototype.removeSync
addQueryMethods(LMDBStore, { env, getReadTxn() {
return readTxnRenewed ? readTxn : renewReadTxn()
}, saveKey, keyBytes, keyBytesView, getLastVersion })
addWriteMethods(LMDBStore, { env, fixedBuffer: keyBytes, resetReadTxn, binaryBuffer, ...options })
const putSync = LMDBStore.prototype.putSync;
const removeSync = LMDBStore.prototype.removeSync;
addQueryMethods(LMDBStore, { env, saveKey, keyBytes, keyBytesView, getLastVersion });
addWriteMethods(LMDBStore, { env, fixedBuffer: keyBytes,
resetReadTxn: LMDBStore.prototype.resetReadTxn, ...options });
LMDBStore.prototype.supports = {

@@ -458,78 +320,47 @@ permanence: true,

openCallback: true,
}
};
return options.cache ?
new (CachingStore(LMDBStore))(options.name || null, options) :
new LMDBStore(options.name || null, options)
new LMDBStore(options.name || null, options);
}
function matches(previousVersion, ifVersion){
let matches
if (previousVersion) {
if (ifVersion) {
matches = previousVersion == ifVersion
} else {
matches = false
}
} else {
matches = !ifVersion
}
return matches
}
class Entry {
constructor(value, version, db) {
this.value = value
this.version = version
this.db = db
}
ifSamePut() {
}
ifSameRemove() {
}
}
export function getLastEntrySize() {
return lastSize
return lastSize;
}
export function getLastVersion() {
return keyBytesView.getFloat64(16, true)
return keyBytesView.getFloat64(16, true);
}
export function setLastVersion(version) {
return keyBytesView.setFloat64(16, version, true)
return keyBytesView.setFloat64(16, version, true);
}
export function asBinary(buffer) {
return {
[binaryBuffer]: buffer
}
}
let saveBuffer, saveDataView, saveDataAddress
let savePosition = 8000
let saveBuffer, saveDataView, saveDataAddress;
let savePosition = 8000;
function allocateSaveBuffer() {
saveBuffer = Buffer.alloc(8192)
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength)
saveBuffer.buffer.address = getAddress(saveBuffer.buffer)
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset
savePosition = 0
saveBuffer = Buffer.alloc(8192);
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength);
saveBuffer.buffer.address = getAddress(saveBuffer.buffer);
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset;
savePosition = 0;
}
function allocateFixedBuffer() {
keyBytes = Buffer.allocUnsafeSlow(2048)
const keyBuffer = keyBytes.buffer
keyBytesView = keyBytes.dataView = new DataView(keyBytes.buffer, 0, 2048) // max key size is actually 1978
keyBytes.uint32 = new Uint32Array(keyBuffer, 0, 512)
keyBytes.float64 = new Float64Array(keyBuffer, 0, 256)
keyBytes.uint32.address = keyBytes.address = keyBuffer.address = getAddress(keyBuffer)
keyBytes = Buffer.allocUnsafeSlow(2048);
const keyBuffer = keyBytes.buffer;
keyBytesView = keyBytes.dataView = new DataView(keyBytes.buffer, 0, 2048); // max key size is actually 1978
keyBytes.uint32 = new Uint32Array(keyBuffer, 0, 512);
keyBytes.float64 = new Float64Array(keyBuffer, 0, 256);
keyBytes.uint32.address = keyBytes.address = keyBuffer.address = getAddress(keyBuffer);
}
function saveKey(key, writeKey, saveTo) {
if (savePosition > 6200) {
allocateSaveBuffer()
allocateSaveBuffer();
}
let start = savePosition
savePosition = writeKey(key, saveBuffer, start + 4)
saveDataView.setUint32(start, savePosition - start - 4, true)
saveTo.saveBuffer = saveBuffer
savePosition = (savePosition + 7) & 0xfffff8
return start + saveDataAddress
let start = savePosition;
savePosition = writeKey(key, saveBuffer, start + 4);
saveDataView.setUint32(start, savePosition - start - 4, true);
saveTo.saveBuffer = saveBuffer;
savePosition = (savePosition + 7) & 0xfffff8;
return start + saveDataAddress;
}

@@ -1,21 +0,21 @@

import { getAddress } from './native.js'
import { writeKey, readKey, enableNullTermination } from 'ordered-binary/index.js'
enableNullTermination()
import { getAddress } from './native.js';
import { writeKey, readKey, enableNullTermination } from 'ordered-binary/index.js';
enableNullTermination();
const writeUint32Key = (key, target, start) => {
(target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).setUint32(start, key, true)
return start + 4
}
(target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).setUint32(start, key, true);
return start + 4;
};
const readUint32Key = (target, start) => {
return (target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).getUint32(start, true)
}
return (target.dataView || (target.dataView = new DataView(target.buffer, 0, target.length))).getUint32(start, true);
};
const writeBufferKey = (key, target, start) => {
if (key.length > 1978)
throw new Error('Key buffer is too long')
target.set(key, start)
return key.length + start
}
throw new Error('Key buffer is too long');
target.set(key, start);
return key.length + start;
};
const readBufferKey = (target, start, end) => {
return Uint8ArraySlice.call(target, start, end)
}
return Uint8ArraySlice.call(target, start, end);
};

@@ -27,3 +27,3 @@ export function applyKeyHandling(store) {

readKey,
}
};
}

@@ -33,36 +33,36 @@ if (store.encoder && store.encoder.writeKey && !store.encoder.encode) {

if (savePosition > 6200)
allocateSaveBuffer()
let start = savePosition
savePosition = writeKey(value, saveBuffer, start)
saveBuffer.start = start
saveBuffer.end = savePosition
savePosition = (savePosition + 7) & 0xfffff8
return saveBuffer
}
allocateSaveBuffer();
let start = savePosition;
savePosition = writeKey(value, saveBuffer, start);
saveBuffer.start = start;
saveBuffer.end = savePosition;
savePosition = (savePosition + 7) & 0xfffff8;
return saveBuffer;
};
}
if (store.decoder && store.decoder.readKey && !store.decoder.decode)
store.decoder.decode = function(buffer, end) { return this.readKey(buffer, 0, end) }
if (store.keyIsUint32) {
store.writeKey = writeUint32Key
store.readKey = readUint32Key
} else if (store.keyIsBuffer) {
store.writeKey = writeBufferKey
store.readKey = readBufferKey
store.decoder.decode = function(buffer) { return this.readKey(buffer, 0, buffer.length); };
if (store.keyIsUint32 || store.keyEncoding == 'uint32') {
store.writeKey = writeUint32Key;
store.readKey = readUint32Key;
} else if (store.keyIsBuffer || store.keyEncoding == 'binary') {
store.writeKey = writeBufferKey;
store.readKey = readBufferKey;
} else if (store.keyEncoder) {
store.writeKey = store.keyEncoder.writeKey
store.readKey = store.keyEncoder.readKey
store.writeKey = store.keyEncoder.writeKey;
store.readKey = store.keyEncoder.readKey;
} else {
store.writeKey = writeKey
store.readKey = readKey
store.writeKey = writeKey;
store.readKey = readKey;
}
}
let saveBuffer, saveDataView, saveDataAddress
let savePosition = 8000
let saveBuffer, saveDataView, saveDataAddress;
let savePosition = 8000;
function allocateSaveBuffer() {
saveBuffer = Buffer.alloc(8192)
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength)
saveBuffer.buffer.address = getAddress(saveBuffer.buffer)
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset
savePosition = 0
saveBuffer = Buffer.alloc(8192);
saveBuffer.dataView = saveDataView = new DataView(saveBuffer.buffer, saveBuffer.byteOffset, saveBuffer.byteLength);
saveBuffer.buffer.address = getAddress(saveBuffer.buffer);
saveDataAddress = saveBuffer.buffer.address + saveBuffer.byteOffset;
savePosition = 0;

@@ -72,10 +72,10 @@ }

if (savePosition > 6200) {
allocateSaveBuffer()
allocateSaveBuffer();
}
let start = savePosition
savePosition = writeKey(key, saveBuffer, start + 4)
saveDataView.setUint32(start, savePosition - start - 4, true)
saveTo.saveBuffer = saveBuffer
savePosition = (savePosition + 7) & 0xfffff8
return start + saveDataAddress
let start = savePosition;
savePosition = writeKey(key, saveBuffer, start + 4);
saveDataView.setUint32(start, savePosition - start - 4, true);
saveTo.saveBuffer = saveBuffer;
savePosition = (savePosition + 7) & 0xfffff8;
return start + saveDataAddress;
}
export function levelup(store) {
return Object.assign(Object.create(store), {
get(key, options, callback) {
let result = store.get(key)
let result = store.get(key);
if (typeof options == 'function')
callback = options
callback = options;
if (callback) {
if (result === undefined)
callback(new NotFoundError())
callback(new NotFoundError());
else
callback(null, result)
callback(null, result);
} else {
if (result === undefined)
return Promise.reject(new NotFoundError())
return Promise.reject(new NotFoundError());
else
return Promise.resolve(result)
return Promise.resolve(result);
}
},
})
});
}
class NotFoundError extends Error {
constructor(message) {
super(message)
this.name = 'NotFoundError'
this.notFound = true
super(message);
this.name = 'NotFoundError';
this.notFound = true;
}
}

@@ -1,11 +0,12 @@

export let Env, Compression, Cursor, getAddress, getAddressShared, require, arch, fs
export let Env, Compression, Cursor, getAddress, getAddressShared, setGlobalBuffer, require, arch, fs;
export function setNativeFunctions(nativeInterface) {
Env = nativeInterface.Env
Compression = nativeInterface.Compression
getAddress = nativeInterface.getAddress
getAddressShared = nativeInterface.getAddressShared
Cursor = nativeInterface.Cursor
require = nativeInterface.require
arch = nativeInterface.arch
fs = nativeInterface.fs
Env = nativeInterface.Env;
Compression = nativeInterface.Compression;
getAddress = nativeInterface.getAddress;
getAddressShared = nativeInterface.getAddressShared;
setGlobalBuffer = nativeInterface.setGlobalBuffer;
Cursor = nativeInterface.Cursor;
require = nativeInterface.require;
arch = nativeInterface.arch;
fs = nativeInterface.fs;
}
import { createRequire } from 'module';
const require = createRequire(import.meta.url)
import { fileURLToPath } from 'url'
import { dirname } from 'path'
import { setNativeFunctions } from './native.js'
import fs from 'fs'
import { arch } from 'os'
let nativeFunctions, dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, '')
const require = createRequire(import.meta.url);
import { fileURLToPath } from 'url';
import { dirname } from 'path';
import { setNativeFunctions } from './native.js';
import fs from 'fs';
import { arch } from 'os';
let nativeFunctions, dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, '');
try {
console.log(dirName)
nativeFunctions = require('node-gyp-build')(dirName)
console.log(dirName);
nativeFunctions = require('node-gyp-build')(dirName);
if (process.versions.modules == 93)
require('v8').setFlagsFromString('--turbo-fast-api-calls')
require('v8').setFlagsFromString('--turbo-fast-api-calls');
} catch(error) {
if (process.versions.modules == 93) {
// use this abi version as the backup version without turbo-fast-api-calls enabled
Object.defineProperty(process.versions, 'modules', { value: '92' })
Object.defineProperty(process.versions, 'modules', { value: '92' });
try {
nativeFunctions = require('node-gyp-build')(dirName)
nativeFunctions = require('node-gyp-build')(dirName);
} catch(secondError) {
throw error
throw error;
} finally {
Object.defineProperty(process.versions, 'modules', { value: '93' })
Object.defineProperty(process.versions, 'modules', { value: '93' });
}
} else
throw error
throw error;
}
nativeFunctions.require = require
nativeFunctions.arch = arch
nativeFunctions.fs = fs
setNativeFunctions(nativeFunctions)
export { toBufferKey as keyValueToBuffer, compareKeys, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js'
export { ABORT } from './writer.js'
export { levelup } from './level.js'
export { open, asBinary, getLastVersion, getLastEntrySize, setLastVersion, allDbs } from './index.js'
import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js'
import { open, getLastVersion } from './index.js'
nativeFunctions.require = require;
nativeFunctions.arch = arch;
nativeFunctions.fs = fs;
setNativeFunctions(nativeFunctions);
export { toBufferKey as keyValueToBuffer, compareKeys, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js';
export { ABORT, asBinary } from './writer.js';
export { levelup } from './level.js';
export { open, getLastVersion, getLastEntrySize, setLastVersion, allDbs } from './index.js';
import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary/index.js';
import { open, getLastVersion } from './index.js';
export default {
open, getLastVersion, compareKey, keyValueToBuffer, bufferToKeyValue
}
};
{
"name": "lmdb",
"author": "Kris Zyp",
"version": "2.0.0-beta2-win-ia32",
"version": "2.0.0-beta3",
"description": "Simple, efficient, scalable data store wrapper for LMDB",

@@ -15,3 +15,7 @@ "license": "MIT",

"mdb",
"lightning"
"lightning",
"key-value store",
"storage",
"adapter",
"performance"
],

@@ -38,5 +42,6 @@ "type": "module",

"prepare": "rollup -c",
"before-publish": "rollup -c && prebuildify --target 14.17.0 --arch=ia32 && prebuildify --target 14.17.6 && prebuildify --target 12.17.0 --arch=ia32 && prebuildify --target 12.17.0 && prebuildify --target electron@15.2.0",
"before-publish": "rollup -c && prebuildify-ci download && prebuildify --target 17.0.1 && prebuildify --target 16.9.0 && prebuildify --target 15.5.0 && prebuildify --target 14.17.6 && prebuildify --target 12.17.0 && prebuildify --target electron@15.2.0",
"prebuild": "prebuildify --target 17.0.1 && prebuildify --target 16.9.0 && prebuildify --target 15.5.0 && prebuildify --target 14.17.6 && prebuildify --target 12.18.0 && prebuildify --target electron@15.2.0",
"prebuild-arm64": "prebuildify --arch=arm64 --target 17.0.1 && prebuildify --arch=arm64 --target 16.9.0 && prebuildify --arch=arm64 --target 14.17.6 && prebuildify --arch=arm64 --target electron@15.2.0",
"prebuild-musl": "prebuildify --target 17.0.1 --libc musl --tag-libc && prebuildify --target 16.9.0 --libc musl --tag-libc && prebuildify --target 14.17.6 --libc musl --tag-libc",
"prebuild-arm64": "prebuildify --arch=arm64 --target 17.0.1 --libc musl && prebuildify --arch=arm64 --target 16.9.0 && prebuildify --arch=arm64 --target 14.17.6 && prebuildify --arch=arm64 --target electron@15.2.0",
"recompile": "node-gyp clean && node-gyp configure && node-gyp build",

@@ -66,3 +71,3 @@ "test": "mocha test/**.test.js --recursive && npm run test:types",

"mocha": "^8.3.2",
"prebuildify": "kriszyp/prebuildify#b78c5a9",
"prebuildify": "^5.0.0",
"prebuildify-ci": "^1.0.5",

@@ -69,0 +74,0 @@ "rimraf": "^3.0.2",

@@ -1,6 +0,10 @@

import { ArrayLikeIterable } from './util/ArrayLikeIterable.js'
import { getAddress, Cursor } from './native.js'
import { saveKey } from './keys.js'
import { writeKey } from 'ordered-binary/index.js'
const ITERATOR_DONE = { done: true, value: undefined }
import { ArrayLikeIterable } from './util/ArrayLikeIterable.js';
import { getAddress, Cursor, setGlobalBuffer } from './native.js';
import { saveKey } from './keys.js';
import { writeKey } from 'ordered-binary/index.js';
import { binaryBuffer } from './writer.js';
const ITERATOR_DONE = { done: true, value: undefined };
const Uint8ArraySlice = Uint8Array.prototype.slice;
let getValueBytes = makeReusableBuffer(0);
let lastSize;

@@ -10,6 +14,115 @@ export function addQueryMethods(LMDBStore, {

}) {
let renewId = 1
LMDBStore.onReadReset = () => renewId++
let get = LMDBStore.prototype.get
let readTxn, readTxnRenewed;
let renewId = 1;
Object.assign(LMDBStore.prototype, {
getSizeBinaryFast(id) {
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()));
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0));
},
getString(id) {
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()));
let string = this.db.getStringByBinary(this.writeKey(id, keyBytes, 0));
if (typeof string === 'number') { // indicates the buffer wasn't large enough
this._allocateGetBuffer(string);
// and then try again
string = this.db.getStringByBinary(this.writeKey(id, keyBytes, 0));
}
if (string)
lastSize = string.length;
return string;
},
getBinaryFast(id) {
(env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()));
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0));
let compression = this.compression;
let bytes = compression ? compression.getValueBytes : getValueBytes;
if (lastSize > bytes.maxLength) {
if (lastSize === 0xffffffff)
return;
bytes = this._allocateGetBuffer(lastSize);
lastSize = this.db.getByBinary(this.writeKey(id, keyBytes, 0));
}
bytes.length = lastSize;
return bytes;
},
_allocateGetBuffer(lastSize) {
let newLength = Math.min(Math.max(lastSize * 2, 0x1000), 0xfffffff8);
let bytes;
if (this.compression) {
let dictionary = this.compression.dictionary || [];
let dictLength = (dictionary.length >> 3) << 3;// make sure it is word-aligned
bytes = makeReusableBuffer(newLength + dictLength);
bytes.set(dictionary) // copy dictionary into start
this.compression.setBuffer(bytes, dictLength);
// the section after the dictionary is the target area for get values
bytes = bytes.subarray(dictLength);
bytes.maxLength = newLength;
Object.defineProperty(bytes, 'length', { value: newLength, writable: true, configurable: true });
this.compression.getValueBytes = bytes;
} else {
bytes = makeReusableBuffer(newLength);
setGlobalBuffer(bytes);
getValueBytes = bytes;
}
return bytes;
},
getBinary(id) {
let fastBuffer = this.getBinaryFast(id);
return fastBuffer && Uint8ArraySlice.call(fastBuffer, 0, lastSize);
},
get(id) {
if (this.decoder) {
let bytes = this.getBinaryFast(id);
return bytes && this.decoder.decode(bytes);
}
if (this.encoding == 'binary')
return this.getBinary(id);
let result = this.getString(id);
if (result) {
if (this.encoding == 'json')
return JSON.parse(result);
}
return result;
},
getEntry(id) {
let value = this.get(id);
if (value !== undefined) {
if (this.useVersions)
return {
value,
version: getLastVersion(),
//size: lastSize
};
else
return {
value,
//size: lastSize
};
}
},
resetReadTxn() {
resetReadTxn();
},
doesExist(key, versionOrValue) {
if (!env.writeTxn)
readTxnRenewed ? readTxn : renewReadTxn();
if (versionOrValue === undefined) {
this.getSizeBinaryFast(key);
return lastSize !== 0xffffffff;
}
else if (this.useVersions) {
this.getSizeBinaryFast(key);
return lastSize !== 0xffffffff && matches(getLastVersion(), versionOrValue);
}
else {
if (versionOrValue && versionOrValue[binaryBuffer])
versionOrValue = versionOrValue[binaryBuffer];
else if (this.encoder)
versionOrValue = this.encoder.encode(versionOrValue);
if (typeof versionOrValue == 'string')
versionOrValue = Buffer.from(versionOrValue);
return this.getValuesCount(key, { start: versionOrValue, exactMatch: true}) > 0;
}
},
getValues(key, options) {

@@ -19,72 +132,73 @@ let defaultOptions = {

valuesForKey: true
}
};
if (options && options.snapshot === false)
throw new Error('Can not disable snapshots for getValues')
return this.getRange(options ? Object.assign(defaultOptions, options) : defaultOptions)
throw new Error('Can not disable snapshots for getValues');
return this.getRange(options ? Object.assign(defaultOptions, options) : defaultOptions);
},
getKeys(options) {
if (!options)
options = {}
options.values = false
return this.getRange(options)
options = {};
options.values = false;
return this.getRange(options);
},
getCount(options) {
if (!options)
options = {}
options.onlyCount = true
return this.getRange(options)[Symbol.iterator]()
options = {};
options.onlyCount = true;
return this.getRange(options)[Symbol.iterator]();
},
getKeysCount(options) {
if (!options)
options = {}
options.onlyCount = true
options.values = false
return this.getRange(options)[Symbol.iterator]()
options = {};
options.onlyCount = true;
options.values = false;
return this.getRange(options)[Symbol.iterator]();
},
getValuesCount(key, options) {
if (!options)
options = {}
options.key = key
options.valuesForKey = true
options.onlyCount = true
return this.getRange(options)[Symbol.iterator]()
options = {};
options.key = key;
options.valuesForKey = true;
options.onlyCount = true;
return this.getRange(options)[Symbol.iterator]();
},
getRange(options) {
let iterable = new ArrayLikeIterable()
let iterable = new ArrayLikeIterable();
if (!options)
options = {}
let includeValues = options.values !== false
let includeVersions = options.versions
let valuesForKey = options.valuesForKey
let limit = options.limit
let db = this.db
let snapshot = options.snapshot
options = {};
let includeValues = options.values !== false;
let includeVersions = options.versions;
let valuesForKey = options.valuesForKey;
let limit = options.limit;
let db = this.db;
let snapshot = options.snapshot;
let compression = this.compression;
iterable[Symbol.iterator] = () => {
let currentKey = valuesForKey ? options.key : options.start
const reverse = options.reverse
let count = 0
let cursor, cursorRenewId
let txn
let currentKey = valuesForKey ? options.key : options.start;
const reverse = options.reverse;
let count = 0;
let cursor, cursorRenewId;
let txn;
let flags = (includeValues ? 0x100 : 0) | (reverse ? 0x400 : 0) |
(valuesForKey ? 0x800 : 0) | (options.exactMatch ? 0x4000 : 0)
(valuesForKey ? 0x800 : 0) | (options.exactMatch ? 0x4000 : 0);
function resetCursor() {
try {
if (cursor)
finishCursor()
let writeTxn = env.writeTxn
txn = writeTxn || getReadTxn()
cursor = !writeTxn && db.availableCursor
finishCursor();
let writeTxn = env.writeTxn;
txn = writeTxn || (readTxnRenewed ? readTxn : renewReadTxn());
cursor = !writeTxn && db.availableCursor;
if (cursor) {
db.availableCursor = null
db.availableCursor = null;
if (db.cursorTxn != txn)
cursor.renew()
cursor.renew();
else// if (db.currentRenewId != renewId)
flags |= 0x2000
flags |= 0x2000;
} else {
cursor = new Cursor(db)
cursor = new Cursor(db);
}
txn.cursorCount = (txn.cursorCount || 0) + 1 // track transaction so we always use the same one
txn.cursorCount = (txn.cursorCount || 0) + 1; // track transaction so we always use the same one
if (snapshot === false) {
cursorRenewId = renewId // use shared read transaction
txn.renewingCursorCount = (txn.renewingCursorCount || 0) + 1 // need to know how many are renewing cursors
cursorRenewId = renewId; // use shared read transaction
txn.renewingCursorCount = (txn.renewingCursorCount || 0) + 1; // need to know how many are renewing cursors
}

@@ -94,42 +208,42 @@ } catch(error) {

try {
cursor.close()
cursor.close();
} catch(error) { }
}
throw error
throw error;
}
}
resetCursor()
let store = this
resetCursor();
let store = this;
if (options.onlyCount) {
flags |= 0x1000
let count = position(options.offset)
finishCursor()
return count
flags |= 0x1000;
let count = position(options.offset);
finishCursor();
return count;
}
function position(offset) {
let keySize = store.writeKey(currentKey, keyBytes, 0)
let endAddress
let keySize = store.writeKey(currentKey, keyBytes, 0);
let endAddress;
if (valuesForKey) {
if (options.start === undefined && options.end === undefined)
endAddress = 0
endAddress = 0;
else {
let startAddress
let startAddress;
if (store.encoder.writeKey) {
startAddress = saveKey(options.start, store.encoder.writeKey, iterable)
keyBytesView.setFloat64(2000, startAddress, true)
endAddress = saveKey(options.end, store.encoder.writeKey, iterable)
startAddress = saveKey(options.start, store.encoder.writeKey, iterable);
keyBytesView.setFloat64(2000, startAddress, true);
endAddress = saveKey(options.end, store.encoder.writeKey, iterable);
} else if ((!options.start || options.start instanceof Uint8Array) && (!options.end || options.end instanceof Uint8Array)) {
startAddress = saveKey(options.start, writeKey, iterable)
keyBytesView.setFloat64(2000, startAddress, true)
endAddress = saveKey(options.end, writeKey, iterable)
startAddress = saveKey(options.start, writeKey, iterable);
keyBytesView.setFloat64(2000, startAddress, true);
endAddress = saveKey(options.end, writeKey, iterable);
} else {
throw new Error('Only key-based encoding is supported for start/end values')
let encoded = store.encoder.encode(options.start)
let bufferAddress = encoded.buffer.address || (encoded.buffer.address = getAddress(encoded) - encoded.byteOffset)
startAddress = bufferAddress + encoded.byteOffset
throw new Error('Only key-based encoding is supported for start/end values');
let encoded = store.encoder.encode(options.start);
let bufferAddress = encoded.buffer.address || (encoded.buffer.address = getAddress(encoded) - encoded.byteOffset);
startAddress = bufferAddress + encoded.byteOffset;
}
}
} else
endAddress = saveKey(options.end, store.writeKey, iterable)
return cursor.position(flags, offset || 0, keySize, endAddress)
endAddress = saveKey(options.end, store.writeKey, iterable);
return cursor.position(flags, offset || 0, keySize, endAddress);
}

@@ -139,15 +253,15 @@

if (txn.isAborted)
return
return;
if (cursorRenewId)
txn.renewingCursorCount--
txn.renewingCursorCount--;
if (--txn.cursorCount <= 0 && txn.onlyCursor) {
cursor.close()
txn.abort() // this is no longer main read txn, abort it now that we are done
txn.isAborted = true
cursor.close();
txn.abort(); // this is no longer main read txn, abort it now that we are done
txn.isAborted = true;
} else {
if (db.availableCursor || txn != getReadTxn())
cursor.close()
if (db.availableCursor || txn != readTxn)
cursor.close();
else { // try to reuse it
db.availableCursor = cursor
db.cursorTxn = txn
db.availableCursor = cursor;
db.cursorTxn = txn;
}

@@ -158,29 +272,35 @@ }

next() {
let keySize, lastSize
let keySize, lastSize;
if (cursorRenewId && cursorRenewId != renewId) {
resetCursor()
keySize = position(0)
resetCursor();
keySize = position(0);
}
if (count === 0) { // && includeValues) // on first entry, get current value if we need to
keySize = position(options.offset)
keySize = position(options.offset);
} else
keySize = cursor.iterate()
keySize = cursor.iterate();
if (keySize === 0 ||
(count++ >= limit)) {
finishCursor()
return ITERATOR_DONE
finishCursor();
return ITERATOR_DONE;
}
if (!valuesForKey || snapshot === false)
currentKey = store.readKey(keyBytes, 32, keySize + 32)
currentKey = store.readKey(keyBytes, 32, keySize + 32);
if (includeValues) {
let value
lastSize = keyBytesView.getUint32(0, true)
let value;
lastSize = keyBytesView.getUint32(0, true);
let bytes = compression ? compression.getValueBytes : getValueBytes;
if (lastSize > bytes.maxLength) {
bytes = store._allocateGetBuffer(lastSize);
cursor.getCurrentValue();
}
bytes.length = lastSize;
if (store.decoder) {
value = store.decoder.decode(db.unsafeBuffer, lastSize)
value = store.decoder.decode(bytes, lastSize);
} else if (store.encoding == 'binary')
value = Uint8ArraySlice.call(db.unsafeBuffer, 0, lastSize)
value = Uint8ArraySlice.call(bytes, 0, lastSize);
else {
value = store.db.unsafeBuffer.toString('utf8', 0, lastSize)
value = bytes.toString('utf8', 0, lastSize);
if (store.encoding == 'json' && value)
value = JSON.parse(value)
value = JSON.parse(value);
}

@@ -194,7 +314,7 @@ if (includeVersions)

}
}
};
else if (valuesForKey)
return {
value
}
};
else

@@ -206,3 +326,3 @@ return {

}
}
};
} else if (includeVersions) {

@@ -214,32 +334,113 @@ return {

}
}
};
} else {
return {
value: currentKey
}
};
}
},
return() {
finishCursor()
return ITERATOR_DONE
finishCursor();
return ITERATOR_DONE;
},
throw() {
finishCursor()
return ITERATOR_DONE
finishCursor();
return ITERATOR_DONE;
}
}
}
return iterable
};
};
return iterable;
},
getMany(keys, callback) {
let results = new Array(keys.length)
let results = new Array(keys.length);
for (let i = 0, l = keys.length; i < l; i++) {
results[i] = get.call(this, keys[i])
results[i] = get.call(this, keys[i]);
}
if (callback)
callback(null, results)
return Promise.resolve(results) // we may eventually make this a true async operation
callback(null, results);
return Promise.resolve(results); // we may eventually make this a true async operation
},
getSharedBufferForGet(id) {
let txn = (env.writeTxn || (readTxnRenewed ? readTxn : renewReadTxn()));
lastSize = this.keyIsCompatibility ? txn.getBinaryShared(id) : this.db.get(this.writeKey(id, keyBytes, 0));
if (lastSize === 0xffffffff) { // not found code
return; //undefined
}
return lastSize;
lastSize = keyBytesView.getUint32(0, true);
let bufferIndex = keyBytesView.getUint32(12, true);
lastOffset = keyBytesView.getUint32(8, true);
let buffer = buffers[bufferIndex];
let startOffset;
if (!buffer || lastOffset < (startOffset = buffer.startOffset) || (lastOffset + lastSize > startOffset + 0x100000000)) {
if (buffer)
env.detachBuffer(buffer.buffer);
startOffset = (lastOffset >>> 16) * 0x10000;
console.log('make buffer for address', bufferIndex * 0x100000000 + startOffset);
buffer = buffers[bufferIndex] = Buffer.from(getBufferForAddress(bufferIndex * 0x100000000 + startOffset));
buffer.startOffset = startOffset;
}
lastOffset -= startOffset;
return buffer;
return buffer.slice(lastOffset, lastOffset + lastSize);/*Uint8ArraySlice.call(buffer, lastOffset, lastOffset + lastSize)*/
},
close(callback) {
this.db.close();
if (this.isRoot) {
if (readTxn) {
try {
readTxn.abort();
} catch(error) {}
}
readTxnRenewed = null;
env.close();
}
this.status = 'closed';
if (callback)
callback();
},
getStats() {
return this.db.stat(readTxnRenewed ? readTxn : renewReadTxn());
}
})
});
let get = LMDBStore.prototype.get;
function renewReadTxn() {
if (readTxn)
readTxn.renew();
else
readTxn = env.beginTxn(0x20000);
readTxnRenewed = setImmediate(resetReadTxn);
return readTxn;
}
function resetReadTxn() {
if (readTxnRenewed) {
renewId++;
readTxnRenewed = null;
if (readTxn.cursorCount - (readTxn.renewingCursorCount || 0) > 0) {
readTxn.onlyCursor = true;
readTxn = null;
}
else
readTxn.reset();
}
}
}
function matches(previousVersion, ifVersion){
let matches;
if (previousVersion) {
if (ifVersion) {
matches = previousVersion == ifVersion;
} else {
matches = false;
}
} else {
matches = !ifVersion;
}
return matches;
}
export function makeReusableBuffer(size) {
let bytes = Buffer.alloc(size)
bytes.maxLength = size;
Object.defineProperty(bytes, 'length', { value: size, writable: true, configurable: true });
return bytes;
}

@@ -21,8 +21,8 @@ [![license](https://img.shields.io/badge/license-MIT-brightgreen)](LICENSE)

This library is published to the NPM package `lmdb` (and the 1.x was also published to `lmdb-store`), and can be installed with:
This library is published to the NPM package `lmdb` (the 1.x versions were published to `lmdb-store`), and can be installed with:
```npm install lmdb```
This library has minimal, tightly-controlled, and maintained dependencies to ensure stability and efficient memory use. It supports both ESM and CJS usage.
This library has minimal, tightly-controlled, and maintained dependencies to ensure stability, security, and efficiency. It supports both native ESM and CJS usage.
This has replaced the previously deprecated (LevelDOWN) `lmdb` package in the NPM package registry, but existing versions of that library are [still available](https://www.npmjs.com/package/lmdb/v/0.2.0).
This library has formerly known as "lmdb-store", but "lmdb-js" better represents the broad purpose of being the cross-platform LMDB JavaScript adapter. This package has replaced the previously deprecated (LevelDOWN) `lmdb` package in the NPM package registry, but existing versions of that library are [still available](https://www.npmjs.com/package/lmdb/v/0.2.0).

@@ -33,3 +33,3 @@ ## Design

`lmdb-store` is designed for synchronous reads, and asynchronous writes. In idiomatic NodeJS code, I/O operations are performed asynchronously. LMDB is a memory-mapped database, reading and writing within a transaction does not use any I/O (other than the slight possibility of a page fault), and can usually be performed faster than Node's event queue callbacks can even execute, and it is easier to write code for instant synchronous values from reads. On the otherhand, commiting transactions does involve I/O, and vastly higher throughput can be achieved by batching operations and executing on a separate thread. Consequently, `lmdb-store` is designed for transactioSSns to go through this asynchronous batching process and return a simple promise that resolves once data is written and flushed to disk.
`lmdb-store` is designed for synchronous reads, and asynchronous writes. In idiomatic NodeJS code, I/O operations are performed asynchronously. LMDB is a memory-mapped database, reading and writing within a transaction does not use any I/O (other than the slight possibility of a page fault), and can usually be performed faster than Node's event queue callbacks can even execute, and it is easier to write code for instant synchronous values from reads. On the otherhand, commiting transactions does involve I/O, and vastly higher throughput can be achieved by batching operations and executing on a separate thread. Consequently, `lmdb-store` is designed for transactions to go through this asynchronous batching process and return a simple promise that resolves once data is written and flushed to disk.

@@ -258,3 +258,3 @@ With the default sync'ing configuration, LMDB has a crash-proof design; a machine can be turned off at any point, and data can not be corrupted unless the written data is actually changed or tampered. Writing data and waiting for confirmation that has been writted to the physical medium is critical for data integrity, but is well known to have latency (although not necessarily less efficient). However, by batching writes, when a database is under load, slower transactions enable more writes per transaction, and this library is able to drive LMDB to achieve the maximum levels of throughput with fully sync'ed operations, preserving both the durability/safety of the transactions and legendary performance.

### `asBinary(buffer): Binary`
This can be used to directly store a buffer or Uint8Array as a value, bypassing any encoding. If you are using a store with an encoding that isn't `binary`, setting a value with a Uint8Array will typically be encoding with encoding (for example MessagePack wraps in a header, preserving its type for `get`). However, if you want to bypass encoding, for example, if you have already encoded a value, you can use `asBinary`:
This can be used to directly store a buffer or Uint8Array as a value, bypassing any encoding. If you are using a store with an encoding that isn't `binary`, setting a value with a Uint8Array will typically be encoded with the store's encoding (for example MessagePack wraps in a header, preserving its type for `get`). However, if you want to bypass encoding, for example, if you have already encoded a value, you can use `asBinary`:
```

@@ -275,3 +275,3 @@ let buffer = encode(myValue) // if we have already serialized a value, perhaps to compare it or check its size

### `store.getBinaryFast(key): Buffer`
This will retrieve the binary data at the specified key, like `getBinary`, except it uses reusable buffers, which is faster, but means the data in the buffer is only valid until the next get operation (including cursor operations).
This will retrieve the binary data at the specified key, like `getBinary`, except it uses reusable buffers, which is faster, but means the data in the buffer is only valid until the next get operation (including cursor operations). Since this is a reusable buffer it also slightly differs from a typical buffer: the `length` property is set to the length of the value (what you typically want for normal usage), but the `byteLength` will be the size of the full allocated memory area for the buffer (usually much larger).

@@ -356,3 +356,3 @@ ### `resetReadTxn(): void`

* `name` - This is the name of the database. This defaults to null (which is the root database) when opening the database environment (`open`). When an opening a database within an environment (`openDB`), this is required, if not specified in first parameter.
* `encoding` - Sets the encoding for the database, which can be `'msgpack'`, `'json'`, `'cbor'`, `'string'`, `'ordered-binary'`or `'binary'`.
* `encoding` - Sets the encoding for the database values, which can be `'msgpack'`, `'json'`, `'cbor'`, `'string'`, `'ordered-binary'`or `'binary'`.
* `sharedStructuresKey` - Enables shared structures and sets the key where the shared structures will be stored.

@@ -362,4 +362,3 @@ * `compression` - This enables compression. This can be set a truthy value to enable compression with default settings, or it can be an object with compression settings.

* `useVersions` - Set this to true if you will be setting version numbers on the entries in the database. Note that you can not change this flag once a database has entries in it (or they won't be read correctly).
* `keyIsBuffer` - This will cause the database to expect and return keys as node buffers.
* `keyIsUint32` - This will cause the database to expect and return keys as unsigned 32-bit integers.
* `keyEncoding` - This indicates the encoding to use for the database keys, and can be `'uint32'` for unsigned 32-bit integers, `'binary'` for raw buffers/Uint8Arrays, and the default `'ordered-binary'` allows any JS primitive as a keys.
* `keyEncoder` - Provide a custom key encoder.

@@ -407,3 +406,3 @@ * `dupSort` - Enables duplicate entries for keys. You will usually want to retrieve the values for a key with `getValues`.

Enabling `overlappingSync` option is generally not recommended on Windows, as Window's disk flushing operation tends to have poor performance characteristics on larger databases (whereas Windows tends to perform well with standard transactions), but YMMV. This option may be enabled by default in the future, for non-Windows platforms, this is probably a good setting:
Enabling `overlappingSync` option is generally not recommended on Windows, as Window's disk flushing operation tends to have very poor performance characteristics on larger databases (whereas Windows tends to perform well with standard transactions). This option may be enabled by default in the future, for non-Windows platforms. This is probably a good setting:
```

@@ -430,3 +429,3 @@ overlappingSync: os.platform() != 'win32',

`beforecommit` - This event is fired before a batched operation begins to start a transaction to write all queued writes to the database. The callback function can perform additional (asynchronous) writes (`put` and `remove`) and they will be included in the transaction about to be performed (this can be useful for updating a global version stamp based on all previous writes, for example). Using this event forces `eventTurnBatching` to be enabled.
`beforecommit` - This event is fired before a transaction finishes/commits. The callback function can perform additional (asynchronous) writes (`put` and `remove`) and they will be included in the transaction about to be performed as the last operation(s) before the transaction commits (this can be useful for updating a global version stamp based on all previous writes, for example). Using this event forces `eventTurnBatching` to be enabled. This can be called multiples times in a transaction, but should always be called as the last operation of a transaction.

@@ -433,0 +432,0 @@ ## LevelUp

@@ -94,2 +94,19 @@ import path from 'path';

}
it('zero length values', async function() {
db.put(5, asBinary(Buffer.from([])));
await db2.put('key1', asBinary(Buffer.from([])));
should.equal(db.getBinary(5).length, 0);
should.equal(db2.getBinary('key1').length, 0);
db.put(5, asBinary(Buffer.from([4])));
db2.remove('key1');
await db2.put('key1', asBinary(Buffer.from([4])));
should.equal(db.getBinary(5).length, 1);
should.equal(db2.getBinary('key1').length, 1);
db.put(5, asBinary(Buffer.from([])));
db2.remove('key1');
await db2.put('key1', asBinary(Buffer.from([])));
should.equal(db.getBinary(5).length, 0);
should.equal(db2.getBinary('key1').length, 0);
await db2.remove('key1');
});
it('query of keys', async function() {

@@ -825,3 +842,3 @@ let keys = [

name: 'mydb6',
keyIsUint32: true,
keyEncoding: 'uint32',
create: true,

@@ -884,3 +901,3 @@ }));

name: 'uint32',
keyIsUint32: true,
keyEncoding: 'uint32',
compression: true,

@@ -932,3 +949,3 @@ });

path: testDirPath + '/test-mixedkeys.mdb',
keyIsUint32: false,
keyEncoding: 'ordered-binary',
})

@@ -938,3 +955,3 @@

name: `intKeys`,
keyIsUint32: true,
keyEncoding: 'uint32',
})

@@ -944,3 +961,3 @@

name: `strKeys`,
keyIsUint32: false,
keyEncoding: 'ordered-binary',
})

@@ -947,0 +964,0 @@

@@ -1,4 +0,4 @@

const SKIP = {}
const SKIP = {};
if (!Symbol.asyncIterator) {
Symbol.asyncIterator = Symbol.for('Symbol.asyncIterator')
Symbol.asyncIterator = Symbol.for('Symbol.asyncIterator');
}

@@ -9,29 +9,29 @@

if (sourceArray) {
this[Symbol.iterator] = sourceArray[Symbol.iterator].bind(sourceArray)
this[Symbol.iterator] = sourceArray[Symbol.iterator].bind(sourceArray);
}
}
map(func) {
let source = this
let result = new ArrayLikeIterable()
let source = this;
let result = new ArrayLikeIterable();
result[Symbol.iterator] = (async) => {
let iterator = source[Symbol.iterator](async)
let iterator = source[Symbol.iterator](async);
return {
next(resolvedResult) {
let result
let result;
do {
let iteratorResult
let iteratorResult;
if (resolvedResult) {
iteratorResult = resolvedResult
resolvedResult = null // don't go in this branch on next iteration
iteratorResult = resolvedResult;
resolvedResult = null; // don't go in this branch on next iteration
} else {
iteratorResult = iterator.next()
iteratorResult = iterator.next();
if (iteratorResult.then) {
return iteratorResult.then(iteratorResult => this.next(iteratorResult))
return iteratorResult.then(iteratorResult => this.next(iteratorResult));
}
}
if (iteratorResult.done === true) {
this.done = true
return iteratorResult
this.done = true;
return iteratorResult;
}
result = func(iteratorResult.value)
result = func(iteratorResult.value);
if (result && result.then) {

@@ -43,3 +43,3 @@ return result.then(result =>

value: result
})
});
}

@@ -49,59 +49,59 @@ } while(result == SKIP)

value: result
}
};
},
return() {
return iterator.return()
return iterator.return();
},
throw() {
return iterator.throw()
return iterator.throw();
}
}
}
return result
};
};
return result;
}
[Symbol.asyncIterator]() {
return this[Symbol.iterator](true)
return this[Symbol.iterator](true);
}
filter(func) {
return this.map(element => func(element) ? element : SKIP)
return this.map(element => func(element) ? element : SKIP);
}
forEach(callback) {
let iterator = this[Symbol.iterator]()
let result
let iterator = this[Symbol.iterator]();
let result;
while ((result = iterator.next()).done !== true) {
callback(result.value)
callback(result.value);
}
}
concat(secondIterable) {
let concatIterable = new ArrayLikeIterable()
let concatIterable = new ArrayLikeIterable();
concatIterable[Symbol.iterator] = (async) => {
let iterator = this[Symbol.iterator]()
let isFirst = true
let iterator = this[Symbol.iterator]();
let isFirst = true;
let concatIterator = {
next() {
let result = iterator.next()
let result = iterator.next();
if (isFirst && result.done) {
isFirst = false
iterator = secondIterable[Symbol.iterator](async)
return iterator.next()
isFirst = false;
iterator = secondIterable[Symbol.iterator](async);
return iterator.next();
}
return result
return result;
},
return() {
return iterator.return()
return iterator.return();
},
throw() {
return iterator.throw()
return iterator.throw();
}
}
return concatIterator
}
return concatIterable
};
return concatIterator;
};
return concatIterable;
}
toJSON() {
if (this.asArray && this.asArray.forEach) {
return this.asArray
return this.asArray;
}
throw new Error('Can not serialize async iteratables without first calling resolveJSON')
throw new Error('Can not serialize async iteratables without first calling resolveJSON');
//return Array.from(this)

@@ -111,27 +111,27 @@ }

if (this._asArray)
return this._asArray
return this._asArray;
let promise = new Promise((resolve, reject) => {
let iterator = this[Symbol.iterator](true)
let array = []
let iterable = this
let iterator = this[Symbol.iterator](true);
let array = [];
let iterable = this;
function next(result) {
while (result.done !== true) {
if (result.then) {
return result.then(next)
return result.then(next);
} else {
array.push(result.value)
array.push(result.value);
}
result = iterator.next()
result = iterator.next();
}
array.iterable = iterable
resolve(iterable._asArray = array)
array.iterable = iterable;
resolve(iterable._asArray = array);
}
next(iterator.next())
})
promise.iterable = this
return this._asArray || (this._asArray = promise)
next(iterator.next());
});
promise.iterable = this;
return this._asArray || (this._asArray = promise);
}
resolveData() {
return this.asArray
return this.asArray;
}
}

@@ -5,5 +5,5 @@ export function when(promise, callback, errback) {

promise.then(callback, errback) :
promise.then(callback)
promise.then(callback);
}
return callback(promise)
return callback(promise);
}

@@ -1,99 +0,100 @@

import { getAddressShared as getAddress } from './native.js'
import { when } from './util/when.js'
var backpressureArray
import { getAddressShared as getAddress } from './native.js';
import { when } from './util/when.js';
var backpressureArray;
const MAX_KEY_SIZE = 1978
const WAITING_OPERATION = 0x2000000
const BACKPRESSURE_THRESHOLD = 50000
const TXN_DELIMITER = 0x8000000
const TXN_COMMITTED = 0x10000000
const TXN_FLUSHED = 0x20000000
const TXN_FAILED = 0x40000000
const FAILED_CONDITION = 0x4000000
const REUSE_BUFFER_MODE = 1000
const MAX_KEY_SIZE = 1978;
const WAITING_OPERATION = 0x2000000;
const BACKPRESSURE_THRESHOLD = 50000;
const TXN_DELIMITER = 0x8000000;
const TXN_COMMITTED = 0x10000000;
const TXN_FLUSHED = 0x20000000;
const TXN_FAILED = 0x40000000;
const FAILED_CONDITION = 0x4000000;
const REUSE_BUFFER_MODE = 1000;
export const binaryBuffer = Symbol('binaryBuffer');
const SYNC_PROMISE_SUCCESS = Promise.resolve(true)
const SYNC_PROMISE_FAIL = Promise.resolve(false)
export const ABORT = {}
const CALLBACK_THREW = {}
SYNC_PROMISE_SUCCESS.isSync = true
SYNC_PROMISE_FAIL.isSync = true
const ByteArray = typeof Buffer != 'undefined' ? Buffer.from : Uint8Array
const SYNC_PROMISE_SUCCESS = Promise.resolve(true);
const SYNC_PROMISE_FAIL = Promise.resolve(false);
export const ABORT = {};
const CALLBACK_THREW = {};
SYNC_PROMISE_SUCCESS.isSync = true;
SYNC_PROMISE_FAIL.isSync = true;
const ByteArray = typeof Buffer != 'undefined' ? Buffer.from : Uint8Array;
//let debugLog = []
var log = []
export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, useWritemap, binaryBuffer,
var log = [];
export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, useWritemap,
eventTurnBatching, txnStartThreshold, batchStartThreshold, overlappingSync, commitDelay, separateFlushed }) {
// stands for write instructions
var dynamicBytes
var dynamicBytes;
function allocateInstructionBuffer() {
let buffer = new SharedArrayBuffer(0x10000) // Must use a shared buffer to ensure GC doesn't move it around
dynamicBytes = new ByteArray(buffer)
let uint32 = dynamicBytes.uint32 = new Uint32Array(buffer, 0, 0x10000 >> 2)
uint32[0] = 0
dynamicBytes.float64 = new Float64Array(buffer, 0, 0x10000 >> 3)
buffer.address = getAddress(buffer)
uint32.address = buffer.address + uint32.byteOffset
dynamicBytes.position = 0
return dynamicBytes
let buffer = new SharedArrayBuffer(0x10000); // Must use a shared buffer to ensure GC doesn't move it around
dynamicBytes = new ByteArray(buffer);
let uint32 = dynamicBytes.uint32 = new Uint32Array(buffer, 0, 0x10000 >> 2);
uint32[0] = 0;
dynamicBytes.float64 = new Float64Array(buffer, 0, 0x10000 >> 3);
buffer.address = getAddress(buffer);
uint32.address = buffer.address + uint32.byteOffset;
dynamicBytes.position = 0;
return dynamicBytes;
}
var outstandingWriteCount = 0
var startAddress = 0
var writeTxn = null
var abortedNonChildTransactionWarn
var nextTxnCallbacks = []
var commitPromise, flushPromise, flushResolvers = []
commitDelay = commitDelay || 0
eventTurnBatching = eventTurnBatching === false ? false : true
var enqueuedCommit
var afterCommitCallbacks = []
var beforeCommitCallbacks = []
var enqueuedEventTurnBatch
var outstandingWriteCount = 0;
var startAddress = 0;
var writeTxn = null;
var abortedNonChildTransactionWarn;
var nextTxnCallbacks = [];
var commitPromise, flushPromise, flushResolvers = [];
commitDelay = commitDelay || 0;
eventTurnBatching = eventTurnBatching === false ? false : true;
var enqueuedCommit;
var afterCommitCallbacks = [];
var beforeCommitCallbacks = [];
var enqueuedEventTurnBatch;
if (separateFlushed === undefined)
separateFlushed = overlappingSync
var batchDepth = 0
var writeBatchStart, outstandingBatchCount
txnStartThreshold = txnStartThreshold || 5
batchStartThreshold = batchStartThreshold || 1000
separateFlushed = overlappingSync;
var batchDepth = 0;
var writeBatchStart, outstandingBatchCount;
txnStartThreshold = txnStartThreshold || 5;
batchStartThreshold = batchStartThreshold || 1000;
allocateInstructionBuffer()
dynamicBytes.uint32[0] = TXN_DELIMITER | TXN_COMMITTED | TXN_FLUSHED
var txnResolution, lastQueuedResolution, nextResolution = { uint32: dynamicBytes.uint32, flagPosition: 0, }
var uncommittedResolution = { next: nextResolution }
var unwrittenResolution = nextResolution
allocateInstructionBuffer();
dynamicBytes.uint32[0] = TXN_DELIMITER | TXN_COMMITTED | TXN_FLUSHED;
var txnResolution, lastQueuedResolution, nextResolution = { uint32: dynamicBytes.uint32, flagPosition: 0, };
var uncommittedResolution = { next: nextResolution };
var unwrittenResolution = nextResolution;
function writeInstructions(flags, store, key, value, version, ifVersion) {
let writeStatus
let targetBytes, position
let valueBuffer, valueSize, valueBufferStart
let writeStatus;
let targetBytes, position;
let valueBuffer, valueSize, valueBufferStart;
if (flags & 2) {
// encode first in case we have to write a shared structure
let encoder = store.encoder
let encoder = store.encoder;
if (value && value[binaryBuffer])
valueBuffer = value[binaryBuffer]
valueBuffer = value[binaryBuffer];
else if (encoder) {
if (encoder.copyBuffers) // use this as indicator for support buffer reuse for now
valueBuffer = encoder.encode(value, REUSE_BUFFER_MODE)
valueBuffer = encoder.encode(value, REUSE_BUFFER_MODE);
else { // various other encoders, including JSON.stringify, that might serialize to a string
valueBuffer = encoder.encode(value)
valueBuffer = encoder.encode(value);
if (typeof valueBuffer == 'string')
valueBuffer = Buffer.from(valueBuffer) // TODO: Would be nice to write strings inline in the instructions
valueBuffer = Buffer.from(valueBuffer); // TODO: Would be nice to write strings inline in the instructions
}
} else if (typeof value == 'string') {
valueBuffer = Buffer.from(value) // TODO: Would be nice to write strings inline in the instructions
valueBuffer = Buffer.from(value); // TODO: Would be nice to write strings inline in the instructions
} else if (value instanceof Uint8Array)
valueBuffer = value
valueBuffer = value;
else
throw new Error('Invalid value to put in database ' + value + ' (' + (typeof value) +'), consider using encoder')
valueBufferStart = valueBuffer.start
throw new Error('Invalid value to put in database ' + value + ' (' + (typeof value) +'), consider using encoder');
valueBufferStart = valueBuffer.start;
if (valueBufferStart > -1) // if we have buffers with start/end position
valueSize = valueBuffer.end - valueBufferStart // size
valueSize = valueBuffer.end - valueBufferStart; // size
else
valueSize = valueBuffer.length
valueSize = valueBuffer.length;
if (store.dupSort && valueSize > MAX_KEY_SIZE)
throw new Error('The value is larger than the maximum size (' + MAX_KEY_SIZE + ') for a value in a dupSort database')
throw new Error('The value is larger than the maximum size (' + MAX_KEY_SIZE + ') for a value in a dupSort database');
} else
valueSize = 0
valueSize = 0;
if (writeTxn) {
targetBytes = fixedBuffer
position = 0
targetBytes = fixedBuffer;
position = 0;
} else {

@@ -104,71 +105,71 @@ if (eventTurnBatching && !enqueuedEventTurnBatch && batchDepth == 0) {

for (let i = 0, l = beforeCommitCallbacks.length; i < l; i++) {
beforeCommitCallbacks[i]()
beforeCommitCallbacks[i]();
}
} catch(error) {
console.error(error)
console.error(error);
}
enqueuedEventTurnBatch = null
enqueuedEventTurnBatch = null;
//console.log('ending event turn')
batchDepth--
finishBatch()
batchDepth--;
finishBatch();
if (writeBatchStart)
writeBatchStart() // TODO: When we support delay start of batch, optionally don't delay this
})
commitPromise = null // reset the commit promise, can't know if it is really a new transaction prior to finishWrite being called
flushPromise = null
writeBatchStart = writeInstructions(1, store)
outstandingBatchCount = 0
batchDepth++
writeBatchStart(); // TODO: When we support delay start of batch, optionally don't delay this
});
commitPromise = null; // reset the commit promise, can't know if it is really a new transaction prior to finishWrite being called
flushPromise = null;
writeBatchStart = writeInstructions(1, store);
outstandingBatchCount = 0;
batchDepth++;
}
targetBytes = dynamicBytes
position = targetBytes.position
targetBytes = dynamicBytes;
position = targetBytes.position;
}
let uint32 = targetBytes.uint32, float64 = targetBytes.float64
let flagPosition = position << 1 // flagPosition is the 32-bit word starting position
let uint32 = targetBytes.uint32, float64 = targetBytes.float64;
let flagPosition = position << 1; // flagPosition is the 32-bit word starting position
// don't increment position until we are sure we don't have any key writing errors
if (!uint32) {
throw new Error('Internal buffers have been corrupted')
throw new Error('Internal buffers have been corrupted');
}
uint32[flagPosition + 1] = store.db.dbi
uint32[flagPosition + 1] = store.db.dbi;
if (flags & 4) {
let keyStartPosition = (position << 3) + 12
let endPosition
let keyStartPosition = (position << 3) + 12;
let endPosition;
try {
endPosition = store.writeKey(key, targetBytes, keyStartPosition)
endPosition = store.writeKey(key, targetBytes, keyStartPosition);
} catch(error) {
targetBytes.fill(0, keyStartPosition)
throw error
targetBytes.fill(0, keyStartPosition);
throw error;
}
let keySize = endPosition - keyStartPosition
let keySize = endPosition - keyStartPosition;
if (keySize > MAX_KEY_SIZE) {
targetBytes.fill(0, keyStartPosition) // restore zeros
throw new Error('Key size is larger than the maximum key size (' + MAX_KEY_SIZE + ')')
targetBytes.fill(0, keyStartPosition); // restore zeros
throw new Error('Key size is larger than the maximum key size (' + MAX_KEY_SIZE + ')');
}
uint32[flagPosition + 2] = keySize
position = (endPosition + 16) >> 3
uint32[flagPosition + 2] = keySize;
position = (endPosition + 16) >> 3;
if (flags & 2) {
let mustCompress
let mustCompress;
if (valueBufferStart > -1) { // if we have buffers with start/end position
// record pointer to value buffer
float64[position] = (valueBuffer.address ||
(valueBuffer.address = getAddress(valueBuffer.buffer) + valueBuffer.byteOffset)) + valueBufferStart
mustCompress = valueBuffer[valueBufferStart] >= 254 // this is the compression indicator, so we must compress
(valueBuffer.address = getAddress(valueBuffer.buffer) + valueBuffer.byteOffset)) + valueBufferStart;
mustCompress = valueBuffer[valueBufferStart] >= 250; // this is the compression indicator, so we must compress
} else {
let valueArrayBuffer = valueBuffer.buffer
let valueArrayBuffer = valueBuffer.buffer;
// record pointer to value buffer
float64[position] = (valueArrayBuffer.address ||
(valueArrayBuffer.address = getAddress(valueArrayBuffer))) + valueBuffer.byteOffset
mustCompress = valueBuffer[0] >= 254 // this is the compression indicator, so we must compress
(valueArrayBuffer.address = getAddress(valueArrayBuffer))) + valueBuffer.byteOffset;
mustCompress = valueBuffer[0] >= 250; // this is the compression indicator, so we must compress
}
uint32[(position++ << 1) - 1] = valueSize
uint32[(position++ << 1) - 1] = valueSize;
if (store.compression && (valueSize >= store.compression.threshold || mustCompress)) {
flags |= 0x100000;
float64[position] = store.compression.address
float64[position] = store.compression.address;
if (!writeTxn)
env.compress(uint32.address + (position << 3), () => {
// this is never actually called, just use to pin the buffer in memory until it is finished
console.log(float64)
})
position++
console.log(float64);
});
position++;
}

@@ -178,36 +179,36 @@ }

if (ifVersion === null)
flags |= 0x10
flags |= 0x10;
else {
flags |= 0x100
float64[position++] = ifVersion
flags |= 0x100;
float64[position++] = ifVersion;
}
}
if (version !== undefined) {
flags |= 0x200
float64[position++] = version || 0
flags |= 0x200;
float64[position++] = version || 0;
}
} else
position++
targetBytes.position = position
position++;
targetBytes.position = position;
//console.log('js write', (targetBytes.buffer.address + (flagPosition << 2)).toString(16), flags.toString(16))
if (writeTxn) {
uint32[0] = flags
env.write(uint32.address)
return () => (uint32[0] & FAILED_CONDITION) ? SYNC_PROMISE_FAIL : SYNC_PROMISE_SUCCESS
uint32[0] = flags;
env.write(uint32.address);
return () => (uint32[0] & FAILED_CONDITION) ? SYNC_PROMISE_FAIL : SYNC_PROMISE_SUCCESS;
}
// if we ever use buffers that haven't been zero'ed, need to clear out the next slot like this:
// uint32[position << 1] = 0 // clear out the next slot
let nextUint32
let nextUint32;
if (position > 0x1e00) { // 61440 bytes
// make new buffer and make pointer to it
let lastPosition = position
targetBytes = allocateInstructionBuffer()
position = targetBytes.position
float64[lastPosition + 1] = targetBytes.uint32.address + position
uint32[lastPosition << 1] = 3 // pointer instruction
let lastPosition = position;
targetBytes = allocateInstructionBuffer();
position = targetBytes.position;
float64[lastPosition + 1] = targetBytes.uint32.address + position;
uint32[lastPosition << 1] = 3; // pointer instruction
//console.log('pointer from ', (lastFloat64.buffer.address + (lastPosition << 3)).toString(16), 'to', (targetBytes.buffer.address + position).toString(16), 'flag position', (uint32.buffer.address + (flagPosition << 2)).toString(16))
nextUint32 = targetBytes.uint32
nextUint32 = targetBytes.uint32;
} else
nextUint32 = uint32
let resolution = nextResolution
nextUint32 = uint32;
let resolution = nextResolution;
// create the placeholder next resolution

@@ -231,4 +232,4 @@ nextResolution = resolution.next = store.cache ?

next: null,
}
let writtenBatchDepth = batchDepth
};
let writtenBatchDepth = batchDepth;

@@ -240,12 +241,12 @@ return (callback) => {

// just poll for the status change if we miss a status update
writeStatus = uint32[flagPosition]
uint32[flagPosition] = flags
writeStatus = uint32[flagPosition];
uint32[flagPosition] = flags;
//writeStatus = Atomics.or(uint32, flagPosition, flags)
if (writeBatchStart && !writeStatus) {
outstandingBatchCount += 1 + (valueSize >> 12)
outstandingBatchCount += 1 + (valueSize >> 12);
//console.log(outstandingBatchCount, batchStartThreshold)
if (outstandingBatchCount > batchStartThreshold) {
outstandingBatchCount = 0
writeBatchStart()
writeBatchStart = null
outstandingBatchCount = 0;
writeBatchStart();
writeBatchStart = null;
}

@@ -256,71 +257,71 @@ }

// so we use the slower atomic operation
writeStatus = Atomics.or(uint32, flagPosition, flags)
writeStatus = Atomics.or(uint32, flagPosition, flags);
outstandingWriteCount++
outstandingWriteCount++;
if (writeStatus & TXN_DELIMITER) {
//console.warn('Got TXN delimiter', ( uint32.address + (flagPosition << 2)).toString(16))
commitPromise = null // TODO: Don't reset these if this comes from the batch start operation on an event turn batch
flushPromise = null
queueCommitResolution(resolution)
commitPromise = null; // TODO: Don't reset these if this comes from the batch start operation on an event turn batch
flushPromise = null;
queueCommitResolution(resolution);
if (!startAddress) {
startAddress = uint32.address + (flagPosition << 2)
startAddress = uint32.address + (flagPosition << 2);
}
}
if (!flushPromise && overlappingSync && separateFlushed)
flushPromise = new Promise(resolve => flushResolvers.push(resolve))
flushPromise = new Promise(resolve => flushResolvers.push(resolve));
if (writeStatus & WAITING_OPERATION) { // write thread is waiting
//console.log('resume batch thread', uint32.buffer.address + (flagPosition << 2))
env.write(0)
env.write(0);
}
if (outstandingWriteCount > BACKPRESSURE_THRESHOLD) {
if (!backpressureArray)
backpressureArray = new Int32Array(new SharedArrayBuffer(4), 0, 1)
Atomics.wait(backpressureArray, 0, 0, Math.round(outstandingWriteCount / BACKPRESSURE_THRESHOLD))
backpressureArray = new Int32Array(new SharedArrayBuffer(4), 0, 1);
Atomics.wait(backpressureArray, 0, 0, Math.round(outstandingWriteCount / BACKPRESSURE_THRESHOLD));
}
if (startAddress) {
if (eventTurnBatching)
startWriting() // start writing immediately because this has already been batched/queued
startWriting(); // start writing immediately because this has already been batched/queued
else if (!enqueuedCommit && txnStartThreshold) {
enqueuedCommit = commitDelay == 0 ? setImmediate(() => startWriting()) : setTimeout(() => startWriting(), commitDelay)
enqueuedCommit = commitDelay == 0 ? setImmediate(() => startWriting()) : setTimeout(() => startWriting(), commitDelay);
} else if (outstandingWriteCount > txnStartThreshold)
startWriting()
startWriting();
}
if ((outstandingWriteCount & 7) === 0)
resolveWrites()
resolveWrites();
if (store.cache) {
resolution.key = key
resolution.store = store
resolution.valueSize = valueBuffer ? valueBuffer.length : 0
resolution.key = key;
resolution.store = store;
resolution.valueSize = valueBuffer ? valueBuffer.length : 0;
}
resolution.valueBuffer = valueBuffer
lastQueuedResolution = resolution
resolution.valueBuffer = valueBuffer;
lastQueuedResolution = resolution;
if (callback) {
resolution.reject = callback
resolution.resolve = (value) => callback(null, value)
return
resolution.reject = callback;
resolution.resolve = (value) => callback(null, value);
return;
}
if (ifVersion === undefined) {
if (writtenBatchDepth > 1)
return SYNC_PROMISE_SUCCESS // or return undefined?
return SYNC_PROMISE_SUCCESS; // or return undefined?
if (!commitPromise) {
commitPromise = new Promise((resolve, reject) => {
resolution.resolve = resolve
resolution.reject = reject
})
resolution.resolve = resolve;
resolution.reject = reject;
});
if (separateFlushed)
commitPromise.flushed = overlappingSync ? flushPromise : commitPromise
commitPromise.flushed = overlappingSync ? flushPromise : commitPromise;
}
return commitPromise
return commitPromise;
}
let promise = new Promise((resolve, reject) => {
resolution.resolve = resolve
resolution.reject = reject
})
resolution.resolve = resolve;
resolution.reject = reject;
});
if (separateFlushed)
promise.flushed = overlappingSync ? flushPromise : promise
return promise
}
promise.flushed = overlappingSync ? flushPromise : promise;
return promise;
};
}

@@ -330,31 +331,31 @@ function startWriting() {

if (enqueuedCommit) {
clearImmediate(enqueuedCommit)
enqueuedCommit = null
clearImmediate(enqueuedCommit);
enqueuedCommit = null;
}
let resolvers = flushResolvers
flushResolvers = []
let resolvers = flushResolvers;
flushResolvers = [];
env.startWriting(startAddress, (status) => {
//console.log('finished batch', store.name)
if (dynamicBytes.uint32[dynamicBytes.position << 1] & TXN_DELIMITER)
queueCommitResolution(nextResolution)
queueCommitResolution(nextResolution);
resolveWrites(true)
resolveWrites(true);
switch (status) {
case 0:
for (let i = 0; i < resolvers.length; i++)
resolvers[i]()
resolvers[i]();
case 1:
break;
case 2:
executeTxnCallbacks()
break
executeTxnCallbacks();
break;
default:
console.error(status)
console.error(status);
if (commitRejectPromise) {
commitRejectPromise.reject(status)
commitRejectPromise = null
commitRejectPromise.reject(status);
commitRejectPromise = null;
}
}
})
startAddress = 0
});
startAddress = 0;
}

@@ -364,15 +365,15 @@

if (!resolution.isTxn) {
resolution.isTxn = true
resolution.isTxn = true;
if (txnResolution) {
txnResolution.nextTxn = resolution
txnResolution.nextTxn = resolution;
//outstandingWriteCount = 0
}
else
txnResolution = resolution
txnResolution = resolution;
}
}
var TXN_DONE = (separateFlushed ? TXN_COMMITTED : TXN_FLUSHED) | TXN_FAILED
var TXN_DONE = (separateFlushed ? TXN_COMMITTED : TXN_FLUSHED) | TXN_FAILED;
function resolveWrites(async) {
// clean up finished instructions
let instructionStatus
let instructionStatus;
while ((instructionStatus = unwrittenResolution.uint32[unwrittenResolution.flagPosition])

@@ -382,11 +383,11 @@ & 0x1000000) {

if (unwrittenResolution.callbacks) {
nextTxnCallbacks.push(unwrittenResolution.callbacks)
unwrittenResolution.callbacks = null
nextTxnCallbacks.push(unwrittenResolution.callbacks);
unwrittenResolution.callbacks = null;
}
if (!unwrittenResolution.isTxn)
unwrittenResolution.uint32 = null
unwrittenResolution.valueBuffer = null
unwrittenResolution.flag = instructionStatus
outstandingWriteCount--
unwrittenResolution = unwrittenResolution.next
unwrittenResolution.uint32 = null;
unwrittenResolution.valueBuffer = null;
unwrittenResolution.flag = instructionStatus;
outstandingWriteCount--;
unwrittenResolution = unwrittenResolution.next;
}

@@ -396,5 +397,5 @@ while (txnResolution &&

if (instructionStatus & TXN_FAILED)
rejectCommit()
rejectCommit();
else
resolveCommit(async)
resolveCommit(async);
}

@@ -404,37 +405,37 @@ }

function resolveCommit(async) {
afterCommit()
afterCommit();
if (async)
resetReadTxn()
resetReadTxn();
else
queueMicrotask(resetReadTxn) // TODO: only do this if there are actually committed writes?
queueMicrotask(resetReadTxn); // TODO: only do this if there are actually committed writes?
do {
if (uncommittedResolution.resolve) {
let flag = uncommittedResolution.flag
let flag = uncommittedResolution.flag;
if (flag < 0)
uncommittedResolution.reject(new Error("Error occurred in write"))
uncommittedResolution.reject(new Error("Error occurred in write"));
else if (flag & FAILED_CONDITION) {
uncommittedResolution.resolve(false)
uncommittedResolution.resolve(false);
} else
uncommittedResolution.resolve(true)
uncommittedResolution.resolve(true);
}
} while((uncommittedResolution = uncommittedResolution.next) && uncommittedResolution != txnResolution)
txnResolution = txnResolution.nextTxn
txnResolution = txnResolution.nextTxn;
}
var commitRejectPromise
var commitRejectPromise;
function rejectCommit() {
afterCommit()
afterCommit();
if (!commitRejectPromise) {
let rejectFunction
commitRejectPromise = new Promise((resolve, reject) => rejectFunction = reject)
commitRejectPromise.reject = rejectFunction
let rejectFunction;
commitRejectPromise = new Promise((resolve, reject) => rejectFunction = reject);
commitRejectPromise.reject = rejectFunction;
}
do {
if (uncommittedResolution.reject) {
let flag = uncommittedResolution.flag & 0xf
let error = new Error("Commit failed (see commitError for details)")
error.commitError = commitRejectPromise
uncommittedResolution.reject(error)
let flag = uncommittedResolution.flag & 0xf;
let error = new Error("Commit failed (see commitError for details)");
error.commitError = commitRejectPromise;
uncommittedResolution.reject(error);
}
} while((uncommittedResolution = uncommittedResolution.next) && uncommittedResolution != txnResolution)
txnResolution = txnResolution.nextTxn
txnResolution = txnResolution.nextTxn;
}

@@ -446,5 +447,5 @@ function atomicStatus(uint32, flagPosition, newStatus) {

// just poll for the status change if we miss a status update
let writeStatus = uint32[flagPosition]
uint32[flagPosition] = newStatus
return writeStatus
let writeStatus = uint32[flagPosition];
uint32[flagPosition] = newStatus;
return writeStatus;
//return Atomics.or(uint32, flagPosition, newStatus)

@@ -454,51 +455,51 @@ } else // otherwise the transaction could end at any time and we need to know the

// so we use the slower atomic operation
return Atomics.or(uint32, flagPosition, newStatus)
return Atomics.or(uint32, flagPosition, newStatus);
}
function afterCommit() {
for (let i = 0, l = afterCommitCallbacks.length; i < l; i++) {
afterCommitCallbacks[i]({ next: uncommittedResolution, last: unwrittenResolution})
afterCommitCallbacks[i]({ next: uncommittedResolution, last: unwrittenResolution});
}
}
async function executeTxnCallbacks() {
env.writeTxn = writeTxn = {}
let promises
let txnCallbacks
env.writeTxn = writeTxn = {};
let promises;
let txnCallbacks;
for (let i = 0, l = nextTxnCallbacks.length; i < l; i++) {
txnCallbacks = nextTxnCallbacks[i]
txnCallbacks = nextTxnCallbacks[i];
for (let i = 0, l = txnCallbacks.length; i < l; i++) {
let userTxnCallback = txnCallbacks[i]
let asChild = userTxnCallback.asChild
let userTxnCallback = txnCallbacks[i];
let asChild = userTxnCallback.asChild;
if (asChild) {
if (promises) {
// must complete any outstanding transactions before proceeding
await Promise.all(promises)
promises = null
await Promise.all(promises);
promises = null;
}
env.beginTxn(1) // abortable
env.beginTxn(1); // abortable
try {
let result = userTxnCallback.callback()
let result = userTxnCallback.callback();
if (result && result.then) {
await result
await result;
}
if (result === ABORT)
env.abortTxn()
env.abortTxn();
else
env.commitTxn()
txnCallbacks[i] = result
env.commitTxn();
txnCallbacks[i] = result;
} catch(error) {
env.abortTxn()
txnError(error, i)
env.abortTxn();
txnError(error, i);
}
} else {
try {
let result = userTxnCallback()
txnCallbacks[i] = result
let result = userTxnCallback();
txnCallbacks[i] = result;
if (result && result.then) {
if (!promises)
promises = []
promises.push(result.catch(() => {}))
promises = [];
promises.push(result.catch(() => {}));
}
} catch(error) {
txnError(error, i)
txnError(error, i);
}

@@ -508,18 +509,18 @@ }

}
nextTxnCallbacks = []
nextTxnCallbacks = [];
if (promises) { // finish any outstanding commit functions
await Promise.all(promises)
await Promise.all(promises);
}
env.writeTxn = writeTxn = false
env.writeTxn = writeTxn = false;
function txnError(error, i) {
(txnCallbacks.errors || (txnCallbacks.errors = []))[i] = error
txnCallbacks[i] = CALLBACK_THREW
(txnCallbacks.errors || (txnCallbacks.errors = []))[i] = error;
txnCallbacks[i] = CALLBACK_THREW;
}
}
function finishBatch() {
dynamicBytes.uint32[(dynamicBytes.position + 1) << 1] = 0 // clear out the next slot
let writeStatus = atomicStatus(dynamicBytes.uint32, (dynamicBytes.position++) << 1, 2) // atomically write the end block
nextResolution.flagPosition += 2
dynamicBytes.uint32[(dynamicBytes.position + 1) << 1] = 0; // clear out the next slot
let writeStatus = atomicStatus(dynamicBytes.uint32, (dynamicBytes.position++) << 1, 2); // atomically write the end block
nextResolution.flagPosition += 2;
if (writeStatus & WAITING_OPERATION) {
env.write(0)
env.write(0);
}

@@ -529,40 +530,40 @@ }

put(key, value, versionOrOptions, ifVersion) {
let callback, flags = 15, type = typeof versionOrOptions
let callback, flags = 15, type = typeof versionOrOptions;
if (type == 'object') {
if (versionOrOptions.noOverwrite)
flags |= 0x10
flags |= 0x10;
if (versionOrOptions.noDupData)
flags |= 0x20
flags |= 0x20;
if (versionOrOptions.append)
flags |= 0x20000
flags |= 0x20000;
if (versionOrOptions.ifVersion != undefined)
ifVersion = versionsOrOptions.ifVersion
versionOrOptions = versionOrOptions.version
ifVersion = versionsOrOptions.ifVersion;
versionOrOptions = versionOrOptions.version;
if (typeof ifVersion == 'function')
callback = ifVersion
callback = ifVersion;
} else if (type == 'function') {
callback = versionOrOptions
callback = versionOrOptions;
}
return writeInstructions(flags, this, key, value, this.useVersions ? versionOrOptions || 0 : undefined, ifVersion)(callback)
return writeInstructions(flags, this, key, value, this.useVersions ? versionOrOptions || 0 : undefined, ifVersion)(callback);
},
remove(key, ifVersionOrValue, callback) {
let flags = 13
let ifVersion, value
let flags = 13;
let ifVersion, value;
if (ifVersionOrValue !== undefined) {
if (typeof ifVersionOrValue == 'function')
callback = ifVersionOrValue
callback = ifVersionOrValue;
else if (this.useVersions)
ifVersion = ifVersionOrValue
ifVersion = ifVersionOrValue;
else {
flags = 14
value = ifVersionOrValue
flags = 14;
value = ifVersionOrValue;
}
}
return writeInstructions(flags, this, key, value, undefined, ifVersion)(callback)
return writeInstructions(flags, this, key, value, undefined, ifVersion)(callback);
},
del(key, options, callback) {
return this.remove(key, options, callback)
return this.remove(key, options, callback);
},
ifNoExists(key, callback) {
return this.ifVersion(key, null, callback)
return this.ifVersion(key, null, callback);
},

@@ -573,25 +574,25 @@

return new Batch((operations, callback) => {
let promise = this.ifVersion(key, version, operations)
let promise = this.ifVersion(key, version, operations);
if (callback)
promise.then(callback)
return promise
})
promise.then(callback);
return promise;
});
}
if (writeTxn) {
if (this.doesExist(key, version)) {
callback()
return SYNC_PROMISE_SUCCESS
callback();
return SYNC_PROMISE_SUCCESS;
}
return SYNC_PROMISE_FAIL
return SYNC_PROMISE_FAIL;
}
let finishStartWrite = writeInstructions(typeof key === 'undefined' ? 1 : 4, this, key, undefined, undefined, version)
let promise
batchDepth += 2
let finishStartWrite = writeInstructions(typeof key === 'undefined' ? 1 : 4, this, key, undefined, undefined, version);
let promise;
batchDepth += 2;
if (batchDepth > 2)
promise = finishStartWrite()
promise = finishStartWrite();
else {
writeBatchStart = () => {
promise = finishStartWrite()
}
outstandingBatchCount = 0
promise = finishStartWrite();
};
outstandingBatchCount = 0;
}

@@ -601,7 +602,7 @@ //console.warn('wrote start of ifVersion', this.path)

if (typeof callback === 'function') {
callback()
callback();
} else {
for (let i = 0, l = callback.length; i < l; i++) {
let operation = callback[i]
this[operation.type](operation.key, operation.value)
let operation = callback[i];
this[operation.type](operation.key, operation.value);
}

@@ -612,26 +613,26 @@ }

if (!promise) {
finishBatch()
batchDepth -= 2
promise = finishStartWrite() // finish write once all the operations have been written (and it hasn't been written prematurely)
writeBatchStart = null
finishBatch();
batchDepth -= 2;
promise = finishStartWrite(); // finish write once all the operations have been written (and it hasn't been written prematurely)
writeBatchStart = null;
} else {
batchDepth -= 2
finishBatch()
batchDepth -= 2;
finishBatch();
}
}
return promise
return promise;
},
batch(callbackOrOperations) {
return this.ifVersion(undefined, undefined, callbackOrOperations)
return this.ifVersion(undefined, undefined, callbackOrOperations);
},
drop(callback) {
return writeInstructions(1024 + 12, this, undefined, undefined, undefined, undefined)(callback)
return writeInstructions(1024 + 12, this, undefined, undefined, undefined, undefined)(callback);
},
clearAsync(callback) {
if (this.encoder && this.encoder.structures)
this.encoder.structures = []
return writeInstructions(12, this, undefined, undefined, undefined, undefined)(callback)
this.encoder.structures = [];
return writeInstructions(12, this, undefined, undefined, undefined, undefined)(callback);
},
_triggerError() {
finishBatch()
finishBatch();
},

@@ -641,15 +642,15 @@

if (writeTxn)
return this.put(key, value, versionOrOptions, ifVersion)
return this.put(key, value, versionOrOptions, ifVersion);
else
return this.transactionSync(() =>
this.put(key, value, versionOrOptions, ifVersion) == SYNC_PROMISE_SUCCESS,
{ abortable: false })
{ abortable: false });
},
removeSync(key, ifVersionOrValue) {
if (writeTxn)
return this.remove(key, ifVersionOrValue)
return this.remove(key, ifVersionOrValue);
else
return this.transactionSync(() =>
this.remove(key, ifVersionOrValue) == SYNC_PROMISE_SUCCESS,
{ abortable: false })
{ abortable: false });
},

@@ -659,47 +660,47 @@ transaction(callback) {

// already nested in a transaction, just execute and return
return callback()
return callback();
}
return this.transactionAsync(callback)
return this.transactionAsync(callback);
},
childTransaction(callback) {
if (useWritemap)
throw new Error('Child transactions are not supported in writemap mode')
throw new Error('Child transactions are not supported in writemap mode');
if (writeTxn) {
env.beginTxn(1) // abortable
env.beginTxn(1); // abortable
try {
return when(callback(), (result) => {
if (result === ABORT)
env.abortTxn()
env.abortTxn();
else
env.commitTxn()
return result
env.commitTxn();
return result;
}, (error) => {
env.abortTxn()
throw error
})
env.abortTxn();
throw error;
});
} catch(error) {
env.abortTxn()
throw error
env.abortTxn();
throw error;
}
}
return this.transactionAsync(callback, true)
return this.transactionAsync(callback, true);
},
transactionAsync(callback, asChild) {
let txnIndex
let txnCallbacks
let txnIndex;
let txnCallbacks;
if (!nextResolution.callbacks) {
txnCallbacks = [asChild ? { callback, asChild } : callback]
nextResolution.callbacks = txnCallbacks
txnCallbacks.results = writeInstructions(8 | (this.strictAsyncOrder ? 0x100000 : 0), this)()
txnIndex = 0
txnCallbacks = [asChild ? { callback, asChild } : callback];
nextResolution.callbacks = txnCallbacks;
txnCallbacks.results = writeInstructions(8 | (this.strictAsyncOrder ? 0x100000 : 0), this)();
txnIndex = 0;
} else {
txnCallbacks = lastQueuedResolution.callbacks
txnIndex = txnCallbacks.push(asChild ? { callback, asChild } : callback) - 1
txnCallbacks = lastQueuedResolution.callbacks;
txnIndex = txnCallbacks.push(asChild ? { callback, asChild } : callback) - 1;
}
return txnCallbacks.results.then((results) => {
let result = txnCallbacks[txnIndex]
let result = txnCallbacks[txnIndex];
if (result === CALLBACK_THREW)
throw txnCallbacks.errors[txnIndex]
return result
})
throw txnCallbacks.errors[txnIndex];
return result;
});
},

@@ -710,49 +711,49 @@ transactionSync(callback, flags) {

// already nested in a transaction, execute as child transaction (if possible) and return
return this.childTransaction(callback)
let result = callback() // else just run in current transaction
return this.childTransaction(callback);
let result = callback(); // else just run in current transaction
if (result == ABORT && !abortedNonChildTransactionWarn) {
console.warn('Can not abort a transaction inside another transaction with ' + (this.cache ? 'caching enabled' : 'useWritemap enabled'))
abortedNonChildTransactionWarn = true
console.warn('Can not abort a transaction inside another transaction with ' + (this.cache ? 'caching enabled' : 'useWritemap enabled'));
abortedNonChildTransactionWarn = true;
}
return result
return result;
}
try {
this.transactions++
env.beginTxn(flags == undefined ? 3 : flags)
writeTxn = env.writeTxn = {}
this.transactions++;
env.beginTxn(flags == undefined ? 3 : flags);
writeTxn = env.writeTxn = {};
return when(callback(), (result) => {
try {
if (result === ABORT)
env.abortTxn()
env.abortTxn();
else {
env.commitTxn()
resetReadTxn()
env.commitTxn();
resetReadTxn();
}
return result
return result;
} finally {
env.writeTxn = writeTxn = null
env.writeTxn = writeTxn = null;
}
}, (error) => {
try { env.abortTxn() } catch(e) {}
env.writeTxn = writeTxn = null
throw error
})
try { env.abortTxn(); } catch(e) {}
env.writeTxn = writeTxn = null;
throw error;
});
} catch(error) {
try { env.abortTxn() } catch(e) {}
env.writeTxn = writeTxn = null
throw error
try { env.abortTxn(); } catch(e) {}
env.writeTxn = writeTxn = null;
throw error;
}
},
transactionSyncStart(callback) {
return this.transactionSync(callback, 0)
return this.transactionSync(callback, 0);
},
on(event, callback) {
if (event == 'beforecommit') {
eventTurnBatching = true
beforeCommitCallbacks.push(callback)
eventTurnBatching = true;
beforeCommitCallbacks.push(callback);
} else if (event == 'aftercommit')
afterCommitCallbacks.push(callback)
afterCommitCallbacks.push(callback);
}
})
LMDBStore.prototype.del = LMDBStore.prototype.remove
});
LMDBStore.prototype.del = LMDBStore.prototype.remove;
}

@@ -762,17 +763,22 @@

constructor(callback) {
super()
this.callback = callback
super();
this.callback = callback;
}
put(key, value) {
this.push({ type: 'put', key, value })
this.push({ type: 'put', key, value });
}
del(key) {
this.push({ type: 'del', key })
this.push({ type: 'del', key });
}
clear() {
this.splice(0, this.length)
this.splice(0, this.length);
}
write(callback) {
return this.callback(this, callback)
return this.callback(this, callback);
}
}
export function asBinary(buffer) {
return {
[binaryBuffer]: buffer
};
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet