Comparing version 35.0.3 to 35.1.0
"use strict"; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { | ||
Object.defineProperty(o, "default", { enumerable: true, value: v }); | ||
}) : function(o, v) { | ||
o["default"] = v; | ||
}); | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); | ||
__setModuleDefault(result, mod); | ||
return result; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.stream = void 0; | ||
const QueryStream_1 = require("../QueryStream"); | ||
const executeQuery_1 = require("../routines/executeQuery"); | ||
const through = __importStar(require("through2")); | ||
const node_stream_1 = require("node:stream"); | ||
const pg_query_stream_1 = __importDefault(require("pg-query-stream")); | ||
const stream = async (connectionLogger, connection, clientConfiguration, slonikSql, streamHandler, uid, options) => { | ||
return await (0, executeQuery_1.executeQuery)(connectionLogger, connection, clientConfiguration, slonikSql, undefined, async (finalConnection, finalSql, finalValues, executionContext, actualQuery) => { | ||
const query = new QueryStream_1.QueryStream(finalSql, finalValues, options); | ||
const streamEndResultRow = { | ||
command: 'SELECT', | ||
fields: [], | ||
notices: [], | ||
rowCount: 0, | ||
rows: [], | ||
}; | ||
const query = new pg_query_stream_1.default(finalSql, finalValues, options); | ||
const queryStream = finalConnection.query(query); | ||
let fields = []; | ||
// @ts-expect-error – https://github.com/brianc/node-postgres/issues/3015 | ||
finalConnection.connection.once('rowDescription', (rowDescription) => { | ||
fields = rowDescription.fields.map((field) => { | ||
return { | ||
dataTypeId: field.dataTypeID, | ||
name: field.name, | ||
}; | ||
}); | ||
}); | ||
const rowTransformers = []; | ||
@@ -40,48 +37,42 @@ for (const interceptor of clientConfiguration.interceptors) { | ||
} | ||
return await new Promise((resolve, reject) => { | ||
queryStream.on('error', (error) => { | ||
reject(error); | ||
return new Promise((resolve, reject) => { | ||
const transformStream = new node_stream_1.Transform({ | ||
objectMode: true, | ||
transform(datum, enc, callback) { | ||
let finalRow = datum; | ||
if (rowTransformers.length) { | ||
for (const rowTransformer of rowTransformers) { | ||
finalRow = rowTransformer(executionContext, actualQuery, finalRow, fields); | ||
} | ||
} | ||
// eslint-disable-next-line @babel/no-invalid-this | ||
this.push({ | ||
fields, | ||
row: finalRow, | ||
}); | ||
callback(); | ||
}, | ||
}); | ||
const transformedStream = queryStream.pipe(through.obj(function (datum, enc, callback) { | ||
let finalRow = datum.row; | ||
if (rowTransformers.length) { | ||
for (const rowTransformer of rowTransformers) { | ||
finalRow = rowTransformer(executionContext, actualQuery, finalRow, datum.fields); | ||
} | ||
transformStream.on('newListener', (event) => { | ||
if (event === 'data') { | ||
queryStream.pipe(transformStream); | ||
} | ||
// eslint-disable-next-line @babel/no-invalid-this | ||
this.push({ | ||
fields: datum.fields, | ||
row: finalRow, | ||
}); | ||
callback(); | ||
})); | ||
transformedStream.on('end', () => { | ||
resolve({ | ||
command: 'SELECT', | ||
fields: [], | ||
notices: [], | ||
rowCount: 0, | ||
rows: [], | ||
}); | ||
}); | ||
// Invoked if stream is destroyed using transformedStream.destroy(). | ||
transformedStream.on('close', () => { | ||
transformStream.on('end', () => { | ||
resolve(streamEndResultRow); | ||
}); | ||
transformStream.on('close', () => { | ||
if (!queryStream.destroyed) { | ||
queryStream.destroy(); | ||
} | ||
resolve({ | ||
command: 'SELECT', | ||
fields: [], | ||
notices: [], | ||
rowCount: 0, | ||
rows: [], | ||
}); | ||
resolve(streamEndResultRow); | ||
}); | ||
transformedStream.on('error', (error) => { | ||
transformStream.on('error', (error) => { | ||
reject(error); | ||
queryStream.destroy(error); | ||
}); | ||
transformedStream.once('readable', () => { | ||
streamHandler(transformedStream); | ||
queryStream.on('error', (error) => { | ||
transformStream.destroy(error); | ||
}); | ||
streamHandler(transformStream); | ||
}); | ||
@@ -88,0 +79,0 @@ }); |
@@ -6,5 +6,4 @@ "use strict"; | ||
const errors_1 = require("../errors"); | ||
const establishConnection_1 = require("../routines/establishConnection"); | ||
const state_1 = require("../state"); | ||
const createUid_1 = require("../utilities/createUid"); | ||
const serialize_error_1 = require("serialize-error"); | ||
const terminatePoolConnection = (connection) => { | ||
@@ -35,40 +34,2 @@ // tells the pool to destroy this client | ||
}; | ||
const establishConnection = async (parentLog, pool, connectionRetryLimit) => { | ||
const poolState = (0, state_1.getPoolState)(pool); | ||
let connection; | ||
let remainingConnectionRetryLimit = connectionRetryLimit; | ||
// eslint-disable-next-line no-constant-condition | ||
while (true) { | ||
remainingConnectionRetryLimit--; | ||
try { | ||
connection = await pool.connect(); | ||
state_1.poolClientStateMap.set(connection, { | ||
connectionId: (0, createUid_1.createUid)(), | ||
mock: poolState.mock, | ||
poolId: poolState.poolId, | ||
terminated: null, | ||
transactionDepth: null, | ||
transactionId: null, | ||
}); | ||
break; | ||
} | ||
catch (error) { | ||
parentLog.error({ | ||
error: (0, serialize_error_1.serializeError)(error), | ||
remainingConnectionRetryLimit, | ||
}, 'cannot establish connection'); | ||
if (remainingConnectionRetryLimit > 1) { | ||
parentLog.info('retrying connection'); | ||
continue; | ||
} | ||
else { | ||
throw new errors_1.ConnectionError(error.message); | ||
} | ||
} | ||
} | ||
if (!connection) { | ||
throw new errors_1.UnexpectedStateError('Connection handle is not present.'); | ||
} | ||
return connection; | ||
}; | ||
const createConnection = async (parentLog, pool, clientConfiguration, connectionType, connectionHandler, poolHandler, query = null) => { | ||
@@ -91,3 +52,3 @@ const { ended, poolId } = (0, state_1.getPoolState)(pool); | ||
} | ||
const connection = await establishConnection(parentLog, pool, clientConfiguration.connectionRetryLimit); | ||
const connection = await (0, establishConnection_1.establishConnection)(parentLog, pool, clientConfiguration.connectionRetryLimit); | ||
const { connectionId } = (0, state_1.getPoolClientState)(connection); | ||
@@ -94,0 +55,0 @@ const connectionLog = parentLog.child({ |
@@ -8,7 +8,6 @@ "use strict"; | ||
const state_1 = require("../state"); | ||
const createUid_1 = require("../utilities/createUid"); | ||
const createClientConfiguration_1 = require("./createClientConfiguration"); | ||
const createInternalPool_1 = require("./createInternalPool"); | ||
const createPoolConfiguration_1 = require("./createPoolConfiguration"); | ||
const pg_1 = require("pg"); | ||
const serialize_error_1 = require("serialize-error"); | ||
/** | ||
@@ -19,6 +18,2 @@ * @param connectionUri PostgreSQL [Connection URI](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING). | ||
const clientConfiguration = (0, createClientConfiguration_1.createClientConfiguration)(clientConfigurationInput); | ||
const poolId = (0, createUid_1.createUid)(); | ||
const poolLog = Logger_1.Logger.child({ | ||
poolId, | ||
}); | ||
const poolConfiguration = (0, createPoolConfiguration_1.createPoolConfiguration)(connectionUri, clientConfiguration); | ||
@@ -32,20 +27,13 @@ let Pool = clientConfiguration.PgPool; | ||
} | ||
const setupClient = new pg_1.Client({ | ||
connectionTimeoutMillis: poolConfiguration.connectionTimeoutMillis, | ||
database: poolConfiguration.database, | ||
host: poolConfiguration.host, | ||
password: poolConfiguration.password, | ||
port: poolConfiguration.port, | ||
ssl: poolConfiguration.ssl, | ||
user: poolConfiguration.user, | ||
}); | ||
const setupPool = (0, createInternalPool_1.createInternalPool)(Pool, poolConfiguration); | ||
let getTypeParser; | ||
try { | ||
await setupClient.connect(); | ||
getTypeParser = await (0, createTypeOverrides_1.createTypeOverrides)(setupClient, clientConfiguration.typeParsers); | ||
const connection = await setupPool.connect(); | ||
getTypeParser = await (0, createTypeOverrides_1.createTypeOverrides)(connection, clientConfiguration.typeParsers); | ||
await connection.release(); | ||
} | ||
finally { | ||
await setupClient.end(); | ||
await setupPool.end(); | ||
} | ||
const pool = new Pool({ | ||
const pool = (0, createInternalPool_1.createInternalPool)(Pool, { | ||
...poolConfiguration, | ||
@@ -56,60 +44,7 @@ types: { | ||
}); | ||
// https://github.com/gajus/slonik/issues/471 | ||
pool.on('error', (error) => { | ||
poolLog.error({ | ||
error: (0, serialize_error_1.serializeError)(error), | ||
}, 'client error'); | ||
}); | ||
state_1.poolStateMap.set(pool, { | ||
ended: false, | ||
mock: false, | ||
poolId, | ||
typeOverrides: null, | ||
}); | ||
// istanbul ignore next | ||
pool.on('connect', (client) => { | ||
client.on('error', (error) => { | ||
poolLog.error({ | ||
error: (0, serialize_error_1.serializeError)(error), | ||
}, 'client error'); | ||
}); | ||
client.on('notice', (notice) => { | ||
poolLog.info({ | ||
notice: { | ||
level: notice.name, | ||
message: notice.message, | ||
}, | ||
}, 'notice message'); | ||
}); | ||
poolLog.debug({ | ||
stats: { | ||
idleConnectionCount: pool.idleCount, | ||
totalConnectionCount: pool.totalCount, | ||
waitingRequestCount: pool.waitingCount, | ||
}, | ||
}, 'created a new client connection'); | ||
}); | ||
// istanbul ignore next | ||
pool.on('acquire', () => { | ||
poolLog.debug({ | ||
stats: { | ||
idleConnectionCount: pool.idleCount, | ||
totalConnectionCount: pool.totalCount, | ||
waitingRequestCount: pool.waitingCount, | ||
}, | ||
}, 'client is checked out from the pool'); | ||
}); | ||
// istanbul ignore next | ||
pool.on('remove', () => { | ||
poolLog.debug({ | ||
stats: { | ||
idleConnectionCount: pool.idleCount, | ||
totalConnectionCount: pool.totalCount, | ||
waitingRequestCount: pool.waitingCount, | ||
}, | ||
}, 'client connection is closed and removed from the client pool'); | ||
}); | ||
return (0, bindPool_1.bindPool)(poolLog, pool, clientConfiguration); | ||
return (0, bindPool_1.bindPool)(Logger_1.Logger.child({ | ||
poolId: (0, state_1.getPoolState)(pool).poolId, | ||
}), pool, clientConfiguration); | ||
}; | ||
exports.createPool = createPool; | ||
//# sourceMappingURL=createPool.js.map |
@@ -41,3 +41,5 @@ "use strict"; | ||
rawSql += sqlFragment.sql; | ||
parameterValues.push(...sqlFragment.values); | ||
for (const value of sqlFragment.values) { | ||
parameterValues.push(value); | ||
} | ||
} | ||
@@ -44,0 +46,0 @@ else { |
import { type TypeParser } from '../types'; | ||
import { type Client as PgClient } from 'pg'; | ||
export declare const createTypeOverrides: (pool: PgClient, typeParsers: readonly TypeParser[]) => Promise<(oid: number) => any>; | ||
import { type PoolClient } from 'pg'; | ||
export declare const createTypeOverrides: (connection: PoolClient, typeParsers: readonly TypeParser[]) => Promise<(oid: number) => any>; | ||
//# sourceMappingURL=createTypeOverrides.d.ts.map |
@@ -6,7 +6,7 @@ "use strict"; | ||
const postgres_array_1 = require("postgres-array"); | ||
const createTypeOverrides = async (pool, typeParsers) => { | ||
const createTypeOverrides = async (connection, typeParsers) => { | ||
const typeNames = typeParsers.map((typeParser) => { | ||
return typeParser.name; | ||
}); | ||
const postgresTypes = (await pool.query('SELECT oid, typarray, typname FROM pg_type WHERE typname = ANY($1::text[])', [typeNames])).rows; | ||
const postgresTypes = (await connection.query('SELECT oid, typarray, typname FROM pg_type WHERE typname = ANY($1::text[])', [typeNames])).rows; | ||
const parsers = {}; | ||
@@ -13,0 +13,0 @@ for (const typeParser of typeParsers) { |
import { type TypeOverrides } from './types'; | ||
import { type DeferredPromise } from './utilities/defer'; | ||
import { type Pool as PgPool, type PoolClient as PgClientPool } from 'pg'; | ||
type PoolState = { | ||
export type PoolState = { | ||
ended: boolean; | ||
@@ -6,0 +6,0 @@ mock: boolean; |
@@ -34,3 +34,7 @@ /// <reference types="node" /> | ||
export type MaybePromise<T> = Promise<T> | T; | ||
type StreamHandler = (stream: Readable) => void; | ||
interface TypedReadable<T> extends Readable { | ||
on(event: 'data', listener: (chunk: T) => void): this; | ||
on(event: string | symbol, listener: (...args: any[]) => void): this; | ||
} | ||
type StreamHandler<T> = (stream: TypedReadable<T>) => void; | ||
export type Connection = 'EXPLICIT' | 'IMPLICIT_QUERY' | 'IMPLICIT_TRANSACTION'; | ||
@@ -106,3 +110,3 @@ export type Field = { | ||
}; | ||
type StreamFunction = (sql: QuerySqlToken, streamHandler: StreamHandler, config?: QueryStreamConfig) => Promise<Record<string, unknown> | null>; | ||
type StreamFunction = <T extends ZodTypeAny>(sql: QuerySqlToken<T>, streamHandler: StreamHandler<z.infer<T>>, config?: QueryStreamConfig) => Promise<Record<string, unknown> | null>; | ||
export type CommonQueryMethods = { | ||
@@ -294,3 +298,3 @@ readonly any: QueryAnyFunction; | ||
export type InternalQueryMethod<R = any> = (log: Logger, connection: PgPoolClient, clientConfiguration: ClientConfiguration, slonikSql: QuerySqlToken, uid?: QueryId) => R; | ||
export type InternalStreamFunction = (log: Logger, connection: PgPoolClient, clientConfiguration: ClientConfiguration, slonikSql: QuerySqlToken, streamHandler: StreamHandler, uid?: QueryId, config?: QueryStreamConfig) => Promise<Record<string, unknown>>; | ||
export type InternalStreamFunction = <T>(log: Logger, connection: PgPoolClient, clientConfiguration: ClientConfiguration, slonikSql: QuerySqlToken, streamHandler: StreamHandler<T>, uid?: QueryId, config?: QueryStreamConfig) => Promise<Record<string, unknown>>; | ||
export type InternalTransactionFunction = <T>(log: Logger, connection: PgPoolClient, clientConfiguration: ClientConfiguration, handler: TransactionFunction<T>, transactionRetryLimit?: number) => Promise<T>; | ||
@@ -297,0 +301,0 @@ export type InternalNestedTransactionFunction = <T>(log: Logger, connection: PgPoolClient, clientConfiguration: ClientConfiguration, handler: TransactionFunction<T>, transactionDepth: number, transactionRetryLimit?: number) => Promise<T>; |
@@ -13,4 +13,4 @@ { | ||
"pg": "^8.11.3", | ||
"pg-cursor": "^2.10.0", | ||
"pg-protocol": "^1.6.0", | ||
"pg-query-stream": "^4.5.3", | ||
"pg-types": "^4.0.1", | ||
@@ -21,4 +21,3 @@ "postgres-array": "^3.0.2", | ||
"safe-stable-stringify": "^2.4.3", | ||
"serialize-error": "^8.0.0", | ||
"through2": "^4.0.2" | ||
"serialize-error": "^8.0.0" | ||
}, | ||
@@ -31,3 +30,2 @@ "description": "A Node.js PostgreSQL client with strict types, detailed logging and assertions.", | ||
"@types/sinon": "^10.0.15", | ||
"@types/through2": "^2.0.38", | ||
"ava": "^5.3.0", | ||
@@ -105,3 +103,3 @@ "cspell": "^7.3.6", | ||
"types": "./dist/index.d.ts", | ||
"version": "35.0.3" | ||
"version": "35.1.0" | ||
} |
@@ -1,6 +0,5 @@ | ||
import { QueryStream } from '../QueryStream'; | ||
import { executeQuery } from '../routines/executeQuery'; | ||
import { type Interceptor, type InternalStreamFunction } from '../types'; | ||
import { type Readable } from 'node:stream'; | ||
import * as through from 'through2'; | ||
import { type Readable, Transform } from 'node:stream'; | ||
import QueryStream from 'pg-query-stream'; | ||
@@ -29,6 +28,33 @@ export const stream: InternalStreamFunction = async ( | ||
) => { | ||
const query = new QueryStream(finalSql, finalValues, options); | ||
const streamEndResultRow = { | ||
command: 'SELECT', | ||
fields: [], | ||
notices: [], | ||
rowCount: 0, | ||
rows: [], | ||
} as const; | ||
const query = new QueryStream( | ||
finalSql, | ||
finalValues as unknown[], | ||
options, | ||
); | ||
const queryStream: Readable = finalConnection.query(query); | ||
let fields: Array<{ | ||
dataTypeId: number; | ||
name: string; | ||
}> = []; | ||
// @ts-expect-error – https://github.com/brianc/node-postgres/issues/3015 | ||
finalConnection.connection.once('rowDescription', (rowDescription) => { | ||
fields = rowDescription.fields.map((field) => { | ||
return { | ||
dataTypeId: field.dataTypeID, | ||
name: field.name, | ||
}; | ||
}); | ||
}); | ||
const rowTransformers: Array<NonNullable<Interceptor['transformRow']>> = | ||
@@ -43,11 +69,8 @@ []; | ||
return await new Promise((resolve, reject) => { | ||
queryStream.on('error', (error: Error) => { | ||
reject(error); | ||
}); | ||
return new Promise((resolve, reject) => { | ||
const transformStream = new Transform({ | ||
objectMode: true, | ||
transform(datum, enc, callback) { | ||
let finalRow = datum; | ||
const transformedStream = queryStream.pipe( | ||
through.obj(function (datum, enc, callback) { | ||
let finalRow = datum.row; | ||
if (rowTransformers.length) { | ||
@@ -59,3 +82,3 @@ for (const rowTransformer of rowTransformers) { | ||
finalRow, | ||
datum.fields, | ||
fields, | ||
); | ||
@@ -67,3 +90,3 @@ } | ||
this.push({ | ||
fields: datum.fields, | ||
fields, | ||
row: finalRow, | ||
@@ -73,17 +96,16 @@ }); | ||
callback(); | ||
}), | ||
); | ||
}, | ||
}); | ||
transformedStream.on('end', () => { | ||
resolve({ | ||
command: 'SELECT', | ||
fields: [], | ||
notices: [], | ||
rowCount: 0, | ||
rows: [], | ||
}); | ||
transformStream.on('newListener', (event) => { | ||
if (event === 'data') { | ||
queryStream.pipe(transformStream); | ||
} | ||
}); | ||
// Invoked if stream is destroyed using transformedStream.destroy(). | ||
transformedStream.on('close', () => { | ||
transformStream.on('end', () => { | ||
resolve(streamEndResultRow); | ||
}); | ||
transformStream.on('close', () => { | ||
if (!queryStream.destroyed) { | ||
@@ -93,18 +115,16 @@ queryStream.destroy(); | ||
resolve({ | ||
command: 'SELECT', | ||
fields: [], | ||
notices: [], | ||
rowCount: 0, | ||
rows: [], | ||
}); | ||
resolve(streamEndResultRow); | ||
}); | ||
transformedStream.on('error', (error: Error) => { | ||
transformStream.on('error', (error: Error) => { | ||
reject(error); | ||
queryStream.destroy(error); | ||
}); | ||
transformedStream.once('readable', () => { | ||
streamHandler(transformedStream); | ||
queryStream.on('error', (error: Error) => { | ||
transformStream.destroy(error); | ||
}); | ||
streamHandler(transformStream); | ||
}); | ||
@@ -111,0 +131,0 @@ }, |
import { bindPoolConnection } from '../binders/bindPoolConnection'; | ||
import { ConnectionError, UnexpectedStateError } from '../errors'; | ||
import { getPoolClientState, getPoolState, poolClientStateMap } from '../state'; | ||
import { UnexpectedStateError } from '../errors'; | ||
import { establishConnection } from '../routines/establishConnection'; | ||
import { getPoolClientState, getPoolState } from '../state'; | ||
import { | ||
@@ -13,5 +14,3 @@ type ClientConfiguration, | ||
} from '../types'; | ||
import { createUid } from '../utilities/createUid'; | ||
import { type Pool as PgPool, type PoolClient as PgPoolClient } from 'pg'; | ||
import { serializeError } from 'serialize-error'; | ||
@@ -55,56 +54,2 @@ type ConnectionHandlerType = ( | ||
const establishConnection = async ( | ||
parentLog: Logger, | ||
pool: PgPool, | ||
connectionRetryLimit: number, | ||
) => { | ||
const poolState = getPoolState(pool); | ||
let connection: PgPoolClient; | ||
let remainingConnectionRetryLimit = connectionRetryLimit; | ||
// eslint-disable-next-line no-constant-condition | ||
while (true) { | ||
remainingConnectionRetryLimit--; | ||
try { | ||
connection = await pool.connect(); | ||
poolClientStateMap.set(connection, { | ||
connectionId: createUid(), | ||
mock: poolState.mock, | ||
poolId: poolState.poolId, | ||
terminated: null, | ||
transactionDepth: null, | ||
transactionId: null, | ||
}); | ||
break; | ||
} catch (error) { | ||
parentLog.error( | ||
{ | ||
error: serializeError(error), | ||
remainingConnectionRetryLimit, | ||
}, | ||
'cannot establish connection', | ||
); | ||
if (remainingConnectionRetryLimit > 1) { | ||
parentLog.info('retrying connection'); | ||
continue; | ||
} else { | ||
throw new ConnectionError(error.message); | ||
} | ||
} | ||
} | ||
if (!connection) { | ||
throw new UnexpectedStateError('Connection handle is not present.'); | ||
} | ||
return connection; | ||
}; | ||
export const createConnection = async ( | ||
@@ -111,0 +56,0 @@ parentLog: Logger, |
import { bindPool } from '../binders/bindPool'; | ||
import { Logger } from '../Logger'; | ||
import { createTypeOverrides } from '../routines/createTypeOverrides'; | ||
import { poolStateMap } from '../state'; | ||
import { getPoolState } from '../state'; | ||
import { type ClientConfigurationInput, type DatabasePool } from '../types'; | ||
import { createUid } from '../utilities/createUid'; | ||
import { createClientConfiguration } from './createClientConfiguration'; | ||
import { createInternalPool } from './createInternalPool'; | ||
import { createPoolConfiguration } from './createPoolConfiguration'; | ||
import { Client as PgClient, Pool as PgPool } from 'pg'; | ||
import { Pool as PgPool } from 'pg'; | ||
import type pgTypes from 'pg-types'; | ||
import { serializeError } from 'serialize-error'; | ||
@@ -24,8 +23,2 @@ /** | ||
const poolId = createUid(); | ||
const poolLog = Logger.child({ | ||
poolId, | ||
}); | ||
const poolConfiguration = createPoolConfiguration( | ||
@@ -46,11 +39,3 @@ connectionUri, | ||
const setupClient = new PgClient({ | ||
connectionTimeoutMillis: poolConfiguration.connectionTimeoutMillis, | ||
database: poolConfiguration.database, | ||
host: poolConfiguration.host, | ||
password: poolConfiguration.password, | ||
port: poolConfiguration.port, | ||
ssl: poolConfiguration.ssl, | ||
user: poolConfiguration.user, | ||
}); | ||
const setupPool = createInternalPool(Pool, poolConfiguration); | ||
@@ -60,13 +45,15 @@ let getTypeParser: typeof pgTypes.getTypeParser; | ||
try { | ||
await setupClient.connect(); | ||
const connection = await setupPool.connect(); | ||
getTypeParser = await createTypeOverrides( | ||
setupClient, | ||
connection, | ||
clientConfiguration.typeParsers, | ||
); | ||
await connection.release(); | ||
} finally { | ||
await setupClient.end(); | ||
await setupPool.end(); | ||
} | ||
const pool: PgPool = new Pool({ | ||
const pool = createInternalPool(Pool, { | ||
...poolConfiguration, | ||
@@ -78,83 +65,9 @@ types: { | ||
// https://github.com/gajus/slonik/issues/471 | ||
pool.on('error', (error) => { | ||
poolLog.error( | ||
{ | ||
error: serializeError(error), | ||
}, | ||
'client error', | ||
); | ||
}); | ||
poolStateMap.set(pool, { | ||
ended: false, | ||
mock: false, | ||
poolId, | ||
typeOverrides: null, | ||
}); | ||
// istanbul ignore next | ||
pool.on('connect', (client) => { | ||
client.on('error', (error) => { | ||
poolLog.error( | ||
{ | ||
error: serializeError(error), | ||
}, | ||
'client error', | ||
); | ||
}); | ||
client.on('notice', (notice) => { | ||
poolLog.info( | ||
{ | ||
notice: { | ||
level: notice.name, | ||
message: notice.message, | ||
}, | ||
}, | ||
'notice message', | ||
); | ||
}); | ||
poolLog.debug( | ||
{ | ||
stats: { | ||
idleConnectionCount: pool.idleCount, | ||
totalConnectionCount: pool.totalCount, | ||
waitingRequestCount: pool.waitingCount, | ||
}, | ||
}, | ||
'created a new client connection', | ||
); | ||
}); | ||
// istanbul ignore next | ||
pool.on('acquire', () => { | ||
poolLog.debug( | ||
{ | ||
stats: { | ||
idleConnectionCount: pool.idleCount, | ||
totalConnectionCount: pool.totalCount, | ||
waitingRequestCount: pool.waitingCount, | ||
}, | ||
}, | ||
'client is checked out from the pool', | ||
); | ||
}); | ||
// istanbul ignore next | ||
pool.on('remove', () => { | ||
poolLog.debug( | ||
{ | ||
stats: { | ||
idleConnectionCount: pool.idleCount, | ||
totalConnectionCount: pool.totalCount, | ||
waitingRequestCount: pool.waitingCount, | ||
}, | ||
}, | ||
'client connection is closed and removed from the client pool', | ||
); | ||
}); | ||
return bindPool(poolLog, pool, clientConfiguration); | ||
return bindPool( | ||
Logger.child({ | ||
poolId: getPoolState(pool).poolId, | ||
}), | ||
pool, | ||
clientConfiguration, | ||
); | ||
}; |
@@ -90,3 +90,6 @@ import { InvalidInputError } from '../errors'; | ||
rawSql += sqlFragment.sql; | ||
parameterValues.push(...sqlFragment.values); | ||
for (const value of sqlFragment.values) { | ||
parameterValues.push(value); | ||
} | ||
} else { | ||
@@ -93,0 +96,0 @@ log.error( |
import { type TypeParser } from '../types'; | ||
import { type Client as PgClient } from 'pg'; | ||
import { type PoolClient } from 'pg'; | ||
import { getTypeParser } from 'pg-types'; | ||
@@ -13,3 +13,3 @@ import { parse as parseArray } from 'postgres-array'; | ||
export const createTypeOverrides = async ( | ||
pool: PgClient, | ||
connection: PoolClient, | ||
typeParsers: readonly TypeParser[], | ||
@@ -22,3 +22,3 @@ ) => { | ||
const postgresTypes: PostgresType[] = ( | ||
await pool.query( | ||
await connection.query( | ||
'SELECT oid, typarray, typname FROM pg_type WHERE typname = ANY($1::text[])', | ||
@@ -25,0 +25,0 @@ [typeNames], |
@@ -6,3 +6,3 @@ import { UnexpectedStateError } from './errors'; | ||
type PoolState = { | ||
export type PoolState = { | ||
ended: boolean; | ||
@@ -9,0 +9,0 @@ mock: boolean; |
@@ -60,4 +60,12 @@ import { type SlonikError } from './errors'; | ||
type StreamHandler = (stream: Readable) => void; | ||
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions | ||
interface TypedReadable<T> extends Readable { | ||
// eslint-disable-next-line @typescript-eslint/method-signature-style | ||
on(event: 'data', listener: (chunk: T) => void): this; | ||
// eslint-disable-next-line @typescript-eslint/method-signature-style | ||
on(event: string | symbol, listener: (...args: any[]) => void): this; | ||
} | ||
type StreamHandler<T> = (stream: TypedReadable<T>) => void; | ||
export type Connection = 'EXPLICIT' | 'IMPLICIT_QUERY' | 'IMPLICIT_TRANSACTION'; | ||
@@ -137,5 +145,5 @@ | ||
type StreamFunction = ( | ||
sql: QuerySqlToken, | ||
streamHandler: StreamHandler, | ||
type StreamFunction = <T extends ZodTypeAny>( | ||
sql: QuerySqlToken<T>, | ||
streamHandler: StreamHandler<z.infer<T>>, | ||
config?: QueryStreamConfig, | ||
@@ -430,3 +438,3 @@ ) => Promise<Record<string, unknown> | null>; | ||
export type InternalStreamFunction = ( | ||
export type InternalStreamFunction = <T>( | ||
log: Logger, | ||
@@ -436,3 +444,3 @@ connection: PgPoolClient, | ||
slonikSql: QuerySqlToken, | ||
streamHandler: StreamHandler, | ||
streamHandler: StreamHandler<T>, | ||
uid?: QueryId, | ||
@@ -439,0 +447,0 @@ config?: QueryStreamConfig, |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14
22
344
507524
7389
+ Addedpg-query-stream@^4.5.3
+ Addedpg-query-stream@4.7.1(transitive)
- Removedpg-cursor@^2.10.0
- Removedthrough2@^4.0.2
- Removedinherits@2.0.4(transitive)
- Removedreadable-stream@3.6.2(transitive)
- Removedsafe-buffer@5.2.1(transitive)
- Removedstring_decoder@1.3.0(transitive)
- Removedthrough2@4.0.2(transitive)
- Removedutil-deprecate@1.0.2(transitive)