Skip to content

Commit f8d3a3e

Browse files
ChangeFeedProcessor: Fixes deterministic lease partition key dedup behavior
Agent-Logs-Url: https://github.com/Azure/azure-cosmos-dotnet-v3/sessions/2ecc1c73-658a-4358-ad20-9ecc87fb25fe Co-authored-by: kirankumarkolli <6880899+kirankumarkolli@users.noreply.github.com>
1 parent 3c56d2f commit f8d3a3e

3 files changed

Lines changed: 113 additions & 4 deletions

File tree

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ internal sealed class DocumentServiceLeaseManagerCosmos : DocumentServiceLeaseMa
2929
private readonly DocumentServiceLeaseStoreManagerOptions options;
3030
private readonly RequestOptionsFactory requestOptionsFactory;
3131
private readonly AsyncLazy<TryCatch<string>> lazyContainerRid;
32+
private readonly bool useDeterministicPartitionKey;
3233
private PartitionKeyRangeCache partitionKeyRangeCache;
3334

3435
public DocumentServiceLeaseManagerCosmos(
@@ -44,6 +45,7 @@ public DocumentServiceLeaseManagerCosmos(
4445
this.options = options;
4546
this.requestOptionsFactory = requestOptionsFactory;
4647
this.lazyContainerRid = new AsyncLazy<TryCatch<string>>(valueFactory: (trace, innerCancellationToken) => this.TryInitializeContainerRIdAsync(innerCancellationToken));
48+
this.useDeterministicPartitionKey = ConfigurationManager.IsChangeFeedLeaseIdAsPartitionKeyEnabled();
4749
}
4850

4951
public override async Task<DocumentServiceLease> AcquireAsync(DocumentServiceLease lease)
@@ -128,7 +130,10 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
128130
Mode = this.GetChangeFeedMode()
129131
};
130132

131-
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
133+
string partitionKeyValue = this.useDeterministicPartitionKey
134+
? documentServiceLease.LeaseId
135+
: Guid.NewGuid().ToString();
136+
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, partitionKeyValue);
132137

133138
return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
134139
}
@@ -153,7 +158,10 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
153158
Mode = this.GetChangeFeedMode()
154159
};
155160

156-
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
161+
string partitionKeyValue = this.useDeterministicPartitionKey
162+
? documentServiceLease.LeaseId
163+
: Guid.NewGuid().ToString();
164+
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, partitionKeyValue);
157165

158166
return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
159167
}

Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ internal static class ConfigurationManager
133133
/// </summary>
134134
internal static readonly string TcpDnsDotSuffixEnabled = "AZURE_COSMOS_TCP_DNS_DOT_SUFFIX_ENABLED";
135135

136+
/// <summary>
137+
/// Environment variable name to enable deterministic lease-id partition key values for Change Feed lease creation.
138+
/// </summary>
139+
internal static readonly string ChangeFeedLeaseIdAsPartitionKeyEnabled = "AZURE_COSMOS_CHANGE_FEED_LEASE_ID_AS_PARTITION_KEY_ENABLED";
140+
136141
public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
137142
{
138143
string value = Environment.GetEnvironmentVariable(variable);
@@ -199,7 +204,19 @@ public static bool IsThinClientEnabled(
199204
.GetEnvironmentVariable(
200205
variable: ConfigurationManager.ThinClientModeEnabled,
201206
defaultValue: defaultValue);
202-
}
207+
}
208+
209+
/// <summary>
210+
/// Gets the boolean value indicating whether Change Feed lease creation should use lease id as the partition key value.
211+
/// </summary>
212+
/// <returns>A boolean flag indicating if deterministic lease-id partition key behavior is enabled.</returns>
213+
public static bool IsChangeFeedLeaseIdAsPartitionKeyEnabled()
214+
{
215+
return ConfigurationManager
216+
.GetEnvironmentVariable(
217+
variable: ConfigurationManager.ChangeFeedLeaseIdAsPartitionKeyEnabled,
218+
defaultValue: true);
219+
}
203220

204221
/// <summary>
205222
/// Gets the AAD scope value to override.

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseManagerCosmosTests.cs

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests
2020
[TestCategory("ChangeFeed")]
2121
public class DocumentServiceLeaseManagerCosmosTests
2222
{
23+
[TestCleanup]
24+
public void TestCleanup()
25+
{
26+
Environment.SetEnvironmentVariable(ConfigurationManager.ChangeFeedLeaseIdAsPartitionKeyEnabled, null);
27+
}
28+
2329
/// <summary>
2430
/// Verifies that the update lambda updates the owner.
2531
/// </summary>
@@ -153,6 +159,83 @@ public async Task CreatesPartitionKeyBasedLease(int factoryType)
153159
ValidateRequestOptionsFactory(requestOptionsFactory, pkRangeBasedLease);
154160
}
155161

162+
[TestMethod]
163+
public async Task CreatesPartitionKeyBasedLeaseWithDeterministicPartitionKeyByDefault()
164+
{
165+
PartitionKey? capturedPartitionKey = null;
166+
RequestOptionsFactory requestOptionsFactory = new PartitionedByPartitionKeyCollectionRequestOptionsFactory();
167+
string continuation = Guid.NewGuid().ToString();
168+
DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
169+
{
170+
HostName = Guid.NewGuid().ToString()
171+
};
172+
173+
Documents.PartitionKeyRange partitionKeyRange = new Documents.PartitionKeyRange()
174+
{
175+
Id = "0",
176+
MinInclusive = "",
177+
MaxExclusive = "FF"
178+
};
179+
180+
Mock<ContainerInternal> mockedContainer = new Mock<ContainerInternal>();
181+
mockedContainer.Setup(c => c.CreateItemStreamAsync(
182+
It.IsAny<Stream>(),
183+
It.IsAny<PartitionKey>(),
184+
It.IsAny<ItemRequestOptions>(),
185+
It.IsAny<CancellationToken>())).Callback((Stream stream, PartitionKey partitionKey, ItemRequestOptions itemRequestOptions, CancellationToken token) => capturedPartitionKey = partitionKey)
186+
.ReturnsAsync((Stream stream, PartitionKey partitionKey, ItemRequestOptions itemRequestOptions, CancellationToken token) => new ResponseMessage(System.Net.HttpStatusCode.OK) { Content = stream });
187+
188+
DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
189+
Mock.Of<ContainerInternal>(),
190+
mockedContainer.Object,
191+
Mock.Of<DocumentServiceLeaseUpdater>(),
192+
options,
193+
requestOptionsFactory);
194+
195+
DocumentServiceLease lease = await documentServiceLeaseManagerCosmos.CreateLeaseIfNotExistAsync(partitionKeyRange, continuation);
196+
197+
Assert.IsTrue(capturedPartitionKey.HasValue);
198+
Assert.AreEqual(new PartitionKey(lease.Id), capturedPartitionKey.Value);
199+
Assert.AreEqual(lease.Id, lease.PartitionKey);
200+
}
201+
202+
[TestMethod]
203+
public async Task CreatesEPKBasedLeaseWithLegacyGuidPartitionKeyWhenOptedOut()
204+
{
205+
Environment.SetEnvironmentVariable(ConfigurationManager.ChangeFeedLeaseIdAsPartitionKeyEnabled, "false");
206+
207+
PartitionKey? capturedPartitionKey = null;
208+
RequestOptionsFactory requestOptionsFactory = new PartitionedByPartitionKeyCollectionRequestOptionsFactory();
209+
string continuation = Guid.NewGuid().ToString();
210+
DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
211+
{
212+
HostName = Guid.NewGuid().ToString()
213+
};
214+
215+
FeedRangeEpk feedRangeEpk = new FeedRangeEpk(new Documents.Routing.Range<string>("AA", "BB", true, false));
216+
217+
Mock<ContainerInternal> mockedContainer = new Mock<ContainerInternal>();
218+
mockedContainer.Setup(c => c.CreateItemStreamAsync(
219+
It.IsAny<Stream>(),
220+
It.IsAny<PartitionKey>(),
221+
It.IsAny<ItemRequestOptions>(),
222+
It.IsAny<CancellationToken>())).Callback((Stream stream, PartitionKey partitionKey, ItemRequestOptions itemRequestOptions, CancellationToken token) => capturedPartitionKey = partitionKey)
223+
.ReturnsAsync((Stream stream, PartitionKey partitionKey, ItemRequestOptions itemRequestOptions, CancellationToken token) => new ResponseMessage(System.Net.HttpStatusCode.OK) { Content = stream });
224+
225+
DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
226+
Mock.Of<ContainerInternal>(),
227+
mockedContainer.Object,
228+
Mock.Of<DocumentServiceLeaseUpdater>(),
229+
options,
230+
requestOptionsFactory);
231+
232+
DocumentServiceLease lease = await documentServiceLeaseManagerCosmos.CreateLeaseIfNotExistAsync(feedRangeEpk, continuation);
233+
234+
Assert.IsTrue(capturedPartitionKey.HasValue);
235+
Assert.AreNotEqual(new PartitionKey(lease.Id), capturedPartitionKey.Value);
236+
Assert.IsTrue(Guid.TryParse(lease.PartitionKey, out _));
237+
}
238+
156239
/// <summary>
157240
/// Verifies a Release sets the owner on null
158241
/// </summary>
@@ -628,11 +711,12 @@ private static void ValidateRequestOptionsFactory(RequestOptionsFactory requestO
628711
else if (requestOptionsFactory is PartitionedByPartitionKeyCollectionRequestOptionsFactory)
629712
{
630713
Assert.IsNotNull(lease.PartitionKey);
714+
Assert.AreEqual(lease.Id, lease.PartitionKey);
631715
}
632716
else
633717
{
634718
throw new Exception($"Unkown type mapping for FactoryType: {requestOptionsFactory.GetType()}.");
635719
}
636720
}
637721
}
638-
}
722+
}

0 commit comments

Comments
 (0)