Skip to content

Commit 7490713

Browse files
committed
Add TransportSerializer and update memeory pool implementation.
1 parent 1d0046e commit 7490713

6 files changed

Lines changed: 2980 additions & 16 deletions

File tree

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<ClientOfficialVersion>3.48.0</ClientOfficialVersion>
44
<ClientPreviewVersion>3.49.0</ClientPreviewVersion>
55
<ClientPreviewSuffixVersion>preview.0</ClientPreviewSuffixVersion>
6-
<DirectVersion>3.37.10</DirectVersion>
6+
<DirectVersion>3.38.0</DirectVersion>
77
<FaultInjectionVersion>1.0.0</FaultInjectionVersion>
88
<FaultInjectionSuffixVersion>beta.0</FaultInjectionSuffixVersion>
99
<EncryptionOfficialVersion>2.0.4</EncryptionOfficialVersion>
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
//------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
//------------------------------------------------------------
4+
namespace Microsoft.Azure.Cosmos
5+
{
6+
using System;
7+
using System.Collections.Concurrent;
8+
using System.Collections.Generic;
9+
using Microsoft.Azure.Documents;
10+
11+
internal class ProxyRequest
12+
{
13+
private static readonly int[] expectedPositionsForTokensDuringReorder;
14+
private static readonly int[] positionsOfTokensToMakeOptionalAfterReorder;
15+
private static readonly Action<RntbdToken, RntbdConstants.Request>[] settersForFieldsToMakeOptional;
16+
17+
static ProxyRequest()
18+
{
19+
// Proxy expects the first tokens to be the ones needed for routing
20+
// This is done as an optimization to avoid parsing the entire request to get the routing and authorization information
21+
RntbdConstants.RequestIdentifiers[] firstTokenTypes = new[]
22+
{
23+
// Proxy expects EPK to be first
24+
RntbdConstants.RequestIdentifiers.EffectivePartitionKey,
25+
26+
// The rest of the tokens that Proxy uses are moved to first positions for faster parsing in Proxy side
27+
RntbdConstants.RequestIdentifiers.StartEpkHash,
28+
RntbdConstants.RequestIdentifiers.EndEpkHash,
29+
RntbdConstants.RequestIdentifiers.GlobalDatabaseAccountName,
30+
RntbdConstants.RequestIdentifiers.DatabaseName,
31+
RntbdConstants.RequestIdentifiers.CollectionName,
32+
RntbdConstants.RequestIdentifiers.CollectionRid,
33+
RntbdConstants.RequestIdentifiers.ResourceId,
34+
35+
// Fields used for AuthZ
36+
RntbdConstants.RequestIdentifiers.PayloadPresent,
37+
RntbdConstants.RequestIdentifiers.DocumentName,
38+
RntbdConstants.RequestIdentifiers.AuthorizationToken,
39+
RntbdConstants.RequestIdentifiers.Date,
40+
};
41+
42+
(RntbdConstants.RequestIdentifiers, Action<RntbdToken, RntbdConstants.Request>)[] tokensToMakeOptional = new (RntbdConstants.RequestIdentifiers, Action<RntbdToken, RntbdConstants.Request>)[]
43+
{
44+
// For Proxy, transportRequestId and replicapath are optional.
45+
(RntbdConstants.RequestIdentifiers.ReplicaPath, (token, request) => request.replicaPath = token),
46+
(RntbdConstants.RequestIdentifiers.TransportRequestID, (token, request) => request.transportRequestID = token),
47+
};
48+
49+
ProxyRequest.expectedPositionsForTokensDuringReorder = new int[firstTokenTypes.Length];
50+
RntbdConstants.Request rntbdRequest = new ();
51+
for (int i = 0; i < firstTokenTypes.Length; i++)
52+
{
53+
RntbdConstants.RequestIdentifiers tokenType = firstTokenTypes[i];
54+
int index = Array.FindIndex(rntbdRequest.tokens, x => x?.GetTokenIdentifier() == (ushort)tokenType);
55+
ProxyRequest.expectedPositionsForTokensDuringReorder[i] = index;
56+
ProxyRequest.SwapTokens(rntbdRequest.tokens, i, index);
57+
}
58+
59+
Dictionary<RntbdConstants.RequestIdentifiers, int> indexes = new ();
60+
for (int i = 0; i < rntbdRequest.tokens.Length; i++)
61+
{
62+
if (rntbdRequest.tokens[i] == null)
63+
{
64+
continue;
65+
}
66+
67+
RntbdConstants.RequestIdentifiers tokenType = (RntbdConstants.RequestIdentifiers)rntbdRequest.tokens[i].GetTokenIdentifier();
68+
indexes[tokenType] = i;
69+
}
70+
71+
ProxyRequest.positionsOfTokensToMakeOptionalAfterReorder = new int[tokensToMakeOptional.Length];
72+
ProxyRequest.settersForFieldsToMakeOptional = new Action<RntbdToken, RntbdConstants.Request>[tokensToMakeOptional.Length];
73+
for (int i = 0; i < tokensToMakeOptional.Length; i++)
74+
{
75+
ProxyRequest.positionsOfTokensToMakeOptionalAfterReorder[i] = indexes[tokensToMakeOptional[i].Item1];
76+
ProxyRequest.settersForFieldsToMakeOptional[i] = tokensToMakeOptional[i].Item2;
77+
}
78+
}
79+
80+
public RntbdConstants.Request RntbdRequest { get; } = new ();
81+
public ProxyRequest()
82+
{
83+
for (int i = 0; i < ProxyRequest.expectedPositionsForTokensDuringReorder.Length; i++)
84+
{
85+
ProxyRequest.SwapTokens(this.RntbdRequest.tokens, i, ProxyRequest.expectedPositionsForTokensDuringReorder[i]);
86+
}
87+
88+
// Make optional
89+
for (int i = 0; i < ProxyRequest.positionsOfTokensToMakeOptionalAfterReorder.Length; i++)
90+
{
91+
int expectedTokenPosition = ProxyRequest.positionsOfTokensToMakeOptionalAfterReorder[i];
92+
RntbdToken originalToken = this.RntbdRequest.tokens[expectedTokenPosition];
93+
this.RntbdRequest.tokens[expectedTokenPosition] = new RntbdToken(false, originalToken.GetTokenType(), originalToken.GetTokenIdentifier());
94+
ProxyRequest.settersForFieldsToMakeOptional[i](this.RntbdRequest.tokens[expectedTokenPosition], this.RntbdRequest);
95+
}
96+
}
97+
98+
public void Reset()
99+
{
100+
this.RntbdRequest.Reset();
101+
}
102+
103+
private static void SwapTokens(RntbdToken[] tokens, int i, int j)
104+
{
105+
if (i != j)
106+
{
107+
(tokens[j], tokens[i]) = (tokens[i], tokens[j]);
108+
}
109+
}
110+
111+
internal sealed class ObjectPool
112+
{
113+
public static readonly ObjectPool Instance = new ObjectPool();
114+
115+
private readonly ConcurrentQueue<ProxyRequest> entities = new ConcurrentQueue<ProxyRequest>();
116+
117+
private ObjectPool()
118+
{
119+
}
120+
121+
public EntityOwner Get()
122+
{
123+
if (this.entities.TryDequeue(out ProxyRequest entity))
124+
{
125+
return new EntityOwner(entity);
126+
}
127+
128+
return new EntityOwner(new ProxyRequest());
129+
}
130+
131+
private void Return(ProxyRequest entity)
132+
{
133+
entity.Reset();
134+
this.entities.Enqueue(entity);
135+
}
136+
137+
public readonly struct EntityOwner : IDisposable
138+
{
139+
public EntityOwner(ProxyRequest entity)
140+
{
141+
this.Entity = entity;
142+
}
143+
144+
public ProxyRequest Entity { get; }
145+
146+
public void Dispose()
147+
{
148+
if (this.Entity != null)
149+
{
150+
ObjectPool.Instance.Return(this.Entity);
151+
}
152+
}
153+
}
154+
}
155+
}
156+
}

Microsoft.Azure.Cosmos/src/ReadFeedKeyType.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ internal enum ReadFeedKeyType
1616
/// <summary>
1717
/// Use EffectivePartitionKey
1818
/// </summary>
19-
EffectivePartitionKey
19+
EffectivePartitionKey,
20+
21+
/// <summary>
22+
/// Use EffectivePartitionKeyRange
23+
/// </summary>
24+
EffectivePartitionKeyRange
2025
}
2126
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
namespace Microsoft.Azure.Cosmos
2+
{
3+
using System.Collections.Concurrent;
4+
using System.IO;
5+
6+
internal sealed class ThinClientMemoryStreamPool
7+
{
8+
public static ThinClientMemoryStreamPool Instance { get; } = new ();
9+
10+
private readonly ConcurrentBag<MemoryStream> objects = new ConcurrentBag<MemoryStream>();
11+
12+
private ThinClientMemoryStreamPool()
13+
{
14+
}
15+
16+
public MemoryStream Get(int minimumCapacity)
17+
{
18+
if (this.objects.TryTake(out MemoryStream ms))
19+
{
20+
ms.Position = 0;
21+
ms.SetLength(0);
22+
if (ms.Capacity < minimumCapacity)
23+
{
24+
ms.Capacity = minimumCapacity;
25+
}
26+
return ms;
27+
}
28+
29+
return new MemoryStream(minimumCapacity);
30+
}
31+
32+
public void Return(MemoryStream ms)
33+
{
34+
ms.Position = 0;
35+
ms.SetLength(0);
36+
this.objects.Add(ms);
37+
}
38+
}
39+
40+
}

Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ namespace Microsoft.Azure.Cosmos
1919
using Microsoft.Azure.Documents.Collections;
2020

2121
/// <summary>
22-
/// A static helper class providing serialization and deserialization
22+
/// A static helper class providing serialization and deserialization
2323
/// of requests/responses to and from the RNTBD protocol for the ThinClient scenario.
2424
/// </summary>
2525
internal static class ThinClientTransportSerializer
2626
{
27+
private static readonly ThinClientMemoryStreamPool memoryStreamPool = ThinClientMemoryStreamPool.Instance;
28+
2729
public sealed class BufferProviderWrapper
2830
{
2931
internal BufferProvider Provider { get; set; } = new ();
@@ -41,7 +43,6 @@ public static async Task<Stream> SerializeProxyRequestAsync(
4143
ClientCollectionCache clientCollectionCache,
4244
HttpRequestMessage requestMessage)
4345
{
44-
// Skip this and use the original DSR.
4546
OperationType operationType = (OperationType)Enum.Parse(typeof(OperationType), requestMessage.Headers.GetValues(ThinClientConstants.ProxyOperationType).First());
4647
ResourceType resourceType = (ResourceType)Enum.Parse(typeof(ResourceType), requestMessage.Headers.GetValues(ThinClientConstants.ProxyResourceType).First());
4748

@@ -60,8 +61,8 @@ public static async Task<Stream> SerializeProxyRequestAsync(
6061
}
6162

6263
using DocumentServiceRequest request = new (operationType, resourceType, requestMessage.RequestUri.PathAndQuery,
63-
requestStream, AuthorizationTokenType.PrimaryMasterKey,
64-
dictionaryCollection);
64+
requestStream, AuthorizationTokenType.PrimaryMasterKey,
65+
dictionaryCollection);
6566

6667
if (operationType.IsPointOperation())
6768
{
@@ -79,9 +80,9 @@ public static async Task<Stream> SerializeProxyRequestAsync(
7980
string epk = GetEffectivePartitionKeyHash(partitionKey, collection.PartitionKey);
8081

8182
request.Properties = new Dictionary<string, object>
82-
{
83-
{ ThinClientConstants.EffectivePartitionKey, HexStringUtility.HexStringToBytes(epk) }
84-
};
83+
{
84+
{ ThinClientConstants.EffectivePartitionKey, HexStringUtility.HexStringToBytes(epk) }
85+
};
8586
}
8687
else if (request.Headers[ThinClientConstants.ProxyStartEpk] != null)
8788
{
@@ -99,17 +100,16 @@ public static async Task<Stream> SerializeProxyRequestAsync(
99100

100101
await request.EnsureBufferedBodyAsync();
101102

102-
using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest =
103-
Documents.Rntbd.TransportSerialization.BuildRequestForProxy(request,
103+
using TransportSerialization.SerializedRequest serializedRequest =
104+
TransportSerialization.BuildRequestForProxy(request,
104105
new ResourceOperation(operationType, resourceType),
105106
activityId,
106107
bufferProvider.Provider,
107108
accountName,
108109
out _,
109110
out _);
110111

111-
// TODO: consider using the SerializedRequest directly.
112-
MemoryStream memoryStream = new MemoryStream(serializedRequest.RequestSize);
112+
MemoryStream memoryStream = memoryStreamPool.Get(serializedRequest.RequestSize);
113113
await serializedRequest.CopyToStreamAsync(memoryStream);
114114
memoryStream.Position = 0;
115115
return memoryStream;
@@ -145,14 +145,14 @@ public static async Task<HttpResponseMessage> ConvertProxyResponseAsync(HttpResp
145145
if (payloadPresent)
146146
{
147147
int length = await ThinClientTransportSerializer.ReadBodyLengthAsync(responseStream);
148-
bodyStream = new MemoryStream(length);
148+
bodyStream = memoryStreamPool.Get(length);
149149
await responseStream.CopyToAsync(bodyStream);
150150
bodyStream.Position = 0;
151151
}
152152

153153
// TODO(Perf): Clean this up.
154154
bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length);
155-
StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse(
155+
StoreResponse storeResponse = TransportSerialization.MakeStoreResponse(
156156
status,
157157
Guid.NewGuid(),
158158
bodyStream,
@@ -264,7 +264,6 @@ private static async Task<int> ReadBodyLengthAsync(Stream stream)
264264
{
265265
ArrayPool<byte>.Shared.Return(header);
266266
}
267-
268267
}
269268
}
270269
}

0 commit comments

Comments
 (0)