Skip to content

Commit b0f1abb

Browse files
[Internal] DTS: Adds distributed read transaction API to CosmosClient (#5795)
# Pull Request Template ## Description This pull request introduces support for distributed read transactions in the Cosmos DB SDK, allowing users to perform multiple read operations across partitions and containers atomically. The main changes include the addition of new classes and methods to create and manage distributed read transactions, comprehensive tests, and supporting utilities for test scenarios. **Distributed Read Transaction Feature:** * Added a new abstract class `DistributedReadTransaction` that allows chaining multiple read operations and committing them atomically. This class is obtained via the new `CosmosClient.CreateDistributedReadTransaction()` method. * Implemented the concrete class `DistributedReadTransactionCore`, which manages the list of read operations, validates inputs, and delegates transaction execution to the committer. **Public API and Entry Point:** * Added the `CreateDistributedReadTransaction()` method to `CosmosClient`, providing a public (or internal, depending on build) entry point for distributed read transactions. ## Type of change Please delete options that are not relevant. - [] New feature (non-breaking change which adds functionality) ## Closing issues To automatically close an issue: closes #IssueNumber --------- Co-authored-by: Kiran Kumar Kolli <kirankk@microsoft.com>
1 parent a1958e2 commit b0f1abb

7 files changed

Lines changed: 573 additions & 1 deletion

File tree

Microsoft.Azure.Cosmos/src/CosmosClient.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,20 @@ virtual DistributedWriteTransaction CreateDistributedWriteTransaction()
11771177
return new DistributedWriteTransactionCore(this.ClientContext);
11781178
}
11791179

1180+
/// <summary>
1181+
/// Creates a new instance of a distributed read transaction.
1182+
/// </summary>
1183+
/// <returns>An instance of <see cref="DistributedReadTransaction"/>.</returns>
1184+
#if INTERNAL
1185+
public
1186+
#else
1187+
internal
1188+
#endif
1189+
virtual DistributedReadTransaction CreateDistributedReadTransaction()
1190+
{
1191+
return new DistributedReadTransactionCore(this.ClientContext);
1192+
}
1193+
11801194
/// <summary>
11811195
/// Send a request for creating a database.
11821196
///
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// ------------------------------------------------------------
4+
5+
namespace Microsoft.Azure.Cosmos
6+
{
7+
using System.IO;
8+
9+
/// <summary>
10+
/// Represents a distributed transaction that supports read operations across multiple partitions and containers.
11+
/// </summary>
12+
/// <remarks>
13+
/// Use <see cref="CosmosClient.CreateDistributedReadTransaction"/> to obtain an instance.
14+
/// Add read operations using <see cref="ReadItem"/> then call
15+
/// <see cref="DistributedTransaction.CommitTransactionAsync"/> to execute all reads atomically.
16+
/// </remarks>
17+
#if INTERNAL
18+
public
19+
#else
20+
internal
21+
#endif
22+
abstract class DistributedReadTransaction : DistributedTransaction
23+
{
24+
/// <summary>
25+
/// Adds a read operation to the distributed transaction.
26+
/// </summary>
27+
/// <param name="database">The name of the database containing the container.</param>
28+
/// <param name="collection">The name of the container where the item exists.</param>
29+
/// <param name="partitionKey">The partition key for the item.</param>
30+
/// <param name="id">The unique identifier of the item to read.</param>
31+
/// <param name="requestOptions">Options for the read operation.</param>
32+
/// <returns>The current <see cref="DistributedReadTransaction"/> instance for method chaining.</returns>
33+
public abstract DistributedReadTransaction ReadItem(
34+
string database,
35+
string collection,
36+
PartitionKey partitionKey,
37+
string id,
38+
DistributedTransactionRequestOptions requestOptions = null);
39+
}
40+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// ------------------------------------------------------------
4+
5+
namespace Microsoft.Azure.Cosmos
6+
{
7+
using System;
8+
using System.Collections.Generic;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Microsoft.Azure.Documents;
12+
13+
internal class DistributedReadTransactionCore : DistributedReadTransaction
14+
{
15+
private readonly CosmosClientContext clientContext;
16+
private readonly List<DistributedTransactionOperation> operations;
17+
18+
internal DistributedReadTransactionCore(CosmosClientContext clientContext)
19+
{
20+
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
21+
this.operations = new List<DistributedTransactionOperation>();
22+
}
23+
24+
public override DistributedReadTransaction ReadItem(
25+
string database,
26+
string collection,
27+
PartitionKey partitionKey,
28+
string id,
29+
DistributedTransactionRequestOptions requestOptions = null)
30+
{
31+
DistributedReadTransactionCore.ValidateContainerReference(database, collection);
32+
DistributedReadTransactionCore.ValidateItemId(id);
33+
34+
this.operations.Add(
35+
new DistributedTransactionOperation(
36+
operationType: OperationType.Read,
37+
operationIndex: this.operations.Count,
38+
database: database,
39+
container: collection,
40+
partitionKey: partitionKey,
41+
id: id,
42+
requestOptions: requestOptions));
43+
44+
return this;
45+
}
46+
47+
public override async Task<DistributedTransactionResponse> CommitTransactionAsync(
48+
CancellationToken cancellationToken = default)
49+
{
50+
DistributedTransactionCommitter committer = new DistributedTransactionCommitter(
51+
operations: this.operations,
52+
clientContext: this.clientContext);
53+
54+
return await committer.CommitTransactionAsync(cancellationToken);
55+
}
56+
57+
private static void ValidateContainerReference(string database, string collection)
58+
{
59+
if (string.IsNullOrWhiteSpace(database))
60+
{
61+
throw new ArgumentNullException(nameof(database));
62+
}
63+
64+
if (string.IsNullOrWhiteSpace(collection))
65+
{
66+
throw new ArgumentNullException(nameof(collection));
67+
}
68+
}
69+
70+
private static void ValidateItemId(string id)
71+
{
72+
if (string.IsNullOrWhiteSpace(id))
73+
{
74+
throw new ArgumentNullException(nameof(id));
75+
}
76+
}
77+
}
78+
}

Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionOperation.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ public DistributedTransactionOperation(
5757

5858
internal string SessionToken { get; set; }
5959

60-
internal string ETag => this.RequestOptions?.IfMatchEtag;
60+
internal string ETag => this.OperationType == OperationType.Read
61+
? this.RequestOptions?.IfNoneMatchEtag
62+
: this.RequestOptions?.IfMatchEtag;
6163

6264
internal Stream ResourceStream { get; set; }
6365

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DistributedTransaction/DistributedTransactionTests.cs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,153 @@ public async Task ValidateSessionTokenMergedIntoDtcClient()
784784
}
785785
}
786786

787+
// Read Transaction Tests
788+
789+
[TestMethod]
790+
public async Task ValidateReadTransactionHappyPath()
791+
{
792+
// Arrange
793+
ToDoActivity doc1 = ToDoActivity.CreateRandomToDoActivity();
794+
ToDoActivity doc2 = ToDoActivity.CreateRandomToDoActivity();
795+
796+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
797+
request => Task.FromResult(this.BuildMockResponse(
798+
HttpStatusCode.OK,
799+
BuildReadSuccessResponseJson(2, JsonSerializer.Serialize(doc1), JsonSerializer.Serialize(doc2)))));
800+
801+
using CosmosClient client = this.CreateMockClient(handler);
802+
803+
// Act
804+
DistributedTransactionResponse response = await client
805+
.CreateDistributedReadTransaction()
806+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(doc1.pk), doc1.id)
807+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(doc2.pk), doc2.id)
808+
.CommitTransactionAsync(CancellationToken.None);
809+
810+
// Assert
811+
Assert.IsNotNull(handler.CapturedRequestBody);
812+
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
813+
Assert.IsTrue(response.IsSuccessStatusCode);
814+
Assert.AreEqual(2, response.Count);
815+
816+
response.Dispose();
817+
}
818+
819+
[TestMethod]
820+
public async Task ValidateReadTransactionRequestStructure()
821+
{
822+
// Arrange
823+
ToDoActivity doc = ToDoActivity.CreateRandomToDoActivity();
824+
825+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
826+
request => Task.FromResult(this.BuildMockResponse(
827+
HttpStatusCode.OK,
828+
BuildReadSuccessResponseJson(1, JsonSerializer.Serialize(doc)))));
829+
830+
using CosmosClient client = this.CreateMockClient(handler);
831+
832+
// Act
833+
DistributedTransactionResponse response = await client
834+
.CreateDistributedReadTransaction()
835+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(doc.pk), doc.id)
836+
.CommitTransactionAsync(CancellationToken.None);
837+
838+
// Assert – request structure
839+
Assert.IsNotNull(handler.CapturedRequestBody);
840+
using JsonDocument requestJson = JsonDocument.Parse(handler.CapturedRequestBody);
841+
JsonElement operation = requestJson.RootElement.GetProperty("operations")[0];
842+
843+
Assert.AreEqual(OperationType.Read.ToString(), operation.GetProperty("operationType").GetString());
844+
Assert.AreEqual(doc.id, operation.GetProperty("id").GetString());
845+
Assert.IsTrue(operation.TryGetProperty("databaseName", out _), "databaseName should be present");
846+
Assert.IsTrue(operation.TryGetProperty("collectionName", out _), "collectionName should be present");
847+
Assert.IsTrue(operation.TryGetProperty("partitionKey", out _), "partitionKey should be present");
848+
Assert.IsFalse(operation.TryGetProperty("resourceBody", out _), "resourceBody must NOT be present for read operations");
849+
850+
response.Dispose();
851+
}
852+
853+
[TestMethod]
854+
public async Task ValidateReadTransactionResponseDeserialization()
855+
{
856+
// Arrange
857+
ToDoActivity expectedDoc = ToDoActivity.CreateRandomToDoActivity();
858+
859+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
860+
request => Task.FromResult(this.BuildMockResponse(
861+
HttpStatusCode.OK,
862+
BuildReadSuccessResponseJson(1, JsonSerializer.Serialize(expectedDoc)))));
863+
864+
using CosmosClient client = this.CreateMockClient(handler);
865+
866+
// Act
867+
DistributedTransactionResponse response = await client
868+
.CreateDistributedReadTransaction()
869+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(expectedDoc.pk), expectedDoc.id)
870+
.CommitTransactionAsync(CancellationToken.None);
871+
872+
// Assert
873+
Assert.IsTrue(response.IsSuccessStatusCode);
874+
ToDoActivity actualDoc = JsonSerializer.Deserialize<ToDoActivity>(response[0].ResourceStream);
875+
Assert.IsNotNull(actualDoc);
876+
Assert.AreEqual(expectedDoc.id, actualDoc.id);
877+
Assert.AreEqual(expectedDoc.pk, actualDoc.pk);
878+
Assert.AreEqual(expectedDoc.taskNum, actualDoc.taskNum);
879+
880+
response.Dispose();
881+
}
882+
883+
[TestMethod]
884+
public async Task ValidateReadTransactionResourceStream()
885+
{
886+
// Arrange
887+
ToDoActivity expectedDoc = ToDoActivity.CreateRandomToDoActivity();
888+
889+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
890+
request => Task.FromResult(this.BuildMockResponse(
891+
HttpStatusCode.OK,
892+
BuildReadSuccessResponseJson(1, JsonSerializer.Serialize(expectedDoc)))));
893+
894+
using CosmosClient client = this.CreateMockClient(handler);
895+
896+
// Act
897+
DistributedTransactionResponse response = await client
898+
.CreateDistributedReadTransaction()
899+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(expectedDoc.pk), expectedDoc.id)
900+
.CommitTransactionAsync(CancellationToken.None);
901+
902+
// Assert – raw stream access
903+
Stream stream = response[0].ResourceStream;
904+
Assert.IsNotNull(stream);
905+
ToDoActivity actualDoc = JsonSerializer.Deserialize<ToDoActivity>(stream);
906+
Assert.AreEqual(expectedDoc.id, actualDoc.id);
907+
Assert.AreEqual(expectedDoc.pk, actualDoc.pk);
908+
909+
response.Dispose();
910+
}
911+
912+
[TestMethod]
913+
public void ValidateReadTransactionMissingIdThrows()
914+
{
915+
using CosmosClient client = this.CreateMockClient(new DistributedTransactionMockHandler(
916+
request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, BuildSuccessResponseJson(1)))));
917+
918+
Assert.ThrowsException<ArgumentNullException>(() =>
919+
client.CreateDistributedReadTransaction()
920+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey("pk"), id: null));
921+
}
922+
923+
[TestMethod]
924+
public void ValidateReadTransactionMissingDatabaseThrows()
925+
{
926+
using CosmosClient client = this.CreateMockClient(new DistributedTransactionMockHandler(
927+
request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, BuildSuccessResponseJson(1)))));
928+
929+
Assert.ThrowsException<ArgumentNullException>(() =>
930+
client.CreateDistributedReadTransaction()
931+
.ReadItem(null, this.container.Id, new PartitionKey("pk"), "item-id"));
932+
}
933+
787934
// Helpers
788935

789936
private void ValidateValueKind(JsonElement operation, string property, JsonValueKind expectedValueKind, int operationIndex, bool isRequired)
@@ -827,6 +974,18 @@ private static string BuildSuccessResponseJson(int operationCount)
827974
return $@"{{""operationResponses"":[{string.Join(",", results)}]}}";
828975
}
829976

977+
private static string BuildReadSuccessResponseJson(int operationCount, params string[] itemJsonBodies)
978+
{
979+
List<string> results = new List<string>();
980+
for (int i = 0; i < operationCount; i++)
981+
{
982+
string body = i < itemJsonBodies.Length ? itemJsonBodies[i] : "{}";
983+
results.Add($@"{{""index"":{i},""statusCode"":200,""etag"":""\""etag-{i}\"""",""resourceBody"":{body}}}");
984+
}
985+
986+
return $@"{{""operationResponses"":[{string.Join(",", results)}]}}";
987+
}
988+
830989
// Mock handler
831990

832991
/// <summary>

0 commit comments

Comments
 (0)