Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 67 additions & 19 deletions csharp/src/Reader/DatabricksOperationStatusPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller
// Telemetry tracking
private int _pollCount = 0;

// Maximum number of consecutive poll failures before giving up.
// At the default 60s heartbeat interval this allows ~10 minutes of transient errors
// before the poller stops itself.
private const int MaxConsecutiveFailures = 10;

Comment on lines +52 to +56
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxConsecutiveFailures changes runtime behavior (poller stops after N consecutive errors), but there’s no unit test asserting (1) polling stops after the threshold and (2) a single successful poll resets the failure counter. Adding coverage for those cases would prevent regressions in the resilience logic.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion — will add in a follow-up. The existing 8 poller tests all pass with the current changes.

public DatabricksOperationStatusPoller(
IHiveServer2Statement statement,
IResponse response,
Expand Down Expand Up @@ -82,30 +87,73 @@ public void Start(CancellationToken externalToken = default)

private async Task PollOperationStatus(CancellationToken cancellationToken)
{
int consecutiveFailures = 0;

while (!cancellationToken.IsCancellationRequested)
{
TOperationHandle? operationHandle = _response.OperationHandle;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is failing this test:
Failed AdbcDrivers.Databricks.Tests.Unit.DatabricksOperationStatusPollerTests.StopsPollingOnException

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to think more about this change, would it cause any unwanted poller keep running which prevent the connection from shutting down?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the logic to avoid this scenario

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed with MaxConsecutiveFailures = 10. After 10 consecutive errors (~10 min at 60s interval), the poller stops itself. Also linked the timeout token with the cancellation token so Stop()/Dispose() immediately aborts any in-flight RPC — no more blocking on hung network calls.

if (operationHandle == null) break;

CancellationToken GetOperationStatusTimeoutToken = ApacheUtility.GetCancellationToken(_requestTimeoutSeconds, ApacheUtility.TimeUnit.Seconds);

var request = new TGetOperationStatusReq(operationHandle);
var response = await _statement.Client.GetOperationStatus(request, GetOperationStatusTimeoutToken);

// Track poll count for telemetry
_pollCount++;

await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken);

// end the heartbeat if the command has terminated
if (response.OperationState == TOperationState.CANCELED_STATE ||
response.OperationState == TOperationState.ERROR_STATE ||
response.OperationState == TOperationState.CLOSED_STATE ||
response.OperationState == TOperationState.TIMEDOUT_STATE ||
response.OperationState == TOperationState.UKNOWN_STATE)
try
{
TOperationHandle? operationHandle = _response.OperationHandle;
if (operationHandle == null) break;

using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(_requestTimeoutSeconds));

TGetOperationStatusReq request = new TGetOperationStatusReq(operationHandle);
TGetOperationStatusResp response = await _statement.Client.GetOperationStatus(request, timeoutCts.Token).ConfigureAwait(false);

// Successful poll — reset failure counter
consecutiveFailures = 0;

// Track poll count for telemetry
_pollCount++;

// end the heartbeat if the command has terminated
if (response.OperationState == TOperationState.CANCELED_STATE ||
response.OperationState == TOperationState.ERROR_STATE ||
response.OperationState == TOperationState.CLOSED_STATE ||
response.OperationState == TOperationState.TIMEDOUT_STATE ||
response.OperationState == TOperationState.UKNOWN_STATE)
{
break;
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Cancellation was requested - exit the polling loop gracefully
break;
}
catch (Exception ex)
{
consecutiveFailures++;

// Log the error but continue polling. Transient errors (e.g. ObjectDisposedException
// from TLS connection recycling) should not kill the heartbeat poller, as that would
// cause the server-side command inactivity timeout to expire and terminate the query.
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error",
tags: new ActivityTagsCollection
{
{ "error.type", ex.GetType().Name },
{ "error.message", ex.Message },
{ "poll_count", _pollCount },
{ "consecutive_failures", consecutiveFailures }
}));

if (consecutiveFailures >= MaxConsecutiveFailures)
{
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached",
tags: new ActivityTagsCollection
{
{ "consecutive_failures", consecutiveFailures },
{ "poll_count", _pollCount }
}));
break;
}
}

// Wait before next poll. On cancellation this throws OperationCanceledException
// which propagates up to the caller (Dispose catches it).
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false);
}

// Add telemetry tags to current activity when polling completes
Expand Down
15 changes: 4 additions & 11 deletions csharp/test/Unit/DatabricksOperationStatusPollerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ public async Task ContinuesPollingOnFinishedState()
}

[Fact]
public async Task StopsPollingOnException()
public async Task ContinuesPollingOnException()
{
// Arrange
var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds);
using var poller = new DatabricksOperationStatusPoller(_mockStatement.Object, _mockResponse.Object, _heartbeatIntervalSeconds);
var pollCount = 0;
_mockClient.Setup(c => c.GetOperationStatus(It.IsAny<TGetOperationStatusReq>(), It.IsAny<CancellationToken>()))
.ThrowsAsync(new Exception("Test exception"))
Expand All @@ -186,15 +186,8 @@ public async Task StopsPollingOnException()
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds * 3)); // Wait longer than heartbeat interval

// Assert
// Should stop polling after the exception
Assert.Equal(1, pollCount);
try
{
poller.Dispose();
}
catch (Exception)
{
}
// Should continue polling despite the exception (transient error resilience)
Assert.True(pollCount > 1, $"Expected multiple polls despite exceptions but got {pollCount}");
}

[Fact]
Expand Down
Loading