diff --git a/.changeset/tough-buses-ring.md b/.changeset/tough-buses-ring.md new file mode 100644 index 00000000..bf3d6f79 --- /dev/null +++ b/.changeset/tough-buses-ring.md @@ -0,0 +1,12 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-core': minor +'@powersync/lib-service-postgres': minor +'@powersync/lib-service-mongodb': minor +'@powersync/service-types': minor +'@powersync/service-image': minor +--- + +Make some service limits configurable. diff --git a/libs/lib-mongodb/src/db/mongo.ts b/libs/lib-mongodb/src/db/mongo.ts index 6fb85336..c91b80fd 100644 --- a/libs/lib-mongodb/src/db/mongo.ts +++ b/libs/lib-mongodb/src/db/mongo.ts @@ -29,7 +29,11 @@ export const MONGO_OPERATION_TIMEOUT_MS = 30_000; */ export const MONGO_CLEAR_OPERATION_TIMEOUT_MS = 5_000; -export function createMongoClient(config: BaseMongoConfigDecoded) { +export interface MongoConnectionOptions { + maxPoolSize: number; +} + +export function createMongoClient(config: BaseMongoConfigDecoded, options?: MongoConnectionOptions) { const normalized = normalizeMongoConfig(config); return new mongo.MongoClient(normalized.uri, { auth: { @@ -48,7 +52,7 @@ export function createMongoClient(config: BaseMongoConfigDecoded) { // Avoid too many connections: // 1. It can overwhelm the source database. // 2. Processing too many queries in parallel can cause the process to run out of memory. - maxPoolSize: 8, + maxPoolSize: options?.maxPoolSize ?? 8, maxConnecting: 3, maxIdleTimeMS: 60_000 diff --git a/libs/lib-postgres/src/db/connection/DatabaseClient.ts b/libs/lib-postgres/src/db/connection/DatabaseClient.ts index 9896a4ee..2ed576bc 100644 --- a/libs/lib-postgres/src/db/connection/DatabaseClient.ts +++ b/libs/lib-postgres/src/db/connection/DatabaseClient.ts @@ -42,7 +42,9 @@ export class DatabaseClient extends AbstractPostgresConnection { // Only listen to notifications on a single (the first) connection const notificationChannels = index == 0 ? options.notificationChannels : []; diff --git a/libs/lib-postgres/src/types/types.ts b/libs/lib-postgres/src/types/types.ts index 3781f6d0..3810313d 100644 --- a/libs/lib-postgres/src/types/types.ts +++ b/libs/lib-postgres/src/types/types.ts @@ -4,7 +4,9 @@ import * as service_types from '@powersync/service-types'; import * as t from 'ts-codec'; import * as urijs from 'uri-js'; -export interface NormalizedBasePostgresConnectionConfig extends jpgwire.NormalizedConnectionConfig {} +export interface NormalizedBasePostgresConnectionConfig extends jpgwire.NormalizedConnectionConfig { + max_pool_size: number; +} export const POSTGRES_CONNECTION_TYPE = 'postgresql' as const; @@ -42,7 +44,9 @@ export const BasePostgresConnectionConfig = t.object({ /** * Prefix for the slot name. Defaults to "powersync_" */ - slot_name_prefix: t.string.optional() + slot_name_prefix: t.string.optional(), + + max_pool_size: t.number.optional() }); export type BasePostgresConnectionConfig = t.Encoded; @@ -125,7 +129,9 @@ export function normalizeConnectionConfig(options: BasePostgresConnectionConfigD lookup, client_certificate: options.client_certificate ?? undefined, - client_private_key: options.client_private_key ?? undefined + client_private_key: options.client_private_key ?? undefined, + + max_pool_size: options.max_pool_size ?? 8 } satisfies NormalizedBasePostgresConnectionConfig; } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts index dd133b26..37bc7f75 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts @@ -22,7 +22,9 @@ export class MongoStorageProvider implements storage.BucketStorageProvider { } const decodedConfig = MongoStorageConfig.decode(storage as any); - const client = lib_mongo.db.createMongoClient(decodedConfig); + const client = lib_mongo.db.createMongoClient(decodedConfig, { + maxPoolSize: resolvedConfig.storage.max_pool_size ?? 8 + }); const database = new PowerSyncMongo(client, { database: resolvedConfig.storage.database }); const factory = new MongoBucketStorage(database, { diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index 44cbbd61..969bf9ea 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -82,6 +82,6 @@ export class PowerSyncMongo { } } -export function createPowerSyncMongo(config: MongoStorageConfig) { - return new PowerSyncMongo(lib_mongo.createMongoClient(config), { database: config.database }); +export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) { + return new PowerSyncMongo(lib_mongo.createMongoClient(config, options), { database: config.database }); } diff --git a/modules/module-postgres-storage/src/types/types.ts b/modules/module-postgres-storage/src/types/types.ts index 80ff067b..889cf58f 100644 --- a/modules/module-postgres-storage/src/types/types.ts +++ b/modules/module-postgres-storage/src/types/types.ts @@ -53,6 +53,7 @@ export type RequiredOperationBatchLimits = Required; export type NormalizedPostgresStorageConfig = pg_wire.NormalizedConnectionConfig & { batch_limits: RequiredOperationBatchLimits; + max_pool_size: number; }; export const normalizePostgresStorageConfig = ( diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 3cf33dd7..b908444e 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -31,6 +31,11 @@ export const SYNC_SNAPSHOT_PATH = path.resolve(__dirname, '../__snapshots/sync.t */ export function registerSyncTests(factory: storage.TestStorageFactory) { const tracker = new sync.RequestTracker(); + const syncContext = new sync.SyncContext({ + maxBuckets: 10, + maxParameterQueryResults: 10, + maxDataFetchConcurrency: 2 + }); test('sync global data', async () => { await using f = await factory(); @@ -67,6 +72,7 @@ export function registerSyncTests(factory: storage.TestStorageFactory) { }); const stream = sync.streamResponse({ + syncContext, bucketStorage: bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { @@ -128,7 +134,8 @@ bucket_definitions: }); const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -191,7 +198,8 @@ bucket_definitions: }); const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -276,7 +284,8 @@ bucket_definitions: }); const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -302,7 +311,7 @@ bucket_definitions: receivedCompletions++; if (receivedCompletions == 1) { // Trigger an empty bucket update. - await bucketStorage.createManagedWriteCheckpoint({user_id: '', heads: {'1': '1/0'}}); + await bucketStorage.createManagedWriteCheckpoint({ user_id: '', heads: { '1': '1/0' } }); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.commit('1/0'); }); @@ -342,7 +351,8 @@ bucket_definitions: }); const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -371,7 +381,8 @@ bucket_definitions: await bucketStorage.autoActivate(); const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -398,7 +409,8 @@ bucket_definitions: await bucketStorage.autoActivate(); const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -461,7 +473,8 @@ bucket_definitions: const exp = Date.now() / 1000 + 0.1; const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -521,7 +534,8 @@ bucket_definitions: }); const stream = sync.streamResponse({ - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], @@ -644,7 +658,8 @@ bucket_definitions: }); const params: sync.SyncStreamParameters = { - bucketStorage: bucketStorage, + syncContext, + bucketStorage, syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), params: { buckets: [], diff --git a/packages/service-core/src/routes/RouterEngine.ts b/packages/service-core/src/routes/RouterEngine.ts index a5dc164b..5852984e 100644 --- a/packages/service-core/src/routes/RouterEngine.ts +++ b/packages/service-core/src/routes/RouterEngine.ts @@ -10,6 +10,7 @@ import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js'; import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js'; import { SocketRouteGenerator } from './router-socket.js'; import { RouteDefinition } from './router.js'; +import { SyncContext } from '../sync/SyncContext.js'; export type RouterSetupResponse = { onShutdown: () => Promise; diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index db9b841a..52d54886 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -13,7 +13,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => { const { service_context } = context; - const { routerEngine } = service_context; + const { routerEngine, syncContext } = service_context; // Create our own controller that we can abort directly const controller = new AbortController(); @@ -73,6 +73,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => const tracker = new sync.RequestTracker(); try { for await (const data of sync.streamResponse({ + syncContext: syncContext, bucketStorage: bucketStorage, syncRules: syncRules, params: { diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index b8bcd88a..eb1f4b4f 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -20,7 +20,7 @@ export const syncStreamed = routeDefinition({ validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), handler: async (payload) => { const { service_context } = payload.context; - const { routerEngine, storageEngine } = service_context; + const { routerEngine, storageEngine, syncContext } = service_context; const headers = payload.request.headers; const userAgent = headers['x-user-agent'] ?? headers['user-agent']; const clientId = payload.params.client_id; @@ -56,6 +56,7 @@ export const syncStreamed = routeDefinition({ sync.transformToBytesTracked( sync.ndjson( sync.streamResponse({ + syncContext: syncContext, bucketStorage, syncRules: syncRules, params, diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index efe983c4..7115e9dd 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -6,8 +6,10 @@ import * as util from '../util/util-index.js'; import { ErrorCode, logger, ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js'; import { BucketSyncState } from './sync.js'; +import { SyncContext } from './SyncContext.js'; export interface BucketChecksumStateOptions { + syncContext: SyncContext; bucketStorage: BucketChecksumStateStorage; syncRules: SqlSyncRules; syncParams: RequestParameters; @@ -20,6 +22,7 @@ export interface BucketChecksumStateOptions { * Handles incrementally re-computing checkpoints. */ export class BucketChecksumState { + private readonly context: SyncContext; private readonly bucketStorage: BucketChecksumStateStorage; /** @@ -43,8 +46,14 @@ export class BucketChecksumState { private pendingBucketDownloads = new Set(); constructor(options: BucketChecksumStateOptions) { + this.context = options.syncContext; this.bucketStorage = options.bucketStorage; - this.parameterState = new BucketParameterState(options.bucketStorage, options.syncRules, options.syncParams); + this.parameterState = new BucketParameterState( + options.syncContext, + options.bucketStorage, + options.syncRules, + options.syncParams + ); this.bucketDataPositions = new Map(); for (let { name, after: start } of options.initialBucketPositions ?? []) { @@ -73,6 +82,12 @@ export class BucketChecksumState { }); } this.bucketDataPositions = dataBucketsNew; + if (dataBucketsNew.size > this.context.maxBuckets) { + throw new ServiceError( + ErrorCode.PSYNC_S2305, + `Too many buckets: ${dataBucketsNew.size} (limit of ${this.context.maxBuckets})` + ); + } let checksumMap: util.ChecksumMap; if (updatedBuckets != null) { @@ -247,13 +262,20 @@ export interface CheckpointUpdate { } export class BucketParameterState { + private readonly context: SyncContext; public readonly bucketStorage: BucketChecksumStateStorage; public readonly syncRules: SqlSyncRules; public readonly syncParams: RequestParameters; private readonly querier: BucketParameterQuerier; private readonly staticBuckets: Map; - constructor(bucketStorage: BucketChecksumStateStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) { + constructor( + context: SyncContext, + bucketStorage: BucketChecksumStateStorage, + syncRules: SqlSyncRules, + syncParams: RequestParameters + ) { + this.context = context; this.bucketStorage = bucketStorage; this.syncRules = syncRules; this.syncParams = syncParams; @@ -275,9 +297,13 @@ export class BucketParameterState { return null; } - if (update.buckets.length > 1000) { - // TODO: Limit number of buckets even before we get to this point - const error = new ServiceError(ErrorCode.PSYNC_S2305, `Too many buckets: ${update.buckets.length}`); + if (update.buckets.length > this.context.maxParameterQueryResults) { + // TODO: Limit number of results even before we get to this point + // This limit applies _before_ we get the unique set + const error = new ServiceError( + ErrorCode.PSYNC_S2305, + `Too many parameter query results: ${update.buckets.length} (limit of ${this.context.maxParameterQueryResults})` + ); logger.error(error.message, { checkpoint: checkpoint, user_id: this.syncParams.user_id, diff --git a/packages/service-core/src/sync/SyncContext.ts b/packages/service-core/src/sync/SyncContext.ts new file mode 100644 index 00000000..4ebf91f6 --- /dev/null +++ b/packages/service-core/src/sync/SyncContext.ts @@ -0,0 +1,36 @@ +import { Semaphore, SemaphoreInterface, withTimeout } from 'async-mutex'; + +export interface SyncContextOptions { + maxBuckets: number; + maxParameterQueryResults: number; + maxDataFetchConcurrency: number; +} + +/** + * Maximum duration to wait for the mutex to become available. + * + * This gives an explicit error if there are mutex issues, rather than just hanging. + */ +const MUTEX_ACQUIRE_TIMEOUT = 30_000; + +/** + * Represents the context in which sync happens. + * + * This is global to all sync requests, not per request. + */ +export class SyncContext { + readonly maxBuckets: number; + readonly maxParameterQueryResults: number; + + readonly syncSemaphore: SemaphoreInterface; + + constructor(options: SyncContextOptions) { + this.maxBuckets = options.maxBuckets; + this.maxParameterQueryResults = options.maxParameterQueryResults; + this.syncSemaphore = withTimeout( + new Semaphore(options.maxDataFetchConcurrency), + MUTEX_ACQUIRE_TIMEOUT, + new Error(`Timeout while waiting for data`) + ); + } +} diff --git a/packages/service-core/src/sync/sync-index.ts b/packages/service-core/src/sync/sync-index.ts index 9fadc8fa..d43145d3 100644 --- a/packages/service-core/src/sync/sync-index.ts +++ b/packages/service-core/src/sync/sync-index.ts @@ -6,3 +6,4 @@ export * from './safeRace.js'; export * from './sync.js'; export * from './util.js'; export * from './BucketChecksumState.js'; +export * from './SyncContext.js'; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 7c9ccb20..7b3aef7f 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -1,6 +1,5 @@ import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; import { BucketDescription, BucketPriority, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules'; -import { Semaphore, withTimeout } from 'async-mutex'; import { AbortError } from 'ix/aborterror.js'; @@ -11,28 +10,12 @@ import * as util from '../util/util-index.js'; import { logger } from '@powersync/lib-services-framework'; import { BucketChecksumState } from './BucketChecksumState.js'; import { mergeAsyncIterables } from './merge.js'; -import { RequestTracker } from './RequestTracker.js'; import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js'; - -/** - * Maximum number of connections actively fetching data. - */ -const MAX_ACTIVE_CONNECTIONS = 10; - -/** - * Maximum duration to wait for the mutex to become available. - * - * This gives an explicit error if there are mutex issues, rather than just hanging. - */ -const MUTEX_ACQUIRE_TIMEOUT = 30_000; - -const syncSemaphore = withTimeout( - new Semaphore(MAX_ACTIVE_CONNECTIONS), - MUTEX_ACQUIRE_TIMEOUT, - new Error(`Timeout while waiting for data`) -); +import { SyncContext } from './SyncContext.js'; +import { RequestTracker } from './RequestTracker.js'; export interface SyncStreamParameters { + syncContext: SyncContext; bucketStorage: storage.SyncRulesBucketStorage; syncRules: SqlSyncRules; params: util.StreamingSyncRequest; @@ -50,7 +33,8 @@ export interface SyncStreamParameters { export async function* streamResponse( options: SyncStreamParameters ): AsyncIterable { - const { bucketStorage, syncRules, params, syncParams, token, tokenStreamOptions, tracker, signal } = options; + const { syncContext, bucketStorage, syncRules, params, syncParams, token, tokenStreamOptions, tracker, signal } = + options; // We also need to be able to abort, so we create our own controller. const controller = new AbortController(); if (signal) { @@ -66,7 +50,15 @@ export async function* streamResponse( } } const ki = tokenStream(token, controller.signal, tokenStreamOptions); - const stream = streamResponseInner(bucketStorage, syncRules, params, syncParams, tracker, controller.signal); + const stream = streamResponseInner( + syncContext, + bucketStorage, + syncRules, + params, + syncParams, + tracker, + controller.signal + ); // Merge the two streams, and abort as soon as one of the streams end. const merged = mergeAsyncIterables([stream, ki], controller.signal); @@ -91,6 +83,7 @@ export type BucketSyncState = { }; async function* streamResponseInner( + syncContext: SyncContext, bucketStorage: storage.SyncRulesBucketStorage, syncRules: SqlSyncRules, params: util.StreamingSyncRequest, @@ -103,6 +96,7 @@ async function* streamResponseInner( const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id); const checksumState = new BucketChecksumState({ + syncContext, bucketStorage, syncRules, syncParams, @@ -195,6 +189,7 @@ async function* streamResponseInner( } yield* bucketDataInBatches({ + syncContext: syncContext, bucketStorage: bucketStorage, checkpoint: next.value.value.base.checkpoint, bucketsToFetch: buckets, @@ -221,6 +216,7 @@ async function* streamResponseInner( } interface BucketDataRequest { + syncContext: SyncContext; bucketStorage: storage.SyncRulesBucketStorage; checkpoint: string; bucketsToFetch: BucketDescription[]; @@ -282,6 +278,7 @@ interface BucketDataBatchResult { */ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator { const { + syncContext, bucketStorage: storage, checkpoint, bucketsToFetch, @@ -296,10 +293,10 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator; diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index b7102c4a..3266573f 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -5,6 +5,7 @@ import { CHECKPOINT_INVALIDATE_ALL, ChecksumMap, OpId, + SyncContext, WatchFilterEvent } from '@/index.js'; import { RequestParameters, SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules'; @@ -46,12 +47,19 @@ bucket_definitions: { defaultSchema: 'public' } ); + const syncContext = new SyncContext({ + maxBuckets: 100, + maxParameterQueryResults: 100, + maxDataFetchConcurrency: 10 + }); + test('global bucket with update', async () => { const storage = new MockBucketChecksumStateStorage(); // Set intial state storage.updateTestChecksum({ bucket: 'global[]', checksum: 1, count: 1 }); const state = new BucketChecksumState({ + syncContext, syncParams: new RequestParameters({ sub: '' }, {}), syncRules: SYNC_RULES_GLOBAL, bucketStorage: storage @@ -115,6 +123,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[]', checksum: 1, count: 1 }); const state = new BucketChecksumState({ + syncContext, // Client sets the initial state here initialBucketPositions: [{ name: 'global[]', after: '1' }], syncParams: new RequestParameters({ sub: '' }, {}), @@ -151,6 +160,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 1, count: 1 }); const state = new BucketChecksumState({ + syncContext, syncParams: new RequestParameters({ sub: '' }, {}), syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage @@ -214,6 +224,7 @@ bucket_definitions: const storage = new MockBucketChecksumStateStorage(); const state = new BucketChecksumState({ + syncContext, // Client sets the initial state here initialBucketPositions: [{ name: 'something_here[]', after: '1' }], syncParams: new RequestParameters({ sub: '' }, {}), @@ -253,6 +264,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 1, count: 1 }); const state = new BucketChecksumState({ + syncContext, syncParams: new RequestParameters({ sub: '' }, {}), syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage @@ -304,6 +316,7 @@ bucket_definitions: const storage = new MockBucketChecksumStateStorage(); const state = new BucketChecksumState({ + syncContext, syncParams: new RequestParameters({ sub: '' }, {}), syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage @@ -355,6 +368,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 3, count: 3 }); const state = new BucketChecksumState({ + syncContext, syncParams: new RequestParameters({ sub: '' }, {}), syncRules: SYNC_RULES_GLOBAL_TWO, bucketStorage: storage @@ -452,6 +466,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'by_project[3]', checksum: 1, count: 1 }); const state = new BucketChecksumState({ + syncContext, syncParams: new RequestParameters({ sub: 'u1' }, {}), syncRules: SYNC_RULES_DYNAMIC, bucketStorage: storage diff --git a/packages/types/src/config/PowerSyncConfig.ts b/packages/types/src/config/PowerSyncConfig.ts index add2b5c5..a235ee58 100644 --- a/packages/types/src/config/PowerSyncConfig.ts +++ b/packages/types/src/config/PowerSyncConfig.ts @@ -103,7 +103,10 @@ export const strictJwks = t.object({ export type StrictJwk = t.Decoded; export const BaseStorageConfig = t.object({ - type: t.string + type: t.string, + // Maximum number of conncetions to the storage database, per process. + // Defaults to 8. + max_pool_size: t.number.optional() }); /** @@ -151,7 +154,28 @@ export const powerSyncConfig = t.object({ api: t .object({ - tokens: t.array(t.string).optional() + tokens: t.array(t.string).optional(), + parameters: t + .object({ + // Maximum number of connections (http streams or websockets) per API process. + // Default of 200. + max_concurrent_connections: t.number.optional(), + // This should not be siginificantly more than storage.max_pool_size, otherwise it would block on the + // pool. Increasing this can significantly increase memory usage in some cases. + // Default of 10. + max_data_fetch_concurrency: t.number.optional(), + // Maximum number of buckets for each connection. + // More buckets increase latency and memory usage. While the actual number is controlled by sync rules, + // having a hard limit ensures that the service errors instead of crashing when a sync rule is misconfigured. + // Default of 1000. + max_buckets_per_connection: t.number.optional(), + + // Related to max_buckets_per_connection, but this limit applies directly on the parameter + // query results, _before_ we convert it into an unique set. + // Default of 1000. + max_parameter_query_results: t.number.optional() + }) + .optional() }) .optional(), diff --git a/service/src/routes/router.ts b/service/src/routes/router.ts deleted file mode 100644 index 4fd55f19..00000000 --- a/service/src/routes/router.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { ReactiveSocketRouter } from '@powersync/service-rsocket-router'; -import { routes } from '@powersync/service-core'; - -export const SocketRouter = new ReactiveSocketRouter({ - max_concurrent_connections: 200 -}); diff --git a/service/src/runners/server.ts b/service/src/runners/server.ts index fef5cfea..1259b1e9 100644 --- a/service/src/runners/server.ts +++ b/service/src/runners/server.ts @@ -4,8 +4,8 @@ import fastify from 'fastify'; import { container, logger } from '@powersync/lib-services-framework'; import * as core from '@powersync/service-core'; +import { ReactiveSocketRouter } from '@powersync/service-rsocket-router'; import { MetricModes, registerMetrics } from '../metrics.js'; -import { SocketRouter } from '../routes/router.js'; /** * Configures the server portion on a {@link ServiceContext} @@ -27,10 +27,23 @@ export function registerServerServices(serviceContext: core.system.ServiceContex core.routes.configureFastifyServer(server, { service_context: serviceContext, - routes: { api: { routes: routes.api_routes }, sync_stream: { routes: routes.stream_routes } } + routes: { + api: { routes: routes.api_routes }, + sync_stream: { + routes: routes.stream_routes, + queue_options: { + concurrency: serviceContext.configuration.api_parameters.max_concurrent_connections, + max_queue_depth: 0 + } + } + } + }); + + const socketRouter = new ReactiveSocketRouter({ + max_concurrent_connections: serviceContext.configuration.api_parameters.max_concurrent_connections }); - core.routes.configureRSocket(SocketRouter, { + core.routes.configureRSocket(socketRouter, { server: server.server, service_context: serviceContext, route_generators: routes.socket_routes