Skip to content

Commit cb59fb7

Browse files
committed
feat: rebalancing connection pooling
1 parent c7ea2da commit cb59fb7

File tree

16 files changed

+4118
-748
lines changed

16 files changed

+4118
-748
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE tenants ADD COLUMN IF NOT EXISTS database_pool_mode TEXT NULL;

package-lock.json

+3,733-609
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
"node": ">= 14.0.0"
2929
},
3030
"dependencies": {
31+
"@aws-sdk/client-ecs": "^3.795.0",
3132
"@aws-sdk/client-s3": "3.654.0",
3233
"@aws-sdk/lib-storage": "3.654.0",
3334
"@aws-sdk/s3-request-presigner": "3.654.0",

src/config.ts

+2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type StorageConfigType = {
8686
databaseURL: string
8787
databaseSSLRootCert?: string
8888
databasePoolURL?: string
89+
databasePoolMode?: 'single_use' | 'recycle'
8990
databaseMaxConnections: number
9091
databaseFreePoolAfterInactivity: number
9192
databaseConnectionTimeout: number
@@ -357,6 +358,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
357358
databaseSSLRootCert: getOptionalConfigFromEnv('DATABASE_SSL_ROOT_CERT'),
358359
databaseURL: getOptionalIfMultitenantConfigFromEnv('DATABASE_URL') || '',
359360
databasePoolURL: getOptionalConfigFromEnv('DATABASE_POOL_URL') || '',
361+
databasePoolMode: getOptionalConfigFromEnv('DATABASE_POOL_MODE'),
360362
databaseMaxConnections: parseInt(
361363
getOptionalConfigFromEnv('DATABASE_MAX_CONNECTIONS') || '20',
362364
10

src/http/routes/admin/tenants.ts

+9
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const patchSchema = {
2626
anonKey: { type: 'string' },
2727
databaseUrl: { type: 'string' },
2828
databasePoolUrl: { type: 'string', nullable: true },
29+
databasePoolMode: { type: 'string', nullable: true },
2930
maxConnections: { type: 'number' },
3031
jwks: { type: 'object', nullable: true },
3132
fileSizeLimit: { type: 'number' },
@@ -112,6 +113,7 @@ export default async function routes(fastify: FastifyInstance) {
112113
anon_key,
113114
database_url,
114115
database_pool_url,
116+
database_pool_mode,
115117
max_connections,
116118
file_size_limit,
117119
jwt_secret,
@@ -130,6 +132,7 @@ export default async function routes(fastify: FastifyInstance) {
130132
anonKey: decrypt(anon_key),
131133
databaseUrl: decrypt(database_url),
132134
databasePoolUrl: database_pool_url ? decrypt(database_pool_url) : undefined,
135+
databasePoolMode: database_pool_mode,
133136
maxConnections: max_connections ? Number(max_connections) : undefined,
134137
fileSizeLimit: Number(file_size_limit),
135138
jwtSecret: decrypt(jwt_secret),
@@ -164,6 +167,7 @@ export default async function routes(fastify: FastifyInstance) {
164167
anon_key,
165168
database_url,
166169
database_pool_url,
170+
database_pool_mode,
167171
max_connections,
168172
file_size_limit,
169173
jwt_secret,
@@ -188,6 +192,7 @@ export default async function routes(fastify: FastifyInstance) {
188192
: database_pool_url
189193
? decrypt(database_pool_url)
190194
: undefined,
195+
databasePoolMode: database_pool_mode,
191196
maxConnections: max_connections ? Number(max_connections) : undefined,
192197
fileSizeLimit: Number(file_size_limit),
193198
jwtSecret: decrypt(jwt_secret),
@@ -217,6 +222,7 @@ export default async function routes(fastify: FastifyInstance) {
217222
const {
218223
anonKey,
219224
databaseUrl,
225+
databasePoolMode,
220226
fileSizeLimit,
221227
jwtSecret,
222228
jwks,
@@ -233,6 +239,7 @@ export default async function routes(fastify: FastifyInstance) {
233239
anon_key: encrypt(anonKey),
234240
database_url: encrypt(databaseUrl),
235241
database_pool_url: databasePoolUrl ? encrypt(databasePoolUrl) : undefined,
242+
database_pool_mode: databasePoolMode,
236243
max_connections: maxConnections ? Number(maxConnections) : undefined,
237244
file_size_limit: fileSizeLimit,
238245
jwt_secret: encrypt(jwtSecret),
@@ -280,6 +287,7 @@ export default async function routes(fastify: FastifyInstance) {
280287
serviceKey,
281288
features,
282289
databasePoolUrl,
290+
databasePoolMode,
283291
maxConnections,
284292
tracingMode,
285293
disableEvents,
@@ -295,6 +303,7 @@ export default async function routes(fastify: FastifyInstance) {
295303
: databasePoolUrl === null
296304
? null
297305
: undefined,
306+
database_pool_mode: databasePoolMode,
298307
max_connections: maxConnections ? Number(maxConnections) : undefined,
299308
file_size_limit: fileSizeLimit,
300309
jwt_secret: jwtSecret !== undefined ? encrypt(jwtSecret) : undefined,

src/internal/cluster/discovery.ts

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { ClusterDiscoveryECS } from '@internal/cluster/ecs'
2+
3+
import { EventEmitter } from 'node:events'
4+
import { logger } from '@internal/monitoring'
5+
6+
const clusterEvent = new EventEmitter()
7+
8+
export class Cluster {
9+
static size: number = 0
10+
protected static watcher?: NodeJS.Timeout = undefined
11+
12+
static on(event: string, listener: (...args: any[]) => void) {
13+
clusterEvent.on(event, listener)
14+
}
15+
16+
static async init(abortSignal: AbortSignal) {
17+
if (process.env.CLUSTER_DISCOVERY === 'ECS') {
18+
const cluster = new ClusterDiscoveryECS()
19+
Cluster.size = await cluster.getClusterSize()
20+
21+
logger.info('[Cluster] Initial cluster size', {
22+
type: 'cluster',
23+
size: Cluster.size,
24+
})
25+
26+
Cluster.watcher = setInterval(() => {
27+
cluster
28+
.getClusterSize()
29+
.then((size) => {
30+
if (size === Cluster.size) {
31+
clusterEvent.emit('change', { size })
32+
}
33+
Cluster.size = size
34+
})
35+
.catch((e) => {
36+
console.error('Error getting cluster size', e)
37+
})
38+
}, 20 * 1000)
39+
40+
abortSignal.addEventListener(
41+
'abort',
42+
() => {
43+
if (Cluster.watcher) {
44+
clearInterval(Cluster.watcher)
45+
clusterEvent.removeAllListeners()
46+
Cluster.watcher = undefined
47+
}
48+
},
49+
{ once: true }
50+
)
51+
}
52+
}
53+
}

src/internal/cluster/ecs.ts

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { ECSClient, ListTasksCommand } from '@aws-sdk/client-ecs'
2+
import axios from 'axios'
3+
4+
export class ClusterDiscoveryECS {
5+
private client: ECSClient
6+
7+
constructor() {
8+
this.client = new ECSClient()
9+
}
10+
11+
async getClusterSize() {
12+
if (!process.env.ECS_CONTAINER_METADATA_URI) {
13+
throw new Error('ECS_CONTAINER_METADATA_URI is not set')
14+
}
15+
16+
const respMetadata = await axios.get(`${process.env.ECS_CONTAINER_METADATA_URI}/task`)
17+
18+
const command = new ListTasksCommand({
19+
serviceName: respMetadata.data.ServiceName,
20+
cluster: respMetadata.data.Cluster,
21+
desiredStatus: 'RUNNING',
22+
})
23+
const response = await this.client.send(command)
24+
return response.taskArns?.length || 0
25+
}
26+
}

src/internal/concurrency/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export * from './mutex'
2+
export * from './wait'
23
export * from './async-abort-controller'
34
export * from './merge-async-itertor'

src/internal/concurrency/wait.ts

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export function wait(ms: number) {
2+
return new Promise((resolve) => {
3+
setTimeout(resolve, ms)
4+
})
5+
}

src/internal/database/client.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { getConfig } from '../../config'
22
import { getTenantConfig } from './tenant'
3-
import { User, TenantConnection } from './connection'
3+
import { TenantConnection } from './connection'
4+
import { User } from './pool'
45
import { ERRORS } from '@internal/errors'
6+
import { Cluster } from '@internal/cluster/discovery'
57

68
interface ConnectionOptions {
79
host: string
@@ -21,17 +23,18 @@ interface ConnectionOptions {
2123
* @param options
2224
*/
2325
export async function getPostgresConnection(options: ConnectionOptions): Promise<TenantConnection> {
24-
const dbCredentials = await getDbCredentials(options.tenantId, options.host, {
26+
const dbCredentials = await getDbSettings(options.tenantId, options.host, {
2527
disableHostCheck: options.disableHostCheck,
2628
})
2729

2830
return await TenantConnection.create({
2931
...dbCredentials,
3032
...options,
33+
clusterSize: Cluster.size,
3134
})
3235
}
3336

34-
async function getDbCredentials(
37+
async function getDbSettings(
3538
tenantId: string,
3639
host: string | undefined,
3740
options?: { disableHostCheck?: boolean }
@@ -42,11 +45,13 @@ async function getDbCredentials(
4245
databaseURL,
4346
databaseMaxConnections,
4447
requestXForwardedHostRegExp,
48+
databasePoolMode,
4549
} = getConfig()
4650

4751
let dbUrl = databasePoolURL || databaseURL
4852
let maxConnections = databaseMaxConnections
4953
let isExternalPool = Boolean(databasePoolURL)
54+
let isSingleUse = !databasePoolMode || databasePoolMode === 'single_use'
5055

5156
if (isMultitenant) {
5257
if (!tenantId) {
@@ -70,11 +75,13 @@ async function getDbCredentials(
7075
dbUrl = tenant.databasePoolUrl || tenant.databaseUrl
7176
isExternalPool = Boolean(tenant.databasePoolUrl)
7277
maxConnections = tenant.maxConnections ?? maxConnections
78+
isSingleUse = tenant.databasePoolMode ? tenant.databasePoolMode !== 'recycled' : isSingleUse
7379
}
7480

7581
return {
7682
dbUrl,
7783
isExternalPool,
7884
maxConnections,
85+
isSingleUse,
7986
}
8087
}

0 commit comments

Comments
 (0)