-
Notifications
You must be signed in to change notification settings - Fork 155
Expand file tree
/
Copy pathDiscoveryService.cs
More file actions
473 lines (430 loc) · 20.8 KB
/
DiscoveryService.cs
File metadata and controls
473 lines (430 loc) · 20.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
// <copyright file="DiscoveryService.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Configuration;
using Datadog.Trace.HttpOverStreams;
using Datadog.Trace.Logging;
using Datadog.Trace.PlatformHelpers;
using Datadog.Trace.SourceGenerators;
using Datadog.Trace.Util;
using Datadog.Trace.Vendors.Newtonsoft.Json.Linq;
namespace Datadog.Trace.Agent.DiscoveryService
{
/// <summary>
/// Queries the Datadog Agent and discovers which version we are running against and which endpoints it supports.
/// </summary>
internal sealed class DiscoveryService : IDiscoveryService
{
private const string SupportedDebuggerEndpoint = "debugger/v1/input";
private const string SupportedDebuggerV2Endpoint = "debugger/v2/input";
private const string SupportedDiagnosticsEndpoint = "debugger/v1/diagnostics";
private const string SupportedSymbolDbEndpoint = "symdb/v1/input";
private const string SupportedConfigurationEndpoint = "v0.7/config";
private const string SupportedStatsEndpoint = "v0.6/stats";
private const string SupportedDataStreamsEndpoint = "v0.1/pipeline_stats";
private const string SupportedEventPlatformProxyEndpointV2 = "evp_proxy/v2";
private const string SupportedEventPlatformProxyEndpointV4 = "evp_proxy/v4";
private const string SupportedTelemetryProxyEndpoint = "telemetry/proxy";
private const string SupportedTracerFlareEndpoint = "tracer_flare/v1";
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<DiscoveryService>();
private readonly int _initialRetryDelayMs;
private readonly int _maxRetryDelayMs;
private readonly int _recheckIntervalMs;
private readonly TaskCompletionSource<bool> _processExit = new();
private readonly List<Action<AgentConfiguration>> _agentChangeCallbacks = new();
private readonly object _lock = new();
private readonly Task _discoveryTask;
private readonly IDisposable? _settingSubscription;
private readonly ServiceRemappingHash _serviceRemappingHash;
private IApiRequestFactory _apiRequestFactory;
private AgentConfiguration? _configuration;
private string? _configurationHash;
private string _agentConfigStateHash = string.Empty;
private long _agentConfigStateHashUnixTime;
public DiscoveryService(
TracerSettings.SettingsManager settings,
ContainerMetadata containerMetadata,
ServiceRemappingHash serviceRemappingHash,
TimeSpan tcpTimeout,
int initialRetryDelayMs,
int maxRetryDelayMs,
int recheckIntervalMs)
: this(CreateApiRequestFactory(settings.InitialExporterSettings, containerMetadata.ContainerId, tcpTimeout), serviceRemappingHash, initialRetryDelayMs, maxRetryDelayMs, recheckIntervalMs)
{
// Create as a "managed" service that can update the request factory
_settingSubscription = settings.SubscribeToChanges(changes =>
{
if (changes.UpdatedExporter is { } exporter)
{
var newFactory = CreateApiRequestFactory(exporter, containerMetadata.ContainerId, tcpTimeout);
Interlocked.Exchange(ref _apiRequestFactory!, newFactory);
}
});
}
/// <summary>
/// Initializes a new instance of the <see cref="DiscoveryService"/> class.
/// Public for testing purposes
/// </summary>
public DiscoveryService(
IApiRequestFactory apiRequestFactory,
ServiceRemappingHash serviceRemappingHash,
int initialRetryDelayMs,
int maxRetryDelayMs,
int recheckIntervalMs)
{
_apiRequestFactory = apiRequestFactory;
_serviceRemappingHash = serviceRemappingHash;
_initialRetryDelayMs = initialRetryDelayMs;
_maxRetryDelayMs = maxRetryDelayMs;
_recheckIntervalMs = recheckIntervalMs;
_discoveryTask = Task.Run(FetchConfigurationLoopAsync);
_discoveryTask.ContinueWith(t => Log.Error(t.Exception, "Error in discovery task"), TaskContinuationOptions.OnlyOnFaulted);
}
/// <summary>
/// Gets all the supported endpoints for testing purposes only
/// </summary>
public static string[] AllSupportedEndpoints =>
new[]
{
SupportedDebuggerEndpoint,
SupportedDebuggerV2Endpoint,
SupportedDiagnosticsEndpoint,
SupportedSymbolDbEndpoint,
SupportedConfigurationEndpoint,
SupportedStatsEndpoint,
SupportedDataStreamsEndpoint,
SupportedEventPlatformProxyEndpointV2,
SupportedEventPlatformProxyEndpointV4,
SupportedTelemetryProxyEndpoint,
SupportedTracerFlareEndpoint,
};
[TestingOnly]
internal string? ConfigStateHash => Volatile.Read(ref _configurationHash);
/// <summary>
/// Create a <see cref="DiscoveryService"/> instance that responds to runtime changes in settings
/// </summary>
public static DiscoveryService CreateManaged(TracerSettings settings, ContainerMetadata containerMetadata, ServiceRemappingHash serviceRemappingHash)
=> new(
settings.Manager,
containerMetadata,
serviceRemappingHash,
tcpTimeout: TimeSpan.FromSeconds(15),
initialRetryDelayMs: 500,
maxRetryDelayMs: 5_000,
recheckIntervalMs: 30_000);
/// <summary>
/// Create a <see cref="DiscoveryService"/> instance that does _not_ respond to runtime changes in settings
/// </summary>
public static DiscoveryService CreateUnmanaged(ExporterSettings exporterSettings, ContainerMetadata containerMetadata, ServiceRemappingHash serviceRemappingHash)
=> CreateUnmanaged(
exporterSettings,
containerMetadata,
serviceRemappingHash,
tcpTimeout: TimeSpan.FromSeconds(15),
initialRetryDelayMs: 500,
maxRetryDelayMs: 5_000,
recheckIntervalMs: 30_000);
/// <summary>
/// Create a <see cref="DiscoveryService"/> instance that does _not_ respond to runtime changes in settings
/// </summary>
public static DiscoveryService CreateUnmanaged(
ExporterSettings exporterSettings,
ContainerMetadata containerMetadata,
ServiceRemappingHash serviceRemappingHash,
TimeSpan tcpTimeout,
int initialRetryDelayMs,
int maxRetryDelayMs,
int recheckIntervalMs)
=> new(
CreateApiRequestFactory(exporterSettings, containerMetadata.ContainerId, tcpTimeout),
serviceRemappingHash,
initialRetryDelayMs,
maxRetryDelayMs,
recheckIntervalMs);
/// <inheritdoc cref="IDiscoveryService.SubscribeToChanges"/>
public void SubscribeToChanges(Action<AgentConfiguration> callback)
{
lock (_lock)
{
if (!_agentChangeCallbacks.Contains(callback))
{
_agentChangeCallbacks.Add(callback);
}
}
if (Volatile.Read(ref _configuration) is { } currentConfig)
{
try
{
// If we already have fetched the config, call this immediately
callback(currentConfig);
}
catch (Exception ex)
{
Log.Error(ex, "Error notifying subscriber of initial discovered configuration");
}
}
}
/// <inheritdoc cref="IDiscoveryService.RemoveSubscription"/>
public void RemoveSubscription(Action<AgentConfiguration> callback)
{
lock (_lock)
{
_agentChangeCallbacks.Remove(callback);
}
}
/// <inheritdoc />
public void SetCurrentConfigStateHash(string configStateHash)
{
// record the new hash and the time we got the hash update
// It would be nice to make these atomic, but given that we're going to call this a lot,
// we don't really want to create a new object every time
Interlocked.Exchange(ref _agentConfigStateHash, configStateHash);
Interlocked.Exchange(ref _agentConfigStateHashUnixTime, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
}
private void NotifySubscribers(AgentConfiguration newConfig)
{
List<Action<AgentConfiguration>> subscribers;
lock (_lock)
{
subscribers = _agentChangeCallbacks.ToList();
// Setting the configuration immediately after grabbing
// the subscribers ensures subscribers receive the
// notification exactly once
_configuration = newConfig;
}
foreach (var subscriber in subscribers)
{
try
{
subscriber(newConfig);
}
catch (Exception ex)
{
Log.Error(ex, "Error notifying subscriber of configuration change");
}
}
}
private async Task FetchConfigurationLoopAsync()
{
var requestFactory = _apiRequestFactory;
var uri = requestFactory.GetEndpoint("info");
var sleepDuration = _recheckIntervalMs;
while (!_processExit.Task.IsCompleted)
{
// do we already have an update from the agent? If so, we can skip the loop
if (RequireRefresh(_configurationHash, DateTimeOffset.UtcNow))
{
try
{
Log.Debug("Agent features discovery refresh required, contacting agent");
// If the exporter settings have been updated, refresh the endpoint
var updatedFactory = Volatile.Read(ref _apiRequestFactory);
if (requestFactory != updatedFactory)
{
requestFactory = updatedFactory;
uri = requestFactory.GetEndpoint("info");
}
var api = requestFactory.Create(uri);
using var response = await api.GetAsync().ConfigureAwait(false);
if (response.StatusCode is >= 200 and < 300)
{
await ProcessDiscoveryResponse(response).ConfigureAwait(false);
sleepDuration = _recheckIntervalMs;
}
else
{
Log.Warning("Error discovering available agent services");
sleepDuration = GetNextSleepDuration(sleepDuration);
}
}
catch (Exception exception)
{
Log.Warning(exception, "Error discovering available agent services");
sleepDuration = GetNextSleepDuration(sleepDuration);
}
}
else
{
// no need to re-check, so reset the check interval
sleepDuration = _recheckIntervalMs;
}
try
{
await Task.WhenAny(_processExit.Task, Task.Delay(sleepDuration)).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
Log.Debug("Discovery service exiting");
int GetNextSleepDuration(int? previousDuration) =>
previousDuration is null ? _initialRetryDelayMs : Math.Min(previousDuration.Value * 2, _maxRetryDelayMs);
}
[TestingAndPrivateOnly]
internal bool RequireRefresh(string? currentHash, DateTimeOffset utcNow)
{
var agentVersion = Volatile.Read(ref _agentConfigStateHash);
if (currentHash is null || agentVersion is null || currentHash != agentVersion)
{
// Either we don't have a current state, we haven't received any updates, or the config has changed
return true;
}
// agent hash matches our current hash, but is it up to date enough?
return Volatile.Read(ref _agentConfigStateHashUnixTime) + _recheckIntervalMs < utcNow.ToUnixTimeMilliseconds();
}
private async Task ProcessDiscoveryResponse(IApiResponse response)
{
// Extract and store container tags hash from response headers
var containerTagsHash = response.GetHeader(AgentHttpHeaderNames.ContainerTagsHash);
if (containerTagsHash != null)
{
_serviceRemappingHash.UpdateContainerTagsHash(containerTagsHash);
}
// Grab the original stream
var stream = await response.GetStreamAsync().ConfigureAwait(false);
// Create a hash of the utf-8 bytes while also deserializing
JObject? jObject;
using var sha256 = SHA256.Create();
using (var cryptoStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read))
{
jObject = response.ReadAsType<JObject>(cryptoStream);
// Newtonsoft.JSON doesn't technically read to the end of the stream, it stops as soon
// as it has something parseable, but for the sha256 we need to read to the end so that
// it finalizes correctly, so just drain it down
#if NETCOREAPP3_1_OR_GREATER
Span<byte> buffer = stackalloc byte[10];
while (cryptoStream.Read(buffer) > 0) { }
#else
var buffer = new byte[10];
while (cryptoStream.Read(buffer, 0, 10) > 0) { }
#endif
}
if (jObject is null)
{
throw new Exception("Error deserializing discovery response: response was null");
}
var agentVersion = jObject["version"]?.Value<string>();
var clientDropP0 = jObject["client_drop_p0s"]?.Value<bool>() ?? false;
var spanMetaStructs = jObject["span_meta_structs"]?.Value<bool>() ?? false;
var spanEvents = jObject["span_events"]?.Value<bool>() ?? false;
var discoveredEndpoints = (jObject["endpoints"] as JArray)?.Values<string>().ToArray();
string? configurationEndpoint = null;
string? debuggerEndpoint = null;
string? debuggerV2Endpoint = null;
string? diagnosticsEndpoint = null;
string? symbolDbEndpoint = null;
string? statsEndpoint = null;
string? dataStreamsMonitoringEndpoint = null;
string? eventPlatformProxyEndpoint = null;
string? telemetryProxyEndpoint = null;
string? tracerFlareEndpoint = null;
if (discoveredEndpoints is { Length: > 0 })
{
foreach (var discoveredEndpoint in discoveredEndpoints)
{
var endpoint = discoveredEndpoint?.Trim('/');
if (endpoint is null)
{
continue;
}
// effectively a switch, but case insensitive
if (endpoint.Equals(SupportedDebuggerEndpoint, StringComparison.OrdinalIgnoreCase))
{
debuggerEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedDebuggerV2Endpoint, StringComparison.OrdinalIgnoreCase))
{
debuggerV2Endpoint = endpoint;
}
else if (endpoint.Equals(SupportedDiagnosticsEndpoint, StringComparison.OrdinalIgnoreCase))
{
diagnosticsEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedSymbolDbEndpoint, StringComparison.OrdinalIgnoreCase))
{
symbolDbEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedConfigurationEndpoint, StringComparison.OrdinalIgnoreCase))
{
configurationEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedStatsEndpoint, StringComparison.OrdinalIgnoreCase))
{
statsEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedDataStreamsEndpoint, StringComparison.OrdinalIgnoreCase))
{
dataStreamsMonitoringEndpoint = endpoint;
}
else if (eventPlatformProxyEndpoint is null && endpoint.Equals(SupportedEventPlatformProxyEndpointV2, StringComparison.OrdinalIgnoreCase))
{
eventPlatformProxyEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedEventPlatformProxyEndpointV4, StringComparison.OrdinalIgnoreCase))
{
eventPlatformProxyEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedTelemetryProxyEndpoint, StringComparison.OrdinalIgnoreCase))
{
telemetryProxyEndpoint = endpoint;
}
else if (endpoint.Equals(SupportedTracerFlareEndpoint, StringComparison.OrdinalIgnoreCase))
{
tracerFlareEndpoint = endpoint;
}
}
}
var existingConfiguration = _configuration;
var newConfig = new AgentConfiguration(
configurationEndpoint: configurationEndpoint,
debuggerEndpoint: debuggerEndpoint,
debuggerV2Endpoint: debuggerV2Endpoint,
diagnosticsEndpoint: diagnosticsEndpoint,
symbolDbEndpoint: symbolDbEndpoint,
agentVersion: agentVersion,
statsEndpoint: statsEndpoint,
dataStreamsMonitoringEndpoint: dataStreamsMonitoringEndpoint,
eventPlatformProxyEndpoint: eventPlatformProxyEndpoint,
telemetryProxyEndpoint: telemetryProxyEndpoint,
tracerFlareEndpoint: tracerFlareEndpoint,
containerTagsHash: _serviceRemappingHash.ContainerTagsHash, // either the value just received, or the one we stored before (prevents overriding with null)
clientDropP0: clientDropP0,
spanMetaStructs: spanMetaStructs,
spanEvents: spanEvents);
// Save the hash, whether the details we care about changed or not
_configurationHash = HexString.ToHexString(sha256.Hash);
// AgentConfiguration is a record, so this compares by value
if (existingConfiguration is null || !newConfig.Equals(existingConfiguration))
{
Log.Debug("Discovery configuration updated, notifying subscribers: {Configuration}", newConfig);
NotifySubscribers(newConfig);
}
}
public Task DisposeAsync()
{
_settingSubscription?.Dispose();
if (!_processExit.TrySetResult(true))
{
// Double dispose in prod shouldn't happen, and should be avoided, so logging for follow-up
Log.Debug($"{nameof(DiscoveryService)} is already disposed, skipping further disposal.");
}
return _discoveryTask;
}
private static IApiRequestFactory CreateApiRequestFactory(ExporterSettings exporterSettings, string? containerId, TimeSpan tcpTimeout)
{
return AgentTransportStrategy.Get(
exporterSettings,
productName: "discovery",
tcpTimeout: tcpTimeout,
httpHeaderHelper: containerId is null ? MinimalAgentHeaderHelper.Instance : new MinimalWithContainerIdAgentHeaderHelper(containerId));
}
}
}