Comparing version 40.2.5 to 40.2.6
@@ -44,48 +44,55 @@ "use strict"; | ||
}; | ||
const acquire = async () => { | ||
if (isEnding) { | ||
throw new Error('Connection pool is being terminated.'); | ||
} | ||
if (isEnded) { | ||
throw new Error('Connection pool has ended.'); | ||
} | ||
const idleConnection = connections.find((connection) => connection.state() === 'IDLE'); | ||
if (idleConnection) { | ||
idleConnection.acquire(); | ||
return idleConnection; | ||
} | ||
if (pendingConnections.length + connections.length < poolSize) { | ||
const pendingConnection = driver.createClient(); | ||
pendingConnections.push(pendingConnection); | ||
const connection = await pendingConnection; | ||
const onRelease = () => { | ||
const waitingClient = waitingClients.shift(); | ||
if (!waitingClient) { | ||
return; | ||
} | ||
if (connection.state() !== 'IDLE') { | ||
throw new Error('Connection is not idle.'); | ||
} | ||
connection.acquire(); | ||
waitingClient.resolve(connection); | ||
}; | ||
connection.on('release', onRelease); | ||
const onDestroy = () => { | ||
connection.removeListener('release', onRelease); | ||
connection.removeListener('destroy', onDestroy); | ||
connections.splice(connections.indexOf(connection), 1); | ||
const waitingClient = waitingClients.shift(); | ||
if (!waitingClient) { | ||
return; | ||
} | ||
// eslint-disable-next-line promise/prefer-await-to-then | ||
acquire().then(waitingClient.resolve, waitingClient.reject); | ||
}; | ||
connection.on('destroy', onDestroy); | ||
connection.acquire(); | ||
connections.push(connection); | ||
pendingConnections.splice(pendingConnections.indexOf(pendingConnection), 1); | ||
return connection; | ||
} | ||
else { | ||
const waitingClient = (0, defer_1.defer)(); | ||
waitingClients.push(waitingClient); | ||
return waitingClient.promise; | ||
} | ||
}; | ||
return { | ||
acquire: async () => { | ||
if (isEnding) { | ||
throw new Error('Connection pool is being terminated.'); | ||
} | ||
if (isEnded) { | ||
throw new Error('Connection pool has ended.'); | ||
} | ||
const idleConnection = connections.find((connection) => connection.state() === 'IDLE'); | ||
if (idleConnection) { | ||
idleConnection.acquire(); | ||
return idleConnection; | ||
} | ||
if (pendingConnections.length + connections.length < poolSize) { | ||
const pendingConnection = driver.createClient(); | ||
pendingConnections.push(pendingConnection); | ||
const connection = await pendingConnection; | ||
const onRelease = () => { | ||
const waitingClient = waitingClients.shift(); | ||
if (!waitingClient) { | ||
return; | ||
} | ||
if (connection.state() !== 'IDLE') { | ||
throw new Error('Connection is not idle.'); | ||
} | ||
connection.acquire(); | ||
waitingClient.resolve(connection); | ||
}; | ||
connection.on('release', onRelease); | ||
const onDestroy = () => { | ||
connection.removeListener('release', onRelease); | ||
connection.removeListener('destroy', onDestroy); | ||
connections.splice(connections.indexOf(connection), 1); | ||
}; | ||
connection.on('destroy', onDestroy); | ||
connection.acquire(); | ||
connections.push(connection); | ||
pendingConnections.splice(pendingConnections.indexOf(pendingConnection), 1); | ||
return connection; | ||
} | ||
else { | ||
const waitingClient = (0, defer_1.defer)(); | ||
waitingClients.push(waitingClient); | ||
return waitingClient.promise; | ||
} | ||
}, | ||
acquire, | ||
end: async () => { | ||
@@ -92,0 +99,0 @@ isEnding = true; |
@@ -1243,3 +1243,3 @@ "use strict"; | ||
}); | ||
test('does not re-use connection if there was an error', async (t) => { | ||
test('does not re-use connection if there was an unhandled error', async (t) => { | ||
const pool = await (0, __1.createPool)(t.context.dsn, { | ||
@@ -1260,2 +1260,29 @@ driverFactory, | ||
}); | ||
test('queued connection gets a new connection in case a blocking connection produced an error', async (t) => { | ||
const pool = await (0, __1.createPool)(t.context.dsn, { | ||
driverFactory, | ||
maximumPoolSize: 1, | ||
}); | ||
const firstConnectionPid = await pool.oneFirst(__1.sql.unsafe ` | ||
SELECT pg_backend_pid(); | ||
`); | ||
await Promise.allSettled([ | ||
// This query will eventually produce an error. | ||
pool.query(__1.sql.unsafe ` | ||
SELECT 1 / 0 | ||
`), | ||
// This query will queue to use the same connection | ||
// that the previous query is using. | ||
// | ||
// Earlier implementation had a race condition where because the first query errored, | ||
// the second query would not get a connection and would remain in the queue indefinitely. | ||
pool.query(__1.sql.unsafe ` | ||
SELECT 1 | ||
`), | ||
]); | ||
const secondConnectionPid = await pool.oneFirst(__1.sql.unsafe ` | ||
SELECT pg_backend_pid(); | ||
`); | ||
t.not(firstConnectionPid, secondConnectionPid); | ||
}); | ||
test('does not re-use transaction connection if there was an error', async (t) => { | ||
@@ -1262,0 +1289,0 @@ const pool = await (0, __1.createPool)(t.context.dsn, { |
@@ -95,3 +95,3 @@ { | ||
"types": "./dist/index.d.ts", | ||
"version": "40.2.5" | ||
"version": "40.2.6" | ||
} |
@@ -111,74 +111,85 @@ import { Logger } from '../Logger'; | ||
return { | ||
acquire: async () => { | ||
if (isEnding) { | ||
throw new Error('Connection pool is being terminated.'); | ||
} | ||
const acquire = async () => { | ||
if (isEnding) { | ||
throw new Error('Connection pool is being terminated.'); | ||
} | ||
if (isEnded) { | ||
throw new Error('Connection pool has ended.'); | ||
} | ||
if (isEnded) { | ||
throw new Error('Connection pool has ended.'); | ||
} | ||
const idleConnection = connections.find( | ||
(connection) => connection.state() === 'IDLE', | ||
); | ||
const idleConnection = connections.find( | ||
(connection) => connection.state() === 'IDLE', | ||
); | ||
if (idleConnection) { | ||
idleConnection.acquire(); | ||
if (idleConnection) { | ||
idleConnection.acquire(); | ||
return idleConnection; | ||
} | ||
return idleConnection; | ||
} | ||
if (pendingConnections.length + connections.length < poolSize) { | ||
const pendingConnection = driver.createClient(); | ||
if (pendingConnections.length + connections.length < poolSize) { | ||
const pendingConnection = driver.createClient(); | ||
pendingConnections.push(pendingConnection); | ||
pendingConnections.push(pendingConnection); | ||
const connection = await pendingConnection; | ||
const connection = await pendingConnection; | ||
const onRelease = () => { | ||
const waitingClient = waitingClients.shift(); | ||
const onRelease = () => { | ||
const waitingClient = waitingClients.shift(); | ||
if (!waitingClient) { | ||
return; | ||
} | ||
if (!waitingClient) { | ||
return; | ||
} | ||
if (connection.state() !== 'IDLE') { | ||
throw new Error('Connection is not idle.'); | ||
} | ||
if (connection.state() !== 'IDLE') { | ||
throw new Error('Connection is not idle.'); | ||
} | ||
connection.acquire(); | ||
connection.acquire(); | ||
waitingClient.resolve(connection); | ||
}; | ||
waitingClient.resolve(connection); | ||
}; | ||
connection.on('release', onRelease); | ||
connection.on('release', onRelease); | ||
const onDestroy = () => { | ||
connection.removeListener('release', onRelease); | ||
connection.removeListener('destroy', onDestroy); | ||
const onDestroy = () => { | ||
connection.removeListener('release', onRelease); | ||
connection.removeListener('destroy', onDestroy); | ||
connections.splice(connections.indexOf(connection), 1); | ||
}; | ||
connections.splice(connections.indexOf(connection), 1); | ||
connection.on('destroy', onDestroy); | ||
const waitingClient = waitingClients.shift(); | ||
connection.acquire(); | ||
if (!waitingClient) { | ||
return; | ||
} | ||
connections.push(connection); | ||
// eslint-disable-next-line promise/prefer-await-to-then | ||
acquire().then(waitingClient.resolve, waitingClient.reject); | ||
}; | ||
pendingConnections.splice( | ||
pendingConnections.indexOf(pendingConnection), | ||
1, | ||
); | ||
connection.on('destroy', onDestroy); | ||
return connection; | ||
} else { | ||
const waitingClient = defer<ConnectionPoolClient>(); | ||
connection.acquire(); | ||
waitingClients.push(waitingClient); | ||
connections.push(connection); | ||
return waitingClient.promise; | ||
} | ||
}, | ||
pendingConnections.splice( | ||
pendingConnections.indexOf(pendingConnection), | ||
1, | ||
); | ||
return connection; | ||
} else { | ||
const waitingClient = defer<ConnectionPoolClient>(); | ||
waitingClients.push(waitingClient); | ||
return waitingClient.promise; | ||
} | ||
}; | ||
return { | ||
acquire, | ||
end: async () => { | ||
@@ -185,0 +196,0 @@ isEnding = true; |
@@ -1640,3 +1640,3 @@ /* eslint-disable id-length */ | ||
test('does not re-use connection if there was an error', async (t) => { | ||
test('does not re-use connection if there was an unhandled error', async (t) => { | ||
const pool = await createPool(t.context.dsn, { | ||
@@ -1664,2 +1664,34 @@ driverFactory, | ||
test('queued connection gets a new connection in case a blocking connection produced an error', async (t) => { | ||
const pool = await createPool(t.context.dsn, { | ||
driverFactory, | ||
maximumPoolSize: 1, | ||
}); | ||
const firstConnectionPid = await pool.oneFirst(sql.unsafe` | ||
SELECT pg_backend_pid(); | ||
`); | ||
await Promise.allSettled([ | ||
// This query will eventually produce an error. | ||
pool.query(sql.unsafe` | ||
SELECT 1 / 0 | ||
`), | ||
// This query will queue to use the same connection | ||
// that the previous query is using. | ||
// | ||
// Earlier implementation had a race condition where because the first query errored, | ||
// the second query would not get a connection and would remain in the queue indefinitely. | ||
pool.query(sql.unsafe` | ||
SELECT 1 | ||
`), | ||
]); | ||
const secondConnectionPid = await pool.oneFirst(sql.unsafe` | ||
SELECT pg_backend_pid(); | ||
`); | ||
t.not(firstConnectionPid, secondConnectionPid); | ||
}); | ||
test('does not re-use transaction connection if there was an error', async (t) => { | ||
@@ -1666,0 +1698,0 @@ const pool = await createPool(t.context.dsn, { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1047937
18715