Skip to content

Commit

Permalink
Broadcast updates across tabs if needed (#74)
Browse files Browse the repository at this point in the history
* Share database updates across tabs

* Avoid unecessary toList()

* Remove unused constructor

* Also use events locally

* Explain duplicate add
  • Loading branch information
simolus3 authored Nov 1, 2024
1 parent a62c9e8 commit e26fb09
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 5 deletions.
7 changes: 6 additions & 1 deletion packages/sqlite_async/lib/src/web/database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:sqlite3/common.dart';
import 'package:sqlite3_web/sqlite3_web.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/utils/shared_utils.dart';
import 'package:sqlite_async/src/web/database/broadcast_updates.dart';
import 'package:sqlite_async/web.dart';
import 'protocol.dart';
import 'web_mutex.dart';
Expand All @@ -15,10 +16,14 @@ class WebDatabase
final Database _database;
final Mutex? _mutex;

/// For persistent databases that aren't backed by a shared worker, we use
/// web broadcast channels to forward local update events to other tabs.
final BroadcastUpdates? broadcastUpdates;

@override
bool closed = false;

WebDatabase(this._database, this._mutex);
WebDatabase(this._database, this._mutex, {this.broadcastUpdates});

@override
Future<void> close() async {
Expand Down
50 changes: 50 additions & 0 deletions packages/sqlite_async/lib/src/web/database/broadcast_updates.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import 'dart:js_interop';

import 'package:sqlite_async/sqlite_async.dart';
import 'package:web/web.dart' as web;

/// Utility to share received [UpdateNotification]s with other tabs using
/// broadcast channels.
class BroadcastUpdates {
final web.BroadcastChannel _channel;

BroadcastUpdates(String name)
: _channel = web.BroadcastChannel('sqlite3_async_updates/$name');

Stream<UpdateNotification> get updates {
return web.EventStreamProviders.messageEvent
.forTarget(_channel)
.map((event) {
final data = event.data as _BroadcastMessage;
if (data.a == 0) {
final payload = data.b as JSArray<JSString>;
return UpdateNotification(
payload.toDart.map((e) => e.toDart).toSet());
} else {
return null;
}
})
.where((e) => e != null)
.cast();
}

void send(UpdateNotification notification) {
_channel.postMessage(_BroadcastMessage.notifications(notification));
}
}

@JS()
@anonymous
extension type _BroadcastMessage._(JSObject _) implements JSObject {
external int get a;
external JSAny get b;

external factory _BroadcastMessage({required int a, required JSAny b});

factory _BroadcastMessage.notifications(UpdateNotification notification) {
return _BroadcastMessage(
a: 0,
b: notification.tables.map((e) => e.toJS).toList().toJS,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class SqliteDatabaseImpl

late final Mutex mutex;
late final WebDatabase _connection;
StreamSubscription? _broadcastUpdatesSubscription;

/// Open a SqliteDatabase.
///
Expand Down Expand Up @@ -85,9 +86,28 @@ class SqliteDatabaseImpl
Future<void> _init() async {
_connection = await openFactory.openConnection(SqliteOpenOptions(
primaryConnection: true, readOnly: false, mutex: mutex)) as WebDatabase;
_connection.updates.forEach((update) {
updatesController.add(update);
});

final broadcastUpdates = _connection.broadcastUpdates;
if (broadcastUpdates == null) {
// We can use updates directly from the database.
_connection.updates.forEach((update) {
updatesController.add(update);
});
} else {
_connection.updates.forEach((update) {
updatesController.add(update);

// Share local updates with other tabs
broadcastUpdates.send(update);
});

// Also add updates from other tabs, note that things we send aren't
// received by our tab.
_broadcastUpdatesSubscription =
broadcastUpdates.updates.listen((updates) {
updatesController.add(updates);
});
}
}

T _runZoned<T>(T Function() callback, {required String debugContext}) {
Expand Down Expand Up @@ -132,6 +152,8 @@ class SqliteDatabaseImpl
@override
Future<void> close() async {
await isInitialized;
_broadcastUpdatesSubscription?.cancel();
updatesController.close();
return _connection.close();
}

Expand Down
10 changes: 9 additions & 1 deletion packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:sqlite3/wasm.dart';
import 'package:sqlite3_web/sqlite3_web.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/web/database/broadcast_updates.dart';
import 'package:sqlite_async/src/web/web_mutex.dart';

import 'database.dart';
Expand Down Expand Up @@ -57,7 +58,14 @@ class DefaultSqliteOpenFactory
? null
: MutexImpl(identifier: path); // Use the DB path as a mutex identifier

return WebDatabase(connection.database, options.mutex ?? mutex);
BroadcastUpdates? updates;
if (connection.access != AccessMode.throughSharedWorker &&
connection.storage != StorageMode.inMemory) {
updates = BroadcastUpdates(path);
}

return WebDatabase(connection.database, options.mutex ?? mutex,
broadcastUpdates: updates);
}

@override
Expand Down

0 comments on commit e26fb09

Please sign in to comment.