@naturalcycles/db-lib
Advanced tools
Comparing version 2.1.0 to 2.2.0
@@ -0,1 +1,8 @@ | ||
# [2.2.0](https://github.com/NaturalCycles/db-lib/compare/v2.1.0...v2.2.0) (2019-10-31) | ||
### Features | ||
* CommonDao.streamQueryAsReadable ([dd961d0](https://github.com/NaturalCycles/db-lib/commit/dd961d038b329818ebd00eb450bb430b7b0a112f)) | ||
# [2.1.0](https://github.com/NaturalCycles/db-lib/compare/v2.0.2...v2.1.0) (2019-10-20) | ||
@@ -2,0 +9,0 @@ |
@@ -1,2 +0,2 @@ | ||
import { JoiValidationError, ObjectSchemaTyped } from '@naturalcycles/nodejs-lib'; | ||
import { JoiValidationError, ObjectSchemaTyped, ReadableTyped } from '@naturalcycles/nodejs-lib'; | ||
import { Observable } from 'rxjs'; | ||
@@ -105,2 +105,6 @@ import { CommonDB } from './common.db'; | ||
streamQuery<IN = Saved<BM>, OUT = IN>(q: DBQuery<BM, DBM, TM>, opt?: CommonDaoStreamOptions<IN, OUT>): Observable<OUT>; | ||
/** | ||
* Stream as Readable, to be able to .pipe() it further with support of backpressure. | ||
*/ | ||
streamQueryAsReadable<IN = Saved<BM>, OUT = IN>(q: DBQuery<BM, DBM, TM>, opt?: CommonDaoStreamOptions<IN, OUT>): ReadableTyped<OUT>; | ||
streamQueryAsDBM<IN = DBM, OUT = IN>(q: DBQuery<BM, DBM, TM>, opt?: CommonDaoStreamOptions<IN, OUT>): Observable<OUT>; | ||
@@ -107,0 +111,0 @@ queryIds(q: DBQuery<BM, DBM>, opt?: CommonDaoOptions): Promise<string[]>; |
@@ -216,18 +216,6 @@ "use strict"; | ||
streamQuery(q, opt) { | ||
const op = `streamQuery(${q.pretty()})`; | ||
const started = this.logStarted(op, true); | ||
const partialQuery = !!q._selectedFieldNames; | ||
let stream = this.cfg.db.streamQuery(q, opt); | ||
if (!partialQuery) { | ||
stream = stream.pipe(new stream_1.Transform({ | ||
objectMode: true, | ||
transform: async (dbm, _encoding, cb) => { | ||
const bm = await this.dbmToBM(dbm, opt); | ||
cb(null, bm); | ||
}, | ||
})); | ||
} | ||
const stream = this.streamQueryAsReadable(q, opt); | ||
const res = nodejs_lib_1.streamToObservable(stream, opt); | ||
if (this.cfg.logLevel >= CommonDaoLogLevel.OPERATIONS) { | ||
log(`<< ${this.cfg.table}.${op}: done in ${time_lib_1.since(started)}`); | ||
// log(`<< ${this.cfg.table}.${op}: done in ${since(started)}`) | ||
// todo: rethink if we need to count results here | ||
@@ -243,2 +231,21 @@ // void obs | ||
} | ||
/** | ||
* Stream as Readable, to be able to .pipe() it further with support of backpressure. | ||
*/ | ||
streamQueryAsReadable(q, opt) { | ||
const op = `streamQuery(${q.pretty()})`; | ||
const _started = this.logStarted(op, true); | ||
const partialQuery = !!q._selectedFieldNames; | ||
let stream = this.cfg.db.streamQuery(q, opt); | ||
if (!partialQuery) { | ||
stream = stream.pipe(new stream_1.Transform({ | ||
objectMode: true, | ||
transform: async (dbm, _encoding, cb) => { | ||
const bm = await this.dbmToBM(dbm, opt).catch(err => cb(err)); | ||
cb(null, bm); | ||
}, | ||
})); | ||
} | ||
return stream; | ||
} | ||
streamQueryAsDBM(q, opt) { | ||
@@ -245,0 +252,0 @@ const op = `streamQueryAsDBM(${q.pretty()})`; |
@@ -43,3 +43,3 @@ import { StreamToObservableOptions } from '@naturalcycles/nodejs-lib'; | ||
} | ||
export interface RunQueryResult<T> extends Record<string, any> { | ||
export interface RunQueryResult<T> { | ||
records: T[]; | ||
@@ -46,0 +46,0 @@ endCursor?: string; |
@@ -0,1 +1,2 @@ | ||
import { ReadableTyped } from '@naturalcycles/nodejs-lib'; | ||
import { Observable } from 'rxjs'; | ||
@@ -64,2 +65,3 @@ import { CommonDao } from './common.dao'; | ||
streamQueryAsDBM<IN = DBM, OUT = IN>(opt?: CommonDaoStreamOptions<IN, OUT>): Observable<OUT>; | ||
streamQueryAsReadable<IN = Saved<BM>, OUT = IN>(opt?: CommonDaoStreamOptions<IN, OUT>): ReadableTyped<OUT>; | ||
queryIds(opt?: CommonDaoOptions): Promise<string[]>; | ||
@@ -66,0 +68,0 @@ streamQueryIds<OUT = string>(opt?: CommonDaoStreamOptions<string, OUT>): Observable<OUT>; |
@@ -138,2 +138,5 @@ "use strict"; | ||
} | ||
streamQueryAsReadable(opt) { | ||
return this.dao.streamQueryAsReadable(this, opt); | ||
} | ||
async queryIds(opt) { | ||
@@ -140,0 +143,0 @@ return await this.dao.queryIds(this, opt); |
@@ -13,3 +13,3 @@ { | ||
"devDependencies": { | ||
"@naturalcycles/dev-lib": "^8.0.0", | ||
"@naturalcycles/dev-lib": "^9.0.1", | ||
"@types/node": "^12.0.4", | ||
@@ -41,3 +41,3 @@ "jest": "^24.8.0" | ||
}, | ||
"version": "2.1.0", | ||
"version": "2.2.0", | ||
"description": "Lowest Common Denominator API to supported Databases", | ||
@@ -44,0 +44,0 @@ "keywords": [ |
@@ -305,4 +305,28 @@ import { _truncate } from '@naturalcycles/js-lib' | ||
): Observable<OUT> { | ||
const stream = this.streamQueryAsReadable<IN, OUT>(q, opt) | ||
const res = streamToObservable<IN, OUT>(stream, opt) | ||
if (this.cfg.logLevel! >= CommonDaoLogLevel.OPERATIONS) { | ||
// log(`<< ${this.cfg.table}.${op}: done in ${since(started)}`) | ||
// todo: rethink if we need to count results here | ||
// void obs | ||
// .pipe(count()) | ||
// .toPromise() | ||
// .then(num => { | ||
// log(`<< ${this.cfg.table}.${op}: ${num} row(s) in ${since(started)}`) | ||
// }) | ||
} | ||
return res | ||
} | ||
/** | ||
* Stream as Readable, to be able to .pipe() it further with support of backpressure. | ||
*/ | ||
streamQueryAsReadable<IN = Saved<BM>, OUT = IN>( | ||
q: DBQuery<BM, DBM, TM>, | ||
opt?: CommonDaoStreamOptions<IN, OUT>, | ||
): ReadableTyped<OUT> { | ||
const op = `streamQuery(${q.pretty()})` | ||
const started = this.logStarted(op, true) | ||
const _started = this.logStarted(op, true) | ||
const partialQuery = !!q._selectedFieldNames | ||
@@ -316,3 +340,3 @@ | ||
transform: async (dbm, _encoding, cb) => { | ||
const bm = await this.dbmToBM(dbm, opt) | ||
const bm = await this.dbmToBM(dbm, opt).catch(err => cb(err)) | ||
cb(null, bm) | ||
@@ -324,15 +348,3 @@ }, | ||
const res = streamToObservable<IN, OUT>(stream, opt) | ||
if (this.cfg.logLevel! >= CommonDaoLogLevel.OPERATIONS) { | ||
log(`<< ${this.cfg.table}.${op}: done in ${since(started)}`) | ||
// todo: rethink if we need to count results here | ||
// void obs | ||
// .pipe(count()) | ||
// .toPromise() | ||
// .then(num => { | ||
// log(`<< ${this.cfg.table}.${op}: ${num} row(s) in ${since(started)}`) | ||
// }) | ||
} | ||
return res | ||
return stream | ||
} | ||
@@ -339,0 +351,0 @@ |
@@ -57,3 +57,3 @@ import { | ||
export interface RunQueryResult<T> extends Record<string, any> { | ||
export interface RunQueryResult<T> { | ||
records: T[] | ||
@@ -60,0 +60,0 @@ endCursor?: string |
import { _truncate } from '@naturalcycles/js-lib' | ||
import { ReadableTyped } from '@naturalcycles/nodejs-lib' | ||
import { Observable } from 'rxjs' | ||
@@ -203,2 +204,8 @@ import { CommonDao } from './common.dao' | ||
streamQueryAsReadable<IN = Saved<BM>, OUT = IN>( | ||
opt?: CommonDaoStreamOptions<IN, OUT>, | ||
): ReadableTyped<OUT> { | ||
return this.dao.streamQueryAsReadable(this, opt) | ||
} | ||
async queryIds(opt?: CommonDaoOptions): Promise<string[]> { | ||
@@ -205,0 +212,0 @@ return await this.dao.queryIds(this, opt) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
208425
3937