Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@naturalcycles/db-lib

Package Overview
Dependencies
Maintainers
3
Versions
302
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@naturalcycles/db-lib - npm Package Compare versions

Comparing version 8.41.1 to 8.42.0

4

dist/adapter/cachedb/cache.db.d.ts

@@ -6,4 +6,5 @@ /// <reference types="node" />

import { CommonDB } from '../../common.db';
import { RunQueryResult } from '../../db.model';
import { CommonDBOptions, RunQueryResult } from '../../db.model';
import { DBQuery } from '../../query/dbQuery';
import { DBTransaction } from '../../transaction/dbTransaction';
import { CacheDBCfg, CacheDBCreateOptions, CacheDBOptions, CacheDBSaveOptions, CacheDBStreamOptions } from './cache.db.model';

@@ -33,2 +34,3 @@ /**

deleteByQuery<ROW extends ObjectWithId>(q: DBQuery<ROW>, opt?: CacheDBOptions): Promise<number>;
commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise<void>;
}

@@ -198,3 +198,7 @@ "use strict";

}
async commitTransaction(tx, opt) {
await this.cfg.downstreamDB.commitTransaction(tx, opt);
await this.cfg.cacheDB.commitTransaction(tx, opt);
}
}
exports.CacheDB = CacheDB;

@@ -73,2 +73,3 @@ "use strict";

}, { concurrency: 16 });
const backup = (0, js_lib_1._deepCopy)(data);
// 2. Apply ops one by one (in order)

@@ -80,3 +81,8 @@ tx.ops.forEach(op => {

else if (op.type === 'saveBatch') {
op.rows.forEach(r => (data[op.table][r.id] = r));
op.rows.forEach(r => {
if (!r.id) {
throw new Error('FileDB: row has an empty id');
}
data[op.table][r.id] = r;
});
}

@@ -89,11 +95,25 @@ else {

// Not filtering empty arrays, cause it's already filtered in this.saveFiles()
const ops = Object.keys(data).map(table => {
const ops = (0, js_lib_1._stringMapEntries)(data).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows(Object.values(data[table])),
rows: this.sortRows((0, js_lib_1._stringMapValues)(map)),
};
});
// 4. Save all files
await this.saveFiles(ops);
try {
await this.saveFiles(ops);
}
catch (err) {
const ops = (0, js_lib_1._stringMapEntries)(backup).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows((0, js_lib_1._stringMapValues)(map)),
};
});
// Rollback, ignore rollback error (if any)
await this.saveFiles(ops).catch(_ => { });
throw err;
}
}

@@ -100,0 +120,0 @@ async runQuery(q, _opt) {

@@ -8,3 +8,3 @@ import { CommonLogger, ObjectWithId } from '@naturalcycles/js-lib';

loadFile<ROW extends ObjectWithId>(table: string): Promise<ROW[]>;
saveFiles(ops: DBSaveBatchOperation[]): Promise<void>;
saveFiles(ops: DBSaveBatchOperation<any>[]): Promise<void>;
}

@@ -11,0 +11,0 @@ export interface FileDBCfg {

@@ -39,3 +39,3 @@ "use strict";

const table = this.cfg.tablesPrefix + _table;
this.cfg.logger?.log(`reset ${table}`);
this.cfg.logger.log(`reset ${table}`);
this.data[table] = {};

@@ -48,3 +48,3 @@ }

});
this.cfg.logger?.log('reset');
this.cfg.logger.log('reset');
}

@@ -84,3 +84,3 @@ }

if (!r.id) {
this.cfg.logger?.warn({ rows });
this.cfg.logger.warn({ rows });
throw new Error(`InMemoryDB doesn't support id auto-generation in saveBatch, row without id was given`);

@@ -131,13 +131,22 @@ }

async commitTransaction(tx, opt) {
for await (const op of tx.ops) {
if (op.type === 'saveBatch') {
await this.saveBatch(op.table, op.rows, opt);
const backup = (0, js_lib_1._deepCopy)(this.data);
try {
for await (const op of tx.ops) {
if (op.type === 'saveBatch') {
await this.saveBatch(op.table, op.rows, { ...op.opt, ...opt });
}
else if (op.type === 'deleteByIds') {
await this.deleteByIds(op.table, op.ids, { ...op.opt, ...opt });
}
else {
throw new Error(`DBOperation not supported: ${op.type}`);
}
}
else if (op.type === 'deleteByIds') {
await this.deleteByIds(op.table, op.ids, opt);
}
else {
throw new Error(`DBOperation not supported: ${op.type}`);
}
}
catch (err) {
// rollback
this.data = backup;
this.cfg.logger.log('InMemoryDB transaction rolled back');
throw err;
}
}

@@ -170,3 +179,3 @@ /**

});
this.cfg.logger?.log(`flushToDisk took ${(0, colors_1.dimGrey)((0, js_lib_1._since)(started))} to save ${(0, colors_1.yellow)(tables)} tables`);
this.cfg.logger.log(`flushToDisk took ${(0, colors_1.dimGrey)((0, js_lib_1._since)(started))} to save ${(0, colors_1.yellow)(tables)} tables`);
}

@@ -200,5 +209,5 @@ /**

});
this.cfg.logger?.log(`restoreFromDisk took ${(0, colors_1.dimGrey)((0, js_lib_1._since)(started))} to read ${(0, colors_1.yellow)(files.length)} tables`);
this.cfg.logger.log(`restoreFromDisk took ${(0, colors_1.dimGrey)((0, js_lib_1._since)(started))} to read ${(0, colors_1.yellow)(files.length)} tables`);
}
}
exports.InMemoryDB = InMemoryDB;

@@ -25,2 +25,3 @@ import { JsonSchemaObject, JsonSchemaRootObject, ObjectWithId } from '@naturalcycles/js-lib';

* Naive implementation.
* Doesn't support rollback on error, hence doesn't pass dbTest.
* To be extended.

@@ -27,0 +28,0 @@ */

@@ -46,2 +46,3 @@ "use strict";

* Naive implementation.
* Doesn't support rollback on error, hence doesn't pass dbTest.
* To be extended.

@@ -48,0 +49,0 @@ */

import { AsyncMapper, JsonSchemaObject, JsonSchemaRootObject, ObjectWithId, Saved, Unsaved } from '@naturalcycles/js-lib';
import { AjvSchema, ObjectSchemaTyped, ReadableTyped } from '@naturalcycles/nodejs-lib';
import { DBModelType, RunQueryResult } from '../db.model';
import { DBDeleteByIdsOperation, DBModelType, DBOperation, DBSaveBatchOperation, RunQueryResult } from '../db.model';
import { DBQuery, RunnableDBQuery } from '../query/dbQuery';

@@ -86,2 +86,8 @@ import { CommonDaoCfg, CommonDaoCreateOptions, CommonDaoOptions, CommonDaoSaveOptions, CommonDaoStreamForEachOptions, CommonDaoStreamOptions } from './common.dao.model';

assignIdCreatedUpdated(obj: Unsaved<BM>, opt?: CommonDaoOptions): Saved<BM>;
tx: {
save: (bm: Unsaved<BM>, opt?: CommonDaoSaveOptions<DBM>) => Promise<DBSaveBatchOperation>;
saveBatch: (bms: Unsaved<BM>[], opt?: CommonDaoSaveOptions<DBM>) => Promise<DBSaveBatchOperation>;
deleteByIds: (ids: ID[], opt?: CommonDaoOptions) => Promise<DBDeleteByIdsOperation>;
deleteById: (id: ID, opt?: CommonDaoOptions) => Promise<DBDeleteByIdsOperation>;
};
/**

@@ -150,2 +156,3 @@ * Mutates with id, created, updated

ping(): Promise<void>;
runInTransaction(ops: Promise<DBOperation>[]): Promise<void>;
protected logResult(started: number, op: string, res: any, table: string): void;

@@ -152,0 +159,0 @@ protected logSaveResult(started: number, op: string, table: string): void;

@@ -9,2 +9,3 @@ "use strict";

const dbQuery_1 = require("../query/dbQuery");
const dbTransaction_1 = require("../transaction/dbTransaction");
const common_dao_model_1 = require("./common.dao.model");

@@ -25,2 +26,44 @@ /* eslint-disable no-dupe-class-members */

this.cfg = cfg;
this.tx = {
save: async (bm, opt = {}) => {
const row = (await this.save(bm, { ...opt, tx: true }));
return {
type: 'saveBatch',
table: this.cfg.table,
rows: [row],
opt: {
excludeFromIndexes: this.cfg.excludeFromIndexes,
...opt,
},
};
},
saveBatch: async (bms, opt = {}) => {
const rows = (await this.saveBatch(bms, { ...opt, tx: true }));
return {
type: 'saveBatch',
table: this.cfg.table,
rows,
opt: {
excludeFromIndexes: this.cfg.excludeFromIndexes,
...opt,
},
};
},
deleteByIds: async (ids, opt = {}) => {
return {
type: 'deleteByIds',
table: this.cfg.table,
ids: ids,
opt,
};
},
deleteById: async (id, opt = {}) => {
return {
type: 'deleteByIds',
table: this.cfg.table,
ids: [id],
opt,
};
},
};
this.cfg = {

@@ -464,2 +507,5 @@ // Default is to NOT log in AppEngine and in CI,

const dbm = await this.bmToDBM(bm, opt);
if (opt.tx) {
return dbm;
}
const table = opt.table || this.cfg.table;

@@ -544,2 +590,5 @@ if (opt.ensureUniqueId && idWasGenerated)

const dbms = await this.bmsToDBM(bms, opt);
if (opt.tx) {
return dbms;
}
if (opt.ensureUniqueId)

@@ -818,5 +867,8 @@ throw new js_lib_1.AppError('ensureUniqueId is not supported in saveBatch');

}
// transaction(): DBTransaction {
// return this.cfg.db.transaction()
// }
async runInTransaction(ops) {
if (!ops.length)
return;
const resolvedOps = await Promise.all(ops);
await this.cfg.db.commitTransaction(dbTransaction_1.DBTransaction.create(resolvedOps));
}
logResult(started, op, res, table) {

@@ -823,0 +875,0 @@ if (!this.cfg.logLevel)

@@ -170,2 +170,12 @@ import { CommonLogger, ErrorMode, ObjectWithId, Saved } from '@naturalcycles/js-lib';

timeout?: number;
/**
* If passed - operation will not be performed immediately, but instead "added" to the transaction.
* In the end - transaction needs to be committed (by calling `commit`).
* This API is inspired by Datastore API.
*
* Only applicable to save* and delete* operations
*
* @experimental
*/
tx?: boolean;
}

@@ -172,0 +182,0 @@ /**

@@ -48,2 +48,3 @@ import { AnyObjectWithId, ObjectWithId } from '@naturalcycles/js-lib';

rows: ROW[];
opt?: CommonDBSaveOptions<ROW>;
}

@@ -54,2 +55,3 @@ export interface DBDeleteByIdsOperation {

ids: string[];
opt?: CommonDBOptions;
}

@@ -56,0 +58,0 @@ export declare enum DBRelation {

@@ -18,5 +18,5 @@ import { InMemoryDB, InMemoryDBCfg } from './adapter/inmemory/inMemory.db';

import { DBTransaction, RunnableDBTransaction } from './transaction/dbTransaction';
import { commitDBTransactionSimple, mergeDBOperations } from './transaction/dbTransaction.util';
import { commitDBTransactionSimple } from './transaction/dbTransaction.util';
export * from './kv/commonKeyValueDaoMemoCache';
export type { DBQueryFilterOperator, DBQueryFilter, DBQueryOrder, CommonDaoCreateOptions, CommonDaoOptions, CommonDaoSaveOptions, CommonDaoStreamForEachOptions, CommonDaoStreamOptions, CommonDaoHooks, CommonDBOptions, CommonDBSaveOptions, CommonDBSaveMethod, CommonDBStreamOptions, CommonDBCreateOptions, CommonDB, RunQueryResult, CommonDaoCfg, InMemoryDBCfg, InMemoryKeyValueDBCfg, DBPipelineBackupOptions, DBPipelineRestoreOptions, DBPipelineCopyOptions, DBOperation, DBSaveBatchOperation, DBDeleteByIdsOperation, CommonKeyValueDB, CommonKeyValueDaoCfg, KeyValueDBTuple, };
export { DBQuery, dbQueryFilterOperatorValues, RunnableDBQuery, CommonDaoLogLevel, DBRelation, DBModelType, CommonDao, createdUpdatedFields, createdUpdatedIdFields, InMemoryDB, InMemoryKeyValueDB, queryInMemory, serializeJsonField, deserializeJsonField, dbPipelineBackup, dbPipelineRestore, dbPipelineCopy, DBLibError, BaseCommonDB, DBTransaction, RunnableDBTransaction, mergeDBOperations, commitDBTransactionSimple, CommonKeyValueDao, };
export { DBQuery, dbQueryFilterOperatorValues, RunnableDBQuery, CommonDaoLogLevel, DBRelation, DBModelType, CommonDao, createdUpdatedFields, createdUpdatedIdFields, InMemoryDB, InMemoryKeyValueDB, queryInMemory, serializeJsonField, deserializeJsonField, dbPipelineBackup, dbPipelineRestore, dbPipelineCopy, DBLibError, BaseCommonDB, DBTransaction, RunnableDBTransaction, commitDBTransactionSimple, CommonKeyValueDao, };
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.CommonKeyValueDao = exports.commitDBTransactionSimple = exports.mergeDBOperations = exports.RunnableDBTransaction = exports.DBTransaction = exports.BaseCommonDB = exports.DBLibError = exports.dbPipelineCopy = exports.dbPipelineRestore = exports.dbPipelineBackup = exports.deserializeJsonField = exports.serializeJsonField = exports.queryInMemory = exports.InMemoryKeyValueDB = exports.InMemoryDB = exports.createdUpdatedIdFields = exports.createdUpdatedFields = exports.CommonDao = exports.DBModelType = exports.DBRelation = exports.CommonDaoLogLevel = exports.RunnableDBQuery = exports.dbQueryFilterOperatorValues = exports.DBQuery = void 0;
exports.CommonKeyValueDao = exports.commitDBTransactionSimple = exports.RunnableDBTransaction = exports.DBTransaction = exports.BaseCommonDB = exports.DBLibError = exports.dbPipelineCopy = exports.dbPipelineRestore = exports.dbPipelineBackup = exports.deserializeJsonField = exports.serializeJsonField = exports.queryInMemory = exports.InMemoryKeyValueDB = exports.InMemoryDB = exports.createdUpdatedIdFields = exports.createdUpdatedFields = exports.CommonDao = exports.DBModelType = exports.DBRelation = exports.CommonDaoLogLevel = exports.RunnableDBQuery = exports.dbQueryFilterOperatorValues = exports.DBQuery = void 0;
const tslib_1 = require("tslib");

@@ -44,3 +44,2 @@ const inMemory_db_1 = require("./adapter/inmemory/inMemory.db");

Object.defineProperty(exports, "commitDBTransactionSimple", { enumerable: true, get: function () { return dbTransaction_util_1.commitDBTransactionSimple; } });
Object.defineProperty(exports, "mergeDBOperations", { enumerable: true, get: function () { return dbTransaction_util_1.mergeDBOperations; } });
tslib_1.__exportStar(require("./kv/commonKeyValueDaoMemoCache"), exports);

@@ -24,3 +24,3 @@ "use strict";

// dbQueryFilterIn = true,
dbQueryOrder = true, dbQuerySelectFields = true, streaming = true, strongConsistency = true, nullValues = true, } = features;
dbQueryOrder = true, dbQuerySelectFields = true, streaming = true, strongConsistency = true, nullValues = true, transactions = true, } = features;
// const {

@@ -195,3 +195,44 @@ // allowExtraPropertiesInResponse,

}
if (transactions) {
test('transaction happy path', async () => {
// cleanup
await dao.query().deleteByQuery();
// Test that id, created, updated are created
const now = (0, js_lib_1.localTime)().unix();
await dao.runInTransaction([dao.tx.save((0, js_lib_1._omit)(item1, ['id', 'created', 'updated']))]);
const loaded = await dao.query().runQuery();
expect(loaded.length).toBe(1);
expect(loaded[0].id).toBeDefined();
expect(loaded[0].created).toBeGreaterThanOrEqual(now);
expect(loaded[0].updated).toBe(loaded[0].created);
await dao.runInTransaction([dao.tx.deleteById(loaded[0].id)]);
// saveBatch [item1, 2, 3]
// save item3 with k1: k1_mod
// delete item2
// remaining: item1, item3_with_k1_mod
await dao.runInTransaction([
dao.tx.saveBatch(items),
dao.tx.save({ ...items[2], k1: 'k1_mod' }),
dao.tx.deleteById(items[1].id),
]);
const rows = await dao.query().runQuery();
const expected = [items[0], { ...items[2], k1: 'k1_mod' }];
(0, dbTest_1.expectMatch)(expected, rows, quirks);
});
test('transaction rollback', async () => {
await expect(dao.runInTransaction([
dao.tx.deleteById(items[2].id),
dao.tx.save({ ...items[0], k1: 5 }), // it should fail here
])).rejects.toThrow();
const rows = await dao.query().runQuery();
const expected = [items[0], { ...items[2], k1: 'k1_mod' }];
(0, dbTest_1.expectMatch)(expected, rows, quirks);
});
if (querying) {
test('transaction cleanup', async () => {
await dao.query().deleteByQuery();
});
}
}
}
exports.runCommonDaoTest = runCommonDaoTest;

@@ -28,2 +28,3 @@ import { CommonDB } from '../common.db';

documentDB?: boolean;
transactions?: boolean;
}

@@ -30,0 +31,0 @@ /**

@@ -7,2 +7,3 @@ "use strict";

const dbQuery_1 = require("../query/dbQuery");
const dbTransaction_1 = require("../transaction/dbTransaction");
const test_model_1 = require("./test.model");

@@ -16,3 +17,3 @@ const test_util_1 = require("./test.util");

// dbQueryFilterIn = true,
dbQueryOrder = true, dbQuerySelectFields = true, insert = true, update = true, streaming = true, strongConsistency = true, bufferSupport = true, nullValues = true, documentDB = true, } = features;
dbQueryOrder = true, dbQuerySelectFields = true, insert = true, update = true, streaming = true, strongConsistency = true, bufferSupport = true, nullValues = true, documentDB = true, transactions = true, } = features;
// const {

@@ -100,2 +101,5 @@ // allowExtraPropertiesInResponse,

});
test('saveBatch should throw on null id', async () => {
await expect(db.saveBatch(test_model_1.TEST_TABLE, [{ ...item1, id: null }])).rejects.toThrow();
});
if (insert) {

@@ -175,7 +179,7 @@ test('saveBatch INSERT method should throw', async () => {

const tables = await db.getTables();
console.log({ tables });
// console.log({ tables })
if (tableSchemas) {
await (0, js_lib_1.pMap)(tables, async (table) => {
const schema = await db.getTableSchema(table);
console.log(schema);
// console.log(schema)
expect(schema.$id).toBe(`${table}.schema.json`);

@@ -208,10 +212,10 @@ });

const b1Loaded = loaded.b1;
console.log({
b11: typeof b1,
b12: typeof b1Loaded,
l1: b1.length,
l2: b1Loaded.length,
b1,
b1Loaded,
});
// console.log({
// b11: typeof b1,
// b12: typeof b1Loaded,
// l1: b1.length,
// l2: b1Loaded.length,
// b1,
// b1Loaded,
// })
expect(b1Loaded).toEqual(b1);

@@ -221,2 +225,30 @@ expect(b1Loaded.toString()).toBe(s);

}
if (transactions) {
test('transaction happy path', async () => {
// cleanup
await db.deleteByQuery(queryAll());
// saveBatch [item1, 2, 3]
// save item3 with k1: k1_mod
// delete item2
// remaining: item1, item3_with_k1_mod
const tx = dbTransaction_1.DBTransaction.create()
.saveBatch(test_model_1.TEST_TABLE, items)
.save(test_model_1.TEST_TABLE, { ...items[2], k1: 'k1_mod' })
.deleteById(test_model_1.TEST_TABLE, items[1].id);
await db.commitTransaction(tx);
const { rows } = await db.runQuery(queryAll());
const expected = [items[0], { ...items[2], k1: 'k1_mod' }];
expectMatch(expected, rows, quirks);
});
test('transaction rollback', async () => {
// It should fail on id == null
const tx = dbTransaction_1.DBTransaction.create()
.deleteById(test_model_1.TEST_TABLE, items[2].id)
.save(test_model_1.TEST_TABLE, { ...items[0], k1: 5, id: null });
await expect(db.commitTransaction(tx)).rejects.toThrow();
const { rows } = await db.runQuery(queryAll());
const expected = [items[0], { ...items[2], k1: 'k1_mod' }];
expectMatch(expected, rows, quirks);
});
}
if (querying) {

@@ -223,0 +255,0 @@ test('cleanup', async () => {

@@ -45,3 +45,3 @@ "use strict";

return;
const tx = new __1.DBTransaction();
const tx = __1.DBTransaction.create();
ops.forEach(op => {

@@ -48,0 +48,0 @@ const rows = op.dataPoints.map(([ts, v]) => ({

@@ -9,3 +9,10 @@ import { AnyObjectWithId, ObjectWithId } from '@naturalcycles/js-lib';

ops: DBOperation[];
protected constructor(ops?: DBOperation[]);
/**
* Convenience method.
*/
static create(ops?: DBOperation[]): DBTransaction;
save<ROW extends ObjectWithId = AnyObjectWithId>(table: string, row: ROW): this;
saveBatch<ROW extends ObjectWithId = AnyObjectWithId>(table: string, rows: ROW[]): this;
deleteById(table: string, id: string): this;
deleteByIds(table: string, ids: string[]): this;

@@ -12,0 +19,0 @@ }

@@ -8,5 +8,19 @@ "use strict";

class DBTransaction {
constructor() {
this.ops = [];
constructor(ops = []) {
this.ops = ops;
}
/**
* Convenience method.
*/
static create(ops = []) {
return new DBTransaction(ops);
}
save(table, row) {
this.ops.push({
type: 'saveBatch',
table,
rows: [row],
});
return this;
}
saveBatch(table, rows) {

@@ -20,2 +34,10 @@ this.ops.push({

}
deleteById(table, id) {
this.ops.push({
type: 'deleteByIds',
table,
ids: [id],
});
return this;
}
deleteByIds(table, ids) {

@@ -22,0 +44,0 @@ this.ops.push({

import type { CommonDB } from '../common.db';
import { CommonDBSaveOptions, DBOperation } from '../db.model';
import { CommonDBSaveOptions } from '../db.model';
import { DBTransaction } from './dbTransaction';

@@ -7,7 +7,7 @@ /**

* E.g if you save id1 first and then delete it - this function will turn it into a no-op (self-eliminate).
* UPD: actually, it will only keep delete, but remove previous ops.
*
* Currently only takes into account SaveBatch and DeleteByIds ops.
* Output ops are maximum 2 (per table) - save and delete (where order actually doesn't matter, cause ids there will not overlap).
* Output ops are maximum 1 per entity - save or delete.
*/
export declare function mergeDBOperations(ops: DBOperation[]): DBOperation[];
/**

@@ -14,0 +14,0 @@ * Naive implementation of "Transaction" which just executes all operations one-by-one.

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.commitDBTransactionSimple = exports.mergeDBOperations = void 0;
const js_lib_1 = require("@naturalcycles/js-lib");
exports.commitDBTransactionSimple = void 0;
/**
* Optimizes the Transaction (list of DBOperations) to do less operations.
* E.g if you save id1 first and then delete it - this function will turn it into a no-op (self-eliminate).
* UPD: actually, it will only keep delete, but remove previous ops.
*
* Currently only takes into account SaveBatch and DeleteByIds ops.
* Output ops are maximum 2 (per table) - save and delete (where order actually doesn't matter, cause ids there will not overlap).
* Output ops are maximum 1 per entity - save or delete.
*/
function mergeDBOperations(ops) {
if (ops.length <= 1)
return ops; // nothing to optimize there
// This map will be saved in the end. Null would mean "delete"
// saveMap[table][id] => row
const saveMapByTable = {};
// Merge ops using `saveMap`
ops.forEach(op => {
saveMapByTable[op.table] = saveMapByTable[op.table] || {};
if (op.type === 'saveBatch') {
op.rows.forEach(r => (saveMapByTable[op.table][r.id] = r));
}
else if (op.type === 'deleteByIds') {
op.ids.forEach(id => (saveMapByTable[op.table][id] = null));
}
else {
throw new Error(`DBOperation not supported: ${op.type}`);
}
});
const resultOps = [];
(0, js_lib_1._stringMapEntries)(saveMapByTable).forEach(([table, saveMap]) => {
const rowsToSave = [];
const idsToDelete = [];
(0, js_lib_1._stringMapEntries)(saveMap).forEach(([id, r]) => {
if (r === null) {
idsToDelete.push(id);
}
else {
rowsToSave.push(r);
}
});
if (rowsToSave.length) {
resultOps.push({
type: 'saveBatch',
table,
rows: rowsToSave,
});
}
if (idsToDelete.length) {
resultOps.push({
type: 'deleteByIds',
table,
ids: idsToDelete,
});
}
});
return resultOps;
// Commented out as "overly complicated"
/*
export function mergeDBOperations(ops: DBOperation[]): DBOperation[] {
if (ops.length <= 1) return ops // nothing to optimize there
// This map will be saved in the end. Null would mean "delete"
// saveMap[table][id] => row
const data: StringMap<StringMap<ObjectWithId | null>> = {}
// Merge ops using `saveMap`
ops.forEach(op => {
data[op.table] ||= {}
if (op.type === 'saveBatch') {
op.rows.forEach(r => (data[op.table]![r.id] = r))
} else if (op.type === 'deleteByIds') {
op.ids.forEach(id => (data[op.table]![id] = null))
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
}
})
const resultOps: DBOperation[] = []
_stringMapEntries(data).forEach(([table, map]) => {
const saveOp: DBSaveBatchOperation = {
type: 'saveBatch',
table,
rows: _stringMapValues(map).filter(_isTruthy),
}
if (saveOp.rows.length) {
resultOps.push(saveOp)
}
const deleteOp: DBDeleteByIdsOperation = {
type: 'deleteByIds',
table,
ids: _stringMapEntries(map).filter(([id, row]) => row === null).map(([id]) => id),
}
if (deleteOp.ids.length) {
resultOps.push(deleteOp)
}
})
return resultOps
}
exports.mergeDBOperations = mergeDBOperations;
*/
/**

@@ -67,9 +67,9 @@ * Naive implementation of "Transaction" which just executes all operations one-by-one.

async function commitDBTransactionSimple(db, tx, opt) {
const ops = mergeDBOperations(tx.ops);
for await (const op of ops) {
// const ops = mergeDBOperations(tx.ops)
for await (const op of tx.ops) {
if (op.type === 'saveBatch') {
await db.saveBatch(op.table, op.rows, opt);
await db.saveBatch(op.table, op.rows, { ...op.opt, ...opt });
}
else if (op.type === 'deleteByIds') {
await db.deleteByIds(op.table, op.ids, opt);
await db.deleteByIds(op.table, op.ids, { ...op.opt, ...opt });
}

@@ -76,0 +76,0 @@ else {

@@ -44,3 +44,3 @@ {

},
"version": "8.41.1",
"version": "8.42.0",
"description": "Lowest Common Denominator API to supported Databases",

@@ -47,0 +47,0 @@ "keywords": [

@@ -10,4 +10,5 @@ import { Readable } from 'stream'

import { CommonDB } from '../../common.db'
import { RunQueryResult } from '../../db.model'
import { CommonDBOptions, RunQueryResult } from '../../db.model'
import { DBQuery } from '../../query/dbQuery'
import { DBTransaction } from '../../transaction/dbTransaction'
import {

@@ -296,2 +297,7 @@ CacheDBCfg,

}
override async commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise<void> {
await this.cfg.downstreamDB.commitTransaction(tx, opt)
await this.cfg.cacheDB.commitTransaction(tx, opt)
}
}

@@ -9,3 +9,3 @@ import { CommonLogger, ObjectWithId } from '@naturalcycles/js-lib'

loadFile<ROW extends ObjectWithId>(table: string): Promise<ROW[]>
saveFiles(ops: DBSaveBatchOperation[]): Promise<void>
saveFiles(ops: DBSaveBatchOperation<any>[]): Promise<void>
}

@@ -12,0 +12,0 @@

@@ -15,4 +15,5 @@ import {

ObjectWithId,
AnyObjectWithId,
_assert,
_deepCopy,
_stringMapEntries,
} from '@naturalcycles/js-lib'

@@ -123,2 +124,4 @@ import { readableCreate, ReadableTyped } from '@naturalcycles/nodejs-lib'

const backup = _deepCopy(data)
// 2. Apply ops one by one (in order)

@@ -129,3 +132,8 @@ tx.ops.forEach(op => {

} else if (op.type === 'saveBatch') {
op.rows.forEach(r => (data[op.table]![r.id] = r))
op.rows.forEach(r => {
if (!r.id) {
throw new Error('FileDB: row has an empty id')
}
data[op.table]![r.id] = r
})
} else {

@@ -138,7 +146,7 @@ throw new Error(`DBOperation not supported: ${(op as any).type}`)

// Not filtering empty arrays, cause it's already filtered in this.saveFiles()
const ops: DBSaveBatchOperation[] = Object.keys(data).map(table => {
const ops: DBSaveBatchOperation[] = _stringMapEntries(data).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows(Object.values(data[table]!) as AnyObjectWithId[]),
rows: this.sortRows(_stringMapValues(map)),
}

@@ -148,3 +156,18 @@ })

// 4. Save all files
await this.saveFiles(ops)
try {
await this.saveFiles(ops)
} catch (err) {
const ops: DBSaveBatchOperation[] = _stringMapEntries(backup).map(([table, map]) => {
return {
type: 'saveBatch',
table,
rows: this.sortRows(_stringMapValues(map)),
}
})
// Rollback, ignore rollback error (if any)
await this.saveFiles(ops).catch(_ => {})
throw err
}
}

@@ -151,0 +174,0 @@

@@ -15,2 +15,3 @@ import { Readable } from 'stream'

CommonLogger,
_deepCopy,
} from '@naturalcycles/js-lib'

@@ -108,3 +109,3 @@ import {

const table = this.cfg.tablesPrefix + _table
this.cfg.logger?.log(`reset ${table}`)
this.cfg.logger!.log(`reset ${table}`)
this.data[table] = {}

@@ -115,3 +116,3 @@ } else {

})
this.cfg.logger?.log('reset')
this.cfg.logger!.log('reset')
}

@@ -167,3 +168,3 @@ }

if (!r.id) {
this.cfg.logger?.warn({ rows })
this.cfg.logger!.warn({ rows })
throw new Error(

@@ -241,10 +242,20 @@ `InMemoryDB doesn't support id auto-generation in saveBatch, row without id was given`,

async commitTransaction(tx: DBTransaction, opt?: CommonDBOptions): Promise<void> {
for await (const op of tx.ops) {
if (op.type === 'saveBatch') {
await this.saveBatch(op.table, op.rows, opt)
} else if (op.type === 'deleteByIds') {
await this.deleteByIds(op.table, op.ids, opt)
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
const backup = _deepCopy(this.data)
try {
for await (const op of tx.ops) {
if (op.type === 'saveBatch') {
await this.saveBatch(op.table, op.rows, { ...op.opt, ...opt })
} else if (op.type === 'deleteByIds') {
await this.deleteByIds(op.table, op.ids, { ...op.opt, ...opt })
} else {
throw new Error(`DBOperation not supported: ${(op as any).type}`)
}
}
} catch (err) {
// rollback
this.data = backup
this.cfg.logger!.log('InMemoryDB transaction rolled back')
throw err
}

@@ -285,3 +296,3 @@ }

this.cfg.logger?.log(
this.cfg.logger!.log(
`flushToDisk took ${dimGrey(_since(started))} to save ${yellow(tables)} tables`,

@@ -328,3 +339,3 @@ )

this.cfg.logger?.log(
this.cfg.logger!.log(
`restoreFromDisk took ${dimGrey(_since(started))} to read ${yellow(files.length)} tables`,

@@ -331,0 +342,0 @@ )

@@ -70,2 +70,3 @@ import { Readable } from 'stream'

* Naive implementation.
* Doesn't support rollback on error, hence doesn't pass dbTest.
* To be extended.

@@ -72,0 +73,0 @@ */

@@ -214,2 +214,13 @@ import { CommonLogger, ErrorMode, ObjectWithId, Saved } from '@naturalcycles/js-lib'

timeout?: number
/**
* If passed - operation will not be performed immediately, but instead "added" to the transaction.
* In the end - transaction needs to be committed (by calling `commit`).
* This API is inspired by Datastore API.
*
* Only applicable to save* and delete* operations
*
* @experimental
*/
tx?: boolean
}

@@ -216,0 +227,0 @@

@@ -38,4 +38,11 @@ import {

import { DBLibError } from '../cnst'
import { DBModelType, RunQueryResult } from '../db.model'
import {
DBDeleteByIdsOperation,
DBModelType,
DBOperation,
DBSaveBatchOperation,
RunQueryResult,
} from '../db.model'
import { DBQuery, RunnableDBQuery } from '../query/dbQuery'
import { DBTransaction } from '../transaction/dbTransaction'
import {

@@ -610,2 +617,53 @@ CommonDaoCfg,

tx = {
save: async (
bm: Unsaved<BM>,
opt: CommonDaoSaveOptions<DBM> = {},
): Promise<DBSaveBatchOperation> => {
const row: DBM = (await this.save(bm, { ...opt, tx: true })) as any
return {
type: 'saveBatch',
table: this.cfg.table,
rows: [row],
opt: {
excludeFromIndexes: this.cfg.excludeFromIndexes as any,
...opt,
},
}
},
saveBatch: async (
bms: Unsaved<BM>[],
opt: CommonDaoSaveOptions<DBM> = {},
): Promise<DBSaveBatchOperation> => {
const rows: DBM[] = (await this.saveBatch(bms, { ...opt, tx: true })) as any
return {
type: 'saveBatch',
table: this.cfg.table,
rows,
opt: {
excludeFromIndexes: this.cfg.excludeFromIndexes as any,
...opt,
},
}
},
deleteByIds: async (ids: ID[], opt: CommonDaoOptions = {}): Promise<DBDeleteByIdsOperation> => {
return {
type: 'deleteByIds',
table: this.cfg.table,
ids: ids as string[],
opt,
}
},
deleteById: async (id: ID, opt: CommonDaoOptions = {}): Promise<DBDeleteByIdsOperation> => {
return {
type: 'deleteByIds',
table: this.cfg.table,
ids: [id as string],
opt,
}
},
}
// SAVE

@@ -620,2 +678,7 @@ /**

const dbm = await this.bmToDBM(bm as BM, opt)
if (opt.tx) {
return dbm as any
}
const table = opt.table || this.cfg.table

@@ -715,2 +778,7 @@ if (opt.ensureUniqueId && idWasGenerated) await this.ensureUniqueId(table, dbm)

const dbms = await this.bmsToDBM(bms as BM[], opt)
if (opt.tx) {
return dbms as any
}
if (opt.ensureUniqueId) throw new AppError('ensureUniqueId is not supported in saveBatch')

@@ -731,3 +799,2 @@ if (this.cfg.immutable && !opt.allowMutability && !opt.saveMethod) {

const assignGeneratedIds = opt.assignGeneratedIds || this.cfg.assignGeneratedIds
await this.cfg.db.saveBatch(table, dbms, {

@@ -1074,6 +1141,10 @@ excludeFromIndexes,

// transaction(): DBTransaction {
// return this.cfg.db.transaction()
// }
async runInTransaction(ops: Promise<DBOperation>[]): Promise<void> {
if (!ops.length) return
const resolvedOps = await Promise.all(ops)
await this.cfg.db.commitTransaction(DBTransaction.create(resolvedOps))
}
protected logResult(started: number, op: string, res: any, table: string): void {

@@ -1080,0 +1151,0 @@ if (!this.cfg.logLevel) return

@@ -59,2 +59,3 @@ import { AnyObjectWithId, ObjectWithId } from '@naturalcycles/js-lib'

rows: ROW[]
opt?: CommonDBSaveOptions<ROW>
}

@@ -66,2 +67,3 @@

ids: string[]
opt?: CommonDBOptions
}

@@ -68,0 +70,0 @@

@@ -51,3 +51,3 @@ import { InMemoryDB, InMemoryDBCfg } from './adapter/inmemory/inMemory.db'

import { DBTransaction, RunnableDBTransaction } from './transaction/dbTransaction'
import { commitDBTransactionSimple, mergeDBOperations } from './transaction/dbTransaction.util'
import { commitDBTransactionSimple } from './transaction/dbTransaction.util'
export * from './kv/commonKeyValueDaoMemoCache'

@@ -108,5 +108,4 @@

RunnableDBTransaction,
mergeDBOperations,
commitDBTransactionSimple,
CommonKeyValueDao,
}

@@ -1,2 +0,2 @@

import { pDelay, _deepCopy, _pick, _sortBy } from '@naturalcycles/js-lib'
import { pDelay, _deepCopy, _pick, _sortBy, _omit, localTime } from '@naturalcycles/js-lib'
import { readableToArray, transformNoOp } from '@naturalcycles/nodejs-lib'

@@ -44,2 +44,3 @@ import { CommonDaoLogLevel } from '..'

nullValues = true,
transactions = true,
} = features

@@ -268,2 +269,54 @@

}
if (transactions) {
test('transaction happy path', async () => {
// cleanup
await dao.query().deleteByQuery()
// Test that id, created, updated are created
const now = localTime().unix()
await dao.runInTransaction([dao.tx.save(_omit(item1, ['id', 'created', 'updated']))])
const loaded = await dao.query().runQuery()
expect(loaded.length).toBe(1)
expect(loaded[0]!.id).toBeDefined()
expect(loaded[0]!.created).toBeGreaterThanOrEqual(now)
expect(loaded[0]!.updated).toBe(loaded[0]!.created)
await dao.runInTransaction([dao.tx.deleteById(loaded[0]!.id)])
// saveBatch [item1, 2, 3]
// save item3 with k1: k1_mod
// delete item2
// remaining: item1, item3_with_k1_mod
await dao.runInTransaction([
dao.tx.saveBatch(items),
dao.tx.save({ ...items[2]!, k1: 'k1_mod' }),
dao.tx.deleteById(items[1]!.id),
])
const rows = await dao.query().runQuery()
const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }]
expectMatch(expected, rows, quirks)
})
test('transaction rollback', async () => {
await expect(
dao.runInTransaction([
dao.tx.deleteById(items[2]!.id),
dao.tx.save({ ...items[0]!, k1: 5 as any }), // it should fail here
]),
).rejects.toThrow()
const rows = await dao.query().runQuery()
const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }]
expectMatch(expected, rows, quirks)
})
if (querying) {
test('transaction cleanup', async () => {
await dao.query().deleteByQuery()
})
}
}
}

@@ -5,2 +5,3 @@ import { pDelay, pMap, _filterObject, _pick, _sortBy } from '@naturalcycles/js-lib'

import { DBQuery } from '../query/dbQuery'
import { DBTransaction } from '../transaction/dbTransaction'
import {

@@ -47,2 +48,4 @@ createTestItemDBM,

documentDB?: boolean
transactions?: boolean
}

@@ -94,2 +97,3 @@

documentDB = true,
transactions = true,
} = features

@@ -196,2 +200,6 @@

test('saveBatch should throw on null id', async () => {
await expect(db.saveBatch(TEST_TABLE, [{ ...item1, id: null as any }])).rejects.toThrow()
})
if (insert) {

@@ -297,3 +305,3 @@ test('saveBatch INSERT method should throw', async () => {

const tables = await db.getTables()
console.log({ tables })
// console.log({ tables })

@@ -303,3 +311,3 @@ if (tableSchemas) {

const schema = await db.getTableSchema(table)
console.log(schema)
// console.log(schema)
expect(schema.$id).toBe(`${table}.schema.json`)

@@ -336,10 +344,10 @@ })

const b1Loaded = loaded!.b1!
console.log({
b11: typeof b1,
b12: typeof b1Loaded,
l1: b1.length,
l2: b1Loaded.length,
b1,
b1Loaded,
})
// console.log({
// b11: typeof b1,
// b12: typeof b1Loaded,
// l1: b1.length,
// l2: b1Loaded.length,
// b1,
// b1Loaded,
// })
expect(b1Loaded).toEqual(b1)

@@ -350,2 +358,37 @@ expect(b1Loaded.toString()).toBe(s)

if (transactions) {
test('transaction happy path', async () => {
// cleanup
await db.deleteByQuery(queryAll())
// saveBatch [item1, 2, 3]
// save item3 with k1: k1_mod
// delete item2
// remaining: item1, item3_with_k1_mod
const tx = DBTransaction.create()
.saveBatch(TEST_TABLE, items)
.save(TEST_TABLE, { ...items[2]!, k1: 'k1_mod' })
.deleteById(TEST_TABLE, items[1]!.id)
await db.commitTransaction(tx)
const { rows } = await db.runQuery(queryAll())
const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }]
expectMatch(expected, rows, quirks)
})
test('transaction rollback', async () => {
// It should fail on id == null
const tx = DBTransaction.create()
.deleteById(TEST_TABLE, items[2]!.id)
.save(TEST_TABLE, { ...items[0]!, k1: 5, id: null as any })
await expect(db.commitTransaction(tx)).rejects.toThrow()
const { rows } = await db.runQuery(queryAll())
const expected = [items[0], { ...items[2]!, k1: 'k1_mod' }]
expectMatch(expected, rows, quirks)
})
}
if (querying) {

@@ -352,0 +395,0 @@ test('cleanup', async () => {

@@ -56,3 +56,3 @@ import { ObjectWithId } from '@naturalcycles/js-lib'

const tx = new DBTransaction()
const tx = DBTransaction.create()

@@ -59,0 +59,0 @@ ops.forEach(op => {

@@ -9,4 +9,20 @@ import { AnyObjectWithId, ObjectWithId } from '@naturalcycles/js-lib'

export class DBTransaction {
public ops: DBOperation[] = []
protected constructor(public ops: DBOperation[] = []) {}
/**
* Convenience method.
*/
static create(ops: DBOperation[] = []): DBTransaction {
return new DBTransaction(ops)
}
save<ROW extends ObjectWithId = AnyObjectWithId>(table: string, row: ROW): this {
this.ops.push({
type: 'saveBatch',
table,
rows: [row],
})
return this
}
saveBatch<ROW extends ObjectWithId = AnyObjectWithId>(table: string, rows: ROW[]): this {

@@ -21,2 +37,11 @@ this.ops.push({

deleteById(table: string, id: string): this {
this.ops.push({
type: 'deleteByIds',
table,
ids: [id],
})
return this
}
deleteByIds(table: string, ids: string[]): this {

@@ -23,0 +48,0 @@ this.ops.push({

@@ -1,4 +0,3 @@

import { StringMap, _stringMapEntries, ObjectWithId } from '@naturalcycles/js-lib'
import type { CommonDB } from '../common.db'
import { CommonDBSaveOptions, DBOperation } from '../db.model'
import { CommonDBSaveOptions } from '../db.model'
import { DBTransaction } from './dbTransaction'

@@ -9,6 +8,9 @@

* E.g if you save id1 first and then delete it - this function will turn it into a no-op (self-eliminate).
* UPD: actually, it will only keep delete, but remove previous ops.
*
* Currently only takes into account SaveBatch and DeleteByIds ops.
* Output ops are maximum 2 (per table) - save and delete (where order actually doesn't matter, cause ids there will not overlap).
* Output ops are maximum 1 per entity - save or delete.
*/
// Commented out as "overly complicated"
/*
export function mergeDBOperations(ops: DBOperation[]): DBOperation[] {

@@ -19,12 +21,12 @@ if (ops.length <= 1) return ops // nothing to optimize there

// saveMap[table][id] => row
const saveMapByTable: StringMap<StringMap<ObjectWithId | null>> = {}
const data: StringMap<StringMap<ObjectWithId | null>> = {}
// Merge ops using `saveMap`
ops.forEach(op => {
saveMapByTable[op.table] = saveMapByTable[op.table] || {}
data[op.table] ||= {}
if (op.type === 'saveBatch') {
op.rows.forEach(r => (saveMapByTable[op.table]![r.id] = r))
op.rows.forEach(r => (data[op.table]![r.id] = r))
} else if (op.type === 'deleteByIds') {
op.ids.forEach(id => (saveMapByTable[op.table]![id] = null))
op.ids.forEach(id => (data[op.table]![id] = null))
} else {

@@ -37,28 +39,21 @@ throw new Error(`DBOperation not supported: ${(op as any).type}`)

_stringMapEntries(saveMapByTable).forEach(([table, saveMap]) => {
const rowsToSave: ObjectWithId[] = []
const idsToDelete: string[] = []
_stringMapEntries(data).forEach(([table, map]) => {
const saveOp: DBSaveBatchOperation = {
type: 'saveBatch',
table,
rows: _stringMapValues(map).filter(_isTruthy),
}
_stringMapEntries(saveMap).forEach(([id, r]) => {
if (r === null) {
idsToDelete.push(id)
} else {
rowsToSave.push(r)
}
})
if (saveOp.rows.length) {
resultOps.push(saveOp)
}
if (rowsToSave.length) {
resultOps.push({
type: 'saveBatch',
table,
rows: rowsToSave,
})
const deleteOp: DBDeleteByIdsOperation = {
type: 'deleteByIds',
table,
ids: _stringMapEntries(map).filter(([id, row]) => row === null).map(([id]) => id),
}
if (idsToDelete.length) {
resultOps.push({
type: 'deleteByIds',
table,
ids: idsToDelete,
})
if (deleteOp.ids.length) {
resultOps.push(deleteOp)
}

@@ -69,2 +64,3 @@ })

}
*/

@@ -81,9 +77,9 @@ /**

): Promise<void> {
const ops = mergeDBOperations(tx.ops)
// const ops = mergeDBOperations(tx.ops)
for await (const op of ops) {
for await (const op of tx.ops) {
if (op.type === 'saveBatch') {
await db.saveBatch(op.table, op.rows, opt)
await db.saveBatch(op.table, op.rows, { ...op.opt, ...opt })
} else if (op.type === 'deleteByIds') {
await db.deleteByIds(op.table, op.ids, opt)
await db.deleteByIds(op.table, op.ids, { ...op.opt, ...opt })
} else {

@@ -90,0 +86,0 @@ throw new Error(`DBOperation not supported: ${(op as any).type}`)

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc