-
-
Notifications
You must be signed in to change notification settings - Fork 484
Expand file tree
/
Copy pathConnection.cs
More file actions
398 lines (335 loc) · 15 KB
/
Copy pathConnection.cs
File metadata and controls
398 lines (335 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using PuppeteerSharp.Cdp.Messaging;
using PuppeteerSharp.Helpers;
using PuppeteerSharp.Helpers.Json;
using PuppeteerSharp.QueryHandlers;
using PuppeteerSharp.Transport;
namespace PuppeteerSharp.Cdp
{
/// <summary>
/// A connection handles the communication with a Chromium browser.
/// </summary>
public sealed class Connection : IDisposable, ICDPConnection
{
internal const int DefaultCommandTimeout = 180_000;
private readonly ILogger _logger;
private readonly TaskQueue _callbackQueue = new();
private readonly ConcurrentDictionary<int, MessageTask> _callbacks = new();
private readonly AsyncDictionaryHelper<string, CdpCDPSession> _sessions = new("Session {0} not found");
private readonly List<string> _manuallyAttached = [];
private int _lastId;
private Connection(string url, int delay, bool enqueueAsyncMessages, IConnectionTransport transport, ILoggerFactory loggerFactory = null, int protocolTimeout = DefaultCommandTimeout)
{
LoggerFactory = loggerFactory ?? new LoggerFactory();
Url = url;
Delay = delay;
Transport = transport;
_logger = LoggerFactory.CreateLogger<Connection>();
ProtocolTimeout = protocolTimeout;
MessageQueue = new AsyncMessageQueue(enqueueAsyncMessages, _logger);
Transport.MessageReceived += Transport_MessageReceived;
Transport.Closed += Transport_Closed;
}
/// <summary>
/// Occurs when the connection is closed.
/// </summary>
public event EventHandler Disconnected;
/// <summary>
/// Occurs when a message from chromium is received.
/// </summary>
public event EventHandler<MessageEventArgs> MessageReceived;
internal event EventHandler<SessionEventArgs> SessionAttached;
internal event EventHandler<SessionEventArgs> SessionDetached;
/// <summary>
/// Gets the WebSocket URL.
/// </summary>
/// <value>The URL.</value>
public string Url { get; }
/// <summary>
/// Gets the sleep time when a message is received.
/// </summary>
/// <value>The delay.</value>
public int Delay { get; }
/// <summary>
/// Gets the Connection transport.
/// </summary>
/// <value>Connection transport.</value>
public IConnectionTransport Transport { get; }
/// <summary>
/// Gets a value indicating whether this <see cref="Connection"/> is closed.
/// </summary>
/// <value><c>true</c> if is closed; otherwise, <c>false</c>.</value>
public bool IsClosed { get; internal set; }
/// <summary>
/// Connection close reason.
/// </summary>
public string CloseReason { get; private set; }
/// <summary>
/// Gets the logger factory.
/// </summary>
/// <value>The logger factory.</value>
public ILoggerFactory LoggerFactory { get; }
internal AsyncMessageQueue MessageQueue { get; }
internal ScriptInjector ScriptInjector => ScriptInjector.Default;
internal int ProtocolTimeout { get; }
/// <inheritdoc />
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <inheritdoc/>
public async Task<JsonElement?> SendAsync(string method, object args = null, bool waitForCallback = true, CommandOptions options = null)
{
if (IsClosed)
{
throw new TargetClosedException($"Protocol error({method}): Target closed.", CloseReason);
}
var id = GetMessageId();
var message = GetMessage(id, method, args);
MessageTask callback = null;
if (waitForCallback)
{
callback = new MessageTask
{
TaskWrapper = new TaskCompletionSource<JsonElement?>(TaskCreationOptions.RunContinuationsAsynchronously),
Method = method,
Message = message,
};
_callbacks[id] = callback;
}
await RawSendAsync(message, options).ConfigureAwait(false);
return waitForCallback ? await callback.TaskWrapper.Task.WithTimeout(ProtocolTimeout).ConfigureAwait(false) : null;
}
/// <inheritdoc/>
public async Task<T> SendAsync<T>(string method, object args = null, CommandOptions options = null)
{
var response = await SendAsync(method, args, true, options).ConfigureAwait(false);
return response!.Value.ToObject<T>();
}
internal static async Task<Connection> Create(string url, IConnectionOptions connectionOptions, ILoggerFactory loggerFactory = null, CancellationToken cancellationToken = default)
{
var transportFactory = connectionOptions.TransportFactory ?? WebSocketTransport.DefaultTransportFactory;
var transport = await transportFactory(new Uri(url), connectionOptions, cancellationToken).ConfigureAwait(false);
return new Connection(url, connectionOptions.SlowMo, connectionOptions.EnqueueAsyncMessages, transport, loggerFactory, connectionOptions.ProtocolTimeout);
}
internal static Connection CreateFromTransport(IConnectionTransport transport, IConnectionOptions connectionOptions, ILoggerFactory loggerFactory = null)
{
return new Connection(string.Empty, connectionOptions.SlowMo, connectionOptions.EnqueueAsyncMessages, transport, loggerFactory, connectionOptions.ProtocolTimeout);
}
internal static Connection FromSession(CdpCDPSession session) => session.Connection;
internal int GetMessageId() => Interlocked.Increment(ref _lastId);
internal Task RawSendAsync(byte[] message, CommandOptions options = null)
{
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace("Send ► {Message}", Encoding.UTF8.GetString(message));
}
return Transport.SendAsync(message);
}
internal byte[] GetMessage(int id, string method, object args, string sessionId = null)
=> JsonSerializer.SerializeToUtf8Bytes(
new ConnectionRequest { Id = id, Method = method, Params = args, SessionId = sessionId },
JsonHelper.DefaultJsonSerializerSettings.Value);
internal bool IsAutoAttached(string targetId)
=> !_manuallyAttached.Contains(targetId);
internal async Task<CDPSession> CreateSessionAsync(TargetInfo targetInfo, bool isAutoAttachEmulated)
{
if (!isAutoAttachEmulated)
{
_manuallyAttached.Add(targetInfo.TargetId);
}
var sessionId = (await SendAsync<TargetAttachToTargetResponse>(
"Target.attachToTarget",
new TargetAttachToTargetRequest
{
TargetId = targetInfo.TargetId,
Flatten = true,
}).ConfigureAwait(false)).SessionId;
_manuallyAttached.Remove(targetInfo.TargetId);
return await GetSessionAsync(sessionId).ConfigureAwait(false);
}
internal bool HasPendingCallbacks() => !_callbacks.IsEmpty;
internal List<string> GetPendingProtocolErrors()
{
var result = new List<string>();
foreach (var callback in _callbacks.Values)
{
result.Add($"{callback.Method} timed out.");
}
foreach (var session in _sessions.Values)
{
result.AddRange(session.GetPendingProtocolErrors());
}
return result;
}
internal void Close(string closeReason)
{
if (IsClosed)
{
return;
}
IsClosed = true;
CloseReason = closeReason;
Transport.StopReading();
Disconnected?.Invoke(this, EventArgs.Empty);
foreach (var session in _sessions.Values)
{
session.Close(closeReason);
}
_sessions.Clear();
foreach (var response in _callbacks.Values)
{
response.TaskWrapper.TrySetException(new TargetClosedException(
$"Protocol error({response.Method}): Target closed.",
closeReason));
}
_callbacks.Clear();
MessageQueue.Dispose();
}
internal CdpCDPSession GetSession(string sessionId) => _sessions.GetValueOrDefault(sessionId);
internal Task<CdpCDPSession> GetSessionAsync(string sessionId) => _sessions.GetItemAsync(sessionId);
/// <summary>
/// Releases all resource used by the <see cref="Connection"/> object.
/// It will raise the <see cref="Disconnected"/> event and dispose <see cref="Transport"/>.
/// </summary>
/// <remarks>Call <see cref="Dispose()"/> when you are finished using the <see cref="Connection"/>. The
/// <see cref="Dispose()"/> method leaves the <see cref="Connection"/> in an unusable state.
/// After calling <see cref="Dispose()"/>, you must release all references to the
/// <see cref="Connection"/> so the garbage collector can reclaim the memory that the
/// <see cref="Connection"/> was occupying.</remarks>
/// <param name="disposing">Indicates whether disposal was initiated by <see cref="Dispose()"/> operation.</param>
private void Dispose(bool disposing)
{
Close("Connection disposed");
Transport.MessageReceived -= Transport_MessageReceived;
Transport.Closed -= Transport_Closed;
Transport.Dispose();
_callbackQueue.Dispose();
}
private async void Transport_MessageReceived(object sender, MessageReceivedEventArgs e)
{
try
{
// Apply SlowMo delay before entering the serialized queue.
// In upstream Puppeteer (JS), the delay runs independently per message.
// If we delay inside the serialized queue, delays accumulate and cause
// timeouts on busy pages (see #2659).
if (e.Message.Length > 0 && Delay > 0)
{
await Task.Delay(Delay).ConfigureAwait(false);
}
await _callbackQueue.Enqueue(() => ProcessMessage(e)).ConfigureAwait(false);
}
catch (Exception exception)
{
// We could just catch ObjectDisposedException but as this is an event listener
// we don't want to crash the whole process.
_logger.LogError(exception, $"Failed to process message {e.Message}");
}
}
private async Task ProcessMessage(MessageReceivedEventArgs e)
{
try
{
var response = e.Message;
ConnectionResponse obj;
try
{
obj = JsonSerializer.Deserialize<ConnectionResponse>(response, JsonHelper.DefaultJsonSerializerSettings.Value);
}
catch (JsonException exc)
{
_logger.LogError(exc, "Failed to deserialize response");
return;
}
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace("◀ Receive {Message}", Encoding.UTF8.GetString(response));
}
ProcessIncomingMessage(obj);
}
catch (Exception ex)
{
var message = $"Connection failed to process {e.Message}. {ex.Message}. {ex.StackTrace}";
_logger.LogError(ex, message);
Close(message);
}
}
private void ProcessIncomingMessage(ConnectionResponse obj)
{
var method = obj.Method;
if (method == "Target.attachedToTarget")
{
var param = obj.Params?.ToObject<ConnectionResponseParams>();
var sessionId = param.SessionId;
var session = new CdpCDPSession(this, param.TargetInfo.Type, sessionId, obj.SessionId);
_sessions.AddItem(sessionId, session);
SessionAttached?.Invoke(this, new SessionEventArgs(session));
if (obj.SessionId != null && _sessions.TryGetValue(obj.SessionId, out var parentSession))
{
parentSession.OnSessionAttached(session);
}
}
else if (method == "Target.detachedFromTarget")
{
var param = obj.Params?.ToObject<ConnectionResponseParams>();
var sessionId = param.SessionId;
if (_sessions.TryRemove(sessionId, out var session) && !session.IsClosed)
{
session.Close("Target.detachedFromTarget");
SessionDetached?.Invoke(this, new SessionEventArgs(session));
if (_sessions.TryGetValue(sessionId, out var parentSession))
{
parentSession.OnSessionDetached(session);
}
}
}
if (!string.IsNullOrEmpty(obj.SessionId))
{
var session = GetSession(obj.SessionId);
session?.OnMessage(obj);
}
else if (obj.Id.HasValue)
{
// If we get the object we are waiting for we return it.
// If not we add this to the list, sooner or later someone will come for it
if (_callbacks.TryRemove(obj.Id.Value, out var callback))
{
MessageQueue.Enqueue(callback, obj);
}
else
{
// Chrome can occasionally omit the sessionId on responses. Fall back
// to a per-session callback lookup so the response is routed to the
// session that originally issued the command (upstream #14975).
foreach (var session in _sessions.Values)
{
if (session.HasCallback(obj.Id.Value))
{
session.OnMessage(obj);
break;
}
}
}
}
else
{
MessageReceived?.Invoke(this, new MessageEventArgs
{
MessageID = method,
MessageData = (JsonElement)obj.Params,
});
}
}
private void Transport_Closed(object sender, TransportClosedEventArgs e) => Close(e.CloseReason);
}
}