From 270fa0719bbc1ed8700bb37c38107b02f4443e39 Mon Sep 17 00:00:00 2001 From: Gajus Kuizinas Date: Sun, 25 Aug 2024 00:47:48 +0300 Subject: [PATCH] fix: utilize all allowed connections (#638) * feat: utilize all allowed connections * feat: utilize all allowed connections * wip * wip * wip * feat: utilize all allowed connections --- .../src/factories/createConnectionPool.ts | 84 ++++++++++--------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/packages/slonik/src/factories/createConnectionPool.ts b/packages/slonik/src/factories/createConnectionPool.ts index 03c9b360..b8e0b85c 100644 --- a/packages/slonik/src/factories/createConnectionPool.ts +++ b/packages/slonik/src/factories/createConnectionPool.ts @@ -62,6 +62,7 @@ export const createConnectionPool = ({ }: { driver: Driver; idleTimeout?: number; + // TODO rename to `maxPoolSize` poolSize?: number; }): ConnectionPool => { // See test "waits for all connections to be established before attempting to terminate the pool" @@ -124,23 +125,14 @@ export const createConnectionPool = ({ throw new Error('Connection pool has ended.'); } - const idleConnection = connections.find( - (connection) => connection.state() === 'IDLE', - ); - - if (idleConnection) { - idleConnection.acquire(); - - return idleConnection; - } - - if (pendingConnections.length + connections.length < poolSize) { + const addConnection = async () => { const pendingConnection = driver.createClient(); pendingConnections.push(pendingConnection); const connection = await pendingConnection.catch((error) => { pendingConnections.pop(); + throw error; }); @@ -183,8 +175,6 @@ export const createConnectionPool = ({ connection.on('destroy', onDestroy); - connection.acquire(); - connections.push(connection); pendingConnections.splice( @@ -193,38 +183,56 @@ export const createConnectionPool = ({ ); return connection; - } else { - const deferred = defer(); + }; - waitingClients.push({ - deferred, - }); + const idleConnection = connections.find( + (connection) => connection.state() === 'IDLE', + ); + + if (idleConnection) { + idleConnection.acquire(); + + return idleConnection; + } + + if (pendingConnections.length + connections.length < poolSize) { + const newConnection = await addConnection(); + + newConnection.acquire(); + + return newConnection; + } + + const deferred = defer(); - const queuedAt = process.hrtime.bigint(); + waitingClients.push({ + deferred, + }); - logger.warn( + const queuedAt = process.hrtime.bigint(); + + logger.warn( + { + connections: connections.length, + pendingConnections: pendingConnections.length, + poolSize, + waitingClients: waitingClients.length, + }, + `connection pool full; client has been queued`, + ); + + // eslint-disable-next-line promise/prefer-await-to-then + return deferred.promise.then((connection) => { + logger.debug( { - connections: connections.length, - pendingConnections: pendingConnections.length, - poolSize, - waitingClients: waitingClients.length, + connectionId: connection.id(), + duration: Number(process.hrtime.bigint() - queuedAt) / 1e6, }, - `connection pool full; client has been queued`, + 'connection has been acquired from the queue', ); - // eslint-disable-next-line promise/prefer-await-to-then - return deferred.promise.then((connection) => { - logger.debug( - { - connectionId: connection.id(), - duration: Number(process.hrtime.bigint() - queuedAt) / 1e6, - }, - 'connection has been acquired from the queue', - ); - - return connection; - }); - } + return connection; + }); }; return {