diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index 96026d2..3e3233b 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -5,6 +5,7 @@ import 'package:sqlite3/common.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/shared_utils.dart'; +import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; import 'package:sqlite_async/web.dart'; import 'protocol.dart'; import 'web_mutex.dart'; @@ -15,10 +16,14 @@ class WebDatabase final Database _database; final Mutex? _mutex; + /// For persistent databases that aren't backed by a shared worker, we use + /// web broadcast channels to forward local update events to other tabs. + final BroadcastUpdates? broadcastUpdates; + @override bool closed = false; - WebDatabase(this._database, this._mutex); + WebDatabase(this._database, this._mutex, {this.broadcastUpdates}); @override Future close() async { diff --git a/packages/sqlite_async/lib/src/web/database/broadcast_updates.dart b/packages/sqlite_async/lib/src/web/database/broadcast_updates.dart new file mode 100644 index 0000000..5514e74 --- /dev/null +++ b/packages/sqlite_async/lib/src/web/database/broadcast_updates.dart @@ -0,0 +1,50 @@ +import 'dart:js_interop'; + +import 'package:sqlite_async/sqlite_async.dart'; +import 'package:web/web.dart' as web; + +/// Utility to share received [UpdateNotification]s with other tabs using +/// broadcast channels. +class BroadcastUpdates { + final web.BroadcastChannel _channel; + + BroadcastUpdates(String name) + : _channel = web.BroadcastChannel('sqlite3_async_updates/$name'); + + Stream get updates { + return web.EventStreamProviders.messageEvent + .forTarget(_channel) + .map((event) { + final data = event.data as _BroadcastMessage; + if (data.a == 0) { + final payload = data.b as JSArray; + return UpdateNotification( + payload.toDart.map((e) => e.toDart).toSet()); + } else { + return null; + } + }) + .where((e) => e != null) + .cast(); + } + + void send(UpdateNotification notification) { + _channel.postMessage(_BroadcastMessage.notifications(notification)); + } +} + +@JS() +@anonymous +extension type _BroadcastMessage._(JSObject _) implements JSObject { + external int get a; + external JSAny get b; + + external factory _BroadcastMessage({required int a, required JSAny b}); + + factory _BroadcastMessage.notifications(UpdateNotification notification) { + return _BroadcastMessage( + a: 0, + b: notification.tables.map((e) => e.toJS).toList().toJS, + ); + } +} diff --git a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart index c7e2a70..c00377c 100644 --- a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart @@ -45,6 +45,7 @@ class SqliteDatabaseImpl late final Mutex mutex; late final WebDatabase _connection; + StreamSubscription? _broadcastUpdatesSubscription; /// Open a SqliteDatabase. /// @@ -85,9 +86,28 @@ class SqliteDatabaseImpl Future _init() async { _connection = await openFactory.openConnection(SqliteOpenOptions( primaryConnection: true, readOnly: false, mutex: mutex)) as WebDatabase; - _connection.updates.forEach((update) { - updatesController.add(update); - }); + + final broadcastUpdates = _connection.broadcastUpdates; + if (broadcastUpdates == null) { + // We can use updates directly from the database. + _connection.updates.forEach((update) { + updatesController.add(update); + }); + } else { + _connection.updates.forEach((update) { + updatesController.add(update); + + // Share local updates with other tabs + broadcastUpdates.send(update); + }); + + // Also add updates from other tabs, note that things we send aren't + // received by our tab. + _broadcastUpdatesSubscription = + broadcastUpdates.updates.listen((updates) { + updatesController.add(updates); + }); + } } T _runZoned(T Function() callback, {required String debugContext}) { @@ -132,6 +152,8 @@ class SqliteDatabaseImpl @override Future close() async { await isInitialized; + _broadcastUpdatesSubscription?.cancel(); + updatesController.close(); return _connection.close(); } diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index 521320b..66000f9 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; import 'package:sqlite_async/src/web/web_mutex.dart'; import 'database.dart'; @@ -57,7 +58,14 @@ class DefaultSqliteOpenFactory ? null : MutexImpl(identifier: path); // Use the DB path as a mutex identifier - return WebDatabase(connection.database, options.mutex ?? mutex); + BroadcastUpdates? updates; + if (connection.access != AccessMode.throughSharedWorker && + connection.storage != StorageMode.inMemory) { + updates = BroadcastUpdates(path); + } + + return WebDatabase(connection.database, options.mutex ?? mutex, + broadcastUpdates: updates); } @override