Skip to content

Commit f811bbb

Browse files
committed
Fix Pusher bugs that omit sequences from checkpoint
I saw a failure of "Continuous Super-Fast Push" from a Windows build, where the ending checkpoint was incorrect. Near the end of that test it logged the active replicator checkpoint as: `{"local":42,"localCompleted":[0,43,48,154]}` which is clearly wrong as it shows some sequences unable to be sent. The test log showed the Pusher holding off on a bunch of consecutive sequences ("...Found 0 changes up to #43" etc) because it was already sending a revision of that doc. This was logged for sequences 43 thru 48, which is very close to the missing sequences in the checkpoint (43-47). From inspection, I deduced the problem to be that the code block with comment "This doc already has a revision being sent; wait till that one is done" stores the current rev into `iDoc->second->nextRev` to be processed later; but if that field already holds a revision that one gets dropped on the floor: it's obsoleted by the current one and doesn't need to be sent, but the checkpointer is never notified. Adding a call to `_checkpointer.completedSequence` in that case should fix the problem. (Also, the existing call to `_checkpointer.addPendingSequence` is unnecessary since the Changes- Feed already marks sequences as pending.) Unfortunately I'm unable to reproduce the test failure locally; it probably only occurs because the test machine is slow. PS: Later I found another bug with the same effect, this one involving `_conflictsIMightRetry`. Added it to the fix.
1 parent d893d42 commit f811bbb

3 files changed

Lines changed: 33 additions & 13 deletions

File tree

Replicator/Pusher+Revs.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ namespace litecore::repl {
289289
}
290290
}
291291

292-
// If sending a rev that's been obsoleted by a newer one, mark the sequence as complete and send
292+
// This RevToSend has been obsoleted by a newer one; mark the sequence as complete and set `*c4Err` to
293293
// a 410 Gone error. (Common subroutine of sendRevision & shouldRetryConflictWithNewerAncestor.)
294294
void Pusher::revToSendIsObsolete(const RevToSend& request, C4Error* c4err) {
295295
logInfo("Revision '%.*s' #%.*s is obsolete; not sending it", SPLAT(request.docID), SPLAT(request.revID));
@@ -375,7 +375,6 @@ namespace litecore::repl {
375375
// `synced` - whether the revision was successfully stored on the peer
376376
void Pusher::doneWithRev(RevToSend* rev, bool completed, bool synced) {
377377
if ( !passive() ) {
378-
logDebug("** doneWithRev %.*s #%.*s", SPLAT(rev->docID), SPLAT(rev->revID)); //TEMP
379378
addProgress({rev->bodySize, 0});
380379
if ( completed ) {
381380
_checkpointer.completedSequence(rev->sequence);

Replicator/Pusher.cc

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,11 @@ namespace litecore::repl {
125125
++iChange;
126126
} else {
127127
// This doc already has a revision being sent; wait till that one is done
128-
logVerbose("Holding off on change '%.*s' %.*s till earlier rev %.*s is done", SPLAT(rev->docID),
129-
SPLAT(rev->revID), SPLAT(iDoc->second->revID));
130-
iDoc->second->nextRev = rev;
131-
if ( !passive() ) _checkpointer.addPendingSequence(rev->sequence);
128+
logInfo("Holding off on change '%.*s' %.*s till earlier rev %.*s is done", SPLAT(rev->docID),
129+
SPLAT(rev->revID), SPLAT(iDoc->second->revID));
130+
auto& nextRev = iDoc->second->nextRev;
131+
if ( nextRev && !passive() ) revToSendIsObsolete(*nextRev, nullptr);
132+
nextRev = std::move(rev);
132133
iChange = changes.revs.erase(iChange); // remove from `changes`
133134
}
134135
}
@@ -377,15 +378,18 @@ namespace litecore::repl {
377378
}
378379

379380
if ( shouldRetryConflictWithNewerAncestor(change, serverRevID) ) {
380-
// I have a newer revision to send in its place:
381+
// Retry this revision with its updated remoteAncestorRevID:
381382
RevToSendList changes = {change};
382383
sendChanges(changes);
383-
return true;
384+
return true; // return early, w/o caling doneWithRev()
384385
} else if ( _options->pull(collectionIndex()) <= kC4Passive ) {
386+
// I'm not pulling, so there's no way to get the server revision. Give up.
385387
C4Error error = C4Error::make(WebSocketDomain, 409, "conflicts with newer server revision"_sl);
386388
finishedDocumentWithError(change, error, false);
387389
} else {
388-
completed = false;
390+
// The current server rev will be pulled soon, either updating the doc or creating
391+
// a conflict that the app will merge (creating a new rev.)
392+
// Either way, this rev is done, so leave `completed` true.
389393
}
390394
} else {
391395
// Other error:
@@ -447,9 +451,17 @@ namespace litecore::repl {
447451
return true;
448452
}
449453
} else {
450-
// No change to remote ancestor, but try again later if it changes:
454+
// No change to remote ancestor, but try again later if it changes.
455+
// (if an earlier rev of the doc was in this state, overwrite it & mark its seq complete.)
451456
logInfo("Will try again if remote rev of '%.*s' is updated", SPLAT(rev->docID));
452-
_conflictsIMightRetry.emplace(rev->docID, rev);
457+
if ( auto i = _conflictsIMightRetry.find(rev->docID); i != _conflictsIMightRetry.end() ) {
458+
if ( i->second->sequence < rev->sequence ) {
459+
revToSendIsObsolete(*i->second);
460+
i->second = rev;
461+
}
462+
} else {
463+
_conflictsIMightRetry.emplace(rev->docID, rev);
464+
}
453465
}
454466
} else {
455467
// Doc has changed, so this rev is obsolete
@@ -478,6 +490,7 @@ namespace litecore::repl {
478490
"but local doc has changed",
479491
SPLAT(docID), SPLAT(collectionSpec().scope), SPLAT(collectionSpec().name),
480492
SPLAT(foreignAncestor));
493+
revToSendIsObsolete(*rev, nullptr);
481494
} else if ( doc->selectRevision(foreignAncestor, false) && !(doc->selectedRev().flags & kRevIsConflict) ) {
482495
// The remote rev is an ancestor of my revision, so retry it:
483496
doc->selectCurrentRevision();

Replicator/tests/ReplicatorLoopbackTest.hh

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ class ReplicatorLoopbackTest
437437
auto& conflictHandlerRunning = _conflictHandlerRunning;
438438
_conflictHandler = [resolvDB, &conflictHandlerRunning](ReplicatedRev* rev) {
439439
// Note: Can't use Catch (CHECK, REQUIRE) on a background thread
440-
Log("Resolving conflict in '%.*s' ...", SPLAT(rev->docID));
440+
Log("Resolving conflict in '%.*s' %.*s ...", SPLAT(rev->docID), SPLAT(rev->revID));
441441

442442
conflictHandlerRunning = true;
443443
TransactionHelper t(resolvDB);
@@ -449,6 +449,12 @@ class ReplicatorLoopbackTest
449449
WarnError("conflictHandler: Couldn't read doc '%.*s'", SPLAT(rev->docID));
450450
Require(doc);
451451
}
452+
if ( !(doc->flags & kDocConflicted) ) {
453+
Log("conflictHandler: Doc '%.*s' not conflicted anymore (at %.*s)", SPLAT(rev->docID),
454+
SPLAT(doc->revID));
455+
conflictHandlerRunning = false;
456+
return;
457+
}
452458
alloc_slice localRevID = doc->selectedRev.revID;
453459
C4RevisionFlags localFlags = doc->selectedRev.flags;
454460
FLDict localBody = c4doc_getProperties(doc);
@@ -529,7 +535,9 @@ class ReplicatorLoopbackTest
529535
string json = stringprintf(R"({"db":"%p","i":%d})", db, revNo);
530536
revID = createFleeceRev(collection, docID, nullslice, slice(json));
531537
}
532-
Log("-------- %s %d: Created rev '%.*s' #%s --------", logName, revNo, SPLAT(docID), revID.c_str());
538+
unsigned long long sequence = collection->getLastSequence();
539+
Log("-------- %s %d: Created rev '%.*s' %s (seq #%llu) --------", logName, revNo, SPLAT(docID),
540+
revID.c_str(), sequence);
533541
}
534542
Log("-------- %s: Done creating revs --------", logName);
535543
}

0 commit comments

Comments
 (0)