32
32
33
33
namespace OpenQA . Selenium . BiDi . Communication ;
34
34
35
- public class Broker : IAsyncDisposable
35
+ public sealed class Broker : IAsyncDisposable
36
36
{
37
37
private readonly ILogger _logger = Log . GetLogger < Broker > ( ) ;
38
38
39
39
private readonly BiDi _bidi ;
40
40
private readonly ITransport _transport ;
41
41
42
- private readonly ConcurrentDictionary < int , TaskCompletionSource < JsonElement > > _pendingCommands = new ( ) ;
42
+ private readonly ConcurrentDictionary < long , CommandInfo > _pendingCommands = new ( ) ;
43
43
private readonly BlockingCollection < MessageEvent > _pendingEvents = [ ] ;
44
+ private readonly Dictionary < string , Type > _eventTypesMap = [ ] ;
44
45
45
46
private readonly ConcurrentDictionary < string , List < EventHandler > > _eventHandlers = new ( ) ;
46
47
47
- private int _currentCommandId ;
48
+ private long _currentCommandId ;
48
49
49
50
private static readonly TaskFactory _myTaskFactory = new ( CancellationToken . None , TaskCreationOptions . DenyChildAttach , TaskContinuationOptions . None , TaskScheduler . Default ) ;
50
51
@@ -89,7 +90,6 @@ internal Broker(BiDi bidi, Uri url)
89
90
new JsonStringEnumConverter ( JsonNamingPolicy . CamelCase ) ,
90
91
91
92
// https://github.com/dotnet/runtime/issues/72604
92
- new Json . Converters . Polymorphic . MessageConverter ( ) ,
93
93
new Json . Converters . Polymorphic . EvaluateResultConverter ( ) ,
94
94
new Json . Converters . Polymorphic . RemoteValueConverter ( ) ,
95
95
new Json . Converters . Polymorphic . RealmInfoConverter ( ) ,
@@ -122,23 +122,18 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
122
122
{
123
123
while ( ! cancellationToken . IsCancellationRequested )
124
124
{
125
- var data = await _transport . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
126
-
127
- var message = JsonSerializer . Deserialize ( new ReadOnlySpan < byte > ( data ) , _jsonSerializerContext . Message ) ;
125
+ try
126
+ {
127
+ var data = await _transport . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
128
128
129
- switch ( message )
129
+ ProcessReceivedMessage ( data ) ;
130
+ }
131
+ catch ( Exception ex )
130
132
{
131
- case MessageSuccess messageSuccess :
132
- _pendingCommands [ messageSuccess . Id ] . SetResult ( messageSuccess . Result ) ;
133
- _pendingCommands . TryRemove ( messageSuccess . Id , out _ ) ;
134
- break ;
135
- case MessageEvent messageEvent :
136
- _pendingEvents . Add ( messageEvent ) ;
137
- break ;
138
- case MessageError mesageError :
139
- _pendingCommands [ mesageError . Id ] . SetException ( new BiDiException ( $ "{ mesageError . Error } : { mesageError . Message } ") ) ;
140
- _pendingCommands . TryRemove ( mesageError . Id , out _ ) ;
141
- break ;
133
+ if ( cancellationToken . IsCancellationRequested is not true && _logger . IsEnabled ( LogEventLevel . Error ) )
134
+ {
135
+ _logger . Error ( $ "Couldn't process received BiDi remote message: { ex } ") ;
136
+ }
142
137
}
143
138
}
144
139
}
@@ -155,7 +150,7 @@ private async Task ProcessEventsAwaiterAsync()
155
150
{
156
151
foreach ( var handler in eventHandlers . ToArray ( ) ) // copy handlers avoiding modified collection while iterating
157
152
{
158
- var args = ( EventArgs ) result . Params . Deserialize ( handler . EventArgsType , _jsonSerializerContext ) ! ;
153
+ var args = result . Params ;
159
154
160
155
args . BiDi = _bidi ;
161
156
@@ -177,40 +172,41 @@ private async Task ProcessEventsAwaiterAsync()
177
172
{
178
173
if ( _logger . IsEnabled ( LogEventLevel . Error ) )
179
174
{
180
- _logger . Error ( $ "Unhandled error processing BiDi event: { ex } ") ;
175
+ _logger . Error ( $ "Unhandled error processing BiDi event handler : { ex } ") ;
181
176
}
182
177
}
183
178
}
184
179
}
185
180
186
- public async Task < TResult > ExecuteCommandAsync < TCommand , TResult > ( TCommand command , CommandOptions ? options )
181
+ public async Task ExecuteCommandAsync < TCommand > ( TCommand command , CommandOptions ? options )
187
182
where TCommand : Command
188
183
{
189
- var jsonElement = await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
190
-
191
- return ( TResult ) jsonElement . Deserialize ( typeof ( TResult ) , _jsonSerializerContext ) ! ;
184
+ await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
192
185
}
193
186
194
- public async Task ExecuteCommandAsync < TCommand > ( TCommand command , CommandOptions ? options )
187
+ public async Task < TResult > ExecuteCommandAsync < TCommand , TResult > ( TCommand command , CommandOptions ? options )
195
188
where TCommand : Command
189
+ where TResult : EmptyResult
196
190
{
197
- await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
191
+ var result = await ExecuteCommandCoreAsync ( command , options ) . ConfigureAwait ( false ) ;
192
+
193
+ return ( TResult ) result ;
198
194
}
199
195
200
- private async Task < JsonElement > ExecuteCommandCoreAsync < TCommand > ( TCommand command , CommandOptions ? options )
196
+ private async Task < EmptyResult > ExecuteCommandCoreAsync < TCommand > ( TCommand command , CommandOptions ? options )
201
197
where TCommand : Command
202
198
{
203
199
command . Id = Interlocked . Increment ( ref _currentCommandId ) ;
204
200
205
- var tcs = new TaskCompletionSource < JsonElement > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
201
+ var tcs = new TaskCompletionSource < EmptyResult > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
206
202
207
203
var timeout = options ? . Timeout ?? TimeSpan . FromSeconds ( 30 ) ;
208
204
209
205
using var cts = new CancellationTokenSource ( timeout ) ;
210
206
211
207
cts . Token . Register ( ( ) => tcs . TrySetCanceled ( cts . Token ) ) ;
212
208
213
- _pendingCommands [ command . Id ] = tcs ;
209
+ _pendingCommands [ command . Id ] = new ( command . Id , command . ResultType , tcs ) ;
214
210
215
211
var data = JsonSerializer . SerializeToUtf8Bytes ( command , typeof ( TCommand ) , _jsonSerializerContext ) ;
216
212
@@ -222,6 +218,8 @@ private async Task<JsonElement> ExecuteCommandCoreAsync<TCommand>(TCommand comma
222
218
public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Action < TEventArgs > action , SubscriptionOptions ? options = null )
223
219
where TEventArgs : EventArgs
224
220
{
221
+ _eventTypesMap [ eventName ] = typeof ( TEventArgs ) ;
222
+
225
223
var handlers = _eventHandlers . GetOrAdd ( eventName , ( a ) => [ ] ) ;
226
224
227
225
if ( options is BrowsingContextsSubscriptionOptions browsingContextsOptions )
@@ -249,6 +247,8 @@ public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, Act
249
247
public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , Func < TEventArgs , Task > func , SubscriptionOptions ? options = null )
250
248
where TEventArgs : EventArgs
251
249
{
250
+ _eventTypesMap [ eventName ] = typeof ( TEventArgs ) ;
251
+
252
252
var handlers = _eventHandlers . GetOrAdd ( eventName , ( a ) => [ ] ) ;
253
253
254
254
if ( options is BrowsingContextsSubscriptionOptions browsingContextsOptions )
@@ -303,12 +303,6 @@ public async Task UnsubscribeAsync(Modules.Session.Subscription subscription, Ev
303
303
}
304
304
305
305
public async ValueTask DisposeAsync ( )
306
- {
307
- await DisposeAsyncCore ( ) ;
308
- GC . SuppressFinalize ( this ) ;
309
- }
310
-
311
- protected virtual async ValueTask DisposeAsyncCore ( )
312
306
{
313
307
_pendingEvents . CompleteAdding ( ) ;
314
308
@@ -320,5 +314,104 @@ protected virtual async ValueTask DisposeAsyncCore()
320
314
}
321
315
322
316
_transport . Dispose ( ) ;
317
+
318
+ GC . SuppressFinalize ( this ) ;
323
319
}
320
+
321
+ private void ProcessReceivedMessage ( byte [ ] ? data )
322
+ {
323
+ long ? id = default ;
324
+ string ? type = default ;
325
+ string ? method = default ;
326
+ string ? error = default ;
327
+ string ? message = default ;
328
+ Utf8JsonReader resultReader = default ;
329
+ Utf8JsonReader paramsReader = default ;
330
+
331
+ Utf8JsonReader reader = new ( new ReadOnlySpan < byte > ( data ) ) ;
332
+ reader . Read ( ) ;
333
+
334
+ reader . Read ( ) ; // "{"
335
+
336
+ while ( reader . TokenType == JsonTokenType . PropertyName )
337
+ {
338
+ string ? propertyName = reader . GetString ( ) ;
339
+ reader . Read ( ) ;
340
+
341
+ switch ( propertyName )
342
+ {
343
+ case "id" :
344
+ id = reader . GetInt64 ( ) ;
345
+ break ;
346
+
347
+ case "type" :
348
+ type = reader . GetString ( ) ;
349
+ break ;
350
+
351
+ case "method" :
352
+ method = reader . GetString ( ) ;
353
+ break ;
354
+
355
+ case "result" :
356
+ resultReader = reader ; // cloning reader with current position
357
+ break ;
358
+
359
+ case "params" :
360
+ paramsReader = reader ; // cloning reader with current position
361
+ break ;
362
+
363
+ case "error" :
364
+ error = reader . GetString ( ) ;
365
+ break ;
366
+
367
+ case "message" :
368
+ message = reader . GetString ( ) ;
369
+ break ;
370
+ }
371
+
372
+ reader . Skip ( ) ;
373
+ reader . Read ( ) ;
374
+ }
375
+
376
+ switch ( type )
377
+ {
378
+ case "success" :
379
+ if ( id is null ) throw new JsonException ( "The remote end responded with 'success' message type, but missed required 'id' property." ) ;
380
+
381
+ var successCommand = _pendingCommands [ id . Value ] ;
382
+ var messageSuccess = JsonSerializer . Deserialize ( ref resultReader , successCommand . ResultType , _jsonSerializerContext ) ! ;
383
+ successCommand . TaskCompletionSource . SetResult ( ( EmptyResult ) messageSuccess ) ;
384
+ _pendingCommands . TryRemove ( id . Value , out _ ) ;
385
+ break ;
386
+
387
+ case "event" :
388
+ if ( method is null ) throw new JsonException ( "The remote end responded with 'event' message type, but missed required 'method' property." ) ;
389
+
390
+ var eventType = _eventTypesMap [ method ] ;
391
+
392
+ var eventArgs = ( EventArgs ) JsonSerializer . Deserialize ( ref paramsReader , eventType , _jsonSerializerContext ) ! ;
393
+
394
+ var messageEvent = new MessageEvent ( method , eventArgs ) ;
395
+ _pendingEvents . Add ( messageEvent ) ;
396
+ break ;
397
+
398
+ case "error" :
399
+ if ( id is null ) throw new JsonException ( "The remote end responded with 'error' message type, but missed required 'id' property." ) ;
400
+
401
+ var messageError = new MessageError ( id . Value ) { Error = error , Message = message } ;
402
+ var errorCommand = _pendingCommands [ messageError . Id ] ;
403
+ errorCommand . TaskCompletionSource . SetException ( new BiDiException ( $ "{ messageError . Error } : { messageError . Message } ") ) ;
404
+ _pendingCommands . TryRemove ( messageError . Id , out _ ) ;
405
+ break ;
406
+ }
407
+ }
408
+
409
+ class CommandInfo ( long id , Type resultType , TaskCompletionSource < EmptyResult > taskCompletionSource )
410
+ {
411
+ public long Id { get ; } = id ;
412
+
413
+ public Type ResultType { get ; } = resultType ;
414
+
415
+ public TaskCompletionSource < EmptyResult > TaskCompletionSource { get ; } = taskCompletionSource ;
416
+ } ;
324
417
}
0 commit comments