Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {

// ////// DOCUMENTS:

func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sender, docID, revID string, deltaSrcRevID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error {
func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sender, docID, revID string, deltaSrcRevID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int, remoteIsLegacyRev bool) error {
bsc.replicationStats.SendRevDeltaRequestedCount.Add(1)

revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID)
Expand All @@ -922,40 +922,36 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen
} else if base.IsFleeceDeltaError(err) {
// Something went wrong in the diffing library. We want to know about this!
base.WarnfCtx(ctx, "Falling back to full body replication. Error generating delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, remoteIsLegacyRev)
} else if err == base.ErrDeltaSourceIsTombstone {
base.TracefCtx(ctx, base.KeySync, "Falling back to full body replication. Delta source %s is tombstone. Unable to generate delta to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, remoteIsLegacyRev)
} else if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s - err: %v", deltaSrcRevID, revID, base.UD(docID), err)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, remoteIsLegacyRev)
}

if redactedRev != nil {
var history []string
var revTreeProperty []string
if !bsc.useHLV() || bsc.sendRevTreeProperty() {
localIsLegacyRev := redactedRev.CV == nil
var revTreeHistory []string
if !bsc.useHLV() || localIsLegacyRev || remoteIsLegacyRev || bsc.sendRevTreeProperty() {
var err error
history, err = toHistory(redactedRev.History, knownRevs, maxHistory)
revTreeHistory, err = toHistory(redactedRev.History, knownRevs, maxHistory)
if err != nil {
err := base.RedactErrorf("Could not get rev tree history for replacement rev in sendRevAsDelta %s %s: %w, sending a noRev to skip this revision for replication at sequence %s.", base.UD(docID), revID, err, seq)
base.WarnfCtx(ctx, "%s", err)
return bsc.sendNoRev(sender, docID, revID, collectionIdx, seq, err)
}
} else {
history = append(history, redactedRev.HlvHistory)
}
if bsc.sendRevTreeProperty() {
revTreeProperty = append(revTreeProperty, redactedRev.RevID)
history, err := toHistory(redactedRev.History, knownRevs, maxHistory)
if err != nil {
err := base.RedactErrorf("Could not get rev tree history for replacement rev when sending in sendRevAsDelta %s %s: %w, sending a noRev to skip this document for replication at sequence %s.", base.UD(docID), redactedRev.RevID, err, seq)
base.WarnfCtx(ctx, "%s", err)
return bsc.sendNoRev(sender, docID, revID, collectionIdx, seq, err)
}
revTreeProperty = append(revTreeProperty, history...)
}

history, revTreeProperty := bsc.buildRevHistory(revHistoryInput{
revTreeHistory: revTreeHistory,
hlvHistory: redactedRev.HlvHistory,
revID: redactedRev.RevID,
localIsLegacyRev: localIsLegacyRev,
remoteIsLegacyRev: remoteIsLegacyRev,
})

properties, err := blipRevMessageProperties(history, redactedRev.Deleted, seq, "", revTreeProperty)
if err != nil {
return err
Expand All @@ -965,16 +961,16 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen

if revDelta == nil {
base.DebugfCtx(ctx, base.KeySync, "Falling back to full body replication. Couldn't get delta from %s to %s for key %s", deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, remoteIsLegacyRev)
}

resendFullRevisionFunc := func() error {
base.InfofCtx(ctx, base.KeySync, "Resending revision as full body. Peer couldn't process delta %s from %s to %s for key %s", base.UD(revDelta.DeltaBytes), deltaSrcRevID, revID, base.UD(docID))
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, false)
return bsc.sendRevision(ctx, sender, docID, revID, seq, knownRevs, maxHistory, handleChangesResponseCollection, collectionIdx, remoteIsLegacyRev)
}

base.TracefCtx(ctx, base.KeySync, "docID: %s - delta: %v", base.UD(docID), base.UD(string(revDelta.DeltaBytes)))
if err := bsc.sendDelta(ctx, sender, docID, collectionIdx, deltaSrcRevID, revDelta, seq, resendFullRevisionFunc); err != nil {
if err := bsc.sendDelta(ctx, sender, docID, collectionIdx, deltaSrcRevID, revDelta, seq, remoteIsLegacyRev, resendFullRevisionFunc); err != nil {
return err
}

Expand Down
159 changes: 111 additions & 48 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
var err error

if deltaSrcRevID != "" {
err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx, legacyRev)
} else {
err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx, legacyRev)
}
Expand Down Expand Up @@ -585,21 +585,109 @@ func (bsc *BlipSyncContext) setUseDeltas(clientCanUseDeltas bool) {
}
}

func (bsc *BlipSyncContext) sendDelta(ctx context.Context, sender *blip.Sender, docID string, collectionIdx *int, deltaSrcRevID string, revDelta *RevisionDelta, seq SequenceID, resendFullRevisionFunc func() error) error {
// revHistoryInput contains the inputs needed to build revision history properties for a BLIP rev message.
type revHistoryInput struct {
revTreeHistory []string // Pre-computed rev tree ancestor chain (from toHistory or RevisionDelta.RevisionHistory)
hlvHistory string // HLV history string in CBL format
revID string // Current document's rev tree ID (e.g. "2-abc")
localIsLegacyRev bool // True if the local revision predates HLV (no version vector yet)
remoteIsLegacyRev bool // True if the remote peer's version of this doc predates HLV
}

var history []string
var revTreeProperty []string
// CV can now be empty on rev cache items; only use CV when it's available
// history being sent here is potentially incorrect pending CBG-5106
if bsc.useHLV() && revDelta.ToCV != "" {
history = append(history, revDelta.HlvHistory)
// buildRevHistory constructs the "history" and "revTreeHistory" BLIP message properties for a rev
// message. These properties tell the peer what version history the document has, so the peer can
// determine ancestry and detect conflicts.
//
// A "legacy" revision is one that predates HLV (Hybrid Logical Vector) — it has a rev tree ID
// (e.g. "2-abc") but no version vector. After a node upgrades, new writes get both a rev tree ID
// and a CV (current version). The combination of local/remote legacy status produces four scenarios:
//
// Scenario 1: Pre-HLV protocol (subprotocol < V4) or post-HLV protocol (subprotocol >= V4) and both sides have legacy rev
// ┌─────────────────────────────────────────────────────┐
// │ history property: [revTreeHistory...] │
// │ revTreeHistory property: (not sent) │
// │ │
// │ Both sides only understand rev trees or only have │
// │ rev trees, so send the rev tree ancestor chain │
// │ in the history property. │
// └─────────────────────────────────────────────────────┘
//
// Scenario 2: Local has HLV, remote has HLV (CBL client)
// ┌─────────────────────────────────────────────────────┐
// │ history property: [hlvHistory] │
// │ revTreeHistory prop: (not sent) │
// │ │
// │ Both sides speak HLV. Send only the HLV history. │
// │ CBL clients don't need the rev tree property. │
// └─────────────────────────────────────────────────────┘
//
// Scenario 3: Local has HLV, remote is legacy
// ┌─────────────────────────────────────────────────────┐
// │ history property: [hlvHistory, revID, │
// │ revTreeHistory...] │
// │ revTreeHistory prop: (not sent) │
// │ │
// │ The remote has a legacy rev tree version. Append │
// │ the current revID + rev tree history after the HLV │
// │ history so the remote can detect conflicts via its │
// │ rev tree. │
// └─────────────────────────────────────────────────────┘
//
// Scenario 4: Local legacy, remote has HLV
// ┌─────────────────────────────────────────────────────┐
// │ history property: [revTreeHistory...] │
// │ revTreeHistory prop: (not sent) │
// │ │
// │ The remote has a HLV version. Assign the rev tree │
// │ to history so the remote can parse for conflict │
// │ checks. │
// └─────────────────────────────────────────────────────┘
//
// Scenario 5: Local is HLV-aware, remote is HLV-aware (ISGR peer)
// ┌─────────────────────────────────────────────────────┐
// │ history property: [hlvHistory] │
// │ revTreeHistory prop: [revID, revTreeHistory...] │
// │ │
// │ ISGR peers (sendRevTreeProperty) get HLV in the │
// │ history property and rev tree in a separate │
// │ revTreeHistory property to keep both reconciled. │
// └─────────────────────────────────────────────────────┘
func (bsc *BlipSyncContext) buildRevHistory(input revHistoryInput) (history []string, revTreeHistoryProperty []string) {
// Build main history: rev tree for legacy/pre-HLV, HLV for upgraded docs
if !bsc.useHLV() || input.localIsLegacyRev {
history = input.revTreeHistory // Scenario 1, 4
} else {
history = revDelta.RevisionHistory
if input.hlvHistory != "" {
history = append(history, input.hlvHistory) // Scenarios 2, 3, 5
}
}
if bsc.sendRevTreeProperty() {
revTreeProperty = append(revTreeProperty, revDelta.ToRevID)
revTreeProperty = append(revTreeProperty, revDelta.RevisionHistory...)

// Append or separate the rev tree for cross-version or ISGR peer scenarios
if input.remoteIsLegacyRev && !input.localIsLegacyRev {
// Scenario 3: remote needs rev tree appended to history for conflict detection
history = append(history, input.revID)
history = append(history, input.revTreeHistory...)
} else if bsc.sendRevTreeProperty() && !input.localIsLegacyRev {
// Scenario 5: ISGR peer gets rev tree in a separate property
revTreeHistoryProperty = append(revTreeHistoryProperty, input.revID)
revTreeHistoryProperty = append(revTreeHistoryProperty, input.revTreeHistory...)
}

return history, revTreeHistoryProperty
}

func (bsc *BlipSyncContext) sendDelta(ctx context.Context, sender *blip.Sender, docID string, collectionIdx *int, deltaSrcRevID string, revDelta *RevisionDelta, seq SequenceID, remoteIsLegacyRev bool, resendFullRevisionFunc func() error) error {

// ToRevID is always a rev tree ID; use ToCV to determine if the doc has been upgraded to HLV
localIsLegacyRev := revDelta.ToCV == ""

history, revTreeProperty := bsc.buildRevHistory(revHistoryInput{
revTreeHistory: revDelta.RevisionHistory,
hlvHistory: revDelta.HlvHistory,
revID: revDelta.ToRevID,
localIsLegacyRev: localIsLegacyRev,
remoteIsLegacyRev: remoteIsLegacyRev,
})
properties, err := blipRevMessageProperties(history, revDelta.ToDeleted, seq, "", revTreeProperty)
if err != nil {
return err
Expand Down Expand Up @@ -755,50 +843,25 @@ func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sende
if replacedRevID != "" {
bsc.replicationStats.SendReplacementRevCount.Add(1)
}
var history []string
if !bsc.useHLV() || localIsLegacyRev {
// Compute rev tree history when any scenario needs it (see buildRevHistory for scenario docs)
var revTreeHistory []string
if !bsc.useHLV() || localIsLegacyRev || remoteIsLegacyRev || bsc.sendRevTreeProperty() {
var err error
history, err = toHistory(docRev.History, knownRevs, maxHistory)
revTreeHistory, err = toHistory(docRev.History, knownRevs, maxHistory)
if err != nil {
err := base.RedactErrorf("Could not get rev tree history for %s %s: %w, sending a noRev to skip this revision for replication at sequence %s.", base.UD(docID), revID, err, seq)
base.WarnfCtx(ctx, "%s", err)
return bsc.sendNoRev(sender, docID, revID, collectionIdx, seq, err)
}
} else {
if docRev.HlvHistory != "" {
history = append(history, docRev.HlvHistory)
}
}

var revTreeHistoryProperty []string
// If the remote side of the replication has this document but it's a legacy version of the document, we need to send the full rev tree history
// after the hlv history so the remote side can identify iof the document is in conflict or not. We should only do
// this when the local revision is not a legacy revision as if local revision is also legacy revision we will have built
// the full rev tree history above to send in the history property.
if remoteIsLegacyRev && !localIsLegacyRev {
// append current revID and rest of rev tree after hlv history
revTreeHistory, err := toHistory(docRev.History, knownRevs, maxHistory)
if err != nil {
err := base.RedactErrorf("Could not get rev tree history for %s %s when remote revision is a legacy rev and the local is hlv aware: %w, sending a noRev to skip this revision for replication at sequence %d.", base.UD(docID), revID, err, seq)
base.WarnfCtx(ctx, "%v", err)
return bsc.sendNoRev(sender, docID, revID, collectionIdx, seq, err)
}
history = append(history, docRev.RevID)
history = append(history, revTreeHistory...)
} else if bsc.sendRevTreeProperty() && !localIsLegacyRev {
// If we are communicating in > 4 subprotocol versions and the client is another SGW peer, we have a revTree
// property we can populate with the rev tree to keep rev tree reconciled on the replication. This property
// should only be sent if the local + remote revisions are not a legacy revisions as the rev tree in this case will be
// sent in history property.
revTreeHistoryProperty = append(revTreeHistoryProperty, docRev.RevID) // we need current rev
history, err := toHistory(docRev.History, knownRevs, maxHistory)
if err != nil {
err := base.RedactErrorf("Could not get rev tree history for %s %s when local and remote revision are hlv aware: %w, sending a noRev to skip this revision for replication at sequence %s.", base.UD(docID), revID, err, seq)
base.WarnfCtx(ctx, "%v", err)
return bsc.sendNoRev(sender, docID, revID, collectionIdx, seq, err)
}
revTreeHistoryProperty = append(revTreeHistoryProperty, history...)
}
history, revTreeHistoryProperty := bsc.buildRevHistory(revHistoryInput{
revTreeHistory: revTreeHistory,
hlvHistory: docRev.HlvHistory,
revID: docRev.RevID,
localIsLegacyRev: localIsLegacyRev,
remoteIsLegacyRev: remoteIsLegacyRev,
})

properties, err := blipRevMessageProperties(history, docRev.Deleted, seq, replacedRevID, revTreeHistoryProperty)
if err != nil {
Expand Down
Loading
Loading