Skip to content

Commit

Permalink
Test partial sync with data
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Feb 19, 2025
1 parent 9df1e92 commit 8ac9f45
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
7 changes: 5 additions & 2 deletions packages/powersync_core/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,16 @@ class BucketStorage {
}
return r;
}
final bucketNames = [for (final c in checkpoint.checksums) c.bucket];
final bucketNames = [
for (final c in checkpoint.checksums)
if (forPriority == null || c.priority <= forPriority) c.bucket
];

await writeTransaction((tx) async {
await tx.execute(
"UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))",
[checkpoint.lastOpId, jsonEncode(bucketNames)]);
if (checkpoint.writeCheckpoint != null) {
if (forPriority == null && checkpoint.writeCheckpoint != null) {
await tx.execute(
"UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'",
[checkpoint.writeCheckpoint]);
Expand Down
36 changes: 32 additions & 4 deletions packages/powersync_core/test/in_memory_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,46 @@ void main() {

final checksums = [
for (var prio = 0; prio <= 3; prio++)
BucketChecksum(bucket: 'prio$prio', priority: prio, checksum: 0)
BucketChecksum(
bucket: 'prio$prio', priority: prio, checksum: 10 + prio)
];
syncService.addLine({
'checkpoint': Checkpoint(
lastOpId: '0',
lastOpId: '4',
writeCheckpoint: null,
checksums: checksums,
)
});
var operationId = 1;

void addRow(int priority) {
syncService.addLine({
'data': {
'bucket': 'prio$priority',
'data': [
{
'checksum': priority + 10,
'data': {'name': 'test', 'email': 'email'},
'op': 'PUT',
'op_id': '${operationId++}',
'object_id': 'prio$priority',
'object_type': 'customers'
}
]
}
});
}

// Receiving the checkpoint sets the state to downloading
await expectLater(
status, emits(isSyncStatus(downloading: true, hasSynced: false)));

// Emit partial sync complete for each priority but the last.
for (var prio = 0; prio < 3; prio++) {
addRow(prio);
syncService.addLine({
'partial_checkpoint_complete': {
'last_op_id': '0',
'last_op_id': operationId.toString(),
'priority': prio,
}
});
Expand All @@ -199,15 +220,22 @@ void main() {
isTrue,
)),
);

await database.waitForFirstSync(priority: BucketPriority(prio));
expect(await database.getAll('SELECT * FROM customers'),
hasLength(prio + 1));
}

// Complete the sync
addRow(3);
syncService.addLine({
'checkpoint_complete': {'last_op_id': '0'}
'checkpoint_complete': {'last_op_id': operationId.toString()}
});

await expectLater(
status, emits(isSyncStatus(downloading: false, hasSynced: true)));
await database.waitForFirstSync();
expect(await database.getAll('SELECT * FROM customers'), hasLength(4));
});

test('remembers last partial sync state', () async {
Expand Down

0 comments on commit 8ac9f45

Please sign in to comment.