New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@peerbit/lazy-level

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@peerbit/lazy-level - npm Package Compare versions

Comparing version 1.0.3 to 1.1.0

28

lib/esm/index.d.ts

@@ -1,4 +0,7 @@

import { AbstractLevel } from "abstract-level";
import { AbstractBatchOperation, AbstractLevel } from "abstract-level";
import { Cache } from "@peerbit/cache";
export type LevelBatchOptions = {
interval: number;
queueMaxBytes: number;
cacheMaxBytes: number;
onError?: (error: any) => void;

@@ -21,11 +24,24 @@ };

}
declare class TXQueue {
readonly opts: LevelBatchOptions;
readonly store: AbstractLevel<any, any, any>;
queue: AbstractBatchOperation<AbstractLevel<any, string, Uint8Array>, string, Uint8Array>[];
currentSize: number;
txPromise?: Promise<void>;
private _interval?;
tempStore: Cache<Uint8Array>;
tempDeleted: Set<string>;
constructor(opts: LevelBatchOptions, store: AbstractLevel<any, any, any>);
open(): void;
add(tx: AbstractBatchOperation<AbstractLevel<any, string, Uint8Array>, string, Uint8Array>): Promise<void>;
processTxQueue(): Promise<void>;
idle(): Promise<void>;
clear(): void;
close(): Promise<void>;
}
export default class LazyLevel implements SimpleLevel {
private _store;
private _interval;
private _txQueue?;
private _tempStore?;
private _tempDeleted?;
private _txPromise?;
private _opts?;
private _sublevels;
txQueue?: TXQueue;
constructor(store: AbstractLevel<any, any, any>, opts?: LazyLevelOptions | {

@@ -32,0 +48,0 @@ batch: boolean;

import { logger as loggerFn } from "@peerbit/logger";
import { waitFor } from "@peerbit/time";
import { Cache } from "@peerbit/cache";
const logger = loggerFn({ module: "cache" });
const DEFAULT_MAX_CACHE_SIZE_BYTES = 10 ** 7;
const DEFAULT_BATCH_INTERVAL = 300;
const DEFAULT_MAX_BATCH_SIZE = 10 ** 7;
const DEFAULT_BATCH_OPTIONS = {
interval: DEFAULT_BATCH_INTERVAL,
queueMaxBytes: DEFAULT_MAX_BATCH_SIZE,
cacheMaxBytes: DEFAULT_MAX_CACHE_SIZE_BYTES
};
const DELETE_TX_SIZE = 50; // experimental memory consumption
class TXQueue {
opts;
store;
queue;
currentSize = 0;
txPromise;
_interval;
tempStore;
tempDeleted;
constructor(opts, store) {
this.opts = opts;
this.store = store;
}
open() {
this.queue = [];
// TODO can we prevent re-open?
this.tempStore =
this.tempStore || new Cache({ max: this.opts.cacheMaxBytes });
this.tempDeleted = this.tempDeleted || new Set();
this._interval =
this._interval ||
setInterval(() => {
this.processTxQueue();
}, this.opts.interval);
}
async add(tx) {
let size;
if (tx.type === "put") {
this.tempDeleted.delete(tx.key);
this.tempStore.add(tx.key, tx.value, tx.value.byteLength);
size = tx.value.byteLength;
}
else if (tx.type == "del") {
size = DELETE_TX_SIZE;
this.tempDeleted.add(tx.key);
}
else {
throw new Error("Unexpected tx type: " + tx["type"]);
}
this.queue.push(tx);
this.currentSize += size;
if (this.currentSize >= this.opts.queueMaxBytes) {
await this.processTxQueue();
}
}
async processTxQueue() {
if (this.store.status === "open" && this.currentSize > 0) {
const arr = this.queue.splice(0, this.queue.length);
if (arr?.length > 0) {
// We manipulate sizes before finishing the tx so that subsequent calls to process processTxQueue end up here because invalid this.currentSize calculations
for (const v of arr) {
if (v.type === "put") {
this.currentSize -= v.value.byteLength;
}
else if (v.type === "del") {
this.currentSize -= DELETE_TX_SIZE;
}
}
const next = () => this.store
.batch(arr, { valueEncoding: "view" })
.then(() => {
arr.forEach((v) => {
if (v.type === "put") {
this.tempDeleted?.delete(v.key);
this.tempStore.del(v.key);
}
else if (v.type === "del") {
this.tempDeleted?.delete(v.key);
this.tempStore.del(v.key);
}
});
})
.catch((error) => {
if (this.opts.onError) {
this.opts.onError(error);
}
else {
logger.error(error);
}
});
this.txPromise = (this.txPromise ? this.txPromise : Promise.resolve())
.then(next)
.catch(next);
}
}
}
async idle() {
if (this.store.status !== "open" &&
this.store.status !== "opening" &&
this.queue &&
this.queue.length > 0) {
throw new Error("Store is closed, so cache will never finish idling");
}
await this.txPromise;
await waitFor(() => !this.queue || this.queue.length === 0, {
timeout: this.opts.interval * 2 + 1000,
delayInterval: 100,
timeoutMessage: `Failed to wait for idling, got txQueue with ${this.queue
?.length} elements. Store status: ${this.store
?.status}, interval exist: ${!!this._interval}`
});
}
clear() {
this.queue = [];
this.tempStore.clear();
this.tempDeleted.clear();
}
async close() {
await this.idle();
clearInterval(this._interval);
this.clear();
this._interval = undefined;
}
}
export default class LazyLevel {
_store;
_interval;
_txQueue;
_tempStore;
_tempDeleted;
_txPromise;
_opts;
_sublevels = [];
constructor(store, opts = { batch: { interval: 300 } }) {
txQueue;
constructor(store, opts = {
batch: DEFAULT_BATCH_OPTIONS
}) {
this._store = store;
if (typeof opts.batch === "boolean") {
if (opts.batch === true)
this._opts = { batch: { interval: 300 } };
this._opts = { batch: DEFAULT_BATCH_OPTIONS };
}
else if (opts) {
this._opts = { batch: { interval: 300, ...opts.batch }, ...opts };
this._opts = {
batch: { ...DEFAULT_BATCH_OPTIONS, ...opts.batch },
...opts
};
}

@@ -27,16 +152,3 @@ }

async idle() {
if (this._opts?.batch && this._txQueue) {
if (this._store.status !== "open" &&
this._store.status !== "opening" &&
this._txQueue &&
this._txQueue.length > 0) {
throw new Error("Store is closed, so cache will never finish idling");
}
await this._txPromise;
await waitFor(() => !this._txQueue || this._txQueue.length === 0, {
timeout: this._opts.batch.interval * 2 + 1000,
delayInterval: 100,
timeoutMessage: `Failed to wait for idling, got txQueue with ${this._txQueue?.length} elements. Store status: ${this._store?.status}, interval exist: ${!!this._interval}`,
});
}
await this.txQueue?.idle();
}

@@ -47,8 +159,4 @@ async close() {

}
await this.idle(); // idle after clear interval (because else txQueue might be filled with new things that are never removed)
if (this._opts?.batch) {
clearInterval(this._interval);
this._interval = undefined;
this._tempStore?.clear();
this._tempDeleted?.clear();
if (this.txQueue) {
await this.txQueue.close();
}

@@ -64,40 +172,5 @@ await Promise.all(this._sublevels.map((l) => l.close()));

return Promise.reject(new Error("No cache store found to open"));
if (this._opts?.batch && !this._interval) {
this._txQueue = [];
this._tempStore = new Map();
this._tempDeleted = new Set();
this._interval = setInterval(() => {
if (this._store.status === "open" &&
this._txQueue &&
this._txQueue.length > 0) {
const arr = this._txQueue.splice(0, this._txQueue.length);
if (arr?.length > 0) {
const next = () => this._store
.batch(arr, { valueEncoding: "view" })
.then(() => {
arr.forEach((v) => {
if (v.type === "put") {
this._tempDeleted?.delete(v.key);
this._tempStore.delete(v.key);
}
else if (v.type === "del") {
this._tempDeleted?.delete(v.key);
this._tempStore.delete(v.key);
}
});
})
.catch((error) => {
if (this._opts?.batch?.onError) {
this._opts?.batch.onError(error);
}
else {
logger.error(error);
}
});
this._txPromise = (this._txPromise ? this._txPromise : Promise.resolve())
.then(next)
.catch(next);
}
}
}, this._opts.batch.interval);
if (this._opts?.batch) {
(this.txQueue ||
(this.txQueue = new TXQueue(this._opts.batch, this._store))).open();
}

@@ -115,9 +188,9 @@ if (this.status() !== "open") {

try {
if (this._tempDeleted) {
if (this.txQueue) {
// batching is activated
if (this._tempDeleted.has(key)) {
if (this.txQueue.tempDeleted.has(key)) {
return undefined;
}
data =
(this._tempStore && this._tempStore.get(key)) ||
(this.txQueue.tempStore && this.txQueue.tempStore.get(key)) ||
(await this._store.get(key, { valueEncoding: "view" }));

@@ -144,3 +217,3 @@ }

lte: prefix + "\xFF",
valueEncoding: "view",
valueEncoding: "view"
});

@@ -154,6 +227,4 @@ const ret = [];

async clear(clearStore = true) {
this._txQueue = [];
this.txQueue?.clear();
await this.idle();
this._tempStore?.clear();
this._tempDeleted?.clear();
if (clearStore) {

@@ -168,3 +239,3 @@ await this._store.clear(); // will also clear sublevels

lte: prefix + "\xFF",
valueEncoding: "view",
valueEncoding: "view"
});

@@ -175,4 +246,4 @@ const keys = [];

}
if (this._tempStore) {
for (const key of this._tempStore.keys()) {
if (this.txQueue) {
for (const key of this.txQueue.tempStore.map.keys()) {
if (key.startsWith(prefix)) {

@@ -186,9 +257,7 @@ keys.push(key);

async put(key, value) {
if (this._opts?.batch) {
this._tempDeleted.delete(key);
this._tempStore.set(key, value);
this._txQueue.push({
if (this.txQueue) {
await this.txQueue.add({
type: "put",
key: key,
value: value,
value: value
});

@@ -205,5 +274,4 @@ }

}
if (this._opts?.batch) {
this._tempDeleted.add(key);
this._txQueue.push({ type: "del", key: key });
if (this.txQueue) {
this.txQueue.add({ type: "del", key: key });
}

@@ -210,0 +278,0 @@ else {

{
"name": "@peerbit/lazy-level",
"version": "1.0.3",
"version": "1.1.0",
"description": "Level with lazy transactions",

@@ -38,4 +38,5 @@ "type": "module",

"dependencies": {
"@peerbit/cache": "^1.1.0",
"@peerbit/logger": "1.0.1",
"@peerbit/time": "1.0.2",
"@peerbit/time": "1.0.3",
"level": "^8.0.0"

@@ -46,3 +47,3 @@ },

],
"gitHead": "06d341c4ea81b70c76018899b029f4419c311500"
"gitHead": "0cfa376bc90c31e1063ddaf5435c828b490e0228"
}
import { logger as loggerFn } from "@peerbit/logger";
import { waitFor } from "@peerbit/time";
import { AbstractBatchOperation, AbstractLevel } from "abstract-level";
import { Cache } from "@peerbit/cache";
export type LevelBatchOptions = {
interval: number;
queueMaxBytes: number;
cacheMaxBytes: number;
onError?: (error: any) => void;

@@ -27,6 +29,15 @@ };

export default class LazyLevel implements SimpleLevel {
private _store: AbstractLevel<any, any, any>;
private _interval: any;
private _txQueue?: AbstractBatchOperation<
const DEFAULT_MAX_CACHE_SIZE_BYTES = 10 ** 7;
const DEFAULT_BATCH_INTERVAL = 300;
const DEFAULT_MAX_BATCH_SIZE = 10 ** 7;
const DEFAULT_BATCH_OPTIONS: LevelBatchOptions = {
interval: DEFAULT_BATCH_INTERVAL,
queueMaxBytes: DEFAULT_MAX_BATCH_SIZE,
cacheMaxBytes: DEFAULT_MAX_CACHE_SIZE_BYTES
};
const DELETE_TX_SIZE = 50; // experimental memory consumption
class TXQueue {
queue: AbstractBatchOperation<
AbstractLevel<any, string, Uint8Array>,

@@ -36,17 +47,149 @@ string,

>[];
private _tempStore?: Map<string, Uint8Array>;
private _tempDeleted?: Set<string>;
private _txPromise?: Promise<void>;
currentSize = 0;
txPromise?: Promise<void>;
private _interval?: ReturnType<typeof setInterval>;
tempStore: Cache<Uint8Array>;
tempDeleted: Set<string>;
constructor(
readonly opts: LevelBatchOptions,
readonly store: AbstractLevel<any, any, any>
) {}
open() {
this.queue = [];
// TODO can we prevent re-open?
this.tempStore =
this.tempStore || new Cache({ max: this.opts.cacheMaxBytes });
this.tempDeleted = this.tempDeleted || new Set();
this._interval =
this._interval ||
setInterval(() => {
this.processTxQueue();
}, this.opts.interval);
}
async add(
tx: AbstractBatchOperation<
AbstractLevel<any, string, Uint8Array>,
string,
Uint8Array
>
) {
let size: number;
if (tx.type === "put") {
this.tempDeleted.delete(tx.key);
this.tempStore.add(tx.key, tx.value, tx.value.byteLength);
size = tx.value.byteLength;
} else if (tx.type == "del") {
size = DELETE_TX_SIZE;
this.tempDeleted.add(tx.key);
} else {
throw new Error("Unexpected tx type: " + tx["type"]);
}
this.queue.push(tx);
this.currentSize += size;
if (this.currentSize >= this.opts.queueMaxBytes) {
await this.processTxQueue();
}
}
async processTxQueue() {
if (this.store.status === "open" && this.currentSize > 0) {
const arr = this.queue.splice(0, this.queue.length);
if (arr?.length > 0) {
// We manipulate sizes before finishing the tx so that subsequent calls to process processTxQueue end up here because invalid this.currentSize calculations
for (const v of arr) {
if (v.type === "put") {
this.currentSize -= v.value.byteLength;
} else if (v.type === "del") {
this.currentSize -= DELETE_TX_SIZE;
}
}
const next = () =>
this.store
.batch(arr, { valueEncoding: "view" })
.then(() => {
arr.forEach((v) => {
if (v.type === "put") {
this.tempDeleted?.delete(v.key);
this.tempStore!.del(v.key);
} else if (v.type === "del") {
this.tempDeleted?.delete(v.key);
this.tempStore!.del(v.key);
}
});
})
.catch((error) => {
if (this.opts.onError) {
this.opts.onError(error);
} else {
logger.error(error);
}
});
this.txPromise = (this.txPromise ? this.txPromise : Promise.resolve())
.then(next)
.catch(next);
}
}
}
async idle() {
if (
this.store.status !== "open" &&
this.store.status !== "opening" &&
this.queue &&
this.queue.length > 0
) {
throw new Error("Store is closed, so cache will never finish idling");
}
await this.txPromise;
await waitFor(() => !this.queue || this.queue.length === 0, {
timeout: this.opts.interval * 2 + 1000, // TODO, do this better so tests don't fail in slow envs.
delayInterval: 100,
timeoutMessage: `Failed to wait for idling, got txQueue with ${this.queue
?.length} elements. Store status: ${this.store
?.status}, interval exist: ${!!this._interval}`
});
}
clear() {
this.queue = [];
this.tempStore.clear();
this.tempDeleted.clear();
}
async close() {
await this.idle();
clearInterval(this._interval);
this.clear();
this._interval = undefined;
}
}
export default class LazyLevel implements SimpleLevel {
private _store: AbstractLevel<any, any, any>;
private _opts?: LazyLevelOptions;
private _sublevels: LazyLevel[] = [];
txQueue?: TXQueue;
constructor(
store: AbstractLevel<any, any, any>,
opts: LazyLevelOptions | { batch: boolean } = { batch: { interval: 300 } }
opts: LazyLevelOptions | { batch: boolean } = {
batch: DEFAULT_BATCH_OPTIONS
}
) {
this._store = store;
if (typeof opts.batch === "boolean") {
if (opts.batch === true) this._opts = { batch: { interval: 300 } };
if (opts.batch === true) this._opts = { batch: DEFAULT_BATCH_OPTIONS };
} else if (opts) {
this._opts = { batch: { interval: 300, ...opts.batch }, ...opts };
this._opts = {
batch: { ...DEFAULT_BATCH_OPTIONS, ...opts.batch },
...opts
};
}

@@ -60,23 +203,5 @@ }

async idle() {
if (this._opts?.batch && this._txQueue) {
if (
this._store.status !== "open" &&
this._store.status !== "opening" &&
this._txQueue &&
this._txQueue.length > 0
) {
throw new Error("Store is closed, so cache will never finish idling");
}
await this._txPromise;
await waitFor(() => !this._txQueue || this._txQueue.length === 0, {
timeout: this._opts.batch.interval * 2 + 1000, // TODO, do this better so tests don't fail in slow envs.
delayInterval: 100,
timeoutMessage: `Failed to wait for idling, got txQueue with ${
this._txQueue?.length
} elements. Store status: ${
this._store?.status
}, interval exist: ${!!this._interval}`,
});
}
await this.txQueue?.idle();
}
async close() {

@@ -87,8 +212,4 @@ if (!this._store) {

await this.idle(); // idle after clear interval (because else txQueue might be filled with new things that are never removed)
if (this._opts?.batch) {
clearInterval(this._interval);
this._interval = undefined;
this._tempStore?.clear();
this._tempDeleted?.clear();
if (this.txQueue) {
await this.txQueue.close();
}

@@ -107,43 +228,7 @@ await Promise.all(this._sublevels.map((l) => l.close()));

if (this._opts?.batch && !this._interval) {
this._txQueue = [];
this._tempStore = new Map();
this._tempDeleted = new Set();
this._interval = setInterval(() => {
if (
this._store.status === "open" &&
this._txQueue &&
this._txQueue.length > 0
) {
const arr = this._txQueue.splice(0, this._txQueue.length);
if (arr?.length > 0) {
const next = () =>
this._store
.batch(arr, { valueEncoding: "view" })
.then(() => {
arr.forEach((v) => {
if (v.type === "put") {
this._tempDeleted?.delete(v.key);
this._tempStore!.delete(v.key);
} else if (v.type === "del") {
this._tempDeleted?.delete(v.key);
this._tempStore!.delete(v.key);
}
});
})
.catch((error) => {
if (this._opts?.batch?.onError) {
this._opts?.batch.onError(error);
} else {
logger.error(error);
}
});
this._txPromise = (
this._txPromise ? this._txPromise : Promise.resolve()
)
.then(next)
.catch(next);
}
}
}, this._opts.batch.interval);
if (this._opts?.batch) {
(
this.txQueue ||
(this.txQueue = new TXQueue(this._opts.batch, this._store))
).open();
}

@@ -163,9 +248,9 @@

try {
if (this._tempDeleted) {
if (this.txQueue) {
// batching is activated
if (this._tempDeleted.has(key)) {
if (this.txQueue.tempDeleted.has(key)) {
return undefined;
}
data =
(this._tempStore && this._tempStore.get(key)) ||
(this.txQueue.tempStore && this.txQueue.tempStore.get(key)) ||
(await this._store.get(key, { valueEncoding: "view" }));

@@ -193,3 +278,3 @@ } else {

lte: prefix + "\xFF",
valueEncoding: "view",
valueEncoding: "view"
});

@@ -205,6 +290,4 @@ const ret: Uint8Array[] = [];

async clear(clearStore = true): Promise<void> {
this._txQueue = [];
this.txQueue?.clear();
await this.idle();
this._tempStore?.clear();
this._tempDeleted?.clear();
if (clearStore) {

@@ -220,3 +303,3 @@ await this._store.clear(); // will also clear sublevels

lte: prefix + "\xFF",
valueEncoding: "view",
valueEncoding: "view"
});

@@ -228,4 +311,4 @@ const keys: string[] = [];

if (this._tempStore) {
for (const key of this._tempStore.keys()) {
if (this.txQueue) {
for (const key of this.txQueue.tempStore.map.keys()) {
if (key.startsWith(prefix)) {

@@ -239,9 +322,7 @@ keys.push(key);

async put(key: string, value: Uint8Array) {
if (this._opts?.batch) {
this._tempDeleted!.delete(key);
this._tempStore!.set(key, value);
this._txQueue!.push({
if (this.txQueue) {
await this.txQueue.add({
type: "put",
key: key,
value: value,
value: value
});

@@ -259,5 +340,4 @@ } else {

if (this._opts?.batch) {
this._tempDeleted!.add(key);
this._txQueue!.push({ type: "del", key: key });
if (this.txQueue) {
this.txQueue.add({ type: "del", key: key });
} else {

@@ -264,0 +344,0 @@ return new Promise<void>((resolve, reject) => {

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc