Skip to content

Commit 29c7c37

Browse files
Jade Wangclaude
andcommitted
Update TelemetryClient to direct enqueue/flush/export pipeline
Remove ActivityListener and MetricsAggregator dependencies. TelemetryClient now directly enqueues TelemetryFrontendLog objects and flushes them via the circuit breaker-protected exporter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e417df7 commit 29c7c37

File tree

1 file changed

+36
-81
lines changed

1 file changed

+36
-81
lines changed

csharp/src/Telemetry/TelemetryClient.cs

Lines changed: 36 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
using System;
18+
using System.Collections.Concurrent;
1819
using System.Collections.Generic;
1920
using System.Diagnostics;
2021
using System.Threading;
@@ -30,7 +31,7 @@ namespace AdbcDrivers.Databricks.Telemetry
3031
/// <remarks>
3132
/// <para>
3233
/// This client orchestrates the complete telemetry pipeline:
33-
/// DatabricksActivityListener → MetricsAggregator → CircuitBreakerTelemetryExporter → DatabricksTelemetryExporter
34+
/// CircuitBreakerTelemetryExporter → DatabricksTelemetryExporter
3435
/// </para>
3536
/// <para>
3637
/// Key Behaviors:
@@ -52,8 +53,9 @@ internal sealed class TelemetryClient : ITelemetryClient
5253

5354
private readonly DatabricksTelemetryExporter _databricksExporter;
5455
private readonly CircuitBreakerTelemetryExporter _circuitBreakerExporter;
55-
private readonly MetricsAggregator _metricsAggregator;
56-
private readonly DatabricksActivityListener _activityListener;
56+
private readonly ITelemetryExporter _effectiveExporter;
57+
private readonly ConcurrentQueue<TelemetryFrontendLog> _pendingLogs;
58+
private readonly int _batchSize;
5759
private readonly CancellationTokenSource _cts;
5860
private volatile bool _disposed;
5961

@@ -64,13 +66,15 @@ internal sealed class TelemetryClient : ITelemetryClient
6466
/// <param name="httpClient">The HTTP client to use for exporting telemetry.</param>
6567
/// <param name="isAuthenticated">Whether the connection is authenticated (determines telemetry endpoint).</param>
6668
/// <param name="configuration">The telemetry configuration.</param>
69+
/// <param name="exporterOverride">Optional exporter override for testing.</param>
6770
/// <exception cref="ArgumentNullException">Thrown when host, httpClient, or configuration is null.</exception>
6871
/// <exception cref="ArgumentException">Thrown when host is empty or whitespace.</exception>
6972
public TelemetryClient(
7073
string host,
7174
System.Net.Http.HttpClient httpClient,
7275
bool isAuthenticated,
73-
TelemetryConfiguration configuration)
76+
TelemetryConfiguration configuration,
77+
ITelemetryExporter? exporterOverride = null)
7478
{
7579
if (string.IsNullOrWhiteSpace(host))
7680
{
@@ -88,6 +92,8 @@ public TelemetryClient(
8892
}
8993

9094
_cts = new CancellationTokenSource();
95+
_pendingLogs = new ConcurrentQueue<TelemetryFrontendLog>();
96+
_batchSize = configuration.BatchSize;
9197

9298
try
9399
{
@@ -98,14 +104,8 @@ public TelemetryClient(
98104
// 2. CircuitBreakerTelemetryExporter (wraps exporter with circuit breaker protection)
99105
_circuitBreakerExporter = new CircuitBreakerTelemetryExporter(_databricksExporter, host);
100106

101-
// 3. MetricsAggregator (aggregates activities and exports via circuit breaker)
102-
_metricsAggregator = new MetricsAggregator(
103-
_circuitBreakerExporter,
104-
batchSize: configuration.BatchSize,
105-
flushInterval: TimeSpan.FromMilliseconds(configuration.FlushIntervalMs));
106-
107-
// 4. DatabricksActivityListener (listens to activities and delegates to aggregator)
108-
_activityListener = new DatabricksActivityListener(_metricsAggregator, configuration);
107+
// Use override exporter if provided (for testing), otherwise use circuit breaker exporter
108+
_effectiveExporter = exporterOverride ?? (ITelemetryExporter)_circuitBreakerExporter;
109109

110110
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.initialized",
111111
tags: new ActivityTagsCollection
@@ -121,8 +121,6 @@ public TelemetryClient(
121121
// Clean up any partially initialized resources
122122
try
123123
{
124-
_activityListener?.Dispose();
125-
_metricsAggregator?.DisposeAsync().AsTask().Wait(TimeSpan.FromSeconds(1));
126124
_cts?.Dispose();
127125
}
128126
catch
@@ -148,10 +146,6 @@ public TelemetryClient(
148146
/// <param name="log">The telemetry frontend log to enqueue.</param>
149147
/// <remarks>
150148
/// <para>
151-
/// This method delegates to the MetricsAggregator, which handles batching and export.
152-
/// The aggregator processes activities from the ActivityListener automatically.
153-
/// </para>
154-
/// <para>
155149
/// This method never throws exceptions. If the client is closed or disposing,
156150
/// the event is silently dropped.
157151
/// </para>
@@ -165,16 +159,13 @@ public void Enqueue(TelemetryFrontendLog log)
165159

166160
try
167161
{
168-
// For the activity-based pipeline, this method is not typically used.
169-
// Activities are automatically captured by the listener and processed by the aggregator.
170-
// However, we provide this method for compatibility with ITelemetryClient interface
171-
// in case manual enqueueing is needed.
162+
_pendingLogs.Enqueue(log);
172163

173-
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.manual_enqueue",
174-
tags: new ActivityTagsCollection
175-
{
176-
{ "frontend_log_id", log.FrontendLogEventId ?? "(none)" }
177-
}));
164+
// Trigger flush if batch size reached
165+
if (_pendingLogs.Count >= _batchSize)
166+
{
167+
_ = Task.Run(() => FlushAsync(CancellationToken.None));
168+
}
178169
}
179170
catch (Exception ex)
180171
{
@@ -213,8 +204,17 @@ public async Task FlushAsync(CancellationToken ct = default)
213204

214205
try
215206
{
216-
// Delegate to aggregator to flush all pending metrics
217-
await _metricsAggregator.FlushAsync(ct).ConfigureAwait(false);
207+
// Flush directly enqueued logs (V3 direct-object path)
208+
List<TelemetryFrontendLog> logsToFlush = new List<TelemetryFrontendLog>();
209+
while (_pendingLogs.TryDequeue(out TelemetryFrontendLog? log))
210+
{
211+
logsToFlush.Add(log);
212+
}
213+
214+
if (logsToFlush.Count > 0)
215+
{
216+
await _effectiveExporter.ExportAsync(logsToFlush, ct).ConfigureAwait(false);
217+
}
218218

219219
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.flush_completed"));
220220
}
@@ -241,10 +241,8 @@ public async Task FlushAsync(CancellationToken ct = default)
241241
/// </para>
242242
/// <para>
243243
/// The close operation performs the following steps:
244-
/// 1. Stop the activity listener (no more activities will be processed)
245-
/// 2. Flush all remaining queued metrics from the aggregator
246-
/// 3. Cancel any pending background tasks
247-
/// 4. Dispose all resources (listener, aggregator, cancellation token source)
244+
/// 1. Cancel any pending background tasks
245+
/// 2. Dispose all resources (cancellation token source)
248246
/// </para>
249247
/// <para>
250248
/// This method is idempotent - calling it multiple times is safe and has no effect
@@ -255,11 +253,11 @@ public async Task FlushAsync(CancellationToken ct = default)
255253
/// logged internally.
256254
/// </para>
257255
/// </remarks>
258-
public async Task CloseAsync()
256+
public Task CloseAsync()
259257
{
260258
if (_disposed)
261259
{
262-
return;
260+
return Task.CompletedTask;
263261
}
264262

265263
_disposed = true;
@@ -268,37 +266,7 @@ public async Task CloseAsync()
268266
{
269267
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.closing"));
270268

271-
// Step 1: Stop the activity listener (no more activities will be captured)
272-
try
273-
{
274-
await _activityListener.StopAsync(_cts.Token).ConfigureAwait(false);
275-
}
276-
catch (Exception ex)
277-
{
278-
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.listener_stop_error",
279-
tags: new ActivityTagsCollection
280-
{
281-
{ "error.message", ex.Message },
282-
{ "error.type", ex.GetType().Name }
283-
}));
284-
}
285-
286-
// Step 2: Flush all remaining metrics from the aggregator
287-
try
288-
{
289-
await _metricsAggregator.FlushAsync(CancellationToken.None).ConfigureAwait(false);
290-
}
291-
catch (Exception ex)
292-
{
293-
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.final_flush_error",
294-
tags: new ActivityTagsCollection
295-
{
296-
{ "error.message", ex.Message },
297-
{ "error.type", ex.GetType().Name }
298-
}));
299-
}
300-
301-
// Step 3: Cancel any pending background tasks
269+
// Cancel any pending background tasks
302270
try
303271
{
304272
_cts.Cancel();
@@ -313,21 +281,6 @@ public async Task CloseAsync()
313281
}));
314282
}
315283

316-
// Step 4: Dispose all resources
317-
try
318-
{
319-
await _metricsAggregator.DisposeAsync().ConfigureAwait(false);
320-
}
321-
catch (Exception ex)
322-
{
323-
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.aggregator_dispose_error",
324-
tags: new ActivityTagsCollection
325-
{
326-
{ "error.message", ex.Message },
327-
{ "error.type", ex.GetType().Name }
328-
}));
329-
}
330-
331284
Activity.Current?.AddEvent(new ActivityEvent("telemetry.client.closed"));
332285
}
333286
catch (Exception ex)
@@ -352,6 +305,8 @@ public async Task CloseAsync()
352305
// Swallow CTS dispose exceptions
353306
}
354307
}
308+
309+
return Task.CompletedTask;
355310
}
356311

357312
/// <summary>

0 commit comments

Comments
 (0)