Skip to content

Commit

Permalink
fix: utilize all allowed connections (#638)
Browse files Browse the repository at this point in the history
* feat: utilize all allowed connections

* feat: utilize all allowed connections

* wip

* wip

* wip

* feat: utilize all allowed connections
  • Loading branch information
gajus authored Aug 24, 2024
1 parent 6b9d069 commit 270fa07
Showing 1 changed file with 46 additions and 38 deletions.
84 changes: 46 additions & 38 deletions packages/slonik/src/factories/createConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export const createConnectionPool = ({
}: {
driver: Driver;
idleTimeout?: number;
// TODO rename to `maxPoolSize`
poolSize?: number;
}): ConnectionPool => {
// See test "waits for all connections to be established before attempting to terminate the pool"
Expand Down Expand Up @@ -124,23 +125,14 @@ export const createConnectionPool = ({
throw new Error('Connection pool has ended.');
}

const idleConnection = connections.find(
(connection) => connection.state() === 'IDLE',
);

if (idleConnection) {
idleConnection.acquire();

return idleConnection;
}

if (pendingConnections.length + connections.length < poolSize) {
const addConnection = async () => {
const pendingConnection = driver.createClient();

pendingConnections.push(pendingConnection);

const connection = await pendingConnection.catch((error) => {
pendingConnections.pop();

throw error;
});

Expand Down Expand Up @@ -183,8 +175,6 @@ export const createConnectionPool = ({

connection.on('destroy', onDestroy);

connection.acquire();

connections.push(connection);

pendingConnections.splice(
Expand All @@ -193,38 +183,56 @@ export const createConnectionPool = ({
);

return connection;
} else {
const deferred = defer<ConnectionPoolClient>();
};

waitingClients.push({
deferred,
});
const idleConnection = connections.find(
(connection) => connection.state() === 'IDLE',
);

if (idleConnection) {
idleConnection.acquire();

return idleConnection;
}

if (pendingConnections.length + connections.length < poolSize) {
const newConnection = await addConnection();

newConnection.acquire();

return newConnection;
}

const deferred = defer<ConnectionPoolClient>();

const queuedAt = process.hrtime.bigint();
waitingClients.push({
deferred,
});

logger.warn(
const queuedAt = process.hrtime.bigint();

logger.warn(
{
connections: connections.length,
pendingConnections: pendingConnections.length,
poolSize,
waitingClients: waitingClients.length,
},
`connection pool full; client has been queued`,
);

// eslint-disable-next-line promise/prefer-await-to-then
return deferred.promise.then((connection) => {
logger.debug(
{
connections: connections.length,
pendingConnections: pendingConnections.length,
poolSize,
waitingClients: waitingClients.length,
connectionId: connection.id(),
duration: Number(process.hrtime.bigint() - queuedAt) / 1e6,
},
`connection pool full; client has been queued`,
'connection has been acquired from the queue',
);

// eslint-disable-next-line promise/prefer-await-to-then
return deferred.promise.then((connection) => {
logger.debug(
{
connectionId: connection.id(),
duration: Number(process.hrtime.bigint() - queuedAt) / 1e6,
},
'connection has been acquired from the queue',
);

return connection;
});
}
return connection;
});
};

return {
Expand Down

0 comments on commit 270fa07

Please sign in to comment.