Skip to content

Commit bb8bdc3

Browse files
Change Feed Processor: Fixes duplicate lease docs crashing load balancer on partition split
When the lease container is partitioned by /partitionKey, PartitionSynchronizerCore.HandlePartitionGoneAsync creates child lease documents using Guid.NewGuid().ToString() as the partitionKey value via DocumentServiceLeaseManagerCosmos.CreateLeaseIfNotExistAsync. Because each split retry (host restart mid-split, concurrent hosts, or error retries) picks a new Guid, TryCreateItemAsync's per-partition-key id-uniqueness check never catches cross-PK duplicates. Once duplicates exist, EqualPartitionsBalancingStrategy.CategorizeLeases throws from Dictionary.Add on every balance tick, blocking all lease acquisition for the container until the extra documents are manually deleted (IcM 768856224). Two fixes: 1. EqualPartitionsBalancingStrategy.CategorizeLeases: tolerate duplicate CurrentLeaseToken entries (keep the first, log a warning with the conflicting ids/PKs and remediation pointer). This unblocks the load balancer even when duplicates already exist in a customer's lease container. 2. PartitionSynchronizerCore.HandlePartitionGoneAsync (both DocumentServiceLeaseCore and DocumentServiceLeaseCoreEpk overloads): pre-query existing leases once and skip CreateLeaseIfNotExistAsync for ranges whose child lease token is already present, mirroring the dedup in CreateLeasesAsync. Tests added: - EqualPartitionsBalancingStrategyTests.CalculateLeasesToTake_DuplicateLeaseTokens_DoesNotThrow - PartitionSynchronizerCoreTests.HandlePartitionGoneAsync_PKRangeBasedLease_Split_DoesNotCreateDuplicateChildLeases - PartitionSynchronizerCoreTests.HandlePartitionGoneAsync_EpkBasedLease_Split_DoesNotCreateDuplicateChildLeases - PartitionSynchronizerCoreTests.HandlePartitionGoneAsync_PKRangeBasedLease_Split_CreatesOnlyMissingChildLeases - Microsoft.Azure.Cosmos.EmulatorTests.ChangeFeed.DuplicateLeaseRegressionTests (emulator-based end-to-end regression) Existing HandlePartitionGoneAsync tests were updated to mock GetAllLeasesAsync (via a CreateEmptyLeaseContainer helper) since the synchronizer now consults the lease container before creating children. Out of scope: long-term deterministic partition-key derivation from LeaseToken. That change is backward-incompatible for existing lease containers (customers have GUID-based PKs on all current lease docs) and will need a migration story; deferred to follow-up work. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent d4c91a0 commit bb8bdc3

5 files changed

Lines changed: 643 additions & 20 deletions

File tree

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Bootstrapping/PartitionSynchronizerCore.cs

Lines changed: 96 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,29 +84,75 @@ public override async Task CreateMissingLeasesAsync()
8484
throw new InvalidOperationException();
8585
}
8686

87+
// Pre-check existing leases so that retried split handling on the same host (e.g. a retry
88+
// after a transient failure within this call) does not create duplicate child lease
89+
// documents. This matters especially when the lease container is partitioned by
90+
// /partitionKey: each retry picks a new random Guid partition-key value, so the
91+
// per-partition-key id uniqueness check on TryCreateItemAsync does not catch cross-PK
92+
// duplicates. Duplicates would later crash EqualPartitionsBalancingStrategy.CategorizeLeases
93+
// and block all lease acquisition.
94+
//
95+
// Note: this is a TOCTOU pre-check, not a hard guarantee. Two hosts racing into
96+
// HandlePartitionGoneAsync for the same parent lease can still both observe the children
97+
// as missing and both proceed to create. That residual race is handled by the
98+
// duplicate-tolerant logic in EqualPartitionsBalancingStrategy.CategorizeLeases, which
99+
// keeps the first lease seen per token and logs a warning instead of throwing.
100+
IReadOnlyList<DocumentServiceLease> existingLeases = await this.leaseContainer.GetAllLeasesAsync().ConfigureAwait(false);
101+
102+
HashSet<string> existingPkRangeLeaseTokens = new HashSet<string>(
103+
existingLeases.Where(l => l is DocumentServiceLeaseCore).Select(l => l.CurrentLeaseToken),
104+
StringComparer.Ordinal);
105+
HashSet<string> existingEpkLeaseTokens = new HashSet<string>(
106+
existingLeases.Where(l => l is DocumentServiceLeaseCoreEpk).Select(l => l.CurrentLeaseToken),
107+
StringComparer.Ordinal);
108+
87109
return lease switch
88110
{
89-
DocumentServiceLeaseCoreEpk feedRangeBaseLease => await this.HandlePartitionGoneAsync(leaseToken, lastContinuationToken, feedRangeBaseLease, overlappingRanges),
90-
_ => await this.HandlePartitionGoneAsync(leaseToken, lastContinuationToken, (DocumentServiceLeaseCore)lease, overlappingRanges)
111+
DocumentServiceLeaseCoreEpk feedRangeBaseLease => await this.HandlePartitionGoneAsync(leaseToken, lastContinuationToken, feedRangeBaseLease, overlappingRanges, existingEpkLeaseTokens),
112+
_ => await this.HandlePartitionGoneAsync(leaseToken, lastContinuationToken, (DocumentServiceLeaseCore)lease, overlappingRanges, existingPkRangeLeaseTokens, existingEpkLeaseTokens)
91113
};
92114
}
93115

94116
/// <summary>
95117
/// Handles splits and merges for partition based leases.
96118
/// </summary>
119+
/// <remarks>
120+
/// When every child lease for a split is already present, this method returns an empty
121+
/// sequence and <c>shouldDeleteGoneLease = true</c>. The caller
122+
/// (<see cref="FeedManagement.PartitionControllerCore"/>) therefore deletes the parent
123+
/// lease but does not start observers for the existing children on this host; those leases
124+
/// are picked up by the next load-balancer tick in
125+
/// <see cref="FeedManagement.EqualPartitionsBalancingStrategy.SelectLeasesToTake"/>. This
126+
/// is intentional — the existing children may already be owned by another host, and the
127+
/// previous behaviour of eagerly grabbing them would have propagated this host's parent
128+
/// lease properties onto leases it did not create.
129+
/// </remarks>
97130
private async Task<(IEnumerable<DocumentServiceLease>, bool)> HandlePartitionGoneAsync(
98131
string leaseToken,
99132
string lastContinuationToken,
100133
DocumentServiceLeaseCore partitionBasedLease,
101-
IReadOnlyList<PartitionKeyRange> overlappingRanges)
134+
IReadOnlyList<PartitionKeyRange> overlappingRanges,
135+
HashSet<string> existingPkRangeLeaseTokens,
136+
HashSet<string> existingEpkLeaseTokens)
102137
{
103138
ConcurrentQueue<DocumentServiceLease> newLeases = new ConcurrentQueue<DocumentServiceLease>();
104139
if (overlappingRanges.Count > 1)
105140
{
106-
// Split: More than two children
141+
// Split: More than two children. The children this overload creates are PK-range
142+
// based (DocumentServiceLeaseCore with LeaseToken == PartitionKeyRange.Id), so the
143+
// dedup compares against the PK-range token set. This is deliberately a stricter
144+
// match than the heuristic used by CreateLeasesAsync (which also treats overlapping
145+
// EPK leases as "covered") because at this point the children's partition-key-range
146+
// ids are known exactly from the split result.
107147
await overlappingRanges.ForEachAsync(
108148
async addedRange =>
109149
{
150+
if (existingPkRangeLeaseTokens.Contains(addedRange.Id))
151+
{
152+
DefaultTrace.TraceInformation("Skipping creation of child lease for range '{0}'; a lease with that token already exists.", addedRange.Id);
153+
return;
154+
}
155+
110156
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync(addedRange, lastContinuationToken);
111157
if (newLease != null)
112158
{
@@ -119,14 +165,27 @@ await overlappingRanges.ForEachAsync(
119165
}
120166
else
121167
{
122-
// Merge: 1 children, multiple ranges merged into 1
168+
// Merge: 1 children, multiple ranges merged into 1. The new lease created for a
169+
// merged range is EPK-based (DocumentServiceLeaseCoreEpk) with
170+
// LeaseToken == "{Min}-{Max}", so the dedup must compare against the EPK token set,
171+
// not the PK-range set, even though we entered through the PK-range overload.
123172
PartitionKeyRange mergedRange = overlappingRanges[0];
124173
DefaultTrace.TraceInformation("Lease {0} merged into {1}", leaseToken, mergedRange.Id);
125174

126-
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync((FeedRangeEpk)partitionBasedLease.FeedRange, lastContinuationToken);
127-
if (newLease != null)
175+
FeedRangeEpk mergedFeedRange = (FeedRangeEpk)partitionBasedLease.FeedRange;
176+
string mergedLeaseToken = $"{mergedFeedRange.Range.Min}-{mergedFeedRange.Range.Max}";
177+
178+
if (existingEpkLeaseTokens.Contains(mergedLeaseToken))
128179
{
129-
newLeases.Enqueue(newLease);
180+
DefaultTrace.TraceInformation("Skipping creation of merged child lease for range '{0}'; a lease with that token already exists.", mergedLeaseToken);
181+
}
182+
else
183+
{
184+
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync(mergedFeedRange, lastContinuationToken);
185+
if (newLease != null)
186+
{
187+
newLeases.Enqueue(newLease);
188+
}
130189
}
131190
}
132191

@@ -136,11 +195,17 @@ await overlappingRanges.ForEachAsync(
136195
/// <summary>
137196
/// Handles splits and merges for feed range based leases.
138197
/// </summary>
198+
/// <remarks>
199+
/// See the remarks on the PK-range overload for the behaviour when every child lease is
200+
/// already present: the method returns an empty sequence; acquisition of the existing
201+
/// children is deferred to the next load-balancer tick.
202+
/// </remarks>
139203
private async Task<(IEnumerable<DocumentServiceLease>, bool)> HandlePartitionGoneAsync(
140204
string leaseToken,
141205
string lastContinuationToken,
142206
DocumentServiceLeaseCoreEpk feedRangeBasedLease,
143-
IReadOnlyList<PartitionKeyRange> overlappingRanges)
207+
IReadOnlyList<PartitionKeyRange> overlappingRanges,
208+
HashSet<string> existingEpkLeaseTokens)
144209
{
145210
List<DocumentServiceLease> newLeases = new List<DocumentServiceLease>();
146211
if (overlappingRanges.Count > 1)
@@ -155,21 +220,37 @@ await overlappingRanges.ForEachAsync(
155220
{
156221
Documents.Routing.Range<string> partitionRange = overlappingRanges[i].ToRange();
157222
Documents.Routing.Range<string> mergedRange = new Documents.Routing.Range<string>(min, partitionRange.Max, true, false);
158-
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync(new FeedRangeEpk(mergedRange), lastContinuationToken);
159-
if (newLease != null)
223+
string childLeaseToken = $"{mergedRange.Min}-{mergedRange.Max}";
224+
if (existingEpkLeaseTokens.Contains(childLeaseToken))
225+
{
226+
DefaultTrace.TraceInformation("Skipping creation of child EPK lease '{0}'; a lease with that token already exists.", childLeaseToken);
227+
}
228+
else
160229
{
161-
newLeases.Add(newLease);
230+
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync(new FeedRangeEpk(mergedRange), lastContinuationToken);
231+
if (newLease != null)
232+
{
233+
newLeases.Add(newLease);
234+
}
162235
}
163236

164237
min = partitionRange.Max;
165238
}
166239

167240
// Add the last range with the original max and the last min from the split
168241
Documents.Routing.Range<string> lastRangeAfterSplit = new Documents.Routing.Range<string>(min, max, true, false);
169-
DocumentServiceLease lastLease = await this.leaseManager.CreateLeaseIfNotExistAsync(new FeedRangeEpk(lastRangeAfterSplit), lastContinuationToken);
170-
if (lastLease != null)
242+
string lastChildLeaseToken = $"{lastRangeAfterSplit.Min}-{lastRangeAfterSplit.Max}";
243+
if (existingEpkLeaseTokens.Contains(lastChildLeaseToken))
244+
{
245+
DefaultTrace.TraceInformation("Skipping creation of child EPK lease '{0}'; a lease with that token already exists.", lastChildLeaseToken);
246+
}
247+
else
171248
{
172-
newLeases.Add(lastLease);
249+
DocumentServiceLease lastLease = await this.leaseManager.CreateLeaseIfNotExistAsync(new FeedRangeEpk(lastRangeAfterSplit), lastContinuationToken);
250+
if (lastLease != null)
251+
{
252+
newLeases.Add(lastLease);
253+
}
173254
}
174255

175256
DefaultTrace.TraceInformation("Lease {0} split into {1}", leaseToken, string.Join(", ", newLeases.Select(l => l.CurrentLeaseToken)));

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/EqualPartitionsBalancingStrategy.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,28 @@ private void CategorizeLeases(
138138
{
139139
Debug.Assert(lease.CurrentLeaseToken != null, "TakeLeasesAsync: lease.CurrentLeaseToken cannot be null.");
140140

141+
// Defensive: duplicate lease documents for the same lease token can exist when the lease
142+
// container is partitioned by /partitionKey and split handling re-ran with a different
143+
// Guid-based partition key (see PartitionSynchronizerCore.HandlePartitionGoneAsync).
144+
// Use TryAdd so the load balancer keeps the first lease seen and continues functioning
145+
// instead of throwing and blocking lease acquisition for the whole container. The
146+
// duplicate documents must be deleted manually (or the lease container recreated with
147+
// /id partitioning) to fully resolve the condition.
148+
if (allPartitions.ContainsKey(lease.CurrentLeaseToken))
149+
{
150+
DocumentServiceLease existing = allPartitions[lease.CurrentLeaseToken];
151+
DefaultTrace.TraceWarning(
152+
"Duplicate lease document detected for lease token '{0}'. Keeping lease with id '{1}' (partitionKey '{2}') and ignoring duplicate with id '{3}' (partitionKey '{4}'). To fully resolve, delete the duplicate lease document(s) (same id, different partitionKey values) from the lease container; if feasible, migrate the lease container to /id partitioning which avoids this condition entirely.",
153+
lease.CurrentLeaseToken,
154+
existing.Id,
155+
existing.PartitionKey,
156+
lease.Id,
157+
lease.PartitionKey);
158+
continue;
159+
}
160+
141161
allPartitions.Add(lease.CurrentLeaseToken, lease);
162+
142163
if (string.IsNullOrWhiteSpace(lease.Owner) || this.IsExpired(lease))
143164
{
144165
DefaultTrace.TraceVerbose("Found unused or expired lease: {0}", lease);

0 commit comments

Comments
 (0)