Skip to content

feat: rebalancing connection pooling #679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Base stage for shared environment setup
FROM node:20-alpine3.20 as base
FROM node:22-alpine3.21 as base
RUN apk add --no-cache g++ make python3
WORKDIR /app
COPY package.json package-lock.json ./
Expand Down
1 change: 1 addition & 0 deletions migrations/multitenant/0017-pool-mode.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE tenants ADD COLUMN IF NOT EXISTS database_pool_mode TEXT NULL;
23,456 changes: 13,290 additions & 10,166 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"node": ">= 14.0.0"
},
"dependencies": {
"@aws-sdk/client-ecs": "^3.795.0",
"@aws-sdk/client-s3": "3.654.0",
"@aws-sdk/lib-storage": "3.654.0",
"@aws-sdk/s3-request-presigner": "3.654.0",
Expand Down
7 changes: 5 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type StorageConfigType = {
isMultitenant: boolean
jwtSecret: string
jwtAlgorithm: string
jwtCachingEnabled: boolean
jwtJWKS?: JwksConfig
multitenantDatabaseUrl?: string
dbAnonRole: string
Expand All @@ -86,6 +87,7 @@ type StorageConfigType = {
databaseURL: string
databaseSSLRootCert?: string
databasePoolURL?: string
databasePoolMode?: 'single_use' | 'recycle'
databaseMaxConnections: number
databaseFreePoolAfterInactivity: number
databaseConnectionTimeout: number
Expand Down Expand Up @@ -262,6 +264,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
encryptionKey: getOptionalConfigFromEnv('AUTH_ENCRYPTION_KEY', 'ENCRYPTION_KEY') || '',
jwtSecret: getOptionalIfMultitenantConfigFromEnv('AUTH_JWT_SECRET', 'PGRST_JWT_SECRET') || '',
jwtAlgorithm: getOptionalConfigFromEnv('AUTH_JWT_ALGORITHM', 'PGRST_JWT_ALGORITHM') || 'HS256',
jwtCachingEnabled: getOptionalConfigFromEnv('JWT_CACHING_ENABLED') === 'true',

// Upload
uploadFileSizeLimit: Number(
Expand Down Expand Up @@ -353,6 +356,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
databaseSSLRootCert: getOptionalConfigFromEnv('DATABASE_SSL_ROOT_CERT'),
databaseURL: getOptionalIfMultitenantConfigFromEnv('DATABASE_URL') || '',
databasePoolURL: getOptionalConfigFromEnv('DATABASE_POOL_URL') || '',
databasePoolMode: getOptionalConfigFromEnv('DATABASE_POOL_MODE'),
databaseMaxConnections: parseInt(
getOptionalConfigFromEnv('DATABASE_MAX_CONNECTIONS') || '20',
10
Expand Down Expand Up @@ -506,8 +510,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {

if (jwtJWKS) {
try {
const parsed = JSON.parse(jwtJWKS)
config.jwtJWKS = parsed
config.jwtJWKS = JSON.parse(jwtJWKS)
} catch {
throw new Error('Unable to parse JWT_JWKS value to JSON')
}
Expand Down
10 changes: 8 additions & 2 deletions src/http/plugins/jwt.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import fastifyPlugin from 'fastify-plugin'
import { JWTPayload } from 'jose'

import { verifyJWT } from '@internal/auth'
import { verifyJWTWithCache, verifyJWT } from '@internal/auth'
import { getJwtSecret } from '@internal/database'
import { ERRORS } from '@internal/errors'
import { getConfig } from '../../config'

declare module 'fastify' {
interface FastifyRequest {
Expand All @@ -18,6 +19,8 @@ declare module 'fastify' {
}
}

const { jwtCachingEnabled } = getConfig()

const BEARER = /^Bearer\s+/i

export const jwt = fastifyPlugin(
Expand All @@ -37,7 +40,10 @@ export const jwt = fastifyPlugin(
const { secret, jwks } = await getJwtSecret(request.tenantId)

try {
const payload = await verifyJWT(request.jwt, secret, jwks || null)
const payload = await (jwtCachingEnabled
? verifyJWTWithCache(request.jwt, secret, jwks || null)
: verifyJWT(request.jwt, secret, jwks || null))

request.jwtPayload = payload
request.owner = payload.sub
request.isAuthenticated = true
Expand Down
9 changes: 9 additions & 0 deletions src/http/routes/admin/tenants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const patchSchema = {
anonKey: { type: 'string' },
databaseUrl: { type: 'string' },
databasePoolUrl: { type: 'string', nullable: true },
databasePoolMode: { type: 'string', nullable: true },
maxConnections: { type: 'number' },
jwks: { type: 'object', nullable: true },
fileSizeLimit: { type: 'number' },
Expand Down Expand Up @@ -112,6 +113,7 @@ export default async function routes(fastify: FastifyInstance) {
anon_key,
database_url,
database_pool_url,
database_pool_mode,
max_connections,
file_size_limit,
jwt_secret,
Expand All @@ -130,6 +132,7 @@ export default async function routes(fastify: FastifyInstance) {
anonKey: decrypt(anon_key),
databaseUrl: decrypt(database_url),
databasePoolUrl: database_pool_url ? decrypt(database_pool_url) : undefined,
databasePoolMode: database_pool_mode,
maxConnections: max_connections ? Number(max_connections) : undefined,
fileSizeLimit: Number(file_size_limit),
jwtSecret: decrypt(jwt_secret),
Expand Down Expand Up @@ -164,6 +167,7 @@ export default async function routes(fastify: FastifyInstance) {
anon_key,
database_url,
database_pool_url,
database_pool_mode,
max_connections,
file_size_limit,
jwt_secret,
Expand All @@ -188,6 +192,7 @@ export default async function routes(fastify: FastifyInstance) {
: database_pool_url
? decrypt(database_pool_url)
: undefined,
databasePoolMode: database_pool_mode,
maxConnections: max_connections ? Number(max_connections) : undefined,
fileSizeLimit: Number(file_size_limit),
jwtSecret: decrypt(jwt_secret),
Expand Down Expand Up @@ -217,6 +222,7 @@ export default async function routes(fastify: FastifyInstance) {
const {
anonKey,
databaseUrl,
databasePoolMode,
fileSizeLimit,
jwtSecret,
jwks,
Expand All @@ -233,6 +239,7 @@ export default async function routes(fastify: FastifyInstance) {
anon_key: encrypt(anonKey),
database_url: encrypt(databaseUrl),
database_pool_url: databasePoolUrl ? encrypt(databasePoolUrl) : undefined,
database_pool_mode: databasePoolMode,
max_connections: maxConnections ? Number(maxConnections) : undefined,
file_size_limit: fileSizeLimit,
jwt_secret: encrypt(jwtSecret),
Expand Down Expand Up @@ -280,6 +287,7 @@ export default async function routes(fastify: FastifyInstance) {
serviceKey,
features,
databasePoolUrl,
databasePoolMode,
maxConnections,
tracingMode,
disableEvents,
Expand All @@ -295,6 +303,7 @@ export default async function routes(fastify: FastifyInstance) {
: databasePoolUrl === null
? null
: undefined,
database_pool_mode: databasePoolMode,
max_connections: maxConnections ? Number(maxConnections) : undefined,
file_size_limit: fileSizeLimit,
jwt_secret: jwtSecret !== undefined ? encrypt(jwtSecret) : undefined,
Expand Down
50 changes: 50 additions & 0 deletions src/internal/auth/jwt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {
JWTVerifyGetKey,
SignJWT,
} from 'jose'
import { LRUCache } from 'lru-cache'
import objectSizeOf from 'object-sizeof'

const { jwtAlgorithm } = getConfig()

Expand Down Expand Up @@ -111,6 +113,54 @@ function getJWTAlgorithms(jwks: JwksConfig | null) {
return algorithms
}

const jwtCache = new LRUCache<string, { token: string; payload: JWTPayload }>({
maxSize: 1024 * 1024 * 50, // 50MB
sizeCalculation: (value) => {
return objectSizeOf(value)
},
ttlResolution: 5000, // 5 seconds
})

/**
* Verifies if a JWT is valid and caches the payload
* for the duration of the token's expiration time
* @param token
* @param secret
* @param jwks
*/
export async function verifyJWTWithCache(
token: string,
secret: string,
jwks?: { keys: JwksConfigKey[] } | null
) {
const cachedVerification = jwtCache.get(token)
if (
cachedVerification &&
cachedVerification.payload.exp &&
cachedVerification.payload.exp * 1000 > Date.now()
) {
return Promise.resolve(cachedVerification.payload)
}

try {
const payload = await verifyJWT(token, secret, jwks)
if (!payload.exp) {
return payload
}

jwtCache.set(
token,
{ token, payload: payload },
{
ttl: payload.exp * 1000 - Date.now(),
}
)
return payload
} catch (e) {
throw e
}
}

/**
* Verifies if a JWT is valid
* @param token
Expand Down
53 changes: 53 additions & 0 deletions src/internal/cluster/cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { ClusterDiscoveryECS } from '@internal/cluster/ecs'

import { EventEmitter } from 'node:events'
import { logger } from '@internal/monitoring'

const clusterEvent = new EventEmitter()

export class Cluster {
static size: number = 0
protected static watcher?: NodeJS.Timeout = undefined

static on(event: string, listener: (...args: any[]) => void) {
clusterEvent.on(event, listener)
}

static async init(abortSignal: AbortSignal) {
if (process.env.CLUSTER_DISCOVERY === 'ECS') {
const cluster = new ClusterDiscoveryECS()
Cluster.size = await cluster.getClusterSize()

logger.info(`[Cluster] Initial cluster size ${Cluster.size}`, {
type: 'cluster',
clusterSize: Cluster.size,
})

Cluster.watcher = setInterval(() => {
cluster
.getClusterSize()
.then((size) => {
if (size !== Cluster.size) {
clusterEvent.emit('change', { size })
}
Cluster.size = size
})
.catch((e) => {
console.error('Error getting cluster size', e)
})
}, 20 * 1000)

abortSignal.addEventListener(
'abort',
() => {
if (Cluster.watcher) {
clearInterval(Cluster.watcher)
clusterEvent.removeAllListeners()
Cluster.watcher = undefined
}
},
{ once: true }
)
}
}
}
36 changes: 36 additions & 0 deletions src/internal/cluster/ecs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { ECSClient, ListTasksCommand } from '@aws-sdk/client-ecs'
import axios from 'axios'
import { DesiredStatus } from '@aws-sdk/client-ecs/dist-types/models/models_0'

export class ClusterDiscoveryECS {
private client: ECSClient

constructor() {
this.client = new ECSClient()
}

async getClusterSize() {
if (!process.env.ECS_CONTAINER_METADATA_URI) {
throw new Error('ECS_CONTAINER_METADATA_URI is not set')
}

const [running, pending] = await Promise.all([
this.listTasks('RUNNING'),
this.listTasks('PENDING'),
])

return running + pending
}

private async listTasks(status: DesiredStatus) {
const respMetadata = await axios.get(`${process.env.ECS_CONTAINER_METADATA_URI}/task`)

const command = new ListTasksCommand({
serviceName: respMetadata.data.ServiceName,
cluster: respMetadata.data.Cluster,
desiredStatus: status,
})
const response = await this.client.send(command)
return response.taskArns?.length || 0
}
}
1 change: 1 addition & 0 deletions src/internal/cluster/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './cluster'
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './mutex'
export * from './wait'
export * from './async-abort-controller'
export * from './merge-async-itertor'
5 changes: 5 additions & 0 deletions src/internal/concurrency/wait.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export function wait(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
13 changes: 10 additions & 3 deletions src/internal/database/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { getConfig } from '../../config'
import { getTenantConfig } from './tenant'
import { User, TenantConnection } from './connection'
import { TenantConnection } from './connection'
import { User } from './pool'
import { ERRORS } from '@internal/errors'
import { Cluster } from '@internal/cluster'

interface ConnectionOptions {
host: string
Expand All @@ -21,17 +23,18 @@ interface ConnectionOptions {
* @param options
*/
export async function getPostgresConnection(options: ConnectionOptions): Promise<TenantConnection> {
const dbCredentials = await getDbCredentials(options.tenantId, options.host, {
const dbCredentials = await getDbSettings(options.tenantId, options.host, {
disableHostCheck: options.disableHostCheck,
})

return await TenantConnection.create({
...dbCredentials,
...options,
clusterSize: Cluster.size,
})
}

async function getDbCredentials(
async function getDbSettings(
tenantId: string,
host: string | undefined,
options?: { disableHostCheck?: boolean }
Expand All @@ -42,11 +45,13 @@ async function getDbCredentials(
databaseURL,
databaseMaxConnections,
requestXForwardedHostRegExp,
databasePoolMode,
} = getConfig()

let dbUrl = databasePoolURL || databaseURL
let maxConnections = databaseMaxConnections
let isExternalPool = Boolean(databasePoolURL)
let isSingleUse = !databasePoolMode || databasePoolMode === 'single_use'

if (isMultitenant) {
if (!tenantId) {
Expand All @@ -70,11 +75,13 @@ async function getDbCredentials(
dbUrl = tenant.databasePoolUrl || tenant.databaseUrl
isExternalPool = Boolean(tenant.databasePoolUrl)
maxConnections = tenant.maxConnections ?? maxConnections
isSingleUse = tenant.databasePoolMode ? tenant.databasePoolMode !== 'recycled' : isSingleUse
}

return {
dbUrl,
isExternalPool,
maxConnections,
isSingleUse,
}
}
Loading