diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 47f1ceed..d5834c11 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -49,6 +49,11 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller // Telemetry tracking private int _pollCount = 0; + // Maximum number of consecutive poll failures before giving up. + // At the default 60s heartbeat interval this allows ~10 minutes of transient errors + // before the poller stops itself. + private const int MaxConsecutiveFailures = 10; + public DatabricksOperationStatusPoller( IHiveServer2Statement statement, IResponse response, @@ -82,30 +87,73 @@ public void Start(CancellationToken externalToken = default) private async Task PollOperationStatus(CancellationToken cancellationToken) { + int consecutiveFailures = 0; + while (!cancellationToken.IsCancellationRequested) { - TOperationHandle? operationHandle = _response.OperationHandle; - if (operationHandle == null) break; - - CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds); - - var request = new TGetOperationStatusReq(operationHandle); - var response = await _statement.Client.GetOperationStatus(request, GetOperationStatusTimeoutToken); - - // Track poll count for telemetry - _pollCount++; - - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken); - - // end the heartbeat if the command has terminated - if (response.OperationState == TOperationState.CANCELED_STATE || - response.OperationState == TOperationState.ERROR_STATE || - response.OperationState == TOperationState.CLOSED_STATE || - response.OperationState == TOperationState.TIMEDOUT_STATE || - response.OperationState == TOperationState.UKNOWN_STATE) + try + { + TOperationHandle? operationHandle = _response.OperationHandle; + if (operationHandle == null) break; + + using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(TimeSpan.FromSeconds(_requestTimeoutSeconds)); + + TGetOperationStatusReq request = new TGetOperationStatusReq(operationHandle); + TGetOperationStatusResp response = await _statement.Client.GetOperationStatus(request, timeoutCts.Token).ConfigureAwait(false); + + // Successful poll — reset failure counter + consecutiveFailures = 0; + + // Track poll count for telemetry + _pollCount++; + + // end the heartbeat if the command has terminated + if (response.OperationState == TOperationState.CANCELED_STATE || + response.OperationState == TOperationState.ERROR_STATE || + response.OperationState == TOperationState.CLOSED_STATE || + response.OperationState == TOperationState.TIMEDOUT_STATE || + response.OperationState == TOperationState.UKNOWN_STATE) + { + break; + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { + // Cancellation was requested - exit the polling loop gracefully break; } + catch (Exception ex) + { + consecutiveFailures++; + + // Log the error but continue polling. Transient errors (e.g. ObjectDisposedException + // from TLS connection recycling) should not kill the heartbeat poller, as that would + // cause the server-side command inactivity timeout to expire and terminate the query. + Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error", + tags: new ActivityTagsCollection + { + { "error.type", ex.GetType().Name }, + { "error.message", ex.Message }, + { "poll_count", _pollCount }, + { "consecutive_failures", consecutiveFailures } + })); + + if (consecutiveFailures >= MaxConsecutiveFailures) + { + Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached", + tags: new ActivityTagsCollection + { + { "consecutive_failures", consecutiveFailures }, + { "poll_count", _pollCount } + })); + break; + } + } + + // Wait before next poll. On cancellation this throws OperationCanceledException + // which propagates up to the caller (Dispose catches it). + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false); } // Add telemetry tags to current activity when polling completes diff --git a/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs b/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs index cf79ad3e..898a14d0 100644 --- a/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs +++ b/csharp/test/Unit/DatabricksOperationStatusPollerTests.cs @@ -172,10 +172,10 @@ public async Task ContinuesPollingOnFinishedState() } [Fact] - public async Task StopsPollingOnException() + public async Task ContinuesPollingOnException() { // Arrange - var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds); + using var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds); var pollCount = 0; _mockClient.Setup(c => c.GetOperationStatus(It.IsAny(), It.IsAny())) .ThrowsAsync(new Exception("Test exception")) @@ -186,15 +186,8 @@ public async Task StopsPollingOnException() await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds * 3)); // Wait longer than heartbeat interval // Assert - // Should stop polling after the exception - Assert.Equal(1, pollCount); - try - { - poller.Dispose(); - } - catch (Exception) - { - } + // Should continue polling despite the exception (transient error resilience) + Assert.True(pollCount > 1, $"Expected multiple polls despite exceptions but got {pollCount}"); } [Fact]