Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into release/v1.9.1
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Nov 4, 2024
2 parents 47cdccf + 1987882 commit 82a784e
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ class PowerSyncDatabaseImpl
// duplicating work across tabs.
try {
sync = await SyncWorkerHandle.start(
this,
connector,
crudThrottleTime.inMilliseconds,
Uri.base.resolve('/powersync_sync.worker.js'),
);
database: this,
connector: connector,
crudThrottleTimeMs: crudThrottleTime.inMilliseconds,
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
syncParams: params);
} catch (e) {
logger.warning(
'Could not use shared worker for synchronization, falling back to locks.',
Expand Down
42 changes: 27 additions & 15 deletions packages/powersync/lib/src/web/sync_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,29 @@ import '../streaming_sync.dart';
import 'sync_worker_protocol.dart';

class SyncWorkerHandle implements StreamingSync {
final PowerSyncDatabaseImpl _database;
final PowerSyncBackendConnector _connector;
final int _crudThrottleTimeMs;
final PowerSyncDatabaseImpl database;
final PowerSyncBackendConnector connector;
final int crudThrottleTimeMs;
final Map<String, dynamic>? syncParams;

late final WorkerCommunicationChannel _channel;

final StreamController<SyncStatus> _status = StreamController.broadcast();

SyncWorkerHandle._(this._database, this._connector, this._crudThrottleTimeMs,
MessagePort sendToWorker, SharedWorker worker) {
SyncWorkerHandle._(
{required this.database,
required this.connector,
required this.crudThrottleTimeMs,
required MessagePort sendToWorker,
required SharedWorker worker,
this.syncParams}) {
_channel = WorkerCommunicationChannel(
port: sendToWorker,
errors: EventStreamProviders.errorEvent.forTarget(worker),
requestHandler: (type, payload) async {
switch (type) {
case SyncWorkerMessageType.requestEndpoint:
final endpoint = await (_database.database as WebSqliteConnection)
final endpoint = await (database.database as WebSqliteConnection)
.exposeEndpoint();

return (
Expand All @@ -38,18 +44,18 @@ class SyncWorkerHandle implements StreamingSync {
[endpoint.connectPort].toJS
);
case SyncWorkerMessageType.uploadCrud:
await _connector.uploadData(_database);
await connector.uploadData(database);
return (JSObject(), null);
case SyncWorkerMessageType.invalidCredentialsCallback:
final credentials = await _connector.fetchCredentials();
final credentials = await connector.fetchCredentials();
return (
credentials != null
? SerializedCredentials.from(credentials)
: null,
null
);
case SyncWorkerMessageType.credentialsCallback:
final credentials = await _connector.getCredentialsCached();
final credentials = await connector.getCredentialsCached();
return (
credentials != null
? SerializedCredentials.from(credentials)
Expand All @@ -71,13 +77,19 @@ class SyncWorkerHandle implements StreamingSync {
}

static Future<SyncWorkerHandle> start(
PowerSyncDatabaseImpl database,
PowerSyncBackendConnector connector,
int crudThrottleTimeMs,
Uri workerUri) async {
{required PowerSyncDatabaseImpl database,
required PowerSyncBackendConnector connector,
required int crudThrottleTimeMs,
required Uri workerUri,
Map<String, dynamic>? syncParams}) async {
final worker = SharedWorker(workerUri.toString().toJS);
final handle = SyncWorkerHandle._(
database, connector, crudThrottleTimeMs, worker.port, worker);
database: database,
connector: connector,
crudThrottleTimeMs: crudThrottleTimeMs,
sendToWorker: worker.port,
worker: worker,
syncParams: syncParams);

// Make sure that the worker is working, or throw immediately.
await handle._channel.ping();
Expand All @@ -101,6 +113,6 @@ class SyncWorkerHandle implements StreamingSync {
@override
Future<void> streamingSync() async {
await _channel.startSynchronization(
_database.openFactory.path, _crudThrottleTimeMs);
database.openFactory.path, crudThrottleTimeMs, syncParams);
}
}
91 changes: 69 additions & 22 deletions packages/powersync/lib/src/web/sync_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
library;

import 'dart:async';
import 'dart:convert';
import 'dart:js_interop';

import 'package:async/async.dart';
Expand Down Expand Up @@ -41,12 +42,15 @@ class _SyncWorker {
});
}

_SyncRunner referenceSyncTask(String databaseIdentifier,
int crudThrottleTimeMs, _ConnectedClient client) {
_SyncRunner referenceSyncTask(
String databaseIdentifier,
int crudThrottleTimeMs,
String? syncParamsEncoded,
_ConnectedClient client) {
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
return _SyncRunner(databaseIdentifier, crudThrottleTimeMs);
return _SyncRunner(databaseIdentifier);
})
..registerClient(client);
..registerClient(client, crudThrottleTimeMs, syncParamsEncoded);
}
}

Expand All @@ -64,11 +68,11 @@ class _ConnectedClient {
switch (type) {
case SyncWorkerMessageType.startSynchronization:
final request = payload as StartSynchronization;
_runner = _worker.referenceSyncTask(
request.databaseName, request.crudThrottleTimeMs, this);
_runner = _worker.referenceSyncTask(request.databaseName,
request.crudThrottleTimeMs, request.syncParamsEncoded, this);
return (JSObject(), null);
case SyncWorkerMessageType.abortSynchronization:
_runner?.unregisterClient(this);
_runner?.disconnectClient(this);
_runner = null;
return (JSObject(), null);
default:
Expand Down Expand Up @@ -105,7 +109,8 @@ class _ConnectedClient {

class _SyncRunner {
final String identifier;
final int crudThrottleTimeMs;
int crudThrottleTimeMs = 1;
String? syncParamsEncoded;

final StreamGroup<_RunnerEvent> _group = StreamGroup();
final StreamController<_RunnerEvent> _mainEvents = StreamController();
Expand All @@ -114,24 +119,46 @@ class _SyncRunner {
_ConnectedClient? databaseHost;
final connections = <_ConnectedClient>[];

_SyncRunner(this.identifier, this.crudThrottleTimeMs) {
_SyncRunner(this.identifier) {
_group.add(_mainEvents.stream);

Future(() async {
await for (final event in _group.stream) {
try {
switch (event) {
case _AddConnection(:final client):
case _AddConnection(
:final client,
:final crudThrottleTimeMs,
:final syncParamsEncoded
):
connections.add(client);
var reconnect = false;
if (this.crudThrottleTimeMs != crudThrottleTimeMs) {
this.crudThrottleTimeMs = crudThrottleTimeMs;
reconnect = true;
}
if (this.syncParamsEncoded != syncParamsEncoded) {
this.syncParamsEncoded = syncParamsEncoded;
reconnect = true;
}
if (sync == null) {
await _requestDatabase(client);
} else if (reconnect) {
// Parameters changed - reconnect.
sync?.abort();
sync = null;
await _requestDatabase(client);
}
case _RemoveConnection(:final client):
connections.remove(client);
if (connections.isEmpty) {
await sync?.abort();
sync = null;
}
case _DisconnectClient(:final client):
connections.remove(client);
await sync?.abort();
sync = null;
case _ActiveDatabaseClosed():
_logger.info('Remote database closed, finding a new client');
sync?.abort();
Expand Down Expand Up @@ -226,16 +253,20 @@ class _SyncRunner {
);
}

final syncParams = syncParamsEncoded == null
? null
: jsonDecode(syncParamsEncoded!) as Map<String, dynamic>;

sync = StreamingSyncImplementation(
adapter: BucketStorage(database),
credentialsCallback: client.channel.credentialsCallback,
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
uploadCrud: client.channel.uploadCrud,
crudUpdateTriggerStream: crudStream,
retryDelay: Duration(seconds: 3),
client: FetchClient(mode: RequestMode.cors),
identifier: identifier,
);
adapter: BucketStorage(database),
credentialsCallback: client.channel.credentialsCallback,
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
uploadCrud: client.channel.uploadCrud,
crudUpdateTriggerStream: crudStream,
retryDelay: Duration(seconds: 3),
client: FetchClient(mode: RequestMode.cors),
identifier: identifier,
syncParameters: syncParams);
sync!.statusStream.listen((event) {
_logger.fine('Broadcasting sync event: $event');
for (final client in connections) {
Expand All @@ -246,21 +277,31 @@ class _SyncRunner {
sync!.streamingSync();
}

void registerClient(_ConnectedClient client) {
_mainEvents.add(_AddConnection(client));
void registerClient(_ConnectedClient client, int currentCrudThrottleTimeMs,
String? currentSyncParamsEncoded) {
_mainEvents.add(_AddConnection(
client, currentCrudThrottleTimeMs, currentSyncParamsEncoded));
}

/// Remove a client, disconnecting if no clients remain..
void unregisterClient(_ConnectedClient client) {
_mainEvents.add(_RemoveConnection(client));
}

/// Remove a client, and immediately disconnect.
void disconnectClient(_ConnectedClient client) {
_mainEvents.add(_DisconnectClient(client));
}
}

sealed class _RunnerEvent {}

final class _AddConnection implements _RunnerEvent {
final _ConnectedClient client;
final int crudThrottleTimeMs;
final String? syncParamsEncoded;

_AddConnection(this.client);
_AddConnection(this.client, this.crudThrottleTimeMs, this.syncParamsEncoded);
}

final class _RemoveConnection implements _RunnerEvent {
Expand All @@ -269,6 +310,12 @@ final class _RemoveConnection implements _RunnerEvent {
_RemoveConnection(this.client);
}

final class _DisconnectClient implements _RunnerEvent {
final _ConnectedClient client;

_DisconnectClient(this.client);
}

final class _ActiveDatabaseClosed implements _RunnerEvent {
const _ActiveDatabaseClosed();
}
24 changes: 15 additions & 9 deletions packages/powersync/lib/src/web/sync_worker_protocol.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:js_interop';

import 'package:web/web.dart';
Expand All @@ -13,10 +14,12 @@ enum SyncWorkerMessageType {

/// Sent from client to the sync worker to request the synchronization
/// starting.
/// If parameters change, the sync worker reconnects.
startSynchronization,

/// Te [SyncWorkerMessage.payload] for the request is a numeric id, the
/// The [SyncWorkerMessage.payload] for the request is a numeric id, the
/// response can be anything (void).
/// This disconnects immediately, even if other clients are still open.
abortSynchronization,

/// Sent from the sync worker to the client when it needs an endpoint to
Expand Down Expand Up @@ -60,15 +63,16 @@ extension type SyncWorkerMessage._(JSObject _) implements JSObject {

@anonymous
extension type StartSynchronization._(JSObject _) implements JSObject {
external factory StartSynchronization({
required String databaseName,
required int crudThrottleTimeMs,
required int requestId,
});
external factory StartSynchronization(
{required String databaseName,
required int crudThrottleTimeMs,
required int requestId,
String? syncParamsEncoded});

external String get databaseName;
external int get requestId;
external int get crudThrottleTimeMs;
external String? get syncParamsEncoded;
}

@anonymous
Expand Down Expand Up @@ -315,15 +319,17 @@ final class WorkerCommunicationChannel {
await _numericRequest(SyncWorkerMessageType.ping);
}

Future<void> startSynchronization(
String databaseName, int crudThrottleTimeMs) async {
Future<void> startSynchronization(String databaseName, int crudThrottleTimeMs,
Map<String, dynamic>? syncParams) async {
final (id, completion) = _newRequest();
port.postMessage(SyncWorkerMessage(
type: SyncWorkerMessageType.startSynchronization.name,
payload: StartSynchronization(
databaseName: databaseName,
crudThrottleTimeMs: crudThrottleTimeMs,
requestId: id),
requestId: id,
syncParamsEncoded:
syncParams == null ? null : jsonEncode(syncParams)),
));
await completion;
}
Expand Down

0 comments on commit 82a784e

Please sign in to comment.