From 08a0b3c737180150c9881917b29848b006fbc792 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Fri, 27 Mar 2026 11:21:18 +0530 Subject: [PATCH 1/6] fix(csharp): add error resilience to heartbeat poller to prevent silent death The PollOperationStatus method had no try-catch, so any transient exception (e.g. ObjectDisposedException from TLS connection recycling on Mono) would kill the heartbeat poller silently. Without heartbeats, the server-side commandInactivityTimeout (20 min) expires and terminates the query, causing CloudFetch failures in Power BI. This wraps the polling logic in a try-catch that logs errors via Activity telemetry and continues polling. Part of ES-1778880. Co-authored-by: Isaac --- .../Reader/DatabricksOperationStatusPoller.cs | 59 ++++++++++++++----- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 47f1ceed..22cb22ed 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -84,28 +84,59 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { - TOperationHandle? operationHandle = _response.OperationHandle; - if (operationHandle == null) break; + try + { + TOperationHandle? operationHandle = _response.OperationHandle; + if (operationHandle == null) break; - CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds); + CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds); - var request = new TGetOperationStatusReq(operationHandle); - var response = await _statement.Client.GetOperationStatus(request, GetOperationStatusTimeoutToken); + var request = new TGetOperationStatusReq(operationHandle); + var response = await _statement.Client.GetOperationStatus(request, GetOperationStatusTimeoutToken); - // Track poll count for telemetry - _pollCount++; + // Track poll count for telemetry + _pollCount++; - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); - // end the heartbeat if the command has terminated - if (response.OperationState == TOperationState.CANCELED_STATE || - response.OperationState == TOperationState.ERROR_STATE || - response.OperationState == TOperationState.CLOSED_STATE || - response.OperationState == TOperationState.TIMEDOUT_STATE || - response.OperationState == TOperationState.UKNOWN_STATE) + // end the heartbeat if the command has terminated + if (response.OperationState == TOperationState.CANCELED_STATE || + response.OperationState == TOperationState.ERROR_STATE || + response.OperationState == TOperationState.CLOSED_STATE || + response.OperationState == TOperationState.TIMEDOUT_STATE || + response.OperationState == TOperationState.UKNOWN_STATE) + { + break; + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { + // Cancellation was requested - exit the polling loop gracefully break; } + catch (Exception ex) + { + // Log the error but continue polling. Transient errors (e.g. ObjectDisposedException + // from TLS connection recycling) should not kill the heartbeat poller, as that would + // cause the server-side command inactivity timeout to expire and terminate the query. + Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error", + tags: new ActivityTagsCollection + { + { "error.type", ex.GetType().Name }, + { "error.message", ex.Message }, + { "poll_count", _pollCount } + })); + + // Wait before retrying to avoid tight error loops + try + { + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); + } + catch (OperationCanceledException) + { + break; + } + } } // Add telemetry tags to current activity when polling completes From 52408c3271bd29a8877c5ca5e57f1780e5fec08f Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 28 Mar 2026 00:09:08 +0530 Subject: [PATCH 2/6] =?UTF-8?q?fix(csharp):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20deduplicate=20Task.Delay,=20remove=20nested=20try-c?= =?UTF-8?q?atch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move Task.Delay to after the try-catch block (shared by success and error paths) - Remove nested try-catch inside the error handler - On cancellation during delay, OperationCanceledException propagates to Dispose() which handles it Co-authored-by: Isaac --- .../Reader/DatabricksOperationStatusPoller.cs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 22cb22ed..4f28991d 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -97,8 +97,6 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) // Track poll count for telemetry _pollCount++; - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); - // end the heartbeat if the command has terminated if (response.OperationState == TOperationState.CANCELED_STATE || response.OperationState == TOperationState.ERROR_STATE || @@ -126,17 +124,12 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) { "error.message", ex.Message }, { "poll_count", _pollCount } })); - - // Wait before retrying to avoid tight error loops - try - { - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); - } - catch (OperationCanceledException) - { - break; - } } + + // Wait before next poll — shared by both success and error paths. + // On cancellation this throws OperationCanceledException which propagates + // up to the caller (Dispose catches it). + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); } // Add telemetry tags to current activity when polling completes From 4229f55a1fabf58d62557cc82b6b63e632204d48 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 28 Mar 2026 12:29:31 +0530 Subject: [PATCH 3/6] test(csharp): update poller test to expect resilience on exception The heartbeat poller now continues polling through transient exceptions instead of stopping. Update the test to assert this new behavior. Co-authored-by: Isaac --- .../Unit/DatabricksOperationStatusPollerTests.cs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs b/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs index cf79ad3e..898a14d0 100644 --- a/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs +++ b/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs @@ -172,10 +172,10 @@ public async Task ContinuesPollingOnFinishedState() } [Fact] - public async Task StopsPollingOnException() + public async Task ContinuesPollingOnException() { // Arrange - var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds); + using var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds); var pollCount = 0; _mockClient.Setup(c => c.GetOperationStatus(It.IsAny(), It.IsAny())) .ThrowsAsync(new Exception("Test exception")) @@ -186,15 +186,8 @@ public async Task StopsPollingOnException() await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds * 3)); // Wait longer than heartbeat interval // Assert - // Should stop polling after the exception - Assert.Equal(1, pollCount); - try - { - poller.Dispose(); - } - catch (Exception) - { - } + // Should continue polling despite the exception (transient error resilience) + Assert.True(pollCount > 1, $"Expected multiple polls despite exceptions but got {pollCount}"); } [Fact] From af73052d9adb570c48431cebae0152cf7b66480b Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 28 Mar 2026 13:22:56 +0530 Subject: [PATCH 4/6] fix(csharp): re-add error resilience with finally-based delay The previous commit accidentally removed the try-catch. Re-add error resilience using a finally block for Task.Delay per reviewer feedback, eliminating duplication and nested try-catch. Co-authored-by: Isaac --- csharp/src/Reader/DatabricksOperationStatusPoller.cs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 4f28991d..62072b95 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -125,11 +125,10 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) { "poll_count", _pollCount } })); } - - // Wait before next poll — shared by both success and error paths. - // On cancellation this throws OperationCanceledException which propagates - // up to the caller (Dispose catches it). - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); + finally + { + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); + } } // Add telemetry tags to current activity when polling completes From e35d6976f5f04d90640be870107736718eee5751 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 28 Mar 2026 13:39:07 +0530 Subject: [PATCH 5/6] fix(csharp): add max consecutive failure limit and fix delay placement - Add MaxConsecutiveFailures=10 (~10min at 60s interval) so persistent errors (auth expired, server gone) don't cause infinite polling - Move Task.Delay out of finally block to avoid unnecessary delay on break paths (terminal state, cancellation, null handle) - Reset failure counter on successful poll so transient blips recover Co-authored-by: Isaac --- .../Reader/DatabricksOperationStatusPoller.cs | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 62072b95..6a7ff352 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -49,6 +49,11 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller // Telemetry tracking private int _pollCount = 0; + // Maximum number of consecutive poll failures before giving up. + // At the default 60s heartbeat interval this allows ~10 minutes of transient errors + // before the poller stops itself. + private const int MaxConsecutiveFailures = 10; + public DatabricksOperationStatusPoller( IHiveServer2Statement statement, IResponse response, @@ -82,6 +87,8 @@ public void Start(CancellationToken externalToken = default) private async Task PollOperationStatus(CancellationToken cancellationToken) { + int consecutiveFailures = 0; + while (!cancellationToken.IsCancellationRequested) { try @@ -94,6 +101,9 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) var request = new TGetOperationStatusReq(operationHandle); var response = await _statement.Client.GetOperationStatus(request, GetOperationStatusTimeoutToken); + // Successful poll — reset failure counter + consecutiveFailures = 0; + // Track poll count for telemetry _pollCount++; @@ -114,6 +124,8 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) } catch (Exception ex) { + consecutiveFailures++; + // Log the error but continue polling. Transient errors (e.g. ObjectDisposedException // from TLS connection recycling) should not kill the heartbeat poller, as that would // cause the server-side command inactivity timeout to expire and terminate the query. @@ -122,13 +134,25 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) { { "error.type", ex.GetType().Name }, { "error.message", ex.Message }, - { "poll_count", _pollCount } + { "poll_count", _pollCount }, + { "consecutive_failures", consecutiveFailures } })); + + if (consecutiveFailures >= MaxConsecutiveFailures) + { + Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached", + tags: new ActivityTagsCollection + { + { "consecutive_failures", consecutiveFailures }, + { "poll_count", _pollCount } + })); + break; + } } - finally - { - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); - } + + // Wait before next poll. On cancellation this throws OperationCanceledException + // which propagates up to the caller (Dispose catches it). + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); } // Add telemetry tags to current activity when polling completes From f71a3b7a151df2abc731639c18c1528c9d17b7a9 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sun, 29 Mar 2026 20:55:18 +0530 Subject: [PATCH 6/6] =?UTF-8?q?fix(csharp):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20link=20timeout=20token,=20fix=20naming,=20add=20Con?= =?UTF-8?q?figureAwait?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Link poller cancellation token with request timeout token so Stop()/Dispose() immediately aborts in-flight GetOperationStatus calls (Copilot feedback) - Rename GetOperationStatusTimeoutToken to camelCase (Copilot feedback) - Use explicit types for TGetOperationStatusReq/Resp (Copilot feedback) - Add .ConfigureAwait(false) to all awaits (Copilot feedback) Co-authored-by: Isaac --- csharp/src/Reader/DatabricksOperationStatusPoller.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 6a7ff352..d5834c11 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -96,10 +96,11 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) TOperationHandle? operationHandle = _response.OperationHandle; if (operationHandle == null) break; - CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds); + using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(TimeSpan.FromSeconds(_requestTimeoutSeconds)); - var request = new TGetOperationStatusReq(operationHandle); - var response = await _statement.Client.GetOperationStatus(request, GetOperationStatusTimeoutToken); + TGetOperationStatusReq request = new TGetOperationStatusReq(operationHandle); + TGetOperationStatusResp response = await _statement.Client.GetOperationStatus(request, timeoutCts.Token).ConfigureAwait(false); // Successful poll — reset failure counter consecutiveFailures = 0; @@ -152,7 +153,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) // Wait before next poll. On cancellation this throws OperationCanceledException // which propagates up to the caller (Dispose catches it). - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false); } // Add telemetry tags to current activity when polling completes