1+ //------------------------------------------------------------
2+ // Copyright (c) Microsoft Corporation. All rights reserved.
3+ //------------------------------------------------------------
4+
5+ namespace Microsoft . Azure . Cosmos
6+ {
7+ using System ;
8+ using System . Collections . Concurrent ;
9+ using System . Diagnostics . CodeAnalysis ;
10+ using System . IO ;
11+ using System . Net . Http ;
12+ using System . Threading ;
13+ using System . Threading . Tasks ;
14+ using Microsoft . Azure . Cosmos . Core . Trace ;
15+ using Microsoft . Azure . Cosmos . Routing ;
16+ using Microsoft . Azure . Documents ;
17+ using Newtonsoft . Json ;
18+ using static Microsoft . Azure . Cosmos . ThinClientTransportSerializer ;
19+
20+ /// <summary>
21+ /// A TransportClient that sends requests to proxy endpoint.
22+ /// And then processes the response back into DocumentServiceResponse objects.
23+ /// </summary>
24+ internal class ThinClientStoreClient : GatewayStoreClient
25+ {
26+ private readonly ObjectPool < BufferProviderWrapper > bufferProviderWrapperPool ;
27+
28+ public ThinClientStoreClient (
29+ CosmosHttpClient httpClient ,
30+ ICommunicationEventSource eventSource ,
31+ JsonSerializerSettings serializerSettings = null )
32+ : base ( httpClient ,
33+ eventSource ,
34+ serializerSettings )
35+ {
36+ this . bufferProviderWrapperPool = new ObjectPool < BufferProviderWrapper > ( ( ) => new BufferProviderWrapper ( ) ) ;
37+ }
38+
39+ public override async Task < DocumentServiceResponse > InvokeAsync (
40+ DocumentServiceRequest request ,
41+ ResourceType resourceType ,
42+ Uri physicalAddress ,
43+ Uri thinClientEndpoint ,
44+ string globalDatabaseAccountName ,
45+ ClientCollectionCache clientCollectionCache ,
46+ CancellationToken cancellationToken )
47+ {
48+ using ( HttpResponseMessage responseMessage = await this . InvokeClientAsync (
49+ request ,
50+ resourceType ,
51+ physicalAddress ,
52+ thinClientEndpoint ,
53+ globalDatabaseAccountName ,
54+ clientCollectionCache ,
55+ cancellationToken ) )
56+ {
57+ HttpResponseMessage proxyResponse = await ThinClientTransportSerializer . ConvertProxyResponseAsync ( responseMessage ) ;
58+ return await ThinClientStoreClient . ParseResponseAsync ( proxyResponse , request . SerializerSettings ?? base . SerializerSettings , request ) ;
59+ }
60+ }
61+
62+ internal override async Task < StoreResponse > InvokeStoreAsync ( Uri baseAddress , ResourceOperation resourceOperation , DocumentServiceRequest request )
63+ {
64+ Uri physicalAddress = ThinClientStoreClient . IsFeedRequest ( request . OperationType ) ?
65+ HttpTransportClient . GetResourceFeedUri ( resourceOperation . resourceType , baseAddress , request ) :
66+ HttpTransportClient . GetResourceEntryUri ( resourceOperation . resourceType , baseAddress , request ) ;
67+
68+ using ( HttpResponseMessage responseMessage = await this . InvokeClientAsync (
69+ request ,
70+ resourceOperation . resourceType ,
71+ physicalAddress ,
72+ default ,
73+ default ,
74+ default ,
75+ default ) )
76+ {
77+ return await HttpTransportClient . ProcessHttpResponse ( request . ResourceAddress , string . Empty , responseMessage , physicalAddress , request ) ;
78+ }
79+ }
80+
81+ private async ValueTask < HttpRequestMessage > PrepareRequestForProxyAsync (
82+ DocumentServiceRequest request ,
83+ Uri physicalAddress ,
84+ Uri thinClientEndpoint ,
85+ string globalDatabaseAccountName ,
86+ ClientCollectionCache clientCollectionCache )
87+ {
88+ HttpRequestMessage requestMessage = base . PrepareRequestMessageAsync ( request , physicalAddress ) . Result ;
89+ requestMessage . Version = new Version ( 2 , 0 ) ;
90+
91+ BufferProviderWrapper bufferProviderWrapper = this . bufferProviderWrapperPool . Get ( ) ;
92+ try
93+ {
94+ requestMessage . Headers . TryAddWithoutValidation (
95+ ThinClientConstants . ProxyOperationType ,
96+ request . OperationType . ToOperationTypeString ( ) ) ;
97+
98+ requestMessage . Headers . TryAddWithoutValidation (
99+ ThinClientConstants . ProxyResourceType ,
100+ request . ResourceType . ToResourceTypeString ( ) ) ;
101+
102+ Stream contentStream = await ThinClientTransportSerializer . SerializeProxyRequestAsync (
103+ bufferProviderWrapper ,
104+ globalDatabaseAccountName ,
105+ clientCollectionCache ,
106+ requestMessage ) ;
107+
108+ if ( ! contentStream . CanSeek )
109+ {
110+ throw new InvalidOperationException (
111+ $ "The serializer returned a non-seekable stream ({ contentStream . GetType ( ) . FullName } ).") ;
112+ }
113+
114+ requestMessage . Content = new StreamContent ( contentStream ) ;
115+ requestMessage . Content . Headers . ContentLength = contentStream . Length ;
116+ requestMessage . Headers . Clear ( ) ;
117+ requestMessage . RequestUri = thinClientEndpoint ;
118+ requestMessage . Method = HttpMethod . Post ;
119+
120+ return requestMessage ;
121+ }
122+ finally
123+ {
124+ this . bufferProviderWrapperPool . Return ( bufferProviderWrapper ) ;
125+ }
126+ }
127+
128+ private Task < HttpResponseMessage > InvokeClientAsync (
129+ DocumentServiceRequest request ,
130+ ResourceType resourceType ,
131+ Uri physicalAddress ,
132+ Uri thinClientEndpoint ,
133+ string globalDatabaseAccountName ,
134+ ClientCollectionCache clientCollectionCache ,
135+ CancellationToken cancellationToken )
136+ {
137+ DefaultTrace . TraceInformation ( "In {0}, OperationType: {1}, ResourceType: {2}" , nameof ( ThinClientStoreClient ) , request . OperationType , request . ResourceType ) ;
138+ return base . httpClient . SendHttpAsync (
139+ ( ) => this . PrepareRequestForProxyAsync ( request , physicalAddress , thinClientEndpoint , globalDatabaseAccountName , clientCollectionCache ) ,
140+ resourceType ,
141+ HttpTimeoutPolicy . GetTimeoutPolicy ( request ) ,
142+ request . RequestContext . ClientRequestStatistics ,
143+ cancellationToken ) ;
144+ }
145+
146+ internal class ObjectPool < T >
147+ {
148+ private readonly ConcurrentBag < T > Objects ;
149+ private readonly Func < T > ObjectGenerator ;
150+
151+ public ObjectPool ( Func < T > objectGenerator )
152+ {
153+ this . ObjectGenerator = objectGenerator ?? throw new ArgumentNullException ( nameof ( objectGenerator ) ) ;
154+ this . Objects = new ConcurrentBag < T > ( ) ;
155+ }
156+
157+ public T Get ( )
158+ {
159+ return this . Objects . TryTake ( out T item ) ? item : this . ObjectGenerator ( ) ;
160+ }
161+
162+ public void Return ( T item )
163+ {
164+ this . Objects . Add ( item ) ;
165+ }
166+ }
167+ }
168+ }
0 commit comments