Skip to content

Commit

Permalink
fix: pending connection handling (#661)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeroelens authored Jan 6, 2025
1 parent bdf1e87 commit a97891b
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .changeset/nice-cameras-march.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"slonik": minor
---

fix: connection pool edge cases that could lead to hanging connections or effectively incorrect pool sizing
63 changes: 43 additions & 20 deletions packages/slonik/src/factories/createConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export type ConnectionPoolClient = {
type ConnectionPoolState = {
acquiredConnections: number;
idleConnections: number;
pendingConnections: number;
pendingDestroyConnections: number;
pendingReleaseConnections: number;
state: ConnectionPoolStateName;
Expand Down Expand Up @@ -144,22 +145,30 @@ export const createConnectionPool = ({
pendingConnections.push(pendingConnection);

const connection = await pendingConnection.catch((error) => {
pendingConnections.pop();
const index = pendingConnections.indexOf(pendingConnection);

if (index === -1) {
logger.error(
'Unable to find pendingConnection in `pendingConnections` array to remove.',
);
} else {
pendingConnections.splice(index, 1);
}

throw error;
});

const onRelease = () => {
if (connection.state() !== 'IDLE') {
return;
}

const waitingClient = waitingClients.shift();

if (!waitingClient) {
return;
}

if (connection.state() !== 'IDLE') {
throw new Error('Connection is not idle.');
}

connection.acquire();

waitingClient.deferred.resolve(connection);
Expand All @@ -171,35 +180,48 @@ export const createConnectionPool = ({
connection.removeListener('release', onRelease);
connection.removeListener('destroy', onDestroy);

connections.splice(connections.indexOf(connection), 1);
const indexOfConnection = connections.indexOf(connection);

if (indexOfConnection === -1) {
logger.error(
'Unable to find connection in `connections` array to remove.',
);
} else {
connections.splice(indexOfConnection, 1);
}

const waitingClient = waitingClients.shift();

if (!isEnding && !isEnded && connections.length < minimumPoolSize) {
addConnection();
if (waitingClient) {
// eslint-disable-next-line promise/prefer-await-to-then
acquire().then(
waitingClient.deferred.resolve,
waitingClient.deferred.reject,
);

return;
}

if (!waitingClient) {
return;
// In the case that there are no waiting clients and we're below the minimum pool size, add a new connection
if (!isEnding && !isEnded && connections.length < minimumPoolSize) {
addConnection();
}

// eslint-disable-next-line promise/prefer-await-to-then
acquire().then(
waitingClient.deferred.resolve,
waitingClient.deferred.reject,
);
};

connection.on('destroy', onDestroy);

connections.push(connection);

pendingConnections.splice(
pendingConnections.indexOf(pendingConnection),
1,
);
const indexOfPendingConnection =
pendingConnections.indexOf(pendingConnection);

if (indexOfPendingConnection === -1) {
logger.error(
'Unable to find pendingConnection in `pendingConnections` array to remove.',
);
} else {
pendingConnections.splice(indexOfPendingConnection, 1);
}

return connection;
};
Expand Down Expand Up @@ -279,6 +301,7 @@ export const createConnectionPool = ({
const state = {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: pendingConnections.length,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
};
Expand Down
84 changes: 84 additions & 0 deletions packages/slonik/src/helpers.test/createIntegrationTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -851,6 +852,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ENDED',
Expand All @@ -867,6 +869,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -880,6 +883,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 1,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -891,6 +895,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ENDED',
Expand All @@ -906,6 +911,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -921,6 +927,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 1,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -932,6 +939,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ENDED',
Expand All @@ -951,6 +959,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand Down Expand Up @@ -978,6 +987,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 5,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -989,6 +999,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ENDED',
Expand Down Expand Up @@ -1442,6 +1453,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -1466,6 +1478,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 1,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand Down Expand Up @@ -2004,6 +2017,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -2021,6 +2035,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 0,
idleConnections: 1,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -2036,6 +2051,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand Down Expand Up @@ -2175,6 +2191,7 @@ export const createIntegrationTests = (
t.like(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
waitingClients: 0,
Expand All @@ -2193,6 +2210,7 @@ export const createIntegrationTests = (
t.like(pool.state(), {
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
waitingClients: 0,
Expand All @@ -2216,6 +2234,7 @@ export const createIntegrationTests = (
acquiredConnections: 0,
// TODO we might want to add an option to warm up the pool, in which case this value should be 1
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -2233,6 +2252,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 0,
idleConnections: 1,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -2248,6 +2268,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 0,
idleConnections: 1,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand All @@ -2259,6 +2280,67 @@ export const createIntegrationTests = (
await pool.end();
});

test('destroy creates a new connection to be used by waiting client', async (t) => {
const pool = await createPool(t.context.dsn, {
driverFactory,
idleTimeout: 30_000,
maximumPoolSize: 1,
minimumPoolSize: 1,
});

pool
.query(
sql.unsafe`
DO $$
BEGIN
PERFORM pg_sleep(1); -- Sleep for 1 second
RAISE EXCEPTION 'Test error after 1 second delay';
END $$;
`,
)
// eslint-disable-next-line promise/prefer-await-to-then
.catch(() => {
// Ignoring intentional error
});

const waitingClientPromise = pool.oneFirst(sql.unsafe`
SELECT 1
`);

t.deepEqual(
pool.state(),
{
acquiredConnections: 0,
idleConnections: 0,
pendingConnections: 1,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
waitingClients: 1,
},
'pool state has waiting client',
);

const waitingClientResult = await waitingClientPromise;
t.is(waitingClientResult, 1);

t.deepEqual(
pool.state(),
{
acquiredConnections: 0,
idleConnections: 1,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
waitingClients: 0,
},
'pool state after all queries complete',
);

await pool.end();
});

test('retains explicit transaction beyond the idle timeout', async (t) => {
const pool = await createPool(t.context.dsn, {
driverFactory,
Expand Down Expand Up @@ -2363,6 +2445,7 @@ export const createIntegrationTests = (
{
acquiredConnections: 0,
idleConnections: 1,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand Down Expand Up @@ -2408,6 +2491,7 @@ export const createIntegrationTests = (
t.deepEqual(pool.state(), {
acquiredConnections: 2,
idleConnections: 0,
pendingConnections: 0,
pendingDestroyConnections: 0,
pendingReleaseConnections: 0,
state: 'ACTIVE',
Expand Down
Loading

0 comments on commit a97891b

Please sign in to comment.