@contember/database
Advanced tools
Comparing version 1.2.0-alpha.3 to 1.2.0-alpha.4
@@ -13,3 +13,5 @@ import { DeleteBuilder, InsertBuilder, SelectBuilder, UpdateBuilder } from '../builders'; | ||
forSchema(schema: string): Client<ConnectionType>; | ||
scope<T>(callback: (wrapper: Client<Connection.ConnectionLike>) => Promise<T> | T): Promise<T>; | ||
transaction<T>(transactionScope: (wrapper: Client<Connection.TransactionLike>) => Promise<T> | T): Promise<T>; | ||
locked<T>(lock: number, callback: (wrapper: Client<Connection.ConnectionLike>) => Promise<T> | T): Promise<T>; | ||
selectBuilder<Result = SelectBuilder.Result>(): SelectBuilder<Result>; | ||
@@ -16,0 +18,0 @@ insertBuilder(): InsertBuilder<InsertBuilder.AffectedRows>; |
@@ -8,2 +8,3 @@ "use strict"; | ||
const queryable_2 = require("@contember/queryable"); | ||
const utils_1 = require("../utils"); | ||
class Client { | ||
@@ -20,5 +21,11 @@ constructor(connection, schema, queryMeta, eventManager = new EventManager_1.EventManager(connection.eventManager)) { | ||
} | ||
async scope(callback) { | ||
return await this.connection.scope(connection => callback(new Client(connection, this.schema, this.queryMeta, new EventManager_1.EventManager(connection.eventManager))), { eventManager: this.eventManager }); | ||
} | ||
async transaction(transactionScope) { | ||
return await this.connection.transaction(transaction => transactionScope(new Client(transaction, this.schema, this.queryMeta, new EventManager_1.EventManager(transaction.eventManager))), { eventManager: this.eventManager }); | ||
} | ||
async locked(lock, callback) { | ||
return await this.scope(client => (0, utils_1.withDatabaseAdvisoryLock)(client.connection, lock, () => callback(client))); | ||
} | ||
selectBuilder() { | ||
@@ -25,0 +32,0 @@ return builders_1.SelectBuilder.create(); |
@@ -13,9 +13,13 @@ import { EventManager } from './EventManager'; | ||
}, queryConfig?: Connection.QueryConfig): Connection; | ||
static createSingle(config: DatabaseConfig, queryConfig?: Connection.QueryConfig): Connection; | ||
createClient(schema: string, queryMeta: Record<string, any>): Client; | ||
clearPool(): Promise<void>; | ||
scope<Result>(callback: (connection: Connection.ConnectionLike) => Promise<Result> | Result, options?: { | ||
eventManager?: EventManager; | ||
}): Promise<Result>; | ||
transaction<Result>(callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, options?: { | ||
eventManager?: EventManager; | ||
}): Promise<Result>; | ||
query<Row extends Record<string, any>>(sql: string, parameters?: any[], meta?: Record<string, any>, { eventManager, ...config }?: Connection.QueryConfig): Promise<Connection.Result<Row>>; | ||
end(): Promise<void>; | ||
query<Row extends Record<string, any>>(sql: string, parameters?: any[], meta?: Record<string, any>, { eventManager, ...config }?: Connection.QueryConfig): Promise<Connection.Result<Row>>; | ||
clearPool(): Promise<void>; | ||
getPoolStatus(): PoolStatus; | ||
@@ -38,4 +42,9 @@ } | ||
type ConnectionType = Connection.ConnectionLike & Connection.ClientFactory & Connection.PoolStatusProvider; | ||
interface ConnectionLike extends Transactional, Queryable { | ||
interface Scopable<Sub> { | ||
scope<Result>(callback: (connection: Sub) => Promise<Result> | Result, options?: { | ||
eventManager?: EventManager; | ||
}): Promise<Result>; | ||
} | ||
interface ConnectionLike extends Transactional, Queryable, Scopable<ConnectionLike> { | ||
} | ||
interface ClientFactory { | ||
@@ -47,3 +56,3 @@ createClient(schema: string, queryMeta: Record<string, any>): Client; | ||
} | ||
interface TransactionLike extends ConnectionLike { | ||
interface TransactionLike extends Transactional, Queryable, Scopable<TransactionLike> { | ||
readonly isClosed: boolean; | ||
@@ -53,5 +62,2 @@ rollback(): Promise<void>; | ||
} | ||
interface QueryContext { | ||
previousQueryEnd?: number; | ||
} | ||
interface Query { | ||
@@ -66,4 +72,5 @@ readonly sql: string; | ||
readonly timing?: { | ||
selfDuration: number; | ||
/** @deprecated both selfDuration and totalDuration now contains same number */ | ||
totalDuration: number; | ||
selfDuration: number; | ||
}; | ||
@@ -70,0 +77,0 @@ } |
@@ -6,6 +6,5 @@ "use strict"; | ||
const Client_1 = require("./Client"); | ||
const Transaction_1 = require("./Transaction"); | ||
const execution_1 = require("./execution"); | ||
const Pool_1 = require("./Pool"); | ||
const utils_1 = require("../utils"); | ||
const AcquiredConnection_1 = require("./AcquiredConnection"); | ||
class Connection { | ||
@@ -25,20 +24,15 @@ constructor(pool, queryConfig, eventManager = new EventManager_1.EventManager(null)) { | ||
} | ||
static createSingle(config, queryConfig = {}) { | ||
return new Connection(new Pool_1.Pool((0, utils_1.createPgClientFactory)(config), { maxConnections: 1, maxIdle: 0 }), queryConfig); | ||
} | ||
createClient(schema, queryMeta) { | ||
return new Client_1.Client(this, schema, queryMeta, new EventManager_1.EventManager(this.eventManager)); | ||
} | ||
async clearPool() { | ||
await this.pool.closeIdle(); | ||
} | ||
async transaction(callback, options = {}) { | ||
async scope(callback, options = {}) { | ||
var _a; | ||
const acquired = await this.pool.acquire(); | ||
const eventManager = new EventManager_1.EventManager((_a = options.eventManager) !== null && _a !== void 0 ? _a : this.eventManager); | ||
await (0, execution_1.executeQuery)(acquired.client, eventManager, { | ||
sql: 'BEGIN', | ||
...this.queryConfig, | ||
}); | ||
const transaction = new Transaction_1.Transaction(acquired.client, eventManager, this.queryConfig); | ||
try { | ||
const result = await callback(transaction); | ||
await transaction.commitUnclosed(); | ||
const connection = new AcquiredConnection_1.AcquiredConnection(acquired.client, eventManager, this.queryConfig); | ||
const result = await callback(connection); | ||
this.pool.release(acquired); | ||
@@ -48,3 +42,2 @@ return result; | ||
catch (e) { | ||
await transaction.rollbackUnclosed(); | ||
this.pool.dispose(acquired); | ||
@@ -54,17 +47,17 @@ throw e; | ||
} | ||
async transaction(callback, options = {}) { | ||
return await this.scope(async (connection) => { | ||
return await connection.transaction(callback); | ||
}, options); | ||
} | ||
async query(sql, parameters = [], meta = {}, { eventManager, ...config } = {}) { | ||
return await this.scope(async (connection) => { | ||
return await connection.query(sql, parameters, meta, config); | ||
}, { eventManager }); | ||
} | ||
async end() { | ||
await this.pool.end(); | ||
} | ||
async query(sql, parameters = [], meta = {}, { eventManager, ...config } = {}) { | ||
const client = await this.pool.acquire(); | ||
const query = { sql, parameters, meta, ...this.queryConfig, ...config }; | ||
try { | ||
const result = await (0, execution_1.executeQuery)(client.client, eventManager !== null && eventManager !== void 0 ? eventManager : this.eventManager, query, {}); | ||
this.pool.release(client); | ||
return result; | ||
} | ||
catch (e) { | ||
this.pool.dispose(client); | ||
throw e; | ||
} | ||
async clearPool() { | ||
await this.pool.closeIdle(); | ||
} | ||
@@ -71,0 +64,0 @@ getPoolStatus() { |
@@ -8,3 +8,2 @@ export * from './Client'; | ||
export * from './Transaction'; | ||
export * from './SingleConnection'; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -24,3 +24,2 @@ "use strict"; | ||
__exportStar(require("./Transaction"), exports); | ||
__exportStar(require("./SingleConnection"), exports); | ||
//# sourceMappingURL=index.js.map |
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
import { Client as PgClient } from 'pg'; | ||
import EventEmitter from 'events'; | ||
import { DatabaseError } from './errors'; | ||
import { PgClientFactory } from '../utils'; | ||
import { PgClient } from './PgClient'; | ||
export declare type PoolLogger = (message: string, status: PoolStatus) => void; | ||
@@ -59,2 +59,3 @@ export declare type PoolConfig = Partial<PoolConfigInternal>; | ||
item_timeout_count: string; | ||
item_rejected_count: string; | ||
item_resolved_new_ms_count: string; | ||
@@ -61,0 +62,0 @@ item_resolved_new_ms_sum: string; |
@@ -25,2 +25,3 @@ "use strict"; | ||
item_timeout_count: 'Number of items waiting for connection, which timed out.', | ||
item_rejected_count: 'Number of items rejected with an error.', | ||
item_resolved_new_ms_count: 'Number of queued items resolved with newly established connection.', | ||
@@ -87,2 +88,3 @@ item_resolved_new_ms_sum: 'Waiting time of queued items, which was resolved with a new connection.', | ||
item_timeout_count: 0, | ||
item_rejected_count: 0, | ||
item_resolved_new_ms_count: 0, | ||
@@ -251,4 +253,14 @@ item_resolved_new_ms_sum: 0, | ||
this.connectingCount--; | ||
this.log('Connecting failed, emitting error'); | ||
this.emit('error', e); | ||
if (this.active.size === 0 && this.queue.length > 0) { | ||
const pendingItem = this.queue.shift(); | ||
if (!pendingItem) | ||
throw new Error(); | ||
this.log('Connecting failed, rejecting pending item'); | ||
pendingItem.reject(new errors_1.ClientError(e)); | ||
this.poolStats.item_rejected_count++; | ||
} | ||
else { | ||
this.log('Connecting failed, emitting error'); | ||
this.emit('error', e); | ||
} | ||
} | ||
@@ -255,0 +267,0 @@ return; |
import { Connection } from './Connection'; | ||
import { EventManager } from './EventManager'; | ||
import { ClientBase } from 'pg'; | ||
export declare class Transaction implements Connection.TransactionLike { | ||
private readonly pgClient; | ||
readonly eventManager: EventManager; | ||
private readonly config; | ||
private _isClosed; | ||
private queryContext; | ||
private savepointCounter; | ||
private readonly connection; | ||
private readonly savepointManager; | ||
private readonly state; | ||
get isClosed(): boolean; | ||
constructor(pgClient: ClientBase, eventManager: EventManager, config: Connection.QueryConfig); | ||
constructor(connection: Connection.ConnectionLike, savepointManager?: SavepointState, state?: TransactionLikeState); | ||
get eventManager(): EventManager; | ||
scope<Result>(callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, { eventManager }?: { | ||
eventManager?: EventManager; | ||
}): Promise<Result>; | ||
transaction<Result>(callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, options?: { | ||
eventManager?: EventManager; | ||
}): Promise<Result>; | ||
query<Row extends Record<string, any>>(sql: string, parameters?: any[], meta?: Record<string, any>, config?: Connection.QueryConfig): Promise<Connection.Result<Row>>; | ||
query<Row extends Record<string, any>>(sql: string, parameters?: any[], meta?: Record<string, any>, { eventManager, ...config }?: Connection.QueryConfig): Promise<Connection.Result<Row>>; | ||
rollback(): Promise<void>; | ||
rollbackUnclosed(): Promise<void>; | ||
commit(): Promise<void>; | ||
commitUnclosed(): Promise<void>; | ||
private close; | ||
} | ||
declare class TransactionLikeState { | ||
private _isClosed; | ||
get isClosed(): boolean; | ||
close(): void; | ||
} | ||
declare class SavepointState { | ||
counter: number; | ||
execute<Result>(connection: Connection.ConnectionLike, callback: (connection: Connection.TransactionLike) => Promise<Result> | Result): Promise<Result>; | ||
} | ||
export declare const executeTransaction: <Result>(transaction: Connection.TransactionLike, callback: (connection: Connection.TransactionLike) => Result | Promise<Result>) => Promise<Result>; | ||
export {}; | ||
//# sourceMappingURL=Transaction.d.ts.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Transaction = void 0; | ||
const EventManager_1 = require("./EventManager"); | ||
exports.executeTransaction = exports.Transaction = void 0; | ||
const utils_1 = require("../utils"); | ||
const execution_1 = require("./execution"); | ||
class Transaction { | ||
constructor(pgClient, eventManager, config) { | ||
this.pgClient = pgClient; | ||
this.eventManager = eventManager; | ||
this.config = config; | ||
this._isClosed = false; | ||
this.queryContext = {}; | ||
this.savepointCounter = 1; | ||
constructor(connection, savepointManager = new SavepointState(), state = new TransactionLikeState()) { | ||
this.connection = connection; | ||
this.savepointManager = savepointManager; | ||
this.state = state; | ||
} | ||
get isClosed() { | ||
return this._isClosed; | ||
return this.state.isClosed; | ||
} | ||
get eventManager() { | ||
return this.connection.eventManager; | ||
} | ||
async scope(callback, { eventManager = this.eventManager } = {}) { | ||
return await this.connection.scope(async (connection) => { | ||
return await callback(new Transaction(connection, this.savepointManager, this.state)); | ||
}, { eventManager }); | ||
} | ||
async transaction(callback, options) { | ||
var _a; | ||
const savepointName = `savepoint_${this.savepointCounter++}`; | ||
await this.query(`SAVEPOINT ${(0, utils_1.wrapIdentifier)(savepointName)}`); | ||
const savepoint = new SavePoint(savepointName, this, this.pgClient, new EventManager_1.EventManager((_a = options === null || options === void 0 ? void 0 : options.eventManager) !== null && _a !== void 0 ? _a : this.eventManager)); | ||
try { | ||
const result = await callback(savepoint); | ||
if (!savepoint.isClosed) { | ||
await savepoint.commit(); | ||
} | ||
return result; | ||
} | ||
catch (e) { | ||
if (!savepoint.isClosed) { | ||
await savepoint.rollback(); | ||
} | ||
throw e; | ||
} | ||
return await this.scope(async (connection) => { | ||
return await this.savepointManager.execute(connection, callback); | ||
}, options); | ||
} | ||
async query(sql, parameters = [], meta = {}, config = {}) { | ||
async query(sql, parameters = [], meta = {}, { eventManager = this.eventManager, ...config } = {}) { | ||
if (this.isClosed) { | ||
throw new Error('Transaction is already closed'); | ||
} | ||
return await (0, execution_1.executeQuery)(this.pgClient, this.eventManager, { sql, parameters, meta, ...this.config, ...config }, this.queryContext); | ||
return await this.connection.scope(async (connection) => { | ||
return connection.query(sql, parameters, meta, config); | ||
}, { eventManager }); | ||
} | ||
@@ -47,20 +38,8 @@ async rollback() { | ||
} | ||
async rollbackUnclosed() { | ||
if (this.isClosed) { | ||
return; | ||
} | ||
await this.rollback(); | ||
} | ||
async commit() { | ||
await this.close('COMMIT'); | ||
} | ||
async commitUnclosed() { | ||
if (this.isClosed) { | ||
return; | ||
} | ||
await this.commit(); | ||
} | ||
async close(command) { | ||
await this.query(command); | ||
this._isClosed = true; | ||
this.state.close(); | ||
} | ||
@@ -70,23 +49,31 @@ } | ||
class SavePoint { | ||
constructor(savepointName, transactionInst, pgClient, eventManager) { | ||
constructor(savepointName, savepointManager, connection, state = new TransactionLikeState()) { | ||
this.savepointName = savepointName; | ||
this.transactionInst = transactionInst; | ||
this.pgClient = pgClient; | ||
this.eventManager = eventManager; | ||
this._isClosed = false; | ||
this.savepointManager = savepointManager; | ||
this.connection = connection; | ||
this.state = state; | ||
} | ||
get isClosed() { | ||
return this._isClosed; | ||
return this.state.isClosed; | ||
} | ||
async transaction(callback, options) { | ||
var _a; | ||
return await this.transactionInst.transaction(callback, { | ||
eventManager: (_a = options === null || options === void 0 ? void 0 : options.eventManager) !== null && _a !== void 0 ? _a : this.eventManager, | ||
}); | ||
get eventManager() { | ||
return this.connection.eventManager; | ||
} | ||
async query(sql, parameters = [], meta = {}) { | ||
async scope(callback, { eventManager = this.eventManager } = {}) { | ||
return this.connection.scope(async (connection) => { | ||
return await callback(new SavePoint(this.savepointName, this.savepointManager, connection, this.state)); | ||
}, { eventManager }); | ||
} | ||
async transaction(callback, options = {}) { | ||
return await this.scope(async (connection) => { | ||
return await this.savepointManager.execute(connection, callback); | ||
}, options); | ||
} | ||
async query(sql, parameters = [], meta = {}, { eventManager = this.eventManager, ...config } = {}) { | ||
if (this.isClosed) { | ||
throw new Error(`Savepoint ${this.savepointName} is already closed.`); | ||
} | ||
return await this.transactionInst.query(sql, parameters, meta); | ||
return await this.connection.scope(connection => { | ||
return connection.query(sql, parameters, meta, config); | ||
}, { eventManager }); | ||
} | ||
@@ -101,5 +88,43 @@ async rollback() { | ||
await this.query(sql); | ||
this.state.close(); | ||
} | ||
} | ||
class TransactionLikeState { | ||
constructor() { | ||
this._isClosed = false; | ||
} | ||
get isClosed() { | ||
return this._isClosed; | ||
} | ||
close() { | ||
this._isClosed = true; | ||
} | ||
} | ||
class SavepointState { | ||
constructor() { | ||
this.counter = 1; | ||
} | ||
async execute(connection, callback) { | ||
const savepointName = `savepoint_${this.counter++}`; | ||
await connection.query(`SAVEPOINT ${(0, utils_1.wrapIdentifier)(savepointName)}`); | ||
const savepoint = new SavePoint(savepointName, this, connection); | ||
return await (0, exports.executeTransaction)(savepoint, callback); | ||
} | ||
} | ||
const executeTransaction = async (transaction, callback) => { | ||
try { | ||
const result = await callback(transaction); | ||
if (!transaction.isClosed) { | ||
await transaction.commit(); | ||
} | ||
return result; | ||
} | ||
catch (e) { | ||
if (!transaction.isClosed) { | ||
await transaction.rollback(); | ||
} | ||
throw e; | ||
} | ||
}; | ||
exports.executeTransaction = executeTransaction; | ||
//# sourceMappingURL=Transaction.js.map |
@@ -7,12 +7,24 @@ "use strict"; | ||
return yield* await new Promise(async (resolveOuter) => { | ||
await db.transaction((db2) => { | ||
// eslint-disable-next-line promise/param-names | ||
return new Promise(resolveInner => { | ||
resolveOuter((async function* () { | ||
const result = yield* cb(db2); | ||
resolveInner(); | ||
return result; | ||
})()); | ||
try { | ||
await db.transaction((db2) => { | ||
// eslint-disable-next-line promise/param-names | ||
return new Promise((resolveInner, rejectInner) => { | ||
resolveOuter((async function* () { | ||
try { | ||
const result = yield* cb(db2); | ||
resolveInner(); | ||
return result; | ||
} | ||
finally { // required to close the transaction when the generator is externally closed, or exceptions are thrown | ||
if (!db2.connection.isClosed) { | ||
rejectInner(); | ||
} | ||
} | ||
})()); | ||
}); | ||
}); | ||
}); | ||
} | ||
catch (e) { | ||
// suppress the error caused by rejectInner(), it was already propagated out of asyncIterableTransaction() | ||
} | ||
}); | ||
@@ -19,0 +31,0 @@ } |
@@ -8,3 +8,3 @@ "use strict"; | ||
try { | ||
const connection = new client_1.SingleConnection(db, {}); | ||
const connection = client_1.Connection.createSingle(db); | ||
await connection.query('SELECT 1'); | ||
@@ -20,3 +20,3 @@ await connection.end(); | ||
log(`Database ${db.database} does not exist, attempting to create it...`); | ||
const connection = new client_1.SingleConnection({ ...db, database: 'postgres' }, {}); | ||
const connection = client_1.Connection.createSingle({ ...db, database: 'postgres' }, {}); | ||
await connection.query(`CREATE DATABASE ${(0, sql_1.wrapIdentifier)(db.database)}`); | ||
@@ -23,0 +23,0 @@ await connection.end(); |
@@ -5,2 +5,3 @@ export * from './assertNever'; | ||
export * from './createDatabaseIfNotExists'; | ||
export * from './Mutex'; | ||
export * from './sql'; | ||
@@ -7,0 +8,0 @@ export * from './retryTransaction'; |
@@ -21,2 +21,3 @@ "use strict"; | ||
__exportStar(require("./createDatabaseIfNotExists"), exports); | ||
__exportStar(require("./Mutex"), exports); | ||
__exportStar(require("./sql"), exports); | ||
@@ -23,0 +24,0 @@ __exportStar(require("./retryTransaction"), exports); |
@@ -1,5 +0,6 @@ | ||
import { Client as PgClient } from 'pg'; | ||
import { Client as PgClientImpl } from 'pg'; | ||
import { DatabaseConfig } from '../types'; | ||
import { PgClient } from '../client/PgClient'; | ||
export declare type PgClientFactory = () => PgClient; | ||
export declare const createPgClientFactory: ({ queryTimeoutMs, statementTimeoutMs, connectionTimeoutMs, ...config }: DatabaseConfig) => () => PgClient; | ||
export declare const createPgClientFactory: ({ queryTimeoutMs, statementTimeoutMs, connectionTimeoutMs, ...config }: DatabaseConfig) => () => PgClientImpl; | ||
//# sourceMappingURL=pgClientFactory.d.ts.map |
import { Connection } from '../client'; | ||
export declare const withDatabaseAdvisoryLock: <Cn extends Connection.Queryable, Result>(connection: Cn, lock: number, callback: (connection: Cn) => Result | Promise<Result>) => Promise<Result>; | ||
export declare const withDatabaseAdvisoryLock: <Result>(connection: Connection.ConnectionLike, lock: number, callback: () => Result | Promise<Result>) => Promise<Result>; | ||
//# sourceMappingURL=withDatabaseAdvisoryLock.d.ts.map |
@@ -7,3 +7,3 @@ "use strict"; | ||
try { | ||
return await callback(connection); | ||
return await callback(); | ||
} | ||
@@ -10,0 +10,0 @@ finally { |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const src_1 = require("../../../src"); | ||
const database_tester_1 = require("@contember/database-tester"); | ||
const vitest_1 = require("vitest"); | ||
const simpleQuery = (sql) => ({ | ||
sql, | ||
parameters: [], | ||
response: { rows: [] }, | ||
}); | ||
const createConnectionMockAlt_1 = require("./createConnectionMockAlt"); | ||
(0, vitest_1.test)('event manager: connection and client', async () => { | ||
const connection = (0, database_tester_1.createConnectionMock)([ | ||
simpleQuery('SELECT 1'), | ||
simpleQuery('SELECT 2'), | ||
const [connection, end] = (0, createConnectionMockAlt_1.createConnectionMockAlt)([ | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'SELECT 2' }, | ||
]); | ||
@@ -27,10 +22,11 @@ const events = []; | ||
]); | ||
end(); | ||
}); | ||
(0, vitest_1.test)('event manager: connection and client with transaction', async () => { | ||
const connection = (0, database_tester_1.createConnectionMock)([ | ||
simpleQuery('BEGIN;'), | ||
simpleQuery('SELECT 1'), | ||
simpleQuery('COMMIT;'), | ||
simpleQuery('SELECT 2'), | ||
simpleQuery('SELECT 3'), | ||
const [connection, end] = (0, createConnectionMockAlt_1.createConnectionMockAlt)([ | ||
{ sql: 'BEGIN' }, | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'COMMIT' }, | ||
{ sql: 'SELECT 2' }, | ||
{ sql: 'SELECT 3' }, | ||
]); | ||
@@ -48,7 +44,9 @@ const events = []; | ||
vitest_1.assert.deepStrictEqual(events, [ | ||
{ sql: 'BEGIN;', source: 'connection' }, | ||
{ sql: 'BEGIN', source: 'client' }, | ||
{ sql: 'BEGIN', source: 'connection' }, | ||
{ sql: 'SELECT 1', source: 'transaction' }, | ||
{ sql: 'SELECT 1', source: 'client' }, | ||
{ sql: 'SELECT 1', source: 'connection' }, | ||
{ sql: 'COMMIT;', source: 'connection' }, | ||
{ sql: 'COMMIT', source: 'client' }, | ||
{ sql: 'COMMIT', source: 'connection' }, | ||
{ sql: 'SELECT 2', source: 'client' }, | ||
@@ -58,3 +56,79 @@ { sql: 'SELECT 2', source: 'connection' }, | ||
]); | ||
end(); | ||
}); | ||
(0, vitest_1.test)('event manager: connection and client with scopes', async () => { | ||
const [connection, end] = (0, createConnectionMockAlt_1.createConnectionMockAlt)([ | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'SELECT 2' }, | ||
{ sql: 'SELECT 3' }, | ||
]); | ||
const events = []; | ||
const client = connection.createClient('public', {}); | ||
connection.eventManager.on(src_1.EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'connection' })); | ||
client.eventManager.on(src_1.EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'client' })); | ||
await client.scope(async (conn) => { | ||
conn.eventManager.on(src_1.EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'scoped' })); | ||
await conn.query('SELECT 1'); | ||
}); | ||
await client.query('SELECT 2'); | ||
await connection.query('SELECT 3'); | ||
vitest_1.assert.deepStrictEqual(events, [ | ||
{ sql: 'SELECT 1', source: 'scoped' }, | ||
{ sql: 'SELECT 1', source: 'client' }, | ||
{ sql: 'SELECT 1', source: 'connection' }, | ||
{ sql: 'SELECT 2', source: 'client' }, | ||
{ sql: 'SELECT 2', source: 'connection' }, | ||
{ sql: 'SELECT 3', source: 'connection' }, | ||
]); | ||
end(); | ||
}); | ||
(0, vitest_1.test)('event manager: connection and client with transaction and savepoint', async () => { | ||
const [connection, end] = (0, createConnectionMockAlt_1.createConnectionMockAlt)([ | ||
{ sql: 'BEGIN' }, | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"' }, | ||
{ sql: 'SELECT 2' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"' }, | ||
{ sql: 'COMMIT' }, | ||
{ sql: 'SELECT 3' }, | ||
{ sql: 'SELECT 4' }, | ||
]); | ||
const events = []; | ||
const client = connection.createClient('public', {}); | ||
connection.eventManager.on(src_1.EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'connection' })); | ||
client.eventManager.on(src_1.EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'client' })); | ||
await client.transaction(async (trx) => { | ||
trx.eventManager.on(src_1.EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'transaction' })); | ||
await trx.query('SELECT 1'); | ||
await trx.transaction(async (savepoint) => { | ||
savepoint.eventManager.on(src_1.EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'savepoint' })); | ||
await savepoint.query('SELECT 2'); | ||
}); | ||
}); | ||
await client.query('SELECT 3'); | ||
await connection.query('SELECT 4'); | ||
vitest_1.assert.deepStrictEqual(events, [ | ||
{ sql: 'BEGIN', source: 'client' }, | ||
{ sql: 'BEGIN', source: 'connection' }, | ||
{ sql: 'SELECT 1', source: 'transaction' }, | ||
{ sql: 'SELECT 1', source: 'client' }, | ||
{ sql: 'SELECT 1', source: 'connection' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"', source: 'transaction' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"', source: 'client' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"', source: 'connection' }, | ||
{ sql: 'SELECT 2', source: 'savepoint' }, | ||
{ sql: 'SELECT 2', source: 'transaction' }, | ||
{ sql: 'SELECT 2', source: 'client' }, | ||
{ sql: 'SELECT 2', source: 'connection' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"', source: 'transaction' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"', source: 'client' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"', source: 'connection' }, | ||
{ sql: 'COMMIT', source: 'client' }, | ||
{ sql: 'COMMIT', source: 'connection' }, | ||
{ sql: 'SELECT 3', source: 'client' }, | ||
{ sql: 'SELECT 3', source: 'connection' }, | ||
{ sql: 'SELECT 4', source: 'connection' }, | ||
]); | ||
end(); | ||
}); | ||
//# sourceMappingURL=eventManager.test.js.map |
@@ -279,15 +279,12 @@ "use strict"; | ||
const pgClientMock = new PgClientMock(); | ||
pgClientMock.on('error', () => { | ||
}); | ||
const pool = new src_1.Pool(() => pgClientMock, { | ||
log: logger, | ||
acquireTimeoutMs: 3, | ||
}); | ||
pgClientMock.connections.push(createErrorPromise()); | ||
pool.on('error', () => { | ||
}); | ||
const poolError = new Promise(resolve => pool.once('error', e => { | ||
resolve(e); | ||
})); | ||
await (0, vitest_1.expect)(pool.acquire.bind(pool)).rejects.toThrowError('Failed to acquire a connection'); | ||
await (0, vitest_1.expect)(poolError).resolves.toEqual(new Error('my err')); | ||
await timeout(4); | ||
pgClientMock.connections.push(createErrorPromise()); | ||
await (0, vitest_1.expect)(async () => await pool.acquire()).rejects.toThrowError('Database client error: my err'); | ||
await timeout(); | ||
(0, vitest_1.expect)(logger.messages).toMatchInlineSnapshot(` | ||
@@ -298,4 +295,3 @@ CAI P | ||
100 1: Connection error occurred: my err | ||
000 1: Connecting failed, emitting error | ||
000 0: Queued item timed out | ||
000 0: Connecting failed, rejecting pending item | ||
`); | ||
@@ -302,0 +298,0 @@ }); |
{ | ||
"name": "@contember/database", | ||
"version": "1.2.0-alpha.3", | ||
"version": "1.2.0-alpha.4", | ||
"license": "Apache-2.0", | ||
@@ -11,6 +11,6 @@ "main": "dist/src/index.js", | ||
"dependencies": { | ||
"@contember/queryable": "^1.2.0-alpha.3" | ||
"@contember/queryable": "^1.2.0-alpha.4" | ||
}, | ||
"devDependencies": { | ||
"@contember/database-tester": "^1.2.0-alpha.3", | ||
"@contember/database-tester": "^1.2.0-alpha.4", | ||
"@types/node": "^16", | ||
@@ -17,0 +17,0 @@ "pg": "^8.5.0" |
@@ -6,2 +6,3 @@ import { DeleteBuilder, InsertBuilder, SelectBuilder, UpdateBuilder } from '../builders' | ||
import { QueryHandler } from '@contember/queryable' | ||
import { withDatabaseAdvisoryLock } from '../utils' | ||
@@ -14,3 +15,4 @@ class Client<ConnectionType extends Connection.ConnectionLike = Connection.ConnectionLike> implements Connection.Queryable { | ||
public readonly eventManager: EventManager = new EventManager(connection.eventManager), | ||
) {} | ||
) { | ||
} | ||
@@ -22,12 +24,26 @@ public forSchema(schema: string): Client<ConnectionType> { | ||
async scope<T>(callback: (wrapper: Client<Connection.ConnectionLike>) => Promise<T> | T): Promise<T> { | ||
return await this.connection.scope( | ||
connection => callback( | ||
new Client( | ||
connection, | ||
this.schema, | ||
this.queryMeta, | ||
new EventManager(connection.eventManager), | ||
), | ||
), | ||
{ eventManager: this.eventManager }, | ||
) | ||
} | ||
async transaction<T>(transactionScope: (wrapper: Client<Connection.TransactionLike>) => Promise<T> | T): Promise<T> { | ||
return await this.connection.transaction( | ||
transaction => | ||
transactionScope( | ||
new Client<Connection.TransactionLike>( | ||
transaction, | ||
this.schema, | ||
this.queryMeta, | ||
new EventManager(transaction.eventManager), | ||
)), | ||
transaction => transactionScope( | ||
new Client<Connection.TransactionLike>( | ||
transaction, | ||
this.schema, | ||
this.queryMeta, | ||
new EventManager(transaction.eventManager), | ||
), | ||
), | ||
{ eventManager: this.eventManager }, | ||
@@ -37,2 +53,8 @@ ) | ||
async locked<T>(lock: number, callback: (wrapper: Client<Connection.ConnectionLike>) => Promise<T> | T): Promise<T> { | ||
return await this.scope(client => | ||
withDatabaseAdvisoryLock(client.connection, lock, () => callback(client)), | ||
) | ||
} | ||
selectBuilder<Result = SelectBuilder.Result>(): SelectBuilder<Result> { | ||
@@ -39,0 +61,0 @@ return SelectBuilder.create<Result>() |
import { EventManager } from './EventManager' | ||
import { Client } from './Client' | ||
import { Transaction } from './Transaction' | ||
import { executeQuery } from './execution' | ||
import { Pool, PoolConfig, PoolStatus } from './Pool' | ||
import { createPgClientFactory } from '../utils' | ||
import { DatabaseConfig } from '../types' | ||
import { AcquiredConnection } from './AcquiredConnection' | ||
@@ -29,2 +28,9 @@ class Connection implements Connection.ConnectionLike, Connection.ClientFactory, Connection.PoolStatusProvider { | ||
public static createSingle( | ||
config: DatabaseConfig, | ||
queryConfig: Connection.QueryConfig = {}, | ||
): Connection { | ||
return new Connection(new Pool(createPgClientFactory(config), { maxConnections: 1, maxIdle: 0 }), queryConfig) | ||
} | ||
public createClient(schema: string, queryMeta: Record<string, any>): Client { | ||
@@ -34,8 +40,4 @@ return new Client(this, schema, queryMeta, new EventManager(this.eventManager)) | ||
public async clearPool(): Promise<void> { | ||
await this.pool.closeIdle() | ||
} | ||
async transaction<Result>( | ||
callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
async scope<Result>( | ||
callback: (connection: Connection.ConnectionLike) => Promise<Result> | Result, | ||
options: { eventManager?: EventManager } = {}, | ||
@@ -45,11 +47,5 @@ ): Promise<Result> { | ||
const eventManager = new EventManager(options.eventManager ?? this.eventManager) | ||
await executeQuery(acquired.client, eventManager, { | ||
sql: 'BEGIN', | ||
...this.queryConfig, | ||
}) | ||
const transaction = new Transaction(acquired.client, eventManager, this.queryConfig) | ||
try { | ||
const result = await callback(transaction) | ||
await transaction.commitUnclosed() | ||
const connection = new AcquiredConnection(acquired.client, eventManager, this.queryConfig) | ||
const result = await callback(connection) | ||
this.pool.release(acquired) | ||
@@ -59,3 +55,2 @@ | ||
} catch (e) { | ||
await transaction.rollbackUnclosed() | ||
this.pool.dispose(acquired) | ||
@@ -66,4 +61,9 @@ throw e | ||
async end(): Promise<void> { | ||
await this.pool.end() | ||
async transaction<Result>( | ||
callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
options: { eventManager?: EventManager } = {}, | ||
): Promise<Result> { | ||
return await this.scope(async connection => { | ||
return await connection.transaction(callback) | ||
}, options) | ||
} | ||
@@ -77,14 +77,15 @@ | ||
): Promise<Connection.Result<Row>> { | ||
const client = await this.pool.acquire() | ||
const query: Connection.Query = { sql, parameters, meta, ...this.queryConfig, ...config } | ||
try { | ||
const result = await executeQuery<Row>(client.client, eventManager ?? this.eventManager, query, {}) | ||
this.pool.release(client) | ||
return result | ||
} catch (e) { | ||
this.pool.dispose(client) | ||
throw e | ||
} | ||
return await this.scope(async connection => { | ||
return await connection.query(sql, parameters, meta, config) | ||
}, { eventManager }) | ||
} | ||
async end(): Promise<void> { | ||
await this.pool.end() | ||
} | ||
public async clearPool(): Promise<void> { | ||
await this.pool.closeIdle() | ||
} | ||
getPoolStatus(): PoolStatus { | ||
@@ -124,4 +125,12 @@ return this.pool.getPoolStatus() | ||
export interface ConnectionLike extends Transactional, Queryable {} | ||
export interface Scopable<Sub> { | ||
scope<Result>( | ||
callback: (connection: Sub) => Promise<Result> | Result, | ||
options?: { eventManager?: EventManager }, | ||
): Promise<Result> | ||
} | ||
export interface ConnectionLike extends Transactional, Queryable, Scopable<ConnectionLike> { | ||
} | ||
export interface ClientFactory { | ||
@@ -135,3 +144,3 @@ createClient(schema: string, queryMeta: Record<string, any>): Client | ||
export interface TransactionLike extends ConnectionLike { | ||
export interface TransactionLike extends Transactional, Queryable, Scopable<TransactionLike> { | ||
readonly isClosed: boolean | ||
@@ -144,6 +153,2 @@ | ||
export interface QueryContext { | ||
previousQueryEnd?: number | ||
} | ||
export interface Query { | ||
@@ -159,4 +164,5 @@ readonly sql: string | ||
readonly timing?: { | ||
selfDuration: number | ||
/** @deprecated both selfDuration and totalDuration now contains same number */ | ||
totalDuration: number | ||
selfDuration: number | ||
} | ||
@@ -163,0 +169,0 @@ } |
@@ -8,2 +8,1 @@ export * from './Client' | ||
export * from './Transaction' | ||
export * from './SingleConnection' |
import { ImplementationException } from '../exceptions' | ||
import { Client as PgClient } from 'pg' | ||
import { ClientErrorCodes } from './errorCodes' | ||
@@ -7,2 +6,3 @@ import EventEmitter from 'events' | ||
import { PgClientFactory } from '../utils' | ||
import { PgClient } from './PgClient' | ||
@@ -66,2 +66,3 @@ export type PoolLogger = (message: string, status: PoolStatus) => void | ||
item_timeout_count: 'Number of items waiting for connection, which timed out.', | ||
item_rejected_count: 'Number of items rejected with an error.', | ||
@@ -147,2 +148,3 @@ item_resolved_new_ms_count: 'Number of queued items resolved with newly established connection.', | ||
item_timeout_count: 0, | ||
item_rejected_count: 0, | ||
item_resolved_new_ms_count: 0, | ||
@@ -333,4 +335,12 @@ item_resolved_new_ms_sum: 0, | ||
this.connectingCount-- | ||
this.log('Connecting failed, emitting error') | ||
this.emit('error', e) | ||
if (this.active.size === 0 && this.queue.length > 0) { | ||
const pendingItem = this.queue.shift() | ||
if (!pendingItem) throw new Error() | ||
this.log('Connecting failed, rejecting pending item') | ||
pendingItem.reject(new ClientError(e)) | ||
this.poolStats.item_rejected_count++ | ||
} else { | ||
this.log('Connecting failed, emitting error') | ||
this.emit('error', e) | ||
} | ||
} | ||
@@ -337,0 +347,0 @@ return |
import { Connection } from './Connection' | ||
import { EventManager } from './EventManager' | ||
import { ClientBase } from 'pg' | ||
import { wrapIdentifier } from '../utils' | ||
import { executeQuery } from './execution' | ||
export class Transaction implements Connection.TransactionLike { | ||
private _isClosed = false | ||
public get isClosed(): boolean { | ||
return this.state.isClosed | ||
} | ||
private queryContext: Connection.QueryContext = {} | ||
constructor( | ||
private readonly connection: Connection.ConnectionLike, | ||
private readonly savepointManager = new SavepointState(), | ||
private readonly state = new TransactionLikeState(), | ||
) { | ||
} | ||
private savepointCounter = 1 | ||
get eventManager() { | ||
return this.connection.eventManager | ||
} | ||
public get isClosed(): boolean { | ||
return this._isClosed | ||
async scope<Result>( | ||
callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
{ eventManager = this.eventManager }: { eventManager?: EventManager } = {}, | ||
): Promise<Result> { | ||
return await this.connection.scope(async connection => { | ||
return await callback(new Transaction(connection, this.savepointManager, this.state)) | ||
}, { eventManager }) | ||
} | ||
constructor( | ||
private readonly pgClient: ClientBase, | ||
public readonly eventManager: EventManager, | ||
private readonly config: Connection.QueryConfig, | ||
) {} | ||
async transaction<Result>( | ||
@@ -28,22 +34,5 @@ callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
): Promise<Result> { | ||
const savepointName = `savepoint_${this.savepointCounter++}` | ||
await this.query(`SAVEPOINT ${wrapIdentifier(savepointName)}`) | ||
const savepoint = new SavePoint( | ||
savepointName, | ||
this, | ||
this.pgClient, | ||
new EventManager(options?.eventManager ?? this.eventManager), | ||
) | ||
try { | ||
const result = await callback(savepoint) | ||
if (!savepoint.isClosed) { | ||
await savepoint.commit() | ||
} | ||
return result | ||
} catch (e) { | ||
if (!savepoint.isClosed) { | ||
await savepoint.rollback() | ||
} | ||
throw e | ||
} | ||
return await this.scope(async connection => { | ||
return await this.savepointManager.execute(connection, callback) | ||
}, options) | ||
} | ||
@@ -55,3 +44,3 @@ | ||
meta: Record<string, any> = {}, | ||
config: Connection.QueryConfig = {}, | ||
{ eventManager = this.eventManager, ...config }: Connection.QueryConfig = {}, | ||
): Promise<Connection.Result<Row>> { | ||
@@ -61,8 +50,5 @@ if (this.isClosed) { | ||
} | ||
return await executeQuery<Row>( | ||
this.pgClient, | ||
this.eventManager, | ||
{ sql, parameters, meta, ...this.config, ...config }, | ||
this.queryContext, | ||
) | ||
return await this.connection.scope(async connection => { | ||
return connection.query(sql, parameters, meta, config) | ||
}, { eventManager }) | ||
} | ||
@@ -74,9 +60,2 @@ | ||
async rollbackUnclosed(): Promise<void> { | ||
if (this.isClosed) { | ||
return | ||
} | ||
await this.rollback() | ||
} | ||
async commit(): Promise<void> { | ||
@@ -86,12 +65,5 @@ await this.close('COMMIT') | ||
async commitUnclosed(): Promise<void> { | ||
if (this.isClosed) { | ||
return | ||
} | ||
await this.commit() | ||
} | ||
private async close(command: string) { | ||
await this.query(command) | ||
this._isClosed = true | ||
this.state.close() | ||
} | ||
@@ -101,22 +73,36 @@ } | ||
class SavePoint implements Connection.TransactionLike { | ||
private _isClosed = false | ||
public get isClosed(): boolean { | ||
return this._isClosed | ||
} | ||
constructor( | ||
public readonly savepointName: string, | ||
private readonly transactionInst: Transaction, | ||
private readonly pgClient: ClientBase, | ||
public readonly eventManager: EventManager, | ||
) {} | ||
public readonly savepointManager: SavepointState, | ||
private readonly connection: Connection.ConnectionLike, | ||
private readonly state = new TransactionLikeState(), | ||
) { | ||
} | ||
get isClosed() { | ||
return this.state.isClosed | ||
} | ||
get eventManager() { | ||
return this.connection.eventManager | ||
} | ||
async scope<Result>( | ||
callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
{ eventManager = this.eventManager }: { eventManager?: EventManager } = {}, | ||
): Promise<Result> { | ||
return this.connection.scope(async connection => { | ||
return await callback(new SavePoint(this.savepointName, this.savepointManager, connection, this.state)) | ||
}, { eventManager }) | ||
} | ||
async transaction<Result>( | ||
callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
options?: { eventManager?: EventManager }, | ||
options: { eventManager?: EventManager } = {}, | ||
): Promise<Result> { | ||
return await this.transactionInst.transaction(callback, { | ||
eventManager: options?.eventManager ?? this.eventManager, | ||
}) | ||
return await this.scope(async connection => { | ||
return await this.savepointManager.execute(connection, callback) | ||
}, options) | ||
} | ||
@@ -128,2 +114,3 @@ | ||
meta: Record<string, any> = {}, | ||
{ eventManager = this.eventManager, ...config }: Connection.QueryConfig = {}, | ||
): Promise<Connection.Result<Row>> { | ||
@@ -133,3 +120,5 @@ if (this.isClosed) { | ||
} | ||
return await this.transactionInst.query<Row>(sql, parameters, meta) | ||
return await this.connection.scope(connection => { | ||
return connection.query(sql, parameters, meta, config) | ||
}, { eventManager }) | ||
} | ||
@@ -147,4 +136,49 @@ | ||
await this.query(sql) | ||
this.state.close() | ||
} | ||
} | ||
class TransactionLikeState { | ||
private _isClosed = false | ||
public get isClosed(): boolean { | ||
return this._isClosed | ||
} | ||
public close(): void { | ||
this._isClosed = true | ||
} | ||
} | ||
class SavepointState { | ||
public counter = 1 | ||
public async execute<Result>( | ||
connection: Connection.ConnectionLike, | ||
callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
) { | ||
const savepointName = `savepoint_${this.counter++}` | ||
await connection.query(`SAVEPOINT ${wrapIdentifier(savepointName)}`) | ||
const savepoint = new SavePoint(savepointName, this, connection) | ||
return await executeTransaction(savepoint, callback) | ||
} | ||
} | ||
export const executeTransaction = async <Result>( | ||
transaction: Connection.TransactionLike, | ||
callback: (connection: Connection.TransactionLike) => Promise<Result> | Result, | ||
) => { | ||
try { | ||
const result = await callback(transaction) | ||
if (!transaction.isClosed) { | ||
await transaction.commit() | ||
} | ||
return result | ||
} catch (e) { | ||
if (!transaction.isClosed) { | ||
await transaction.rollback() | ||
} | ||
throw e | ||
} | ||
} |
@@ -10,13 +10,25 @@ import { Client, Connection } from '../client/index.js' | ||
return yield* await new Promise(async resolveOuter => { | ||
await db.transaction((db2: Client<Connection.TransactionLike>) => { | ||
// eslint-disable-next-line promise/param-names | ||
return new Promise<void>(resolveInner => { | ||
resolveOuter((async function* () { | ||
const result = yield* cb(db2) | ||
resolveInner() | ||
return result | ||
})()) | ||
try { | ||
await db.transaction((db2: Client<Connection.TransactionLike>) => { | ||
// eslint-disable-next-line promise/param-names | ||
return new Promise<void>((resolveInner, rejectInner) => { | ||
resolveOuter((async function* () { | ||
try { | ||
const result = yield* cb(db2) | ||
resolveInner() | ||
return result | ||
} finally { // required to close the transaction when the generator is externally closed, or exceptions are thrown | ||
if (!db2.connection.isClosed) { | ||
rejectInner() | ||
} | ||
} | ||
})()) | ||
}) | ||
}) | ||
}) | ||
} catch (e) { | ||
// suppress the error caused by rejectInner(), it was already propagated out of asyncIterableTransaction() | ||
} | ||
}) | ||
} |
import { DatabaseConfig } from '../types' | ||
import { ClientError, ClientErrorCodes, SingleConnection } from '../client' | ||
import { ClientError, ClientErrorCodes, Connection } from '../client' | ||
import { wrapIdentifier } from './sql' | ||
@@ -7,3 +7,3 @@ | ||
try { | ||
const connection = new SingleConnection(db, {}) | ||
const connection = Connection.createSingle(db) | ||
await connection.query('SELECT 1') | ||
@@ -19,5 +19,5 @@ await connection.end() | ||
log(`Database ${db.database} does not exist, attempting to create it...`) | ||
const connection = new SingleConnection({ ...db, database: 'postgres' }, {}) | ||
const connection = Connection.createSingle({ ...db, database: 'postgres' }, {}) | ||
await connection.query(`CREATE DATABASE ${wrapIdentifier(db.database)}`) | ||
await connection.end() | ||
} |
@@ -5,2 +5,3 @@ export * from './assertNever' | ||
export * from './createDatabaseIfNotExists' | ||
export * from './Mutex' | ||
export * from './sql' | ||
@@ -7,0 +8,0 @@ export * from './retryTransaction' |
@@ -1,6 +0,7 @@ | ||
import { Client as PgClient } from 'pg' | ||
import { Client as PgClientImpl } from 'pg' | ||
import { DatabaseConfig } from '../types' | ||
import { PgClient } from '../client/PgClient' | ||
export type PgClientFactory = () => PgClient | ||
export const createPgClientFactory = ({ queryTimeoutMs, statementTimeoutMs, connectionTimeoutMs, ...config }: DatabaseConfig) => () => new PgClient({ | ||
export const createPgClientFactory = ({ queryTimeoutMs, statementTimeoutMs, connectionTimeoutMs, ...config }: DatabaseConfig) => () => new PgClientImpl({ | ||
query_timeout: queryTimeoutMs, | ||
@@ -7,0 +8,0 @@ statement_timeout: statementTimeoutMs, |
import { Connection } from '../client' | ||
export const withDatabaseAdvisoryLock = async <Cn extends Connection.Queryable, Result>( | ||
connection: Cn, | ||
export const withDatabaseAdvisoryLock = async <Result>( | ||
connection: Connection.ConnectionLike, | ||
lock: number, | ||
callback: (connection: Cn) => Result | Promise<Result>, | ||
callback: () => Result | Promise<Result>, | ||
): Promise<Result> => { | ||
await connection.query(`select pg_advisory_lock(?)`, [lock]) | ||
try { | ||
return await callback(connection) | ||
return await callback() | ||
} finally { | ||
@@ -12,0 +12,0 @@ const result = await connection.query<{lockReleased: boolean}>('select pg_advisory_unlock(?) as "lockReleased"', [lock]) |
import { EventManager } from '../../../src' | ||
import { createConnectionMock } from '@contember/database-tester' | ||
import { test, assert } from 'vitest' | ||
import { assert, test } from 'vitest' | ||
import { createConnectionMockAlt } from './createConnectionMockAlt' | ||
const simpleQuery = (sql: string) => ({ | ||
sql, | ||
parameters: [], | ||
response: { rows: [] }, | ||
}) | ||
test('event manager: connection and client', async () => { | ||
const connection = createConnectionMock([ | ||
simpleQuery('SELECT 1'), | ||
simpleQuery('SELECT 2'), | ||
const [connection, end] = createConnectionMockAlt([ | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'SELECT 2' }, | ||
]) | ||
@@ -28,2 +22,3 @@ const events: { source: string; sql: string }[] = [] | ||
]) | ||
end() | ||
}) | ||
@@ -33,8 +28,8 @@ | ||
test('event manager: connection and client with transaction', async () => { | ||
const connection = createConnectionMock([ | ||
simpleQuery('BEGIN;'), | ||
simpleQuery('SELECT 1'), | ||
simpleQuery('COMMIT;'), | ||
simpleQuery('SELECT 2'), | ||
simpleQuery('SELECT 3'), | ||
const [connection, end] = createConnectionMockAlt([ | ||
{ sql: 'BEGIN' }, | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'COMMIT' }, | ||
{ sql: 'SELECT 2' }, | ||
{ sql: 'SELECT 3' }, | ||
]) | ||
@@ -54,7 +49,9 @@ const events: { source: string; sql: string }[] = [] | ||
assert.deepStrictEqual(events, [ | ||
{ sql: 'BEGIN;', source: 'connection' }, | ||
{ sql: 'BEGIN', source: 'client' }, | ||
{ sql: 'BEGIN', source: 'connection' }, | ||
{ sql: 'SELECT 1', source: 'transaction' }, | ||
{ sql: 'SELECT 1', source: 'client' }, | ||
{ sql: 'SELECT 1', source: 'connection' }, | ||
{ sql: 'COMMIT;', source: 'connection' }, | ||
{ sql: 'COMMIT', source: 'client' }, | ||
{ sql: 'COMMIT', source: 'connection' }, | ||
{ sql: 'SELECT 2', source: 'client' }, | ||
@@ -64,2 +61,87 @@ { sql: 'SELECT 2', source: 'connection' }, | ||
]) | ||
end() | ||
}) | ||
test('event manager: connection and client with scopes', async () => { | ||
const [connection, end] = createConnectionMockAlt([ | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'SELECT 2' }, | ||
{ sql: 'SELECT 3' }, | ||
]) | ||
const events: { source: string; sql: string }[] = [] | ||
const client = connection.createClient('public', {}) | ||
connection.eventManager.on(EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'connection' })) | ||
client.eventManager.on(EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'client' })) | ||
await client.scope(async conn => { | ||
conn.eventManager.on(EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'scoped' })) | ||
await conn.query('SELECT 1') | ||
}) | ||
await client.query('SELECT 2') | ||
await connection.query('SELECT 3') | ||
assert.deepStrictEqual(events, [ | ||
{ sql: 'SELECT 1', source: 'scoped' }, | ||
{ sql: 'SELECT 1', source: 'client' }, | ||
{ sql: 'SELECT 1', source: 'connection' }, | ||
{ sql: 'SELECT 2', source: 'client' }, | ||
{ sql: 'SELECT 2', source: 'connection' }, | ||
{ sql: 'SELECT 3', source: 'connection' }, | ||
]) | ||
end() | ||
}) | ||
test('event manager: connection and client with transaction and savepoint', async () => { | ||
const [connection, end] = createConnectionMockAlt([ | ||
{ sql: 'BEGIN' }, | ||
{ sql: 'SELECT 1' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"' }, | ||
{ sql: 'SELECT 2' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"' }, | ||
{ sql: 'COMMIT' }, | ||
{ sql: 'SELECT 3' }, | ||
{ sql: 'SELECT 4' }, | ||
]) | ||
const events: { source: string; sql: string }[] = [] | ||
const client = connection.createClient('public', {}) | ||
connection.eventManager.on(EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'connection' })) | ||
client.eventManager.on(EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'client' })) | ||
await client.transaction(async trx => { | ||
trx.eventManager.on(EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'transaction' })) | ||
await trx.query('SELECT 1') | ||
await trx.transaction(async savepoint => { | ||
savepoint.eventManager.on(EventManager.Event.queryStart, ({ sql }) => events.push({ sql, source: 'savepoint' })) | ||
await savepoint.query('SELECT 2') | ||
}) | ||
}) | ||
await client.query('SELECT 3') | ||
await connection.query('SELECT 4') | ||
assert.deepStrictEqual(events, [ | ||
{ sql: 'BEGIN', source: 'client' }, | ||
{ sql: 'BEGIN', source: 'connection' }, | ||
{ sql: 'SELECT 1', source: 'transaction' }, | ||
{ sql: 'SELECT 1', source: 'client' }, | ||
{ sql: 'SELECT 1', source: 'connection' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"', source: 'transaction' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"', source: 'client' }, | ||
{ sql: 'SAVEPOINT "savepoint_1"', source: 'connection' }, | ||
{ sql: 'SELECT 2', source: 'savepoint' }, | ||
{ sql: 'SELECT 2', source: 'transaction' }, | ||
{ sql: 'SELECT 2', source: 'client' }, | ||
{ sql: 'SELECT 2', source: 'connection' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"', source: 'transaction' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"', source: 'client' }, | ||
{ sql: 'RELEASE SAVEPOINT "savepoint_1"', source: 'connection' }, | ||
{ sql: 'COMMIT', source: 'client' }, | ||
{ sql: 'COMMIT', source: 'connection' }, | ||
{ sql: 'SELECT 3', source: 'client' }, | ||
{ sql: 'SELECT 3', source: 'connection' }, | ||
{ sql: 'SELECT 4', source: 'connection' }, | ||
]) | ||
end() | ||
}) |
@@ -300,18 +300,13 @@ import { expect, it, beforeAll } from 'vitest' | ||
const pgClientMock = new PgClientMock() | ||
pgClientMock.on('error', () => { | ||
}) | ||
const pool = new Pool(() => pgClientMock as unknown as PgClient, { | ||
log: logger, | ||
acquireTimeoutMs: 3, | ||
}) | ||
pgClientMock.connections.push(createErrorPromise()) | ||
pool.on('error', () => { | ||
}) | ||
const poolError = new Promise(resolve => pool.once('error', e => { | ||
resolve(e) | ||
})) | ||
await expect(pool.acquire.bind(pool)).rejects.toThrowError('Failed to acquire a connection') | ||
await expect(poolError).resolves.toEqual(new Error('my err')) | ||
await timeout(4) | ||
pgClientMock.connections.push(createErrorPromise()) | ||
await expect(async () => await pool.acquire()).rejects.toThrowError('Database client error: my err') | ||
await timeout() | ||
expect(logger.messages).toMatchInlineSnapshot(` | ||
@@ -322,4 +317,3 @@ CAI P | ||
100 1: Connection error occurred: my err | ||
000 1: Connecting failed, emitting error | ||
000 0: Queued item timed out | ||
000 0: Connecting failed, rejecting pending item | ||
`) | ||
@@ -326,0 +320,0 @@ }) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Debug access
Supply chain riskUses debug, reflection and dynamic code execution features.
Found 1 instance in 1 package
605946
274
8837
2