11// Copyright (c) Microsoft Corporation.
22// Licensed under the MIT License.
33
4+ using System ;
5+ using System . Buffers ;
46using System . Collections . Concurrent ;
5- using System . Net . Security ;
7+ using System . IO ;
8+ using System . IO . Pipelines ;
9+ using System . Linq ;
610using System . Text ;
711using System . Text . Encodings . Web ;
812using System . Text . Json ;
913using System . Text . Json . Nodes ;
1014using System . Text . Json . Serialization ;
15+ using System . Threading ;
16+ using System . Threading . Tasks ;
1117
1218namespace Bicep . RpcClient . JsonRpc ;
1319
14- internal class JsonRpcClient ( Stream reader , Stream writer ) : IDisposable
20+ internal class JsonRpcClient ( PipeReader reader , PipeWriter writer ) : IDisposable
1521{
1622 private record JsonRpcRequest < T > (
1723 string Jsonrpc ,
@@ -20,7 +26,6 @@ private record JsonRpcRequest<T>(
2026 int Id ) ;
2127
2228 private record MinimalJsonRpcResponse (
23- string Jsonrpc ,
2429 int Id ) ;
2530
2631 private record JsonRpcResponse < T > (
@@ -34,10 +39,9 @@ private record JsonRpcError(
3439 string Message ,
3540 JsonNode ? Data ) ;
3641
37- private readonly byte [ ] terminator = "\r \n \r \n "u8 . ToArray ( ) ;
3842 private int nextId = 0 ;
3943 private readonly SemaphoreSlim writeSemaphore = new ( 1 , 1 ) ;
40- private readonly ConcurrentDictionary < int , TaskCompletionSource < string > > pendingResponses = new ( ) ;
44+ private readonly ConcurrentDictionary < int , TaskCompletionSource < byte [ ] > > pendingResponses = new ( ) ;
4145
4246 private readonly JsonSerializerOptions jsonSerializerOptions = new ( )
4347 {
@@ -55,20 +59,20 @@ public async Task<TResponse> SendRequest<TRequest, TResponse>(string method, TRe
5559 var requestContent = JsonSerializer . Serialize ( jsonRpcRequest , jsonSerializerOptions ) ;
5660 var requestLength = Encoding . UTF8 . GetByteCount ( requestContent ) ;
5761 var rawRequest = $ "Content-Length: { requestLength } \r \n \r \n { requestContent } ";
58- var requestBytes = Encoding . UTF8 . GetBytes ( rawRequest ) ;
5962
6063 await writeSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
6164 try
6265 {
63- await writer . WriteAsync ( requestBytes , 0 , requestBytes . Length , cancellationToken ) . ConfigureAwait ( false ) ;
66+ var requestBytes = Encoding . UTF8 . GetBytes ( rawRequest ) . AsMemory ( ) ;
67+ await writer . WriteAsync ( requestBytes , cancellationToken ) . ConfigureAwait ( false ) ;
6468 await writer . FlushAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
6569 }
6670 finally
6771 {
6872 writeSemaphore . Release ( ) ;
6973 }
7074
71- var tcs = new TaskCompletionSource < string > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
75+ var tcs = new TaskCompletionSource < byte [ ] > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
7276 if ( ! pendingResponses . TryAdd ( currentId , tcs ) )
7377 {
7478 throw new InvalidOperationException ( $ "A request with ID { currentId } is already pending.") ;
@@ -87,13 +91,34 @@ public async Task<TResponse> SendRequest<TRequest, TResponse>(string method, TRe
8791 return jsonRpcResponse . Result ;
8892 }
8993
90- public async Task ReadLoop ( CancellationToken cancellationToken )
94+ public Task Listen ( Action onComplete , CancellationToken cancellationToken )
95+ => Task . Run ( async ( ) =>
96+ {
97+ try
98+ {
99+ await ListenInternal ( cancellationToken ) . ConfigureAwait ( false ) ;
100+ }
101+ catch ( OperationCanceledException )
102+ {
103+ // Expected when disposing
104+ }
105+ finally
106+ {
107+ onComplete ( ) ;
108+ }
109+ } , cancellationToken ) ;
110+
111+ private async Task ListenInternal ( CancellationToken cancellationToken )
91112 {
92113 while ( true )
93114 {
94115 try
95116 {
96117 var message = await ReadMessage ( cancellationToken ) . ConfigureAwait ( false ) ;
118+ if ( message is null )
119+ {
120+ return ;
121+ }
97122
98123 var response = JsonSerializer . Deserialize < MinimalJsonRpcResponse > ( message , jsonSerializerOptions )
99124 ?? throw new InvalidOperationException ( "Failed to deserialize JSON-RPC response" ) ;
@@ -105,77 +130,134 @@ public async Task ReadLoop(CancellationToken cancellationToken)
105130 }
106131 catch ( Exception ) when ( cancellationToken . IsCancellationRequested )
107132 {
108- reader . Dispose ( ) ;
109- writer . Dispose ( ) ;
133+ await reader . CompleteAsync ( ) . ConfigureAwait ( false ) ;
134+ await writer . CompleteAsync ( ) . ConfigureAwait ( false ) ;
110135 break ;
111136 }
112137 }
113138 }
114139
115- private async Task < string > ReadUntilTerminator ( CancellationToken cancellationToken )
116- {
117- using var outputStream = new MemoryStream ( ) ;
118- var patternIndex = 0 ;
119- var byteBuffer = new byte [ 1 ] ;
140+ private record Headers (
141+ int ContentLength ) ;
120142
143+ private async Task < Headers ? > ReadHeaders ( CancellationToken cancellationToken )
144+ {
145+ int ? contentLength = null ;
121146 while ( true )
122147 {
123- await ReadExactly ( byteBuffer , byteBuffer . Length , cancellationToken ) . ConfigureAwait ( false ) ;
148+ var readResult = await reader . ReadAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
124149
125- await outputStream . WriteAsync ( byteBuffer , 0 , byteBuffer . Length , cancellationToken ) . ConfigureAwait ( false ) ;
126- patternIndex = terminator [ patternIndex ] == byteBuffer [ 0 ] ? patternIndex + 1 : 0 ;
127- if ( patternIndex == terminator . Length )
150+ if ( readResult . Buffer . IsEmpty && readResult . IsCompleted )
128151 {
129- outputStream . Position = 0 ;
130- outputStream . SetLength ( outputStream . Length - terminator . Length ) ;
131- // return stream as string
132- return Encoding . UTF8 . GetString ( outputStream . ToArray ( ) ) ;
152+ return null ; // remote end disconnected at a reasonable place.
153+ }
154+
155+ var lf = readResult . Buffer . PositionOf ( ( byte ) '\n ' ) ;
156+ if ( ! lf . HasValue )
157+ {
158+ if ( readResult . IsCompleted )
159+ {
160+ throw new EndOfStreamException ( ) ;
161+ }
162+
163+ // Indicate that we can't find what we're looking for and read again.
164+ reader . AdvanceTo ( readResult . Buffer . Start , readResult . Buffer . End ) ;
165+ continue ;
166+ }
167+
168+ var line = readResult . Buffer . Slice ( 0 , lf . Value ) ;
169+
170+ // Verify the line ends with an \r (that precedes the \n we already found)
171+ var cr = line . PositionOf ( ( byte ) '\r ' ) ;
172+ if ( ! cr . HasValue || ! line . GetPosition ( 1 , cr . Value ) . Equals ( lf ) )
173+ {
174+ throw new InvalidOperationException ( "Header does not end with expected \r \n character sequence" ) ;
175+ }
176+
177+ // Trim off the \r now that we confirmed it was there.
178+ line = line . Slice ( 0 , line . Length - 1 ) ;
179+
180+ if ( line . Length > 0 )
181+ {
182+ var lineText = Encoding . UTF8 . GetString ( line . ToArray ( ) ) ;
183+ var split = lineText . Split ( [ ':' ] , 2 ) ;
184+ if ( split . Length != 2 )
185+ {
186+ throw new InvalidOperationException ( "Colon not found in header." ) ;
187+ }
188+
189+ var headerName = split [ 0 ] . Trim ( ) ;
190+ var headerValue = split [ 1 ] . Trim ( ) ;
191+
192+ if ( headerName == "Content-Length" )
193+ {
194+ contentLength = int . Parse ( headerValue ) ;
195+ }
196+ }
197+
198+ // Advance to the next line.
199+ reader . AdvanceTo ( readResult . Buffer . GetPosition ( 1 , lf . Value ) ) ;
200+
201+ if ( line . Length == 0 )
202+ {
203+ // We found the empty line that constitutes the end of the HTTP headers.
204+ break ;
133205 }
134206 }
135- }
136207
137- private async Task < string > ReadContent ( int length , CancellationToken cancellationToken )
138- {
139- var byteBuffer = new byte [ length ] ;
140- await ReadExactly ( byteBuffer , length , cancellationToken ) . ConfigureAwait ( false ) ;
208+ if ( ! contentLength . HasValue )
209+ {
210+ throw new InvalidOperationException ( "Failed to obtain Content-Length header" ) ;
211+ }
141212
142- return Encoding . UTF8 . GetString ( byteBuffer ) ;
213+ return new ( contentLength . Value ) ;
143214 }
144215
145- private async Task < string > ReadMessage ( CancellationToken cancellationToken )
216+ protected async ValueTask < ReadResult > ReadAtLeastAsync ( int requiredBytes , bool allowEmpty , CancellationToken cancellationToken )
146217 {
147- var header = await ReadUntilTerminator ( cancellationToken ) . ConfigureAwait ( false ) ;
148- var parsed = header . Split ( ':' ) . Select ( x => x . Trim ( ) ) . ToArray ( ) ;
218+ var readResult = await reader . ReadAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
219+ while ( readResult . Buffer . Length < requiredBytes && ! readResult . IsCompleted && ! readResult . IsCanceled )
220+ {
221+ reader . AdvanceTo ( readResult . Buffer . Start , readResult . Buffer . End ) ;
222+ readResult = await reader . ReadAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
223+ }
149224
150- if ( parsed . Length != 2 ||
151- ! parsed [ 0 ] . Equals ( "Content-Length" , StringComparison . OrdinalIgnoreCase ) ||
152- ! int . TryParse ( parsed [ 1 ] , out var contentLength ) ||
153- contentLength <= 0 )
225+ if ( allowEmpty && readResult . Buffer . Length == 0 )
154226 {
155- throw new InvalidOperationException ( $ "Invalid header: { header } " ) ;
227+ return readResult ;
156228 }
157229
158- return await ReadContent ( contentLength , cancellationToken ) . ConfigureAwait ( false ) ;
230+ if ( readResult . Buffer . Length < requiredBytes )
231+ {
232+ throw readResult . IsCompleted ? new EndOfStreamException ( ) :
233+ readResult . IsCanceled ? new OperationCanceledException ( ) :
234+ throw new InvalidOperationException ( ) ; // should be unreachable
235+ }
236+
237+ return readResult ;
159238 }
160239
161- private async ValueTask ReadExactly ( byte [ ] buffer , int minimumBytes , CancellationToken cancellationToken )
240+ private async Task < byte [ ] ? > ReadMessage ( CancellationToken cancellationToken )
162241 {
163- var totalRead = 0 ;
164- while ( totalRead < minimumBytes )
242+ var headers = await ReadHeaders ( cancellationToken ) . ConfigureAwait ( false ) ;
243+ if ( headers is null )
165244 {
166- var read = await reader . ReadAsync ( buffer , totalRead , minimumBytes - totalRead , cancellationToken ) . ConfigureAwait ( false ) ;
167- if ( read == 0 )
168- {
169- throw new EndOfStreamException ( "Stream closed before reading expected number of bytes" ) ;
170- }
171-
172- totalRead += read ;
245+ return null ;
173246 }
247+
248+ var readResult = await ReadAtLeastAsync ( headers . ContentLength , allowEmpty : false , cancellationToken ) . ConfigureAwait ( false ) ;
249+
250+ var contentBuffer = readResult . Buffer . Slice ( 0 , headers . ContentLength ) ;
251+ var output = contentBuffer . ToArray ( ) ;
252+
253+ reader . AdvanceTo ( contentBuffer . End ) ;
254+
255+ return output ;
174256 }
175257
176258 public void Dispose ( )
177259 {
178- writer . Dispose ( ) ;
179- reader . Dispose ( ) ;
260+ writer . Complete ( ) ;
261+ reader . Complete ( ) ;
180262 }
181263}
0 commit comments