Skip to content

Commit b2c89a0

Browse files
author
Jade Wang
committed
Implement CircuitBreakerTelemetryExporter wrapper\n\nTask ID: task-3.2-circuit-breaker-telemetry-exporter
1 parent aaf9f8d commit b2c89a0

File tree

2 files changed

+796
-0
lines changed

2 files changed

+796
-0
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Copyright (c) 2025 ADBC Drivers Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Diagnostics;
20+
using System.Threading;
21+
using System.Threading.Tasks;
22+
using AdbcDrivers.Databricks.Telemetry.Models;
23+
using Polly.CircuitBreaker;
24+
25+
namespace AdbcDrivers.Databricks.Telemetry
26+
{
27+
/// <summary>
28+
/// Wraps ITelemetryExporter with circuit breaker protection to prevent wasting resources
29+
/// on failing telemetry endpoints.
30+
/// </summary>
31+
/// <remarks>
32+
/// <para>
33+
/// This exporter protects against failing telemetry endpoints (5xx errors, timeouts, network failures).
34+
/// When the circuit breaker detects too many failures, it opens the circuit and all subsequent
35+
/// telemetry events are silently dropped (logged at DEBUG level) until the endpoint recovers.
36+
/// </para>
37+
/// <para>
38+
/// Key Behaviors:
39+
/// - Circuit Closed: Events pass through to inner exporter. Failures are tracked.
40+
/// - Circuit Open: Events are silently dropped (returns true, logs at DEBUG level).
41+
/// - Circuit HalfOpen: Test requests are allowed through to check for recovery.
42+
/// - Per-host isolation: Each host gets its own circuit breaker via CircuitBreakerManager.
43+
/// </para>
44+
/// <para>
45+
/// Thread Safety: This class is thread-safe and can be called concurrently from multiple contexts.
46+
/// </para>
47+
/// </remarks>
48+
internal sealed class CircuitBreakerTelemetryExporter : ITelemetryExporter
49+
{
50+
/// <summary>
51+
/// Activity source for circuit breaker telemetry tracing.
52+
/// </summary>
53+
private static readonly ActivitySource s_activitySource = new ActivitySource("AdbcDrivers.Databricks.CircuitBreakerTelemetryExporter");
54+
55+
private readonly ITelemetryExporter _innerExporter;
56+
private readonly CircuitBreaker _circuitBreaker;
57+
private readonly string _host;
58+
59+
/// <summary>
60+
/// Gets the host for this exporter.
61+
/// </summary>
62+
internal string Host => _host;
63+
64+
/// <summary>
65+
/// Gets the current state of the circuit breaker.
66+
/// </summary>
67+
internal CircuitBreakerState State => _circuitBreaker.State;
68+
69+
/// <summary>
70+
/// Creates a new CircuitBreakerTelemetryExporter.
71+
/// </summary>
72+
/// <param name="innerExporter">The inner telemetry exporter to wrap with circuit breaker protection.</param>
73+
/// <param name="host">The host identifier for per-host circuit breaker isolation.</param>
74+
/// <exception cref="ArgumentNullException">Thrown when innerExporter is null.</exception>
75+
/// <exception cref="ArgumentException">Thrown when host is null, empty, or whitespace.</exception>
76+
public CircuitBreakerTelemetryExporter(ITelemetryExporter innerExporter, string host)
77+
{
78+
_innerExporter = innerExporter ?? throw new ArgumentNullException(nameof(innerExporter));
79+
80+
if (string.IsNullOrWhiteSpace(host))
81+
{
82+
throw new ArgumentException("Host cannot be null or whitespace.", nameof(host));
83+
}
84+
85+
_host = host;
86+
_circuitBreaker = CircuitBreakerManager.GetInstance().GetCircuitBreaker(host);
87+
}
88+
89+
/// <summary>
90+
/// Export telemetry frontend logs with circuit breaker protection.
91+
/// </summary>
92+
/// <param name="logs">The list of telemetry frontend logs to export.</param>
93+
/// <param name="ct">Cancellation token.</param>
94+
/// <returns>
95+
/// True if the export succeeded or was silently dropped (circuit open).
96+
/// False if the export failed and was tracked by the circuit breaker.
97+
/// Returns true for empty/null logs since there's nothing to export.
98+
/// </returns>
99+
/// <remarks>
100+
/// <para>
101+
/// This method never throws exceptions. All errors are caught and traced.
102+
/// </para>
103+
/// <para>
104+
/// When the circuit is open, events are silently dropped and logged at DEBUG level.
105+
/// This prevents wasting resources on a failing endpoint while waiting for recovery.
106+
/// </para>
107+
/// <para>
108+
/// When the circuit is closed, events are passed through to the inner exporter.
109+
/// If the inner exporter fails, the failure is tracked by the circuit breaker.
110+
/// </para>
111+
/// </remarks>
112+
public async Task<bool> ExportAsync(IReadOnlyList<TelemetryFrontendLog> logs, CancellationToken ct = default)
113+
{
114+
if (logs == null || logs.Count == 0)
115+
{
116+
return true;
117+
}
118+
119+
// Check circuit state before attempting export
120+
if (_circuitBreaker.State == CircuitBreakerState.Open)
121+
{
122+
// Circuit is open - silently drop events (log at DEBUG level)
123+
Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_open",
124+
tags: new ActivityTagsCollection
125+
{
126+
{ "host", _host },
127+
{ "log_count", logs.Count },
128+
{ "action", "dropped" }
129+
}));
130+
131+
// Return true because dropping is not a failure - it's expected behavior
132+
return true;
133+
}
134+
135+
try
136+
{
137+
// Execute through circuit breaker
138+
// The circuit breaker will track failures and open if threshold is reached
139+
bool result = await _circuitBreaker.ExecuteAsync(async () =>
140+
{
141+
bool success = await _innerExporter.ExportAsync(logs, ct).ConfigureAwait(false);
142+
143+
// If inner exporter returns false, it means the export failed
144+
// We need to throw an exception so the circuit breaker can track the failure
145+
if (!success)
146+
{
147+
throw new TelemetryExportException("Inner exporter returned false indicating export failure");
148+
}
149+
150+
return success;
151+
}).ConfigureAwait(false);
152+
153+
return result;
154+
}
155+
catch (BrokenCircuitException)
156+
{
157+
// Circuit just opened - log and silently drop
158+
Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_opened",
159+
tags: new ActivityTagsCollection
160+
{
161+
{ "host", _host },
162+
{ "log_count", logs.Count },
163+
{ "action", "dropped" }
164+
}));
165+
166+
// Return true because dropping is expected behavior when circuit opens
167+
return true;
168+
}
169+
catch (OperationCanceledException)
170+
{
171+
// Don't swallow cancellation - let it propagate
172+
throw;
173+
}
174+
catch (Exception ex)
175+
{
176+
// All other exceptions are swallowed per telemetry requirement
177+
// These are already tracked by the circuit breaker
178+
Activity.Current?.AddEvent(new ActivityEvent("telemetry.export.circuit_breaker_error",
179+
tags: new ActivityTagsCollection
180+
{
181+
{ "host", _host },
182+
{ "error.message", ex.Message },
183+
{ "error.type", ex.GetType().Name },
184+
{ "circuit_state", _circuitBreaker.State.ToString() }
185+
}));
186+
187+
return false;
188+
}
189+
}
190+
}
191+
192+
/// <summary>
193+
/// Exception thrown when telemetry export fails.
194+
/// Used internally by CircuitBreakerTelemetryExporter to signal failures to the circuit breaker.
195+
/// </summary>
196+
internal sealed class TelemetryExportException : Exception
197+
{
198+
/// <summary>
199+
/// Creates a new TelemetryExportException.
200+
/// </summary>
201+
/// <param name="message">The error message.</param>
202+
public TelemetryExportException(string message) : base(message)
203+
{
204+
}
205+
206+
/// <summary>
207+
/// Creates a new TelemetryExportException.
208+
/// </summary>
209+
/// <param name="message">The error message.</param>
210+
/// <param name="innerException">The inner exception.</param>
211+
public TelemetryExportException(string message, Exception innerException) : base(message, innerException)
212+
{
213+
}
214+
}
215+
}

0 commit comments

Comments
 (0)