Skip to content

Commit 87e5223

Browse files
authored
fix(csharp): add error resilience to heartbeat poller (#372)
## Summary - Wraps the `PollOperationStatus` polling loop in a try-catch so that transient exceptions (e.g. `ObjectDisposedException` from TLS connection recycling) no longer kill the heartbeat poller silently - Adds a **max consecutive failure limit** (`MaxConsecutiveFailures = 10`) so persistent errors (auth expired, server gone) don't cause infinite polling — at the default 60s heartbeat interval this gives ~10 minutes of tolerance before the poller stops itself - A single successful poll resets the failure counter, so intermittent transient errors are handled gracefully - Logs errors via `Activity.Current?.AddEvent()` telemetry with error type, message, poll count, and consecutive failure count - Properly handles `OperationCanceledException` from the cancellation token to still allow graceful shutdown - Updates the `StopsPollingOnException` test to `ContinuesPollingOnException` to match the new resilient behavior ## Context Without this fix, a single transient network error permanently stops the heartbeat poller. The server-side `commandInactivityTimeout` (default 20 minutes) then expires because no `GetOperationStatus` calls refresh it, causing the server to terminate the query. This manifests as CloudFetch failures in Power BI (ES-1778880). ## Design decisions - **Why not `finally` for `Task.Delay`?** A `finally` block runs even on `break`, which would add an unnecessary 60s delay on every clean exit path (terminal state, cancellation, null handle). Placing the delay after the try-catch means it only executes when the loop continues. - **Why 10 max failures?** At 60s intervals, 10 failures = ~10 minutes — enough to ride out transient network issues but not so long that a permanently broken connection wastes resources indefinitely. - **Request timeouts are treated as transient errors.** The per-request `GetOperationStatusTimeoutToken` throws `OperationCanceledException` but the cancellation filter (`when cancellationToken.IsCancellationRequested`) correctly routes it to the general catch since the main token isn't cancelled. ## Test plan - [x] Verify build succeeds (confirmed locally, 0 warnings, 0 errors) - [x] Update `StopsPollingOnException` → `ContinuesPollingOnException` to assert `pollCount > 1` - [ ] Verify existing unit tests pass - [ ] Manual validation: inject a transient exception during polling and confirm the poller recovers and continues heartbeating - [ ] Verify cancellation still stops the poller gracefully - [ ] Verify persistent errors stop the poller after ~10 consecutive failures This pull request was AI-assisted by Isaac.
1 parent adf4853 commit 87e5223

File tree

2 files changed

+71
-30
lines changed

2 files changed

+71
-30
lines changed

csharp/src/Reader/DatabricksOperationStatusPoller.cs

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller
4949
// Telemetry tracking
5050
private int _pollCount = 0;
5151

52+
// Maximum number of consecutive poll failures before giving up.
53+
// At the default 60s heartbeat interval this allows ~10 minutes of transient errors
54+
// before the poller stops itself.
55+
private const int MaxConsecutiveFailures = 10;
56+
5257
public DatabricksOperationStatusPoller(
5358
IHiveServer2Statement statement,
5459
IResponse response,
@@ -82,30 +87,73 @@ public void Start(CancellationToken externalToken = default)
8287

8388
private async Task PollOperationStatus(CancellationToken cancellationToken)
8489
{
90+
int consecutiveFailures = 0;
91+
8592
while (!cancellationToken.IsCancellationRequested)
8693
{
87-
TOperationHandle? operationHandle = _response.OperationHandle;
88-
if (operationHandle == null) break;
89-
90-
CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);
91-
92-
var request = new TGetOperationStatusReq(operationHandle);
93-
var response = await _statement.Client.GetOperationStatus(request, GetOperationStatusTimeoutToken);
94-
95-
// Track poll count for telemetry
96-
_pollCount++;
97-
98-
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken);
99-
100-
// end the heartbeat if the command has terminated
101-
if (response.OperationState == TOperationState.CANCELED_STATE ||
102-
response.OperationState == TOperationState.ERROR_STATE ||
103-
response.OperationState == TOperationState.CLOSED_STATE ||
104-
response.OperationState == TOperationState.TIMEDOUT_STATE ||
105-
response.OperationState == TOperationState.UKNOWN_STATE)
94+
try
95+
{
96+
TOperationHandle? operationHandle = _response.OperationHandle;
97+
if (operationHandle == null) break;
98+
99+
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
100+
timeoutCts.CancelAfter(TimeSpan.FromSeconds(_requestTimeoutSeconds));
101+
102+
TGetOperationStatusReq request = new TGetOperationStatusReq(operationHandle);
103+
TGetOperationStatusResp response = await _statement.Client.GetOperationStatus(request, timeoutCts.Token).ConfigureAwait(false);
104+
105+
// Successful poll — reset failure counter
106+
consecutiveFailures = 0;
107+
108+
// Track poll count for telemetry
109+
_pollCount++;
110+
111+
// end the heartbeat if the command has terminated
112+
if (response.OperationState == TOperationState.CANCELED_STATE ||
113+
response.OperationState == TOperationState.ERROR_STATE ||
114+
response.OperationState == TOperationState.CLOSED_STATE ||
115+
response.OperationState == TOperationState.TIMEDOUT_STATE ||
116+
response.OperationState == TOperationState.UKNOWN_STATE)
117+
{
118+
break;
119+
}
120+
}
121+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
106122
{
123+
// Cancellation was requested - exit the polling loop gracefully
107124
break;
108125
}
126+
catch (Exception ex)
127+
{
128+
consecutiveFailures++;
129+
130+
// Log the error but continue polling. Transient errors (e.g. ObjectDisposedException
131+
// from TLS connection recycling) should not kill the heartbeat poller, as that would
132+
// cause the server-side command inactivity timeout to expire and terminate the query.
133+
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error",
134+
tags: new ActivityTagsCollection
135+
{
136+
{ "error.type", ex.GetType().Name },
137+
{ "error.message", ex.Message },
138+
{ "poll_count", _pollCount },
139+
{ "consecutive_failures", consecutiveFailures }
140+
}));
141+
142+
if (consecutiveFailures >= MaxConsecutiveFailures)
143+
{
144+
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached",
145+
tags: new ActivityTagsCollection
146+
{
147+
{ "consecutive_failures", consecutiveFailures },
148+
{ "poll_count", _pollCount }
149+
}));
150+
break;
151+
}
152+
}
153+
154+
// Wait before next poll. On cancellation this throws OperationCanceledException
155+
// which propagates up to the caller (Dispose catches it).
156+
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false);
109157
}
110158

111159
// Add telemetry tags to current activity when polling completes

csharp/test/Unit/DatabricksOperationStatusPollerTests.cs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,10 @@ public async Task ContinuesPollingOnFinishedState()
172172
}
173173

174174
[Fact]
175-
public async Task StopsPollingOnException()
175+
public async Task ContinuesPollingOnException()
176176
{
177177
// Arrange
178-
var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds);
178+
using var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds);
179179
var pollCount = 0;
180180
_mockClient.Setup(c => c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), It.IsAny<CancellationToken>()))
181181
.ThrowsAsync(new Exception("Test exception"))
@@ -186,15 +186,8 @@ public async Task StopsPollingOnException()
186186
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds * 3)); // Wait longer than heartbeat interval
187187

188188
// Assert
189-
// Should stop polling after the exception
190-
Assert.Equal(1, pollCount);
191-
try
192-
{
193-
poller.Dispose();
194-
}
195-
catch (Exception)
196-
{
197-
}
189+
// Should continue polling despite the exception (transient error resilience)
190+
Assert.True(pollCount > 1, $"Expected multiple polls despite exceptions but got {pollCount}");
198191
}
199192

200193
[Fact]

0 commit comments

Comments
 (0)