Skip to content

Commit 6970f67

Browse files
committed
Add support for strong consistency in query.
1 parent d57cf6f commit 6970f67

2 files changed

Lines changed: 87 additions & 3 deletions

File tree

Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ namespace Microsoft.Azure.Cosmos
2424
// Marking it as non-sealed in order to unit test it using Moq framework
2525
internal class GatewayStoreModel : IStoreModelExtension, IDisposable
2626
{
27-
private readonly bool isPartitionLevelFailoverEnabled;
27+
private readonly bool isPartitionLevelFailoverEnabled;
28+
private readonly bool isThinClientEnabled;
2829
private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString();
2930
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;
3031

@@ -64,6 +65,7 @@ public GatewayStoreModel(
6465
this.eventSource,
6566
serializerSettings,
6667
isPartitionLevelFailoverEnabled);
68+
this.isThinClientEnabled = isThinClientEnabled;
6769

6870
if (isThinClientEnabled)
6971
{
@@ -99,7 +101,7 @@ await GatewayStoreModel.ApplySessionTokenAsync(
99101
}
100102

101103
// This is applicable for both per partition automatic failover and per partition circuit breaker.
102-
if (this.isPartitionLevelFailoverEnabled
104+
if ((this.isPartitionLevelFailoverEnabled || this.isThinClientEnabled)
103105
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
104106
&& request.ResourceType.IsPartitioned())
105107
{
@@ -111,7 +113,11 @@ await GatewayStoreModel.ApplySessionTokenAsync(
111113
refreshCache: false);
112114

113115
request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
114-
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
116+
117+
if (this.isPartitionLevelFailoverEnabled)
118+
{
119+
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
120+
}
115121
}
116122

117123
bool canUseThinClient =

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemThinClientTests.cs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
1313
using System.Text.Json;
1414
using System.Text.Json.Serialization;
1515
using System.Threading.Tasks;
16+
using global::Azure.Core;
1617
using Microsoft.Azure.Cosmos.Fluent;
1718
using Microsoft.VisualStudio.TestTools.UnitTesting;
1819
using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.MultiRegionSetupHelpers;
@@ -512,6 +513,83 @@ public async Task QueryItemsTest()
512513
Assert.AreEqual(createdItems.Count, count);
513514
}
514515

516+
[TestMethod]
517+
[TestCategory("ThinClient")]
518+
public async Task QueryItemsTestWithStrongConsistency()
519+
{
520+
string connectionString = ConfigurationManager.GetEnvironmentVariable<string>("COSMOSDB_TC_STRONG", string.Empty);
521+
if (string.IsNullOrEmpty(connectionString))
522+
{
523+
Assert.Fail("Set environment variable COSMOSDB_TC_STRONG to run the tests");
524+
}
525+
this.client = new CosmosClient(
526+
connectionString,
527+
new CosmosClientOptions()
528+
{
529+
ConnectionMode = ConnectionMode.Gateway,
530+
RequestTimeout = TimeSpan.FromSeconds(60),
531+
ConsistencyLevel = Microsoft.Azure.Cosmos.ConsistencyLevel.Strong
532+
});
533+
534+
string uniqueDbName = "TestDbTC_" + Guid.NewGuid().ToString();
535+
this.database = await this.client.CreateDatabaseIfNotExistsAsync(uniqueDbName);
536+
string uniqueContainerName = "TestContainerTC_" + Guid.NewGuid().ToString();
537+
this.container = await this.database.CreateContainerIfNotExistsAsync(uniqueContainerName, "/pk");
538+
539+
string pk = "pk_query";
540+
List<TestObject> items = this.GenerateItems(pk).ToList();
541+
542+
List<TestObject> createdItems = await this.CreateItemsSafeAsync(items);
543+
544+
string query = $"SELECT * FROM c WHERE c.pk = '{pk}'";
545+
FeedIterator<TestObject> iterator = this.container.GetItemQueryIterator<TestObject>(query);
546+
547+
int count = 0;
548+
while (iterator.HasMoreResults)
549+
{
550+
FeedResponse<TestObject> response = await iterator.ReadNextAsync();
551+
count += response.Count;
552+
}
553+
554+
Assert.AreEqual(createdItems.Count, count);
555+
}
556+
557+
[TestMethod]
558+
[TestCategory("ThinClient")]
559+
public async Task QueryItemsTestWithSessionConsistency()
560+
{
561+
this.client = new CosmosClient(
562+
this.connectionString,
563+
new CosmosClientOptions()
564+
{
565+
ConnectionMode = ConnectionMode.Gateway,
566+
RequestTimeout = TimeSpan.FromSeconds(60),
567+
ConsistencyLevel = Microsoft.Azure.Cosmos.ConsistencyLevel.Session
568+
});
569+
570+
string uniqueDbName = "TestDbTC_" + Guid.NewGuid().ToString();
571+
this.database = await this.client.CreateDatabaseIfNotExistsAsync(uniqueDbName);
572+
string uniqueContainerName = "TestContainerTC_" + Guid.NewGuid().ToString();
573+
this.container = await this.database.CreateContainerIfNotExistsAsync(uniqueContainerName, "/pk");
574+
575+
string pk = "pk_query";
576+
List<TestObject> items = this.GenerateItems(pk).ToList();
577+
578+
List<TestObject> createdItems = await this.CreateItemsSafeAsync(items);
579+
580+
string query = $"SELECT * FROM c WHERE c.pk = '{pk}'";
581+
FeedIterator<TestObject> iterator = this.container.GetItemQueryIterator<TestObject>(query);
582+
583+
int count = 0;
584+
while (iterator.HasMoreResults)
585+
{
586+
FeedResponse<TestObject> response = await iterator.ReadNextAsync();
587+
count += response.Count;
588+
}
589+
590+
Assert.AreEqual(createdItems.Count, count);
591+
}
592+
515593
[TestMethod]
516594
[TestCategory("ThinClient")]
517595
public async Task QueryItemsStreamTest()

0 commit comments

Comments
 (0)