-
Notifications
You must be signed in to change notification settings - Fork 542
Expand file tree
/
Copy pathThinClientTransportSerializer.cs
More file actions
273 lines (232 loc) · 11.8 KB
/
Copy pathThinClientTransportSerializer.cs
File metadata and controls
273 lines (232 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
/// <summary>
/// A static helper class providing serialization and deserialization
/// of requests/responses to and from the RNTBD protocol for the ThinClient scenario.
/// </summary>
internal static class ThinClientTransportSerializer
{
public sealed class BufferProviderWrapper
{
internal BufferProvider Provider { get; set; } = new ();
}
/// <summary>
/// Serialize the Proxy request to the RNTBD protocol format.
/// Today this takes the HttprequestMessage and reconstructs the DSR.
/// If the SDK can push properties to the HttpRequestMessage then the handler above having
/// the DSR can allow us to push that directly to the serialization.
/// </summary>
public static async Task<Stream> SerializeProxyRequestAsync(
BufferProviderWrapper bufferProvider,
string accountName,
ClientCollectionCache clientCollectionCache,
HttpRequestMessage requestMessage)
{
// Skip this and use the original DSR.
OperationType operationType = (OperationType)Enum.Parse(typeof(OperationType), requestMessage.Headers.GetValues(ThinClientConstants.ProxyOperationType).First());
ResourceType resourceType = (ResourceType)Enum.Parse(typeof(ResourceType), requestMessage.Headers.GetValues(ThinClientConstants.ProxyResourceType).First());
Guid activityId = Guid.Parse(requestMessage.Headers.GetValues(HttpConstants.HttpHeaders.ActivityId).First());
Stream requestStream = null;
if (requestMessage.Content != null)
{
requestStream = await requestMessage.Content.ReadAsStreamAsync();
}
RequestNameValueCollection dictionaryCollection = new RequestNameValueCollection();
foreach (KeyValuePair<string, IEnumerable<string>> header in requestMessage.Headers)
{
dictionaryCollection.Set(header.Key, string.Join(",", header.Value));
}
using DocumentServiceRequest request = new (operationType, resourceType, requestMessage.RequestUri.PathAndQuery,
requestStream, AuthorizationTokenType.PrimaryMasterKey,
dictionaryCollection);
ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(
request,
CancellationToken.None,
NoOpTrace.Singleton);
if (operationType.IsPointOperation())
{
string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey);
if (string.IsNullOrEmpty(partitionKey))
{
throw new InternalServerErrorException();
}
string epk = GetEffectivePartitionKeyHash(partitionKey, collection.PartitionKey);
request.Properties = new Dictionary<string, object>
{
{ ThinClientConstants.EffectivePartitionKey, HexStringUtility.HexStringToBytes(epk) }
};
}
else if (request.Headers[ThinClientConstants.ProxyStartEpk] != null)
{
// Re-add EPK headers removed by RequestInvokerHandler through Properties
request.Properties = new Dictionary<string, object>
{
{ WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ThinClientConstants.ProxyStartEpk]) },
{ WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ThinClientConstants.ProxyEndEpk]) }
};
request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString());
request.Headers.Add(HttpConstants.HttpHeaders.StartEpk, request.Headers[ThinClientConstants.ProxyStartEpk]);
request.Headers.Add(HttpConstants.HttpHeaders.EndEpk, request.Headers[ThinClientConstants.ProxyEndEpk]);
}
request.ResourceId = collection.ResourceId;
request.Headers.Add(WFConstants.BackendHeaders.CollectionRid, collection.ResourceId);
await request.EnsureBufferedBodyAsync();
using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest =
Documents.Rntbd.TransportSerialization.BuildRequestForProxy(request,
new ResourceOperation(operationType, resourceType),
activityId,
bufferProvider.Provider,
accountName,
out _,
out _);
// TODO: consider using the SerializedRequest directly.
MemoryStream memoryStream = new MemoryStream(serializedRequest.RequestSize);
await serializedRequest.CopyToStreamAsync(memoryStream);
memoryStream.Position = 0;
return memoryStream;
}
public static string GetEffectivePartitionKeyHash(string partitionJson, PartitionKeyDefinition partitionKeyDefinition)
{
return Documents.PartitionKey.FromJsonString(partitionJson).InternalKey.GetEffectivePartitionKeyString(partitionKeyDefinition);
}
/// <summary>
/// Deserialize the Proxy Response from the RNTBD protocol format to the Http format needed by the caller.
/// Today this takes the HttpResponseMessage and reconstructs the modified Http response.
/// </summary>
public static async Task<HttpResponseMessage> ConvertProxyResponseAsync(HttpResponseMessage responseMessage)
{
using Stream responseStream = await responseMessage.Content.ReadAsStreamAsync();
(StatusCodes status, byte[] metadata) = await ThinClientTransportSerializer.ReadHeaderAndMetadataAsync(responseStream);
if (responseMessage.StatusCode != (HttpStatusCode)status)
{
throw new InternalServerErrorException($"Status code mismatch: Expected {(int)status}, but received {(int)responseMessage.StatusCode}.");
}
Rntbd.BytesDeserializer bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length);
if (!Documents.Rntbd.HeadersTransportSerialization.TryParseMandatoryResponseHeaders(ref bytesDeserializer, out bool payloadPresent, out _))
{
throw new InternalServerErrorException("Response metadata length mismatch: Unable to parse mandatory headers.");
}
MemoryStream bodyStream = null;
if (payloadPresent)
{
int length = await ThinClientTransportSerializer.ReadBodyLengthAsync(responseStream);
bodyStream = new MemoryStream(length);
await responseStream.CopyToAsync(bodyStream);
bodyStream.Position = 0;
}
// TODO(Perf): Clean this up.
bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length);
StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse(
status,
Guid.NewGuid(),
bodyStream,
HttpConstants.Versions.CurrentVersion,
ref bytesDeserializer);
HttpResponseMessage response = new HttpResponseMessage((HttpStatusCode)storeResponse.StatusCode)
{
RequestMessage = responseMessage.RequestMessage
};
if (bodyStream != null)
{
response.Content = new StreamContent(bodyStream);
}
foreach (string header in storeResponse.Headers.Keys())
{
if (header == HttpConstants.HttpHeaders.SessionToken)
{
string newSessionToken = storeResponse.PartitionKeyRangeId + ":" + storeResponse.Headers.Get(header);
response.Headers.TryAddWithoutValidation(header, newSessionToken);
}
else
{
response.Headers.TryAddWithoutValidation(header, storeResponse.Headers.Get(header));
}
}
response.Headers.TryAddWithoutValidation(ThinClientConstants.RoutedViaProxy, "1");
return response;
}
private static async Task<(StatusCodes, byte[] metadata)> ReadHeaderAndMetadataAsync(Stream stream)
{
byte[] header = ArrayPool<byte>.Shared.Rent(24);
const int headerLength = 24;
try
{
int headerRead = 0;
while (headerRead < headerLength)
{
int read = 0;
read = await stream.ReadAsync(header, headerRead, headerLength - headerRead);
if (read == 0)
{
throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown);
}
headerRead += read;
}
uint totalLength = BitConverter.ToUInt32(header, 0);
StatusCodes status = (StatusCodes)BitConverter.ToUInt32(header, 4);
if (totalLength < headerLength)
{
throw new InternalServerErrorException("Header length mismatch: Total length is smaller than expected.");
}
int metadataLength = (int)totalLength - headerLength;
byte[] metadata = new byte[metadataLength];
int responseMetadataRead = 0;
while (responseMetadataRead < metadataLength)
{
int read = 0;
read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead);
if (read == 0)
{
throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown);
}
responseMetadataRead += read;
}
return (status, metadata);
}
finally
{
ArrayPool<byte>.Shared.Return(header);
}
}
private static async Task<int> ReadBodyLengthAsync(Stream stream)
{
byte[] header = ArrayPool<byte>.Shared.Rent(4);
const int headerLength = 4;
try
{
int headerRead = 0;
while (headerRead < headerLength)
{
int read = 0;
read = await stream.ReadAsync(header, headerRead, headerLength - headerRead);
if (read == 0)
{
throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown);
}
headerRead += read;
}
return BitConverter.ToInt32(header, 0);
}
finally
{
ArrayPool<byte>.Shared.Return(header);
}
}
}
}