From c25dd8743c43760e2f26ea28227d30c3b1531584 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 4 Nov 2024 14:00:15 +0200 Subject: [PATCH 1/3] Web: pass on sync parameters. --- .../database/web/web_powersync_database.dart | 10 ++--- .../lib/src/web/sync_controller.dart | 42 ++++++++++++------- .../powersync/lib/src/web/sync_worker.dart | 36 +++++++++------- .../lib/src/web/sync_worker_protocol.dart | 20 +++++---- 4 files changed, 65 insertions(+), 43 deletions(-) diff --git a/packages/powersync/lib/src/database/web/web_powersync_database.dart b/packages/powersync/lib/src/database/web/web_powersync_database.dart index 8b430024..f75c7beb 100644 --- a/packages/powersync/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync/lib/src/database/web/web_powersync_database.dart @@ -148,11 +148,11 @@ class PowerSyncDatabaseImpl // duplicating work across tabs. try { sync = await SyncWorkerHandle.start( - this, - connector, - crudThrottleTime.inMilliseconds, - Uri.base.resolve('/powersync_sync.worker.js'), - ); + database: this, + connector: connector, + crudThrottleTimeMs: crudThrottleTime.inMilliseconds, + workerUri: Uri.base.resolve('/powersync_sync.worker.js'), + syncParams: params); } catch (e) { logger.warning( 'Could not use shared worker for synchronization, falling back to locks.', diff --git a/packages/powersync/lib/src/web/sync_controller.dart b/packages/powersync/lib/src/web/sync_controller.dart index 07f5f6b2..8231e7ce 100644 --- a/packages/powersync/lib/src/web/sync_controller.dart +++ b/packages/powersync/lib/src/web/sync_controller.dart @@ -10,23 +10,29 @@ import '../streaming_sync.dart'; import 'sync_worker_protocol.dart'; class SyncWorkerHandle implements StreamingSync { - final PowerSyncDatabaseImpl _database; - final PowerSyncBackendConnector _connector; - final int _crudThrottleTimeMs; + final PowerSyncDatabaseImpl database; + final PowerSyncBackendConnector connector; + final int crudThrottleTimeMs; + final Map? syncParams; late final WorkerCommunicationChannel _channel; final StreamController _status = StreamController.broadcast(); - SyncWorkerHandle._(this._database, this._connector, this._crudThrottleTimeMs, - MessagePort sendToWorker, SharedWorker worker) { + SyncWorkerHandle._( + {required this.database, + required this.connector, + required this.crudThrottleTimeMs, + required MessagePort sendToWorker, + required SharedWorker worker, + this.syncParams}) { _channel = WorkerCommunicationChannel( port: sendToWorker, errors: EventStreamProviders.errorEvent.forTarget(worker), requestHandler: (type, payload) async { switch (type) { case SyncWorkerMessageType.requestEndpoint: - final endpoint = await (_database.database as WebSqliteConnection) + final endpoint = await (database.database as WebSqliteConnection) .exposeEndpoint(); return ( @@ -38,10 +44,10 @@ class SyncWorkerHandle implements StreamingSync { [endpoint.connectPort].toJS ); case SyncWorkerMessageType.uploadCrud: - await _connector.uploadData(_database); + await connector.uploadData(database); return (JSObject(), null); case SyncWorkerMessageType.invalidCredentialsCallback: - final credentials = await _connector.fetchCredentials(); + final credentials = await connector.fetchCredentials(); return ( credentials != null ? SerializedCredentials.from(credentials) @@ -49,7 +55,7 @@ class SyncWorkerHandle implements StreamingSync { null ); case SyncWorkerMessageType.credentialsCallback: - final credentials = await _connector.getCredentialsCached(); + final credentials = await connector.getCredentialsCached(); return ( credentials != null ? SerializedCredentials.from(credentials) @@ -71,13 +77,19 @@ class SyncWorkerHandle implements StreamingSync { } static Future start( - PowerSyncDatabaseImpl database, - PowerSyncBackendConnector connector, - int crudThrottleTimeMs, - Uri workerUri) async { + {required PowerSyncDatabaseImpl database, + required PowerSyncBackendConnector connector, + required int crudThrottleTimeMs, + required Uri workerUri, + Map? syncParams}) async { final worker = SharedWorker(workerUri.toString().toJS); final handle = SyncWorkerHandle._( - database, connector, crudThrottleTimeMs, worker.port, worker); + database: database, + connector: connector, + crudThrottleTimeMs: crudThrottleTimeMs, + sendToWorker: worker.port, + worker: worker, + syncParams: syncParams); // Make sure that the worker is working, or throw immediately. await handle._channel.ping(); @@ -101,6 +113,6 @@ class SyncWorkerHandle implements StreamingSync { @override Future streamingSync() async { await _channel.startSynchronization( - _database.openFactory.path, _crudThrottleTimeMs); + database.openFactory.path, crudThrottleTimeMs, syncParams); } } diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart index 040f979b..4f7e5361 100644 --- a/packages/powersync/lib/src/web/sync_worker.dart +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -4,6 +4,7 @@ library; import 'dart:async'; +import 'dart:convert'; import 'dart:js_interop'; import 'package:async/async.dart'; @@ -41,10 +42,14 @@ class _SyncWorker { }); } - _SyncRunner referenceSyncTask(String databaseIdentifier, - int crudThrottleTimeMs, _ConnectedClient client) { + _SyncRunner referenceSyncTask( + String databaseIdentifier, + int crudThrottleTimeMs, + String? syncParamsEncoded, + _ConnectedClient client) { return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { - return _SyncRunner(databaseIdentifier, crudThrottleTimeMs); + return _SyncRunner(databaseIdentifier, crudThrottleTimeMs, + syncParamsEncoded == null ? null : jsonDecode(syncParamsEncoded)); }) ..registerClient(client); } @@ -64,8 +69,8 @@ class _ConnectedClient { switch (type) { case SyncWorkerMessageType.startSynchronization: final request = payload as StartSynchronization; - _runner = _worker.referenceSyncTask( - request.databaseName, request.crudThrottleTimeMs, this); + _runner = _worker.referenceSyncTask(request.databaseName, + request.crudThrottleTimeMs, request.syncParamsEncoded, this); return (JSObject(), null); case SyncWorkerMessageType.abortSynchronization: _runner?.unregisterClient(this); @@ -106,6 +111,7 @@ class _ConnectedClient { class _SyncRunner { final String identifier; final int crudThrottleTimeMs; + final Map? syncParams; final StreamGroup<_RunnerEvent> _group = StreamGroup(); final StreamController<_RunnerEvent> _mainEvents = StreamController(); @@ -114,7 +120,7 @@ class _SyncRunner { _ConnectedClient? databaseHost; final connections = <_ConnectedClient>[]; - _SyncRunner(this.identifier, this.crudThrottleTimeMs) { + _SyncRunner(this.identifier, this.crudThrottleTimeMs, this.syncParams) { _group.add(_mainEvents.stream); Future(() async { @@ -227,15 +233,15 @@ class _SyncRunner { } sync = StreamingSyncImplementation( - adapter: BucketStorage(database), - credentialsCallback: client.channel.credentialsCallback, - invalidCredentialsCallback: client.channel.invalidCredentialsCallback, - uploadCrud: client.channel.uploadCrud, - crudUpdateTriggerStream: crudStream, - retryDelay: Duration(seconds: 3), - client: FetchClient(mode: RequestMode.cors), - identifier: identifier, - ); + adapter: BucketStorage(database), + credentialsCallback: client.channel.credentialsCallback, + invalidCredentialsCallback: client.channel.invalidCredentialsCallback, + uploadCrud: client.channel.uploadCrud, + crudUpdateTriggerStream: crudStream, + retryDelay: Duration(seconds: 3), + client: FetchClient(mode: RequestMode.cors), + identifier: identifier, + syncParameters: syncParams); sync!.statusStream.listen((event) { _logger.fine('Broadcasting sync event: $event'); for (final client in connections) { diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart index 1e01fabf..f17e0457 100644 --- a/packages/powersync/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'dart:js_interop'; import 'package:web/web.dart'; @@ -60,15 +61,16 @@ extension type SyncWorkerMessage._(JSObject _) implements JSObject { @anonymous extension type StartSynchronization._(JSObject _) implements JSObject { - external factory StartSynchronization({ - required String databaseName, - required int crudThrottleTimeMs, - required int requestId, - }); + external factory StartSynchronization( + {required String databaseName, + required int crudThrottleTimeMs, + required int requestId, + String? syncParamsEncoded}); external String get databaseName; external int get requestId; external int get crudThrottleTimeMs; + external String? get syncParamsEncoded; } @anonymous @@ -315,15 +317,17 @@ final class WorkerCommunicationChannel { await _numericRequest(SyncWorkerMessageType.ping); } - Future startSynchronization( - String databaseName, int crudThrottleTimeMs) async { + Future startSynchronization(String databaseName, int crudThrottleTimeMs, + Map? syncParams) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.startSynchronization.name, payload: StartSynchronization( databaseName: databaseName, crudThrottleTimeMs: crudThrottleTimeMs, - requestId: id), + requestId: id, + syncParamsEncoded: + syncParams == null ? null : jsonEncode(syncParams)), )); await completion; } From af641eaabb3d416af1e01e845a65669133f30d6d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 4 Nov 2024 14:23:28 +0200 Subject: [PATCH 2/3] Reconnect when sync params change. --- .../powersync/lib/src/web/sync_worker.dart | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart index 4f7e5361..9b661a21 100644 --- a/packages/powersync/lib/src/web/sync_worker.dart +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -48,10 +48,9 @@ class _SyncWorker { String? syncParamsEncoded, _ConnectedClient client) { return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { - return _SyncRunner(databaseIdentifier, crudThrottleTimeMs, - syncParamsEncoded == null ? null : jsonDecode(syncParamsEncoded)); + return _SyncRunner(databaseIdentifier); }) - ..registerClient(client); + ..registerClient(client, crudThrottleTimeMs, syncParamsEncoded); } } @@ -110,8 +109,8 @@ class _ConnectedClient { class _SyncRunner { final String identifier; - final int crudThrottleTimeMs; - final Map? syncParams; + int crudThrottleTimeMs = 1; + String? syncParamsEncoded; final StreamGroup<_RunnerEvent> _group = StreamGroup(); final StreamController<_RunnerEvent> _mainEvents = StreamController(); @@ -120,17 +119,35 @@ class _SyncRunner { _ConnectedClient? databaseHost; final connections = <_ConnectedClient>[]; - _SyncRunner(this.identifier, this.crudThrottleTimeMs, this.syncParams) { + _SyncRunner(this.identifier) { _group.add(_mainEvents.stream); Future(() async { await for (final event in _group.stream) { try { switch (event) { - case _AddConnection(:final client): + case _AddConnection( + :final client, + :final crudThrottleTimeMs, + :final syncParamsEncoded + ): connections.add(client); + var reconnect = false; + if (this.crudThrottleTimeMs != crudThrottleTimeMs) { + this.crudThrottleTimeMs = crudThrottleTimeMs; + reconnect = true; + } + if (this.syncParamsEncoded != syncParamsEncoded) { + this.syncParamsEncoded = syncParamsEncoded; + reconnect = true; + } if (sync == null) { await _requestDatabase(client); + } else if (reconnect) { + // Parameters changed - reconnect. + sync?.abort(); + sync = null; + await _requestDatabase(client); } case _RemoveConnection(:final client): connections.remove(client); @@ -232,6 +249,10 @@ class _SyncRunner { ); } + final syncParams = syncParamsEncoded == null + ? null + : jsonDecode(syncParamsEncoded!) as Map; + sync = StreamingSyncImplementation( adapter: BucketStorage(database), credentialsCallback: client.channel.credentialsCallback, @@ -252,8 +273,10 @@ class _SyncRunner { sync!.streamingSync(); } - void registerClient(_ConnectedClient client) { - _mainEvents.add(_AddConnection(client)); + void registerClient(_ConnectedClient client, int currentCrudThrottleTimeMs, + String? currentSyncParamsEncoded) { + _mainEvents.add(_AddConnection( + client, currentCrudThrottleTimeMs, currentSyncParamsEncoded)); } void unregisterClient(_ConnectedClient client) { @@ -265,8 +288,10 @@ sealed class _RunnerEvent {} final class _AddConnection implements _RunnerEvent { final _ConnectedClient client; + final int crudThrottleTimeMs; + final String? syncParamsEncoded; - _AddConnection(this.client); + _AddConnection(this.client, this.crudThrottleTimeMs, this.syncParamsEncoded); } final class _RemoveConnection implements _RunnerEvent { From 0e2f2870f16bae59c8fc30d07efcc26ac138785e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 4 Nov 2024 14:50:50 +0200 Subject: [PATCH 3/3] Disconnect immediately if one client disconnects. --- .../powersync/lib/src/web/sync_worker.dart | 18 +++++++++++++++++- .../lib/src/web/sync_worker_protocol.dart | 4 +++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/packages/powersync/lib/src/web/sync_worker.dart b/packages/powersync/lib/src/web/sync_worker.dart index 9b661a21..a5013e8a 100644 --- a/packages/powersync/lib/src/web/sync_worker.dart +++ b/packages/powersync/lib/src/web/sync_worker.dart @@ -72,7 +72,7 @@ class _ConnectedClient { request.crudThrottleTimeMs, request.syncParamsEncoded, this); return (JSObject(), null); case SyncWorkerMessageType.abortSynchronization: - _runner?.unregisterClient(this); + _runner?.disconnectClient(this); _runner = null; return (JSObject(), null); default: @@ -155,6 +155,10 @@ class _SyncRunner { await sync?.abort(); sync = null; } + case _DisconnectClient(:final client): + connections.remove(client); + await sync?.abort(); + sync = null; case _ActiveDatabaseClosed(): _logger.info('Remote database closed, finding a new client'); sync?.abort(); @@ -279,9 +283,15 @@ class _SyncRunner { client, currentCrudThrottleTimeMs, currentSyncParamsEncoded)); } + /// Remove a client, disconnecting if no clients remain.. void unregisterClient(_ConnectedClient client) { _mainEvents.add(_RemoveConnection(client)); } + + /// Remove a client, and immediately disconnect. + void disconnectClient(_ConnectedClient client) { + _mainEvents.add(_DisconnectClient(client)); + } } sealed class _RunnerEvent {} @@ -300,6 +310,12 @@ final class _RemoveConnection implements _RunnerEvent { _RemoveConnection(this.client); } +final class _DisconnectClient implements _RunnerEvent { + final _ConnectedClient client; + + _DisconnectClient(this.client); +} + final class _ActiveDatabaseClosed implements _RunnerEvent { const _ActiveDatabaseClosed(); } diff --git a/packages/powersync/lib/src/web/sync_worker_protocol.dart b/packages/powersync/lib/src/web/sync_worker_protocol.dart index f17e0457..f23a18bc 100644 --- a/packages/powersync/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync/lib/src/web/sync_worker_protocol.dart @@ -14,10 +14,12 @@ enum SyncWorkerMessageType { /// Sent from client to the sync worker to request the synchronization /// starting. + /// If parameters change, the sync worker reconnects. startSynchronization, - /// Te [SyncWorkerMessage.payload] for the request is a numeric id, the + /// The [SyncWorkerMessage.payload] for the request is a numeric id, the /// response can be anything (void). + /// This disconnects immediately, even if other clients are still open. abortSynchronization, /// Sent from the sync worker to the client when it needs an endpoint to