Skip to content

Commit 4d0dc8d

Browse files
Adds distributed read transaction API to CosmosClient
1 parent 2507de6 commit 4d0dc8d

5 files changed

Lines changed: 481 additions & 0 deletions

File tree

Microsoft.Azure.Cosmos/src/CosmosClient.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,20 @@ virtual DistributedWriteTransaction CreateDistributedWriteTransaction()
11771177
return new DistributedWriteTransactionCore(this.ClientContext);
11781178
}
11791179

1180+
/// <summary>
1181+
/// Creates a new instance of a distributed read transaction.
1182+
/// </summary>
1183+
/// <returns>An instance of <see cref="DistributedReadTransaction"/>.</returns>
1184+
#if INTERNAL
1185+
public
1186+
#else
1187+
internal
1188+
#endif
1189+
virtual DistributedReadTransaction CreateDistributedReadTransaction()
1190+
{
1191+
return new DistributedReadTransactionCore(this.ClientContext);
1192+
}
1193+
11801194
/// <summary>
11811195
/// Send a request for creating a database.
11821196
///
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// ------------------------------------------------------------
4+
5+
namespace Microsoft.Azure.Cosmos
6+
{
7+
using System.IO;
8+
9+
/// <summary>
10+
/// Represents a distributed transaction that supports read operations across multiple partitions and containers.
11+
/// </summary>
12+
/// <remarks>
13+
/// Use <see cref="CosmosClient.CreateDistributedReadTransaction"/> to obtain an instance.
14+
/// Add read operations using <see cref="ReadItem"/> then call
15+
/// <see cref="DistributedTransaction.CommitTransactionAsync"/> to execute all reads atomically.
16+
/// </remarks>
17+
#if INTERNAL
18+
public
19+
#else
20+
internal
21+
#endif
22+
abstract class DistributedReadTransaction : DistributedTransaction
23+
{
24+
/// <summary>
25+
/// Adds a read operation to the distributed transaction.
26+
/// </summary>
27+
/// <param name="database">The name of the database containing the container.</param>
28+
/// <param name="collection">The name of the container where the item exists.</param>
29+
/// <param name="partitionKey">The partition key for the item.</param>
30+
/// <param name="id">The unique identifier of the item to read.</param>
31+
/// <param name="requestOptions">Options for the read operation.</param>
32+
/// <returns>The current <see cref="DistributedReadTransaction"/> instance for method chaining.</returns>
33+
public abstract DistributedReadTransaction ReadItem(
34+
string database,
35+
string collection,
36+
PartitionKey partitionKey,
37+
string id,
38+
DistributedTransactionRequestOptions requestOptions = null);
39+
}
40+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// ------------------------------------------------------------
4+
5+
namespace Microsoft.Azure.Cosmos
6+
{
7+
using System;
8+
using System.Collections.Generic;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Microsoft.Azure.Documents;
12+
13+
internal class DistributedReadTransactionCore : DistributedReadTransaction
14+
{
15+
private readonly CosmosClientContext clientContext;
16+
private readonly List<DistributedTransactionOperation> operations;
17+
18+
internal DistributedReadTransactionCore(CosmosClientContext clientContext)
19+
{
20+
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
21+
this.operations = new List<DistributedTransactionOperation>();
22+
}
23+
24+
public override DistributedReadTransaction ReadItem(
25+
string database,
26+
string collection,
27+
PartitionKey partitionKey,
28+
string id,
29+
DistributedTransactionRequestOptions requestOptions = null)
30+
{
31+
DistributedReadTransactionCore.ValidateContainerReference(database, collection);
32+
DistributedReadTransactionCore.ValidateItemId(id);
33+
34+
this.operations.Add(
35+
new DistributedTransactionOperation(
36+
operationType: OperationType.Read,
37+
operationIndex: this.operations.Count,
38+
database: database,
39+
container: collection,
40+
partitionKey: partitionKey,
41+
id: id,
42+
requestOptions: requestOptions));
43+
44+
return this;
45+
}
46+
47+
public override async Task<DistributedTransactionResponse> CommitTransactionAsync(
48+
CancellationToken cancellationToken = default)
49+
{
50+
DistributedTransactionCommitter committer = new DistributedTransactionCommitter(
51+
operations: this.operations,
52+
clientContext: this.clientContext);
53+
54+
return await committer.CommitTransactionAsync(cancellationToken);
55+
}
56+
57+
private static void ValidateContainerReference(string database, string collection)
58+
{
59+
if (string.IsNullOrWhiteSpace(database))
60+
{
61+
throw new ArgumentNullException(nameof(database));
62+
}
63+
64+
if (string.IsNullOrWhiteSpace(collection))
65+
{
66+
throw new ArgumentNullException(nameof(collection));
67+
}
68+
}
69+
70+
private static void ValidateItemId(string id)
71+
{
72+
if (string.IsNullOrWhiteSpace(id))
73+
{
74+
throw new ArgumentNullException(nameof(id));
75+
}
76+
}
77+
}
78+
}

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DistributedTransaction/DistributedTransactionTests.cs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,153 @@ public async Task ValidateSessionTokenMergedIntoDtcClient()
784784
}
785785
}
786786

787+
// Read Transaction Tests
788+
789+
[TestMethod]
790+
public async Task ValidateReadTransactionHappyPath()
791+
{
792+
// Arrange
793+
ToDoActivity doc1 = ToDoActivity.CreateRandomToDoActivity();
794+
ToDoActivity doc2 = ToDoActivity.CreateRandomToDoActivity();
795+
796+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
797+
request => Task.FromResult(this.BuildMockResponse(
798+
HttpStatusCode.OK,
799+
BuildReadSuccessResponseJson(2, JsonSerializer.Serialize(doc1), JsonSerializer.Serialize(doc2)))));
800+
801+
using CosmosClient client = this.CreateMockClient(handler);
802+
803+
// Act
804+
DistributedTransactionResponse response = await client
805+
.CreateDistributedReadTransaction()
806+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(doc1.pk), doc1.id)
807+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(doc2.pk), doc2.id)
808+
.CommitTransactionAsync(CancellationToken.None);
809+
810+
// Assert
811+
Assert.IsNotNull(handler.CapturedRequestBody);
812+
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
813+
Assert.IsTrue(response.IsSuccessStatusCode);
814+
Assert.AreEqual(2, response.Count);
815+
816+
response.Dispose();
817+
}
818+
819+
[TestMethod]
820+
public async Task ValidateReadTransactionRequestStructure()
821+
{
822+
// Arrange
823+
ToDoActivity doc = ToDoActivity.CreateRandomToDoActivity();
824+
825+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
826+
request => Task.FromResult(this.BuildMockResponse(
827+
HttpStatusCode.OK,
828+
BuildReadSuccessResponseJson(1, JsonSerializer.Serialize(doc)))));
829+
830+
using CosmosClient client = this.CreateMockClient(handler);
831+
832+
// Act
833+
DistributedTransactionResponse response = await client
834+
.CreateDistributedReadTransaction()
835+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(doc.pk), doc.id)
836+
.CommitTransactionAsync(CancellationToken.None);
837+
838+
// Assert – request structure
839+
Assert.IsNotNull(handler.CapturedRequestBody);
840+
using JsonDocument requestJson = JsonDocument.Parse(handler.CapturedRequestBody);
841+
JsonElement operation = requestJson.RootElement.GetProperty("operations")[0];
842+
843+
Assert.AreEqual(OperationType.Read.ToString(), operation.GetProperty("operationType").GetString());
844+
Assert.AreEqual(doc.id, operation.GetProperty("id").GetString());
845+
Assert.IsTrue(operation.TryGetProperty("databaseName", out _), "databaseName should be present");
846+
Assert.IsTrue(operation.TryGetProperty("collectionName", out _), "collectionName should be present");
847+
Assert.IsTrue(operation.TryGetProperty("partitionKey", out _), "partitionKey should be present");
848+
Assert.IsFalse(operation.TryGetProperty("resourceBody", out _), "resourceBody must NOT be present for read operations");
849+
850+
response.Dispose();
851+
}
852+
853+
[TestMethod]
854+
public async Task ValidateReadTransactionResponseDeserialization()
855+
{
856+
// Arrange
857+
ToDoActivity expectedDoc = ToDoActivity.CreateRandomToDoActivity();
858+
859+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
860+
request => Task.FromResult(this.BuildMockResponse(
861+
HttpStatusCode.OK,
862+
BuildReadSuccessResponseJson(1, JsonSerializer.Serialize(expectedDoc)))));
863+
864+
using CosmosClient client = this.CreateMockClient(handler);
865+
866+
// Act
867+
DistributedTransactionResponse response = await client
868+
.CreateDistributedReadTransaction()
869+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(expectedDoc.pk), expectedDoc.id)
870+
.CommitTransactionAsync(CancellationToken.None);
871+
872+
// Assert
873+
Assert.IsTrue(response.IsSuccessStatusCode);
874+
ToDoActivity actualDoc = JsonSerializer.Deserialize<ToDoActivity>(response[0].ResourceStream);
875+
Assert.IsNotNull(actualDoc);
876+
Assert.AreEqual(expectedDoc.id, actualDoc.id);
877+
Assert.AreEqual(expectedDoc.pk, actualDoc.pk);
878+
Assert.AreEqual(expectedDoc.taskNum, actualDoc.taskNum);
879+
880+
response.Dispose();
881+
}
882+
883+
[TestMethod]
884+
public async Task ValidateReadTransactionResourceStream()
885+
{
886+
// Arrange
887+
ToDoActivity expectedDoc = ToDoActivity.CreateRandomToDoActivity();
888+
889+
DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
890+
request => Task.FromResult(this.BuildMockResponse(
891+
HttpStatusCode.OK,
892+
BuildReadSuccessResponseJson(1, JsonSerializer.Serialize(expectedDoc)))));
893+
894+
using CosmosClient client = this.CreateMockClient(handler);
895+
896+
// Act
897+
DistributedTransactionResponse response = await client
898+
.CreateDistributedReadTransaction()
899+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey(expectedDoc.pk), expectedDoc.id)
900+
.CommitTransactionAsync(CancellationToken.None);
901+
902+
// Assert – raw stream access
903+
Stream stream = response[0].ResourceStream;
904+
Assert.IsNotNull(stream);
905+
ToDoActivity actualDoc = JsonSerializer.Deserialize<ToDoActivity>(stream);
906+
Assert.AreEqual(expectedDoc.id, actualDoc.id);
907+
Assert.AreEqual(expectedDoc.pk, actualDoc.pk);
908+
909+
response.Dispose();
910+
}
911+
912+
[TestMethod]
913+
public void ValidateReadTransactionMissingIdThrows()
914+
{
915+
using CosmosClient client = this.CreateMockClient(new DistributedTransactionMockHandler(
916+
request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, BuildSuccessResponseJson(1)))));
917+
918+
Assert.ThrowsException<ArgumentNullException>(() =>
919+
client.CreateDistributedReadTransaction()
920+
.ReadItem(this.database.Id, this.container.Id, new PartitionKey("pk"), id: null));
921+
}
922+
923+
[TestMethod]
924+
public void ValidateReadTransactionMissingDatabaseThrows()
925+
{
926+
using CosmosClient client = this.CreateMockClient(new DistributedTransactionMockHandler(
927+
request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, BuildSuccessResponseJson(1)))));
928+
929+
Assert.ThrowsException<ArgumentNullException>(() =>
930+
client.CreateDistributedReadTransaction()
931+
.ReadItem(null, this.container.Id, new PartitionKey("pk"), "item-id"));
932+
}
933+
787934
// Helpers
788935

789936
private void ValidateValueKind(JsonElement operation, string property, JsonValueKind expectedValueKind, int operationIndex, bool isRequired)
@@ -827,6 +974,19 @@ private static string BuildSuccessResponseJson(int operationCount)
827974
return $@"{{""operationResponses"":[{string.Join(",", results)}]}}";
828975
}
829976

977+
private static string BuildReadSuccessResponseJson(int operationCount, params string[] itemJsonBodies)
978+
{
979+
List<string> results = new List<string>();
980+
for (int i = 0; i < operationCount; i++)
981+
{
982+
string body = i < itemJsonBodies.Length ? itemJsonBodies[i] : "{}";
983+
string base64Body = Convert.ToBase64String(Encoding.UTF8.GetBytes(body));
984+
results.Add($@"{{""index"":{i},""statusCode"":200,""etag"":""\""etag-{i}\"""",""resourcebody"":""{base64Body}""}}");
985+
}
986+
987+
return $@"{{""operationResponses"":[{string.Join(",", results)}]}}";
988+
}
989+
830990
// Mock handler
831991

832992
/// <summary>

0 commit comments

Comments
 (0)