@naturalcycles/db-lib
Advanced tools
Comparing version 8.22.0 to 8.23.0
@@ -56,3 +56,3 @@ "use strict"; | ||
return { | ||
...(0, js_lib_1.generateJsonSchemaFromData)(Object.values(this.data[table] || {})), | ||
...(0, js_lib_1.generateJsonSchemaFromData)((0, js_lib_1._stringMapValues)(this.data[table] || {})), | ||
$id: `${table}.schema.json`, | ||
@@ -59,0 +59,0 @@ }; |
@@ -107,3 +107,10 @@ import { AsyncMapper, JsonSchemaObject, JsonSchemaRootObject, Saved, ObjectWithId } from '@naturalcycles/js-lib'; | ||
deleteByIds(ids: string[], opt?: CommonDaoOptions): Promise<number>; | ||
deleteByQuery(q: DBQuery<DBM>, opt?: CommonDaoOptions): Promise<number>; | ||
/** | ||
* Pass `stream: true` option to use Streaming: it will Stream the query, batch by 500, and execute | ||
* `deleteByIds` for each batch concurrently (infinite concurrency). | ||
* This is expected to be more memory-efficient way of deleting big numbers of rows. | ||
*/ | ||
deleteByQuery(q: DBQuery<DBM>, opt?: CommonDaoStreamForEachOptions<DBM> & { | ||
stream?: boolean; | ||
}): Promise<number>; | ||
dbmToBM(_dbm: undefined, opt?: CommonDaoOptions): Promise<undefined>; | ||
@@ -110,0 +117,0 @@ dbmToBM(_dbm?: DBM, opt?: CommonDaoOptions): Promise<Saved<BM>>; |
@@ -539,2 +539,7 @@ "use strict"; | ||
} | ||
/** | ||
* Pass `stream: true` option to use Streaming: it will Stream the query, batch by 500, and execute | ||
* `deleteByIds` for each batch concurrently (infinite concurrency). | ||
* This is expected to be more memory-efficient way of deleting big numbers of rows. | ||
*/ | ||
async deleteByQuery(q, opt = {}) { | ||
@@ -545,5 +550,31 @@ this.requireWriteAccess(); | ||
const started = this.logStarted(op, q.table); | ||
const ids = await this.cfg.db.deleteByQuery(q, opt); | ||
let deleted = 0; | ||
if (opt.stream) { | ||
const batchSize = 500; | ||
await (0, nodejs_lib_1._pipeline)([ | ||
this.cfg.db.streamQuery(q.select(['id']), opt), | ||
(0, nodejs_lib_1.transformMapSimple)(objectWithId => objectWithId.id, { | ||
errorMode: js_lib_1.ErrorMode.SUPPRESS, | ||
}), | ||
(0, nodejs_lib_1.transformBuffer)({ batchSize }), | ||
(0, nodejs_lib_1.transformMap)(async (ids) => { | ||
deleted += await this.cfg.db.deleteByIds(q.table, ids, opt); | ||
}, { | ||
predicate: js_lib_1._passthroughPredicate, | ||
}), | ||
// LogProgress should be AFTER the mapper, to be able to report correct stats | ||
(0, nodejs_lib_1.transformLogProgress)({ | ||
metric: q.table, | ||
logEvery: 2, | ||
batchSize, | ||
...opt, | ||
}), | ||
(0, nodejs_lib_1.writableVoid)(), | ||
]); | ||
} | ||
else { | ||
deleted = await this.cfg.db.deleteByQuery(q, opt); | ||
} | ||
this.logSaveResult(started, op, q.table); | ||
return ids; | ||
return deleted; | ||
} | ||
@@ -550,0 +581,0 @@ async dbmToBM(_dbm, opt = {}) { |
@@ -102,3 +102,5 @@ import { AnyObjectWithId, ObjectWithId, AsyncMapper, Saved } from '@naturalcycles/js-lib'; | ||
streamQueryIdsForEach(mapper: AsyncMapper<string, void>, opt?: CommonDaoStreamForEachOptions<string>): Promise<void>; | ||
deleteByQuery(opt?: CommonDaoOptions): Promise<number>; | ||
deleteByQuery(opt?: CommonDaoStreamForEachOptions<DBM> & { | ||
stream?: boolean; | ||
}): Promise<number>; | ||
} |
@@ -43,5 +43,5 @@ { | ||
"engines": { | ||
"node": ">=12.13" | ||
"node": ">=14.15" | ||
}, | ||
"version": "8.22.0", | ||
"version": "8.23.0", | ||
"description": "Lowest Common Denominator API to supported Databases", | ||
@@ -48,0 +48,0 @@ "keywords": [ |
import { Readable } from 'stream' | ||
import { JsonSchemaObject, JsonSchemaRootObject, ObjectWithId } from '@naturalcycles/js-lib' | ||
import { | ||
JsonSchemaObject, | ||
JsonSchemaRootObject, | ||
ObjectWithId, | ||
StringMap, | ||
} from '@naturalcycles/js-lib' | ||
import { Debug, IDebugger } from '@naturalcycles/nodejs-lib' | ||
@@ -71,3 +76,3 @@ import { BaseCommonDB } from '../../base.common.db' | ||
): Promise<ROW[]> { | ||
const resultMap: Record<string, ROW> = {} | ||
const resultMap: StringMap<ROW> = {} | ||
const missingIds: string[] = [] | ||
@@ -74,0 +79,0 @@ |
@@ -13,2 +13,3 @@ import { Readable } from 'stream' | ||
ObjectWithId, | ||
_stringMapValues, | ||
} from '@naturalcycles/js-lib' | ||
@@ -122,3 +123,3 @@ import { | ||
return { | ||
...generateJsonSchemaFromData(Object.values(this.data[table] || {})), | ||
...generateJsonSchemaFromData(_stringMapValues(this.data[table] || {})), | ||
$id: `${table}.schema.json`, | ||
@@ -125,0 +126,0 @@ } |
@@ -33,2 +33,3 @@ import { | ||
_pipeline, | ||
transformBuffer, | ||
} from '@naturalcycles/nodejs-lib' | ||
@@ -723,3 +724,11 @@ import { DBLibError } from '../cnst' | ||
async deleteByQuery(q: DBQuery<DBM>, opt: CommonDaoOptions = {}): Promise<number> { | ||
/** | ||
* Pass `stream: true` option to use Streaming: it will Stream the query, batch by 500, and execute | ||
* `deleteByIds` for each batch concurrently (infinite concurrency). | ||
* This is expected to be more memory-efficient way of deleting big numbers of rows. | ||
*/ | ||
async deleteByQuery( | ||
q: DBQuery<DBM>, | ||
opt: CommonDaoStreamForEachOptions<DBM> & { stream?: boolean } = {}, | ||
): Promise<number> { | ||
this.requireWriteAccess() | ||
@@ -729,5 +738,36 @@ q.table = opt.table || q.table | ||
const started = this.logStarted(op, q.table) | ||
const ids = await this.cfg.db.deleteByQuery(q, opt) | ||
let deleted = 0 | ||
if (opt.stream) { | ||
const batchSize = 500 | ||
await _pipeline([ | ||
this.cfg.db.streamQuery<DBM>(q.select(['id']), opt), | ||
transformMapSimple<ObjectWithId, string>(objectWithId => objectWithId.id, { | ||
errorMode: ErrorMode.SUPPRESS, | ||
}), | ||
transformBuffer<string>({ batchSize }), | ||
transformMap<string[], void>( | ||
async ids => { | ||
deleted += await this.cfg.db.deleteByIds(q.table, ids, opt) | ||
}, | ||
{ | ||
predicate: _passthroughPredicate, | ||
}, | ||
), | ||
// LogProgress should be AFTER the mapper, to be able to report correct stats | ||
transformLogProgress({ | ||
metric: q.table, | ||
logEvery: 2, // 500 * 2 === 1000 | ||
batchSize, | ||
...opt, | ||
}), | ||
writableVoid(), | ||
]) | ||
} else { | ||
deleted = await this.cfg.db.deleteByQuery(q, opt) | ||
} | ||
this.logSaveResult(started, op, q.table) | ||
return ids | ||
return deleted | ||
} | ||
@@ -734,0 +774,0 @@ |
@@ -275,5 +275,7 @@ import { AnyObjectWithId, ObjectWithId, AsyncMapper, _truncate, Saved } from '@naturalcycles/js-lib' | ||
async deleteByQuery(opt?: CommonDaoOptions): Promise<number> { | ||
async deleteByQuery( | ||
opt?: CommonDaoStreamForEachOptions<DBM> & { stream?: boolean }, | ||
): Promise<number> { | ||
return await this.dao.deleteByQuery(this, opt) | ||
} | ||
} |
9187
360036
125