Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bucket priorities and partial checkpoints #243

Merged
merged 28 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d0019a1
Use sealed classes for sync lines
simolus3 Jan 28, 2025
1fa7caa
Doc comments on sync lines
simolus3 Jan 29, 2025
6241a18
Add priority field
simolus3 Jan 29, 2025
8a28204
Track bucket priorities
simolus3 Jan 29, 2025
ad25f05
Fix tests
simolus3 Jan 30, 2025
d238769
Validate partial checksums
simolus3 Feb 3, 2025
1bd7447
Use updated format not persisting priorities
simolus3 Feb 3, 2025
f423ffd
Start with status API
simolus3 Feb 3, 2025
c99c043
Add status for each priority
simolus3 Feb 4, 2025
7ed6945
Only verify relevant buckets
simolus3 Feb 4, 2025
793e01e
Tests for partial sync operations
simolus3 Feb 5, 2025
c56188a
Merge remote-tracking branch 'origin/main' into feat/bucket-priorities
simolus3 Feb 10, 2025
9f6fec5
Fix unecessary null checks
simolus3 Feb 10, 2025
0da750c
Fix bad merges
simolus3 Feb 10, 2025
97db1fc
Fix initial hasSynced state
simolus3 Feb 10, 2025
c7bf51a
Merge branch 'main' into feat/bucket-priorities
simolus3 Feb 12, 2025
6feb3e0
Fix checking for completed syncs
simolus3 Feb 17, 2025
e7297c0
Merge branch 'main' into feat/bucket-priorities
simolus3 Feb 18, 2025
fc65039
Raise minimum core version
simolus3 Feb 18, 2025
fe88057
Fix web disconnect tests
simolus3 Feb 18, 2025
9df1e92
Don't pass unused priority to validate checkpoint impl
simolus3 Feb 18, 2025
8ac9f45
Test partial sync with data
simolus3 Feb 19, 2025
4ebad73
Avoid sync "operation" when talking about checkpoints
simolus3 Feb 21, 2025
b060725
Adopt priorities in example app
simolus3 Feb 21, 2025
ebc2652
Update core version
simolus3 Feb 24, 2025
cfdc111
Don't require priority on checksum
simolus3 Feb 24, 2025
505acb7
Clean up wait for first sync in lists widget
simolus3 Feb 24, 2025
12c4124
Rename statusInPriority to priorityStatusEntries
simolus3 Feb 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions demos/django-todolist/macos/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ PODS:
- path_provider_foundation (0.0.1):
- Flutter
- FlutterMacOS
- powersync-sqlite-core (0.1.6)
- powersync-sqlite-core (0.3.9)
- powersync_flutter_libs (0.0.1):
- FlutterMacOS
- powersync-sqlite-core (~> 0.1.6)
- powersync-sqlite-core (~> 0.3.8)
- shared_preferences_foundation (0.0.1):
- Flutter
- FlutterMacOS
Expand Down Expand Up @@ -55,13 +55,13 @@ EXTERNAL SOURCES:

SPEC CHECKSUMS:
FlutterMacOS: 8f6f14fa908a6fb3fba0cd85dbd81ec4b251fb24
path_provider_foundation: 2b6b4c569c0fb62ec74538f866245ac84301af46
powersync-sqlite-core: 4c38c8f470f6dca61346789fd5436a6826d1e3dd
powersync_flutter_libs: 1eb1c6790a72afe08e68d4cc489d71ab61da32ee
shared_preferences_foundation: fcdcbc04712aee1108ac7fda236f363274528f78
path_provider_foundation: 080d55be775b7414fd5a5ef3ac137b97b097e564
powersync-sqlite-core: 7515d321eb8e3c08b5259cdadb9d19b1876fe13a
powersync_flutter_libs: 330d8309223a121ec15a7334d9edc105053e5f82
shared_preferences_foundation: 9e1978ff2562383bd5676f64ec4e9aa8fa06a6f7
sqlite3: 292c3e1bfe89f64e51ea7fc7dab9182a017c8630
sqlite3_flutter_libs: 5ca46c1a04eddfbeeb5b16566164aa7ad1616e7b
sqlite3_flutter_libs: 03311aede9d32fb2d24e32bebb8cd01c3b2e6239

PODFILE CHECKSUM: 236401fc2c932af29a9fcf0e97baeeb2d750d367

COCOAPODS: 1.15.2
COCOAPODS: 1.16.2
6 changes: 5 additions & 1 deletion demos/django-todolist/macos/Runner/AppDelegate.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import Cocoa
import FlutterMacOS

@NSApplicationMain
@main
class AppDelegate: FlutterAppDelegate {
override func applicationShouldTerminateAfterLastWindowClosed(_ sender: NSApplication) -> Bool {
return true
}

override func applicationSupportsSecureRestorableState(_ app: NSApplication) -> Bool {
return true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
PRODUCT_NAME = PowerSync Django Demo

// The application's bundle identifier
PRODUCT_BUNDLE_IDENTIFIER = co.powersync.demotodolist
PRODUCT_BUNDLE_IDENTIFIER = co.powersync.demotodolist.django

// The copyright displayed in application information
PRODUCT_COPYRIGHT = Copyright © 2023 Journey Mobile, Inc. All rights reserved.
6 changes: 3 additions & 3 deletions demos/django-todolist/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -310,21 +310,21 @@ packages:
path: "../../packages/powersync"
relative: true
source: path
version: "1.11.2"
version: "1.11.3"
powersync_core:
dependency: "direct overridden"
description:
path: "../../packages/powersync_core"
relative: true
source: path
version: "1.1.2"
version: "1.1.3"
powersync_flutter_libs:
dependency: "direct overridden"
description:
path: "../../packages/powersync_flutter_libs"
relative: true
source: path
version: "0.4.4"
version: "0.4.5"
pub_semver:
dependency: transitive
description:
Expand Down
50 changes: 35 additions & 15 deletions packages/powersync_core/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BucketStorage {
await _updateBucket2(
tx,
jsonEncode({
'buckets': [b]
'buckets': [b],
}));
}
// No need to flush - the data is not directly visible to the user either way.
Expand Down Expand Up @@ -101,31 +101,35 @@ class BucketStorage {
return false;
}

Future<SyncLocalDatabaseResult> syncLocalDatabase(
Checkpoint checkpoint) async {
final r = await validateChecksums(checkpoint);
Future<SyncLocalDatabaseResult> syncLocalDatabase(Checkpoint checkpoint,
{int? forPriority}) async {
final r = await validateChecksums(checkpoint, priority: forPriority);

if (!r.checkpointValid) {
for (String b in r.checkpointFailures ?? []) {
await deleteBucket(b);
}
return r;
}
final bucketNames = [for (final c in checkpoint.checksums) c.bucket];
final bucketNames = [
for (final c in checkpoint.checksums)
if (forPriority == null || c.priority <= forPriority) c.bucket
];

await writeTransaction((tx) async {
await tx.execute(
"UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))",
[checkpoint.lastOpId, jsonEncode(bucketNames)]);
if (checkpoint.writeCheckpoint != null) {
if (forPriority == null && checkpoint.writeCheckpoint != null) {
await tx.execute(
"UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'",
[checkpoint.writeCheckpoint]);
}
// Not flushing here - the flush will happen in the next step
}, flush: false);

final valid = await updateObjectsFromBuckets(checkpoint);
final valid = await updateObjectsFromBuckets(checkpoint,
forPartialPriority: forPriority);
if (!valid) {
return SyncLocalDatabaseResult(ready: false);
}
Expand All @@ -135,11 +139,25 @@ class BucketStorage {
return SyncLocalDatabaseResult(ready: true);
}

Future<bool> updateObjectsFromBuckets(Checkpoint checkpoint) async {
Future<bool> updateObjectsFromBuckets(Checkpoint checkpoint,
{int? forPartialPriority}) async {
return writeTransaction((tx) async {
await tx.execute(
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
['sync_local', '']);
await tx
.execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)", [
'sync_local',
forPartialPriority != null
? jsonEncode({
'priority': forPartialPriority,
// If we're at a partial checkpoint, we should only publish the
// buckets at the completed priority levels.
'buckets': [
for (final desc in checkpoint.checksums)
// Note that higher priorities are encoded as smaller values
if (desc.priority <= forPartialPriority) desc.bucket,
],
})
: null,
]);
final rs = await tx.execute('SELECT last_insert_rowid() as result');
final result = rs[0]['result'];
if (result == 1) {
Expand All @@ -154,10 +172,12 @@ class BucketStorage {
}, flush: true);
}

Future<SyncLocalDatabaseResult> validateChecksums(
Checkpoint checkpoint) async {
final rs = await select("SELECT powersync_validate_checkpoint(?) as result",
[jsonEncode(checkpoint)]);
Future<SyncLocalDatabaseResult> validateChecksums(Checkpoint checkpoint,
{int? priority}) async {
final rs =
await select("SELECT powersync_validate_checkpoint(?) as result", [
jsonEncode({...checkpoint.toJson(priority: priority)})
]);
final result =
jsonDecode(rs[0]['result'] as String) as Map<String, dynamic>;
if (result['valid'] as bool) {
Expand Down
67 changes: 54 additions & 13 deletions packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,65 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {

Future<void> _updateHasSynced() async {
// Query the database to see if any data has been synced.
final result =
await database.get('SELECT powersync_last_synced_at() as synced_at');
final timestamp = result['synced_at'] as String?;
final hasSynced = timestamp != null;
final result = await database.getAll(
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority;',
);
const prioritySentinel = 2147483647;
var hasSynced = false;
DateTime? lastCompleteSync;
final priorityStatus = <SyncPriorityStatus>[];

DateTime parseDateTime(String sql) {
return DateTime.parse('${sql}Z').toLocal();
}

for (final row in result) {
final priority = row.columnAt(0) as int;
final lastSyncedAt = parseDateTime(row.columnAt(1) as String);

if (priority == prioritySentinel) {
hasSynced = true;
lastCompleteSync = lastSyncedAt;
} else {
priorityStatus.add((
hasSynced: true,
lastSyncedAt: lastSyncedAt,
priority: BucketPriority(priority)
));
}
}

if (hasSynced != currentStatus.hasSynced) {
final lastSyncedAt =
timestamp == null ? null : DateTime.parse('${timestamp}Z').toLocal();
final status =
SyncStatus(hasSynced: hasSynced, lastSyncedAt: lastSyncedAt);
final status = SyncStatus(
hasSynced: hasSynced,
lastSyncedAt: lastCompleteSync,
statusInPriority: priorityStatus,
);
setStatus(status);
}
}

/// Returns a [Future] which will resolve once the first full sync has completed.
Future<void> waitForFirstSync() async {
if (currentStatus.hasSynced ?? false) {
/// Returns a [Future] which will resolve once a synchronization operation has
/// completed.
///
/// When [priority] is null (the default), this method waits for a full sync
/// operation to complete. When set to a [BucketPriority] however, it also
/// completes once a partial sync operation containing that priority has
/// completed.
Future<void> waitForFirstSync({BucketPriority? priority}) async {
bool matches(SyncStatus status) {
if (priority == null) {
return status.hasSynced == true;
} else {
return status.statusForPriority(priority).hasSynced == true;
}
}

if (matches(currentStatus)) {
return;
}
await for (final result in statusStream) {
if (result.hasSynced ?? false) {
if (matches(result)) {
break;
}
}
Expand Down Expand Up @@ -187,7 +225,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
await disconnect();
// Now we can close the database
await database.close();
await statusStreamController.close();

// If there are paused subscriptionso n the status stream, don't delay
// closing the database because of that.
unawaited(statusStreamController.close());
}

/// Connect to the PowerSync service, and keep the databases in sync.
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync_core/lib/src/setup_web.dart
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ bool coreVersionIsInRange(String tag) {
// Sets the range of powersync core version that is compatible with the sqlite3 version
// We're a little more selective in the versions chosen here than the range
// we're compatible with.
VersionConstraint constraint = VersionConstraint.parse('>=0.3.0 <0.4.0');
VersionConstraint constraint = VersionConstraint.parse('>=0.3.10 <0.4.0');
List<String> parts = tag.split('-');
String powersyncPart = parts[1];

Expand Down
Loading