Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 38 additions & 32 deletions Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,27 @@ public static async Task<Stream> SerializeProxyRequestAsync(
dictionaryCollection.Set(header.Key, string.Join(",", header.Value));
}

using DocumentServiceRequest request = new (operationType, resourceType, requestMessage.RequestUri.PathAndQuery,
requestStream, AuthorizationTokenType.PrimaryMasterKey,
dictionaryCollection);
using DocumentServiceRequest request = new (
operationType,
resourceType,
requestMessage.RequestUri.PathAndQuery,
requestStream,
AuthorizationTokenType.PrimaryMasterKey,
dictionaryCollection);

if (operationType.IsPointOperation())
{
string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey);

if (string.IsNullOrEmpty(partitionKey))
{
throw new InternalServerErrorException();
}

ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(
request,
CancellationToken.None,
NoOpTrace.Singleton);
request,
CancellationToken.None,
NoOpTrace.Singleton);
string epk = GetEffectivePartitionKeyHash(partitionKey, collection.PartitionKey);

request.Properties = new Dictionary<string, object>
Expand All @@ -89,7 +93,7 @@ public static async Task<Stream> SerializeProxyRequestAsync(
request.Properties = new Dictionary<string, object>
{
{ WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ThinClientConstants.ProxyStartEpk]) },
{ WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ThinClientConstants.ProxyEndEpk]) }
{ WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ThinClientConstants.ProxyEndEpk]) }
};

request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString());
Expand All @@ -100,19 +104,29 @@ public static async Task<Stream> SerializeProxyRequestAsync(
await request.EnsureBufferedBodyAsync();

using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest =
Documents.Rntbd.TransportSerialization.BuildRequestForProxy(request,
new ResourceOperation(operationType, resourceType),
activityId,
bufferProvider.Provider,
accountName,
out _,
out _);

// TODO: consider using the SerializedRequest directly.
MemoryStream memoryStream = new MemoryStream(serializedRequest.RequestSize);
await serializedRequest.CopyToStreamAsync(memoryStream);
memoryStream.Position = 0;
return memoryStream;
Documents.Rntbd.TransportSerialization.BuildRequestForProxy(
request,
new ResourceOperation(operationType, resourceType),
activityId,
bufferProvider.Provider,
accountName,
out _,
out _);

int length = serializedRequest.RequestSize;
byte[] buffer = ArrayPool<byte>.Shared.Rent(length);
try
{
await serializedRequest.CopyToStreamAsync(new MemoryStream(buffer, 0, length, true, true));

MemoryStream stream = new MemoryStream(buffer, 0, length, writable: false, publiclyVisible: false);
return stream;
Comment thread
aavasthy marked this conversation as resolved.
}
catch
{
ArrayPool<byte>.Shared.Return(buffer);
throw;
}
}

public static string GetEffectivePartitionKeyHash(string partitionJson, PartitionKeyDefinition partitionKeyDefinition)
Expand Down Expand Up @@ -195,14 +209,11 @@ public static async Task<HttpResponseMessage> ConvertProxyResponseAsync(HttpResp
int headerRead = 0;
while (headerRead < headerLength)
{
int read = 0;
read = await stream.ReadAsync(header, headerRead, headerLength - headerRead);

int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead);
if (read == 0)
{
throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown);
}

headerRead += read;
}

Expand All @@ -219,9 +230,7 @@ public static async Task<HttpResponseMessage> ConvertProxyResponseAsync(HttpResp
int responseMetadataRead = 0;
while (responseMetadataRead < metadataLength)
{
int read = 0;
read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead);

int read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead);
if (read == 0)
{
throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown);
Expand All @@ -247,9 +256,7 @@ private static async Task<int> ReadBodyLengthAsync(Stream stream)
int headerRead = 0;
while (headerRead < headerLength)
{
int read = 0;
read = await stream.ReadAsync(header, headerRead, headerLength - headerRead);

int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead);
if (read == 0)
{
throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown);
Expand All @@ -264,7 +271,6 @@ private static async Task<int> ReadBodyLengthAsync(Stream stream)
{
ArrayPool<byte>.Shared.Return(header);
}

}
}
}
Loading