@contember/database
Advanced tools
Comparing version 1.2.0-alpha.18 to 1.2.0-alpha.19
@@ -13,5 +13,2 @@ "use strict"; | ||
this.eventManager = eventManager; | ||
this.pool.on('error', err => { | ||
this.eventManager.fire(EventManager_1.EventManager.Event.clientError, err); | ||
}); | ||
} | ||
@@ -18,0 +15,0 @@ static create({ pool = {}, ...config }, logError) { |
@@ -7,4 +7,6 @@ export declare class DatabaseError extends Error { | ||
} | ||
export declare type ClientErrorType = 'recoverable connection error' | 'connection error' | 'runtime error' | 'disposal error'; | ||
export declare class ClientError extends DatabaseError { | ||
constructor(previous: Error | any); | ||
readonly type: ClientErrorType; | ||
constructor(previous: Error | any, type: ClientErrorType); | ||
} | ||
@@ -11,0 +13,0 @@ export declare class QueryError extends DatabaseError { |
@@ -18,4 +18,5 @@ "use strict"; | ||
class ClientError extends DatabaseError { | ||
constructor(previous) { | ||
super(`Database client error: ${'message' in previous ? previous.message : JSON.stringify(previous)}`, previous); | ||
constructor(previous, type) { | ||
super(`Database client ${type}: ${'message' in previous ? previous.message : JSON.stringify(previous)}`, previous); | ||
this.type = type; | ||
} | ||
@@ -22,0 +23,0 @@ } |
@@ -14,4 +14,3 @@ import { Connection } from './Connection'; | ||
queryEnd = "queryEnd", | ||
queryError = "queryError", | ||
clientError = "clientError" | ||
queryError = "queryError" | ||
} | ||
@@ -21,3 +20,2 @@ type QueryStartCallback = (query: Connection.Query) => void; | ||
type QueryErrorCallback = (query: Connection.Query, error: Error) => void; | ||
type ClientErrorCallback = (error: Error) => void; | ||
interface ListenerTypes { | ||
@@ -27,3 +25,2 @@ [EventManager.Event.queryStart]: QueryStartCallback; | ||
[EventManager.Event.queryError]: QueryErrorCallback; | ||
[EventManager.Event.clientError]: ClientErrorCallback; | ||
} | ||
@@ -30,0 +27,0 @@ type ListenersList = { |
@@ -11,3 +11,2 @@ "use strict"; | ||
[EventManager.Event.queryError]: [], | ||
[EventManager.Event.clientError]: [], | ||
}; | ||
@@ -35,3 +34,2 @@ } | ||
Event["queryError"] = "queryError"; | ||
Event["clientError"] = "clientError"; | ||
})(Event = EventManager.Event || (EventManager.Event = {})); | ||
@@ -38,0 +36,0 @@ })(EventManager || (EventManager = {})); |
/// <reference types="node" /> | ||
import { QueryConfig, QueryResult, QueryResultRow } from 'pg'; | ||
import EventEmitter from 'events'; | ||
import EventEmitter from 'node:events'; | ||
export interface PgClient extends EventEmitter { | ||
@@ -5,0 +5,0 @@ connect(): Promise<void>; |
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
import EventEmitter from 'events'; | ||
import { DatabaseError } from './errors'; | ||
import { ClientError, DatabaseError } from './errors'; | ||
import { PgClientFactory } from '../utils'; | ||
@@ -14,2 +12,6 @@ import { PgClient } from './PgClient'; | ||
maxConnecting: number; | ||
/** maximum connection attempts during given period */ | ||
rateLimitCount: number; | ||
/** period for rate limiting */ | ||
rateLimitPeriodMs: number; | ||
/** maximum number of idle connections. when reached, will be disposed immediately */ | ||
@@ -30,3 +32,3 @@ maxIdle: number; | ||
/** logging of out-of-stack errors */ | ||
logError: (error: Error) => void; | ||
logError: (error: ClientError) => void; | ||
} | ||
@@ -77,11 +79,7 @@ export declare type PoolStats = { | ||
} | ||
interface Pool { | ||
on(event: 'error', listener: (err: Error) => void): this; | ||
on(event: 'recoverableError', listener: (err: Error) => void): this; | ||
} | ||
declare class Pool extends EventEmitter { | ||
declare class Pool { | ||
private clientFactory; | ||
/** | ||
* pool was closed by calling end(). | ||
* All connection was disposed and it is not possible to acquire new one | ||
* All connection was disposed, and it is not possible to acquire new one | ||
*/ | ||
@@ -95,2 +93,6 @@ private ended; | ||
/** | ||
* Remaining number of connections per floating period. | ||
*/ | ||
private remainingRateLimit; | ||
/** | ||
* available idle connections | ||
@@ -97,0 +99,0 @@ */ |
"use strict"; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -9,3 +6,2 @@ exports.PoolClosedError = exports.AcquireTimeoutError = exports.PoolError = exports.Pool = exports.poolStatsDescription = void 0; | ||
const errorCodes_1 = require("./errorCodes"); | ||
const events_1 = __importDefault(require("events")); | ||
const errors_1 = require("./errors"); | ||
@@ -47,10 +43,9 @@ exports.poolStatsDescription = { | ||
} | ||
class Pool extends events_1.default { | ||
class Pool { | ||
constructor(clientFactory, poolConfig) { | ||
var _a, _b; | ||
super(); | ||
var _a; | ||
this.clientFactory = clientFactory; | ||
/** | ||
* pool was closed by calling end(). | ||
* All connection was disposed and it is not possible to acquire new one | ||
* All connection was disposed, and it is not possible to acquire new one | ||
*/ | ||
@@ -95,6 +90,10 @@ this.ended = false; | ||
}; | ||
const maxConnections = (_a = poolConfig.maxConnections) !== null && _a !== void 0 ? _a : 10; | ||
const maxConnecting = Math.ceil(maxConnections / 2); | ||
this.poolConfig = { | ||
maxConnections: 10, | ||
maxConnecting: Math.ceil(((_a = poolConfig.maxConnections) !== null && _a !== void 0 ? _a : 10) / 2), | ||
maxIdle: (_b = poolConfig.maxConnections) !== null && _b !== void 0 ? _b : 10, | ||
maxConnections, | ||
maxConnecting, | ||
maxIdle: maxConnections, | ||
rateLimitCount: maxConnecting * 5, | ||
rateLimitPeriodMs: 1000, | ||
idleTimeoutMs: 10000, | ||
@@ -105,2 +104,3 @@ acquireTimeoutMs: 10000, | ||
}; | ||
this.remainingRateLimit = this.poolConfig.rateLimitCount; | ||
} | ||
@@ -186,3 +186,3 @@ acquire() { | ||
} | ||
async maybeCreateNew(attempt = 1) { | ||
async maybeCreateNew() { | ||
if (this.ended) { | ||
@@ -209,12 +209,16 @@ return; | ||
} | ||
if (this.remainingRateLimit <= 0) { | ||
this.log('Not connecting, rate limit reached.'); | ||
return; | ||
} | ||
this.log('Creating a new connection'); | ||
const client = this.clientFactory(); | ||
this.connectingCount++; | ||
const poolConnection = new PoolConnection(client); | ||
client.on('error', e => { | ||
this.log('Client error on idle connection has occurred: ' + e.message); | ||
this.poolConfig.logError(e); | ||
this.emit('error', e); | ||
this.poolConfig.logError(new errors_1.ClientError(e, 'runtime error')); | ||
}); | ||
try { | ||
this.connectingCount++; | ||
this.remainingRateLimit--; | ||
await client.connect(); | ||
@@ -234,23 +238,18 @@ this.poolStats.connection_established_count++; | ||
} | ||
catch (_a) { | ||
catch (e2) { | ||
this.poolConfig.logError(new errors_1.ClientError(e2, 'disposal error')); | ||
} | ||
if (e.code === errorCodes_1.ClientErrorCodes.TOO_MANY_CONNECTIONS || e.code === errorCodes_1.ClientErrorCodes.CANNOT_CONNECT_NOW) { | ||
if (e.code === errorCodes_1.ClientErrorCodes.TOO_MANY_CONNECTIONS | ||
|| e.code === errorCodes_1.ClientErrorCodes.CANNOT_CONNECT_NOW // server starting | ||
|| e.message === 'timeout expired' // https://github.com/brianc/node-postgres/blob/c7dc621d3fb52c158eb23aa31dea6bd440700a4a/packages/pg/lib/client.js#L105 | ||
) { | ||
this.lastRecoverableError = { error: e, time: Date.now() }; | ||
if (this.poolConfig.reconnectIntervalMs * attempt >= this.poolConfig.acquireTimeoutMs) { | ||
this.poolStats.connection_error_count++; | ||
this.log('Recoverable error, max retries reached.'); | ||
this.poolConfig.logError(e); | ||
this.emit('error', e); | ||
this.poolStats.connection_recoverable_error_count++; | ||
this.log('Recoverable error, retrying in a moment.'); | ||
this.poolConfig.logError(new errors_1.ClientError(e, 'recoverable connection error')); | ||
setTimeout(() => { | ||
this.connectingCount--; | ||
} | ||
else { | ||
this.poolStats.connection_recoverable_error_count++; | ||
this.log('Recoverable error, retrying in a moment.'); | ||
this.emit('recoverableError', e); | ||
setTimeout(() => { | ||
this.connectingCount--; | ||
this.log('Retrying'); | ||
this.maybeCreateNew(attempt + 1); | ||
}, this.poolConfig.reconnectIntervalMs); | ||
} | ||
this.log('Retrying'); | ||
this.maybeCreateNew(); | ||
}, this.poolConfig.reconnectIntervalMs); | ||
} | ||
@@ -260,2 +259,3 @@ else { | ||
this.connectingCount--; | ||
const clientError = new errors_1.ClientError(e, 'connection error'); | ||
if (this.active.size === 0 && this.queue.length > 0) { | ||
@@ -266,3 +266,3 @@ const pendingItem = this.queue.shift(); | ||
this.log('Connecting failed, rejecting pending item'); | ||
pendingItem.reject(new errors_1.ClientError(e)); | ||
pendingItem.reject(clientError); | ||
this.poolStats.item_rejected_count++; | ||
@@ -272,4 +272,3 @@ } | ||
this.log('Connecting failed, emitting error'); | ||
this.poolConfig.logError(e); | ||
this.emit('error', e); | ||
this.poolConfig.logError(clientError); | ||
} | ||
@@ -279,2 +278,11 @@ } | ||
} | ||
finally { | ||
setTimeout(() => { | ||
if (this.remainingRateLimit === 0) { | ||
this.log('Rate limit renewed.'); | ||
} | ||
this.remainingRateLimit++; | ||
this.maybeCreateNew(); | ||
}, this.poolConfig.rateLimitPeriodMs); | ||
} | ||
this.handleAvailableConnection(poolConnection); | ||
@@ -368,3 +376,3 @@ this.maybeCreateNew(); | ||
catch (e) { | ||
this.on('error', e); | ||
this.poolConfig.logError(new errors_1.ClientError(e, 'disposal error')); | ||
} | ||
@@ -371,0 +379,0 @@ } |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.MutexDeadlockError = exports.Mutex = void 0; | ||
const async_hooks_1 = require("async_hooks"); | ||
const mutexDeadlockTracker = new async_hooks_1.AsyncLocalStorage(); | ||
const node_async_hooks_1 = require("node:async_hooks"); | ||
const mutexDeadlockTracker = new node_async_hooks_1.AsyncLocalStorage(); | ||
let mutexIdCounter = 1; | ||
@@ -7,0 +7,0 @@ class Mutex { |
@@ -8,3 +8,3 @@ "use strict"; | ||
const src_1 = require("../../../src"); | ||
const events_1 = __importDefault(require("events")); | ||
const node_events_1 = __importDefault(require("node:events")); | ||
const vitest_1 = require("vitest"); | ||
@@ -14,3 +14,3 @@ const createConnectionMockAlt = (...queries) => { | ||
for (const queriesSet of queries) { | ||
connectionMocks.push(new class extends events_1.default { | ||
connectionMocks.push(new class extends node_events_1.default { | ||
connect() { | ||
@@ -17,0 +17,0 @@ return Promise.resolve(); |
@@ -8,3 +8,3 @@ "use strict"; | ||
const src_1 = require("../../../src"); | ||
const events_1 = __importDefault(require("events")); | ||
const node_events_1 = __importDefault(require("node:events")); | ||
const createPoolLogger = () => { | ||
@@ -267,10 +267,8 @@ const logger = (message, pool) => { | ||
logError: () => null, | ||
reconnectIntervalMs: 5, | ||
acquireTimeoutMs: 10, | ||
reconnectIntervalMs: 20, | ||
acquireTimeoutMs: 30, | ||
}); | ||
pool.on('error', () => { | ||
}); | ||
pgClientMock.connections.push(createRecoverableErrorPromise(), createRecoverableErrorPromise()); | ||
await (0, vitest_1.expect)(async () => await pool.acquire()).rejects.toThrowError('Failed to acquire a connection. Last error: too many connection'); | ||
await timeout(2); | ||
await timeout(20); | ||
(0, vitest_1.expect)(logger.messages).toMatchInlineSnapshot(` | ||
@@ -285,4 +283,6 @@ CAI P | ||
100 1: Connection error occurred: too many connection | ||
100 1: Recoverable error, max retries reached. | ||
000 0: Queued item timed out | ||
100 1: Recoverable error, retrying in a moment. | ||
100 0: Queued item timed out | ||
000 0: Retrying | ||
000 0: Not connecting, queue is empty. | ||
`); | ||
@@ -299,6 +299,4 @@ }); | ||
}); | ||
pool.on('error', () => { | ||
}); | ||
pgClientMock.connections.push(createErrorPromise()); | ||
await (0, vitest_1.expect)(async () => await pool.acquire()).rejects.toThrowError('Database client error: my err'); | ||
await (0, vitest_1.expect)(async () => await pool.acquire()).rejects.toThrowError('Database client connection error: my err'); | ||
await timeout(); | ||
@@ -326,3 +324,3 @@ (0, vitest_1.expect)(logger.messages).toMatchInlineSnapshot(` | ||
}); | ||
const PgClientMock = class extends events_1.default { | ||
const PgClientMock = class extends node_events_1.default { | ||
constructor() { | ||
@@ -340,2 +338,41 @@ super(...arguments); | ||
}; | ||
(0, vitest_1.it)('rate limit', async () => { | ||
const logger = createPoolLogger(); | ||
const pool = new src_1.Pool(() => new PgClientMock(), { | ||
rateLimitCount: 1, | ||
rateLimitPeriodMs: 50, | ||
log: logger, | ||
logError: () => null, | ||
}); | ||
const conn1 = await pool.acquire(); | ||
setTimeout(async () => { | ||
await pool.release(conn1); | ||
}, 20); | ||
const conn2 = await pool.acquire(); | ||
await pool.dispose(conn2); | ||
await timeout(10); | ||
const conn3 = await pool.acquire(); | ||
(0, vitest_1.expect)(logger.messages).toMatchInlineSnapshot(` | ||
CAI P | ||
000 1: Item added to a queue. | ||
000 1: Creating a new connection | ||
100 1: Connection established | ||
010 0: Queued item fulfilled with new connection. | ||
010 0: Not connecting, queue is empty. | ||
010 1: Item added to a queue. | ||
010 1: Not connecting, rate limit reached. | ||
010 1: Releasing a connection. | ||
010 0: Queued item fulfilled with released connection. | ||
010 0: Releasing and disposing a connection. | ||
000 0: Not connecting, queue is empty. | ||
000 0: Connection errored and was disposed. | ||
000 1: Item added to a queue. | ||
000 1: Not connecting, rate limit reached. | ||
000 1: Rate limit renewed. | ||
000 1: Creating a new connection | ||
100 1: Connection established | ||
010 0: Queued item fulfilled with new connection. | ||
010 0: Not connecting, queue is empty. | ||
`); | ||
}); | ||
//# sourceMappingURL=pool.test.js.map |
{ | ||
"name": "@contember/database", | ||
"version": "1.2.0-alpha.18", | ||
"version": "1.2.0-alpha.19", | ||
"license": "Apache-2.0", | ||
@@ -11,6 +11,6 @@ "main": "dist/src/index.js", | ||
"dependencies": { | ||
"@contember/queryable": "^1.2.0-alpha.18" | ||
"@contember/queryable": "^1.2.0-alpha.19" | ||
}, | ||
"devDependencies": { | ||
"@contember/database-tester": "^1.2.0-alpha.18", | ||
"@contember/database-tester": "^1.2.0-alpha.19", | ||
"@types/node": "^18", | ||
@@ -17,0 +17,0 @@ "pg": "^8.5.0" |
@@ -13,5 +13,2 @@ import { EventManager } from './EventManager' | ||
) { | ||
this.pool.on('error', err => { | ||
this.eventManager.fire(EventManager.Event.clientError, err) | ||
}) | ||
} | ||
@@ -18,0 +15,0 @@ |
@@ -0,1 +1,3 @@ | ||
import { ClientErrorCodes } from './errorCodes' | ||
export class DatabaseError extends Error { | ||
@@ -18,5 +20,11 @@ public readonly code?: string | ||
export type ClientErrorType = | ||
| 'recoverable connection error' | ||
| 'connection error' | ||
| 'runtime error' | ||
| 'disposal error' | ||
export class ClientError extends DatabaseError { | ||
constructor(previous: Error | any) { | ||
super(`Database client error: ${'message' in previous ? previous.message : JSON.stringify(previous)}`, previous) | ||
constructor(previous: Error | any, public readonly type: ClientErrorType) { | ||
super(`Database client ${type}: ${'message' in previous ? previous.message : JSON.stringify(previous)}`, previous) | ||
} | ||
@@ -23,0 +31,0 @@ } |
@@ -8,3 +8,2 @@ import { Connection } from './Connection' | ||
[EventManager.Event.queryError]: [] as EventManager.QueryStartCallback[], | ||
[EventManager.Event.clientError]: [] as EventManager.ClientErrorCallback[], | ||
} | ||
@@ -43,3 +42,2 @@ | ||
queryError = 'queryError', | ||
clientError = 'clientError', | ||
} | ||
@@ -50,3 +48,2 @@ | ||
export type QueryErrorCallback = (query: Connection.Query, error: Error) => void | ||
export type ClientErrorCallback = (error: Error) => void | ||
@@ -57,3 +54,2 @@ export interface ListenerTypes { | ||
[EventManager.Event.queryError]: QueryErrorCallback | ||
[EventManager.Event.clientError]: ClientErrorCallback | ||
} | ||
@@ -60,0 +56,0 @@ |
import { QueryConfig, QueryResult, QueryResultRow } from 'pg' | ||
import EventEmitter from 'events' | ||
import EventEmitter from 'node:events' | ||
@@ -4,0 +4,0 @@ export interface PgClient extends EventEmitter { |
import { ImplementationException } from '../exceptions' | ||
import { ClientErrorCodes } from './errorCodes' | ||
import EventEmitter from 'events' | ||
import EventEmitter from 'node:events' | ||
import { ClientError, DatabaseError } from './errors' | ||
@@ -17,2 +17,6 @@ import { PgClientFactory } from '../utils' | ||
maxConnecting: number | ||
/** maximum connection attempts during given period */ | ||
rateLimitCount: number | ||
/** period for rate limiting */ | ||
rateLimitPeriodMs: number | ||
/** maximum number of idle connections. when reached, will be disposed immediately */ | ||
@@ -33,3 +37,3 @@ maxIdle: number | ||
/** logging of out-of-stack errors */ | ||
logError: (error: Error) => void | ||
logError: (error: ClientError) => void | ||
} | ||
@@ -99,13 +103,7 @@ | ||
interface Pool { | ||
on(event: 'error', listener: (err: Error) => void): this | ||
class Pool { | ||
on(event: 'recoverableError', listener: (err: Error) => void): this | ||
} | ||
class Pool extends EventEmitter { | ||
/** | ||
* pool was closed by calling end(). | ||
* All connection was disposed and it is not possible to acquire new one | ||
* All connection was disposed, and it is not possible to acquire new one | ||
*/ | ||
@@ -121,2 +119,7 @@ private ended = false | ||
/** | ||
* Remaining number of connections per floating period. | ||
*/ | ||
private remainingRateLimit: number | ||
/** | ||
* available idle connections | ||
@@ -168,7 +171,10 @@ */ | ||
) { | ||
super() | ||
const maxConnections = poolConfig.maxConnections ?? 10 | ||
const maxConnecting = Math.ceil(maxConnections / 2) | ||
this.poolConfig = { | ||
maxConnections: 10, | ||
maxConnecting: Math.ceil((poolConfig.maxConnections ?? 10) / 2), | ||
maxIdle: poolConfig.maxConnections ?? 10, | ||
maxConnections, | ||
maxConnecting, | ||
maxIdle: maxConnections, | ||
rateLimitCount: maxConnecting * 5, | ||
rateLimitPeriodMs: 1000, | ||
idleTimeoutMs: 10_000, | ||
@@ -179,2 +185,3 @@ acquireTimeoutMs: 10_000, | ||
} | ||
this.remainingRateLimit = this.poolConfig.rateLimitCount | ||
} | ||
@@ -275,3 +282,3 @@ | ||
private async maybeCreateNew(attempt = 1): Promise<void> { | ||
private async maybeCreateNew(): Promise<void> { | ||
if (this.ended) { | ||
@@ -298,12 +305,17 @@ return | ||
} | ||
if (this.remainingRateLimit <= 0) { | ||
this.log('Not connecting, rate limit reached.') | ||
return | ||
} | ||
this.log('Creating a new connection') | ||
const client = this.clientFactory() | ||
this.connectingCount++ | ||
const poolConnection = new PoolConnection(client) | ||
client.on('error', e => { | ||
this.log('Client error on idle connection has occurred: ' + e.message) | ||
this.poolConfig.logError(e) | ||
this.emit('error', e) | ||
this.poolConfig.logError(new ClientError(e, 'runtime error')) | ||
}) | ||
try { | ||
this.connectingCount++ | ||
this.remainingRateLimit-- | ||
await client.connect() | ||
@@ -321,25 +333,23 @@ this.poolStats.connection_established_count++ | ||
await client.end() | ||
} catch { | ||
} catch (e2: unknown) { | ||
this.poolConfig.logError(new ClientError(e2, 'disposal error')) | ||
} | ||
if (e.code === ClientErrorCodes.TOO_MANY_CONNECTIONS || e.code === ClientErrorCodes.CANNOT_CONNECT_NOW) { | ||
if ( | ||
e.code === ClientErrorCodes.TOO_MANY_CONNECTIONS | ||
|| e.code === ClientErrorCodes.CANNOT_CONNECT_NOW // server starting | ||
|| e.message === 'timeout expired' // https://github.com/brianc/node-postgres/blob/c7dc621d3fb52c158eb23aa31dea6bd440700a4a/packages/pg/lib/client.js#L105 | ||
) { | ||
this.lastRecoverableError = { error: e, time: Date.now() } | ||
if (this.poolConfig.reconnectIntervalMs * attempt >= this.poolConfig.acquireTimeoutMs) { | ||
this.poolStats.connection_error_count++ | ||
this.log('Recoverable error, max retries reached.') | ||
this.poolConfig.logError(e) | ||
this.emit('error', e) | ||
this.poolStats.connection_recoverable_error_count++ | ||
this.log('Recoverable error, retrying in a moment.') | ||
this.poolConfig.logError(new ClientError(e, 'recoverable connection error')) | ||
setTimeout(() => { | ||
this.connectingCount-- | ||
} else { | ||
this.poolStats.connection_recoverable_error_count++ | ||
this.log('Recoverable error, retrying in a moment.') | ||
this.emit('recoverableError', e) | ||
setTimeout(() => { | ||
this.connectingCount-- | ||
this.log('Retrying') | ||
this.maybeCreateNew(attempt + 1) | ||
}, this.poolConfig.reconnectIntervalMs) | ||
} | ||
this.log('Retrying') | ||
this.maybeCreateNew() | ||
}, this.poolConfig.reconnectIntervalMs) | ||
} else { | ||
this.poolStats.connection_error_count++ | ||
this.connectingCount-- | ||
const clientError = new ClientError(e, 'connection error') | ||
if (this.active.size === 0 && this.queue.length > 0) { | ||
@@ -349,11 +359,18 @@ const pendingItem = this.queue.shift() | ||
this.log('Connecting failed, rejecting pending item') | ||
pendingItem.reject(new ClientError(e)) | ||
pendingItem.reject(clientError) | ||
this.poolStats.item_rejected_count++ | ||
} else { | ||
this.log('Connecting failed, emitting error') | ||
this.poolConfig.logError(e) | ||
this.emit('error', e) | ||
this.poolConfig.logError(clientError) | ||
} | ||
} | ||
return | ||
} finally { | ||
setTimeout(() => { | ||
if (this.remainingRateLimit === 0) { | ||
this.log('Rate limit renewed.') | ||
} | ||
this.remainingRateLimit++ | ||
this.maybeCreateNew() | ||
}, this.poolConfig.rateLimitPeriodMs) | ||
} | ||
@@ -454,3 +471,3 @@ this.handleAvailableConnection(poolConnection) | ||
} catch (e: any) { | ||
this.on('error', e) | ||
this.poolConfig.logError(new ClientError(e, 'disposal error')) | ||
} | ||
@@ -457,0 +474,0 @@ } |
@@ -1,2 +0,2 @@ | ||
import { AsyncLocalStorage } from 'async_hooks' | ||
import { AsyncLocalStorage } from 'node:async_hooks' | ||
@@ -3,0 +3,0 @@ const mutexDeadlockTracker = new AsyncLocalStorage<Set<number>>() |
import { Connection, Pool } from '../../../src' | ||
import { PgClient } from '../../../src/client/PgClient' | ||
import EventEmitter from 'events' | ||
import EventEmitter from 'node:events' | ||
import { expect } from 'vitest' | ||
@@ -5,0 +5,0 @@ |
import { expect, it, beforeAll } from 'vitest' | ||
import { ClientErrorCodes, Pool, PoolLogger } from '../../../src' | ||
import { Client as PgClient } from 'pg' | ||
import EventEmitter from 'events' | ||
import EventEmitter from 'node:events' | ||
@@ -286,10 +286,8 @@ const createPoolLogger = () => { | ||
logError: () => null, | ||
reconnectIntervalMs: 5, | ||
acquireTimeoutMs: 10, | ||
reconnectIntervalMs: 20, | ||
acquireTimeoutMs: 30, | ||
}) | ||
pool.on('error', () => { | ||
}) | ||
pgClientMock.connections.push(createRecoverableErrorPromise(), createRecoverableErrorPromise()) | ||
await expect(async () => await pool.acquire()).rejects.toThrowError('Failed to acquire a connection. Last error: too many connection') | ||
await timeout(2) | ||
await timeout(20) | ||
expect(logger.messages).toMatchInlineSnapshot(` | ||
@@ -304,4 +302,6 @@ CAI P | ||
100 1: Connection error occurred: too many connection | ||
100 1: Recoverable error, max retries reached. | ||
000 0: Queued item timed out | ||
100 1: Recoverable error, retrying in a moment. | ||
100 0: Queued item timed out | ||
000 0: Retrying | ||
000 0: Not connecting, queue is empty. | ||
`) | ||
@@ -319,7 +319,4 @@ }) | ||
}) | ||
pool.on('error', () => { | ||
}) | ||
pgClientMock.connections.push(createErrorPromise()) | ||
await expect(async () => await pool.acquire()).rejects.toThrowError('Database client error: my err') | ||
await expect(async () => await pool.acquire()).rejects.toThrowError('Database client connection error: my err') | ||
await timeout() | ||
@@ -365,1 +362,42 @@ expect(logger.messages).toMatchInlineSnapshot(` | ||
} | ||
it('rate limit', async () => { | ||
const logger = createPoolLogger() | ||
const pool = new Pool(() => new PgClientMock() as unknown as PgClient, { | ||
rateLimitCount: 1, | ||
rateLimitPeriodMs: 50, | ||
log: logger, | ||
logError: () => null, | ||
}) | ||
const conn1 = await pool.acquire() | ||
setTimeout(async () => { | ||
await pool.release(conn1) | ||
}, 20) | ||
const conn2 = await pool.acquire() | ||
await pool.dispose(conn2) | ||
await timeout(10) | ||
const conn3 = await pool.acquire() | ||
expect(logger.messages).toMatchInlineSnapshot(` | ||
CAI P | ||
000 1: Item added to a queue. | ||
000 1: Creating a new connection | ||
100 1: Connection established | ||
010 0: Queued item fulfilled with new connection. | ||
010 0: Not connecting, queue is empty. | ||
010 1: Item added to a queue. | ||
010 1: Not connecting, rate limit reached. | ||
010 1: Releasing a connection. | ||
010 0: Queued item fulfilled with released connection. | ||
010 0: Releasing and disposing a connection. | ||
000 0: Not connecting, queue is empty. | ||
000 0: Connection errored and was disposed. | ||
000 1: Item added to a queue. | ||
000 1: Not connecting, rate limit reached. | ||
000 1: Rate limit renewed. | ||
000 1: Creating a new connection | ||
100 1: Connection established | ||
010 0: Queued item fulfilled with new connection. | ||
010 0: Not connecting, queue is empty. | ||
`) | ||
}) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Debug access
Supply chain riskUses debug, reflection and dynamic code execution features.
Found 1 instance in 1 package
611590
8931
1