Skip to content

Commit

Permalink
Add priority field
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Jan 29, 2025
1 parent 1fa7caa commit 6241a18
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 34 deletions.
33 changes: 21 additions & 12 deletions packages/powersync_core/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ class BucketStorage {

Future<List<BucketState>> getBucketStates() async {
final rows = await select(
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\'');
'SELECT name as bucket, priority, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\'');
return [
for (var row in rows)
BucketState(bucket: row['bucket'], opId: row['op_id'])
BucketState(
bucket: row['bucket'],
priority: row['priority'],
opId: row['op_id'],
)
];
}

Expand Down Expand Up @@ -100,8 +104,8 @@ class BucketStorage {
return false;
}

Future<SyncLocalDatabaseResult> syncLocalDatabase(
Checkpoint checkpoint) async {
Future<SyncLocalDatabaseResult> syncLocalDatabase(Checkpoint checkpoint,
{int? forPriority}) async {
final r = await validateChecksums(checkpoint);

if (!r.checkpointValid) {
Expand All @@ -124,7 +128,7 @@ class BucketStorage {
// Not flushing here - the flush will happen in the next step
}, flush: false);

final valid = await updateObjectsFromBuckets(checkpoint);
final valid = await updateObjectsFromBuckets(forPriority: forPriority);
if (!valid) {
return SyncLocalDatabaseResult(ready: false);
}
Expand All @@ -134,11 +138,11 @@ class BucketStorage {
return SyncLocalDatabaseResult(ready: true);
}

Future<bool> updateObjectsFromBuckets(Checkpoint checkpoint) async {
Future<bool> updateObjectsFromBuckets({int? forPriority}) async {
return writeTransaction((tx) async {
await tx.execute(
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
['sync_local', '']);
['sync_local', forPriority]);
final rs = await tx.execute('SELECT last_insert_rowid() as result');
final result = rs[0]['result'];
if (result == 1) {
Expand Down Expand Up @@ -321,9 +325,11 @@ class BucketStorage {

class BucketState {
final String bucket;
final int priority;
final String opId;

const BucketState({required this.bucket, required this.opId});
const BucketState(
{required this.bucket, required this.priority, required this.opId});

@override
String toString() {
Expand All @@ -332,17 +338,20 @@ class BucketState {

@override
int get hashCode {
return Object.hash(bucket, opId);
return Object.hash(bucket, priority, opId);
}

@override
bool operator ==(Object other) {
return other is BucketState && other.bucket == bucket && other.opId == opId;
return other is BucketState &&
other.priority == priority &&
other.bucket == bucket &&
other.opId == opId;
}
}

class SyncDataBatch {
List<SyncBucketData> buckets;
final class SyncDataBatch {
final List<SyncBucketData> buckets;

SyncDataBatch(this.buckets);
}
Expand Down
71 changes: 53 additions & 18 deletions packages/powersync_core/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -302,29 +302,36 @@ class StreamingSyncImplementation implements StreamingSync {
_statusStreamController.add(newStatus);
}

Future<bool> streamingSyncIteration(
{AbortController? abortController}) async {
adapter.startSession();
Future<(List<BucketRequest>, Map<String, BucketDescription>)>
_collectLocalBucketState() async {
final bucketEntries = await adapter.getBucketStates();

Map<String, String> initialBucketStates = {};
final initialRequests = [
for (final entry in bucketEntries) BucketRequest(entry.bucket, entry.opId)
];
final localDescriptions = {
for (final entry in bucketEntries)
entry.bucket: (
name: entry.bucket,
priority: entry.priority,
)
};

for (final entry in bucketEntries) {
initialBucketStates[entry.bucket] = entry.opId;
}
return (initialRequests, localDescriptions);
}

final List<BucketRequest> buckets = [];
for (var entry in initialBucketStates.entries) {
buckets.add(BucketRequest(entry.key, entry.value));
}
Future<bool> streamingSyncIteration(
{AbortController? abortController}) async {
adapter.startSession();

var (bucketRequests, bucketMap) = await _collectLocalBucketState();

Checkpoint? targetCheckpoint;
Checkpoint? validatedCheckpoint;
Checkpoint? appliedCheckpoint;
var bucketSet = Set<String>.from(initialBucketStates.keys);

var requestStream = streamingSyncRequest(
StreamingSyncRequest(buckets, syncParameters, clientId!));
StreamingSyncRequest(bucketRequests, syncParameters, clientId!));

var merged = addBroadcast(requestStream, _localPingController.stream);

Expand All @@ -343,13 +350,16 @@ class StreamingSyncImplementation implements StreamingSync {
switch (line) {
case Checkpoint():
targetCheckpoint = line;
final Set<String> bucketsToDelete = {...bucketSet};
final Set<String> newBuckets = {};
final Set<String> bucketsToDelete = {...bucketMap.keys};
final Map<String, BucketDescription> newBuckets = {};
for (final checksum in line.checksums) {
newBuckets.add(checksum.bucket);
newBuckets[checksum.bucket] = (
name: checksum.bucket,
priority: checksum.priority,
);
bucketsToDelete.remove(checksum.bucket);
}
bucketSet = newBuckets;
bucketMap = newBuckets;
await adapter.removeBuckets([...bucketsToDelete]);
_updateStatus(downloading: true);
case StreamingSyncCheckpointComplete():
Expand All @@ -371,6 +381,27 @@ class StreamingSyncImplementation implements StreamingSync {
lastSyncedAt: DateTime.now());
}

validatedCheckpoint = targetCheckpoint;
case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority):
final result = await adapter.syncLocalDatabase(targetCheckpoint!,
forPriority: bucketPriority);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
appliedCheckpoint = targetCheckpoint;

_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
}

validatedCheckpoint = targetCheckpoint;
case StreamingSyncCheckpointDiff():
// TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint
Expand All @@ -397,7 +428,8 @@ class StreamingSyncImplementation implements StreamingSync {
writeCheckpoint: diff.writeCheckpoint);
targetCheckpoint = newCheckpoint;

bucketSet = Set.from(newBuckets.keys);
bucketMap = newBuckets.map((name, checksum) =>
MapEntry(name, (name: name, priority: checksum.priority)));
await adapter.removeBuckets(diff.removedBuckets);
adapter.setTargetCheckpoint(targetCheckpoint);
case SyncBucketData():
Expand All @@ -424,6 +456,9 @@ class StreamingSyncImplementation implements StreamingSync {
});
}
}
case UnknownSyncLine(:final rawData):
isolateLogger.fine('Ignoring unknown sync line: $rawData');
break;
case null: // Local ping
if (targetCheckpoint == appliedCheckpoint) {
_updateStatus(
Expand Down
32 changes: 28 additions & 4 deletions packages/powersync_core/lib/src/sync_types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import 'dart:convert';
sealed class StreamingSyncLine {
const StreamingSyncLine();

/// Parses a [StreamingSyncLine] from JSON, or return `null` if the line is
/// not understood by this client.
static StreamingSyncLine? fromJson(Map<String, dynamic> line) {
/// Parses a [StreamingSyncLine] from JSON.
static StreamingSyncLine fromJson(Map<String, dynamic> line) {
if (line.containsKey('checkpoint')) {
return Checkpoint.fromJson(line['checkpoint']);
} else if (line.containsKey('checkpoint_diff')) {
Expand All @@ -19,11 +18,18 @@ sealed class StreamingSyncLine {
} else if (line.containsKey('token_expires_in')) {
return StreamingSyncKeepalive.fromJson(line);
} else {
return null;
return UnknownSyncLine(line);
}
}
}

/// A message from the sync service that this client doesn't support.
final class UnknownSyncLine implements StreamingSyncLine {
final Map<String, dynamic> rawData;

const UnknownSyncLine(this.rawData);
}

/// Indicates that a checkpoint is available, along with checksums for each
/// bucket in the checkpoint.
///
Expand Down Expand Up @@ -54,8 +60,11 @@ final class Checkpoint extends StreamingSyncLine {
}
}

typedef BucketDescription = ({String name, int priority});

class BucketChecksum {
final String bucket;
final int priority;
final int checksum;

/// Count is informational only
Expand All @@ -64,12 +73,14 @@ class BucketChecksum {

const BucketChecksum(
{required this.bucket,
required this.priority,
required this.checksum,
this.count,
this.lastOpId});

BucketChecksum.fromJson(Map<String, dynamic> json)
: bucket = json['bucket'],
priority = json['priority'],
checksum = json['checksum'],
count = json['count'],
lastOpId = json['last_op_id'];
Expand Down Expand Up @@ -112,6 +123,19 @@ final class StreamingSyncCheckpointComplete extends StreamingSyncLine {
: lastOpId = json['last_op_id'];
}

/// Sent after all the [SyncBucketData] messages for a given priority within a
/// checkpoint have been sent.
final class StreamingSyncCheckpointPartiallyComplete extends StreamingSyncLine {
String lastOpId;
int bucketPriority;

StreamingSyncCheckpointPartiallyComplete(this.lastOpId, this.bucketPriority);

StreamingSyncCheckpointPartiallyComplete.fromJson(Map<String, dynamic> json)
: lastOpId = json['last_op_id'],
bucketPriority = json['priority'];
}

/// Sent as a periodic ping to keep the connection alive and to notify the
/// client about the remaining lifetime of the JWT.
///
Expand Down

0 comments on commit 6241a18

Please sign in to comment.