Skip to content

Commit 51f461f

Browse files
author
Jade Wang
committed
Implement TelemetryClientManager with reference counting\n\nTask ID: task-2.2-telemetry-client-manager
1 parent 3c69f70 commit 51f461f

File tree

4 files changed

+737
-0
lines changed

4 files changed

+737
-0
lines changed
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Concurrent;
19+
using System.Collections.Generic;
20+
using System.Diagnostics;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using AdbcDrivers.Databricks.Telemetry.Models;
24+
25+
namespace AdbcDrivers.Databricks.Telemetry
26+
{
27+
/// <summary>
28+
/// Default implementation of ITelemetryClient that batches events from multiple connections
29+
/// and exports via HTTP on a timer or when batch size is reached.
30+
/// </summary>
31+
/// <remarks>
32+
/// <para>
33+
/// One instance is shared per host via TelemetryClientManager to prevent rate limiting
34+
/// by consolidating telemetry traffic from concurrent connections to the same host.
35+
/// </para>
36+
/// <para>
37+
/// Thread Safety: All methods are thread-safe and can be called concurrently from
38+
/// multiple connections. Enqueue() is non-blocking. FlushAsync() and CloseAsync()
39+
/// use internal synchronization to coordinate with ongoing flush operations.
40+
/// </para>
41+
/// </remarks>
42+
internal sealed class TelemetryClient : ITelemetryClient
43+
{
44+
private readonly ConcurrentQueue<TelemetryFrontendLog> _queue = new ConcurrentQueue<TelemetryFrontendLog>();
45+
private readonly ITelemetryExporter _exporter;
46+
private readonly TelemetryConfiguration _config;
47+
private readonly Timer _flushTimer;
48+
private readonly SemaphoreSlim _flushLock = new SemaphoreSlim(1, 1);
49+
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
50+
private volatile bool _disposed;
51+
52+
/// <summary>
53+
/// Creates a new TelemetryClient with the specified exporter and configuration.
54+
/// </summary>
55+
/// <param name="exporter">The telemetry exporter to use for sending events.</param>
56+
/// <param name="config">The telemetry configuration.</param>
57+
public TelemetryClient(
58+
ITelemetryExporter exporter,
59+
TelemetryConfiguration config)
60+
{
61+
_exporter = exporter;
62+
_config = config;
63+
64+
// Start periodic flush timer (default: every 5 seconds)
65+
_flushTimer = new Timer(
66+
OnFlushTimer,
67+
null,
68+
_config.FlushIntervalMs,
69+
_config.FlushIntervalMs);
70+
}
71+
72+
/// <summary>
73+
/// Queue a telemetry event for batched export. Thread-safe, non-blocking.
74+
/// </summary>
75+
/// <param name="log">The telemetry frontend log to enqueue.</param>
76+
/// <remarks>
77+
/// <para>
78+
/// Events are batched and flushed periodically (configurable flush interval) or when
79+
/// the batch size limit is reached. This method returns immediately after adding the
80+
/// event to an internal queue.
81+
/// </para>
82+
/// <para>
83+
/// If the client is closed or disposing, the event is silently dropped.
84+
/// </para>
85+
/// </remarks>
86+
public void Enqueue(TelemetryFrontendLog log)
87+
{
88+
if (_disposed) return;
89+
90+
_queue.Enqueue(log);
91+
92+
// Trigger flush if batch size reached
93+
if (_queue.Count >= _config.BatchSize)
94+
{
95+
_ = FlushAsync(); // Fire-and-forget, errors swallowed
96+
}
97+
}
98+
99+
/// <summary>
100+
/// Force flush all pending events immediately.
101+
/// </summary>
102+
/// <param name="ct">Cancellation token to cancel the flush operation.</param>
103+
/// <returns>A task that completes when all pending events have been flushed.</returns>
104+
/// <remarks>
105+
/// <para>
106+
/// This method blocks until all queued events are exported to the backend service.
107+
/// It is called when a connection closes to ensure no events are lost.
108+
/// </para>
109+
/// <para>
110+
/// If a flush is already in progress, this method waits for it to complete rather
111+
/// than starting a new flush operation.
112+
/// </para>
113+
/// <para>
114+
/// This method never throws exceptions related to telemetry failures. Export errors
115+
/// are caught and logged internally to ensure telemetry operations never impact
116+
/// driver functionality.
117+
/// </para>
118+
/// </remarks>
119+
public async Task FlushAsync(CancellationToken ct = default)
120+
{
121+
if (_disposed) return;
122+
123+
// Prevent concurrent flushes
124+
if (!await _flushLock.WaitAsync(0, ct).ConfigureAwait(false))
125+
return;
126+
127+
try
128+
{
129+
List<TelemetryFrontendLog> batch = new List<TelemetryFrontendLog>();
130+
131+
// Drain queue up to batch size
132+
while (batch.Count < _config.BatchSize && _queue.TryDequeue(out TelemetryFrontendLog? log))
133+
{
134+
batch.Add(log);
135+
}
136+
137+
if (batch.Count > 0)
138+
{
139+
// Export via circuit breaker → exporter → HTTP
140+
await _exporter.ExportAsync(batch, ct).ConfigureAwait(false);
141+
}
142+
}
143+
catch (Exception ex)
144+
{
145+
// Swallow all exceptions per telemetry requirement
146+
Debug.WriteLine($"[TRACE] TelemetryClient flush error: {ex.Message}");
147+
}
148+
finally
149+
{
150+
_flushLock.Release();
151+
}
152+
}
153+
154+
/// <summary>
155+
/// Gracefully close the client. Flushes all pending events before disposing resources.
156+
/// </summary>
157+
/// <returns>A task that completes when the client is fully closed.</returns>
158+
/// <remarks>
159+
/// <para>
160+
/// This method is called by TelemetryClientManager when the reference count for this
161+
/// host reaches zero (i.e., the last connection to this host has been closed).
162+
/// </para>
163+
/// <para>
164+
/// The close operation performs the following steps:
165+
/// 1. Cancel any pending background flush timers
166+
/// 2. Flush all remaining queued events
167+
/// 3. Dispose internal resources (timers, semaphores, etc.)
168+
/// </para>
169+
/// <para>
170+
/// This method is idempotent - calling it multiple times is safe and has no effect
171+
/// after the first call completes.
172+
/// </para>
173+
/// <para>
174+
/// This method never throws exceptions. All errors during close are caught and
175+
/// logged internally.
176+
/// </para>
177+
/// </remarks>
178+
public async Task CloseAsync()
179+
{
180+
if (_disposed) return;
181+
_disposed = true;
182+
183+
try
184+
{
185+
// Stop timer
186+
_flushTimer.Dispose();
187+
188+
// Cancel any pending operations
189+
_cts.Cancel();
190+
191+
// Final flush of remaining events
192+
await FlushAsync().ConfigureAwait(false);
193+
}
194+
catch (Exception ex)
195+
{
196+
Debug.WriteLine($"[TRACE] TelemetryClient close error: {ex.Message}");
197+
}
198+
finally
199+
{
200+
_cts.Dispose();
201+
_flushLock.Dispose();
202+
}
203+
}
204+
205+
/// <summary>
206+
/// Dispose the telemetry client asynchronously.
207+
/// </summary>
208+
/// <returns>A ValueTask that completes when the client is disposed.</returns>
209+
public async ValueTask DisposeAsync() => await CloseAsync().ConfigureAwait(false);
210+
211+
/// <summary>
212+
/// Timer callback for periodic flush operations.
213+
/// </summary>
214+
/// <param name="state">Unused state object.</param>
215+
private void OnFlushTimer(object? state)
216+
{
217+
if (_disposed) return;
218+
_ = FlushAsync(_cts.Token);
219+
}
220+
}
221+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
namespace AdbcDrivers.Databricks.Telemetry
18+
{
19+
/// <summary>
20+
/// Holds a telemetry client and its reference count.
21+
/// Used by TelemetryClientManager to track how many connections are using a client.
22+
/// </summary>
23+
/// <remarks>
24+
/// Thread Safety: The _refCount field is accessed via Interlocked operations to ensure
25+
/// thread-safe increment and decrement operations from concurrent connections.
26+
/// </remarks>
27+
internal sealed class TelemetryClientHolder
28+
{
29+
/// <summary>
30+
/// Reference count tracking the number of connections using this client.
31+
/// Must be accessed via Interlocked operations for thread safety.
32+
/// </summary>
33+
internal int _refCount = 1;
34+
35+
/// <summary>
36+
/// Gets the telemetry client instance.
37+
/// </summary>
38+
public ITelemetryClient Client { get; }
39+
40+
/// <summary>
41+
/// Creates a new TelemetryClientHolder with the specified client and initial ref count of 1.
42+
/// </summary>
43+
/// <param name="client">The telemetry client to hold.</param>
44+
public TelemetryClientHolder(ITelemetryClient client)
45+
{
46+
Client = client;
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)