Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion csharp/src/DatabricksParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,11 @@ public class DatabricksConstants

/// <summary>
/// Default timeout in minutes for CloudFetch HTTP operations.
/// Covers both HttpClient.Timeout (SendAsync/headers) and body read timeout.
/// Set to 15 minutes because ReadAsByteArrayAsync is a separate call from SendAsync
/// and HttpClient.Timeout does not protect body reads when ResponseHeadersRead is used.
/// </summary>
public const int DefaultCloudFetchTimeoutMinutes = 5;
public const int DefaultCloudFetchTimeoutMinutes = 15;

/// <summary>
/// OAuth grant type constants
Expand Down
38 changes: 35 additions & 3 deletions csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
private readonly int _retryDelayMs;
private readonly int _maxUrlRefreshAttempts;
private readonly int _urlExpirationBufferSeconds;
private readonly int _timeoutMinutes;
private readonly SemaphoreSlim _downloadSemaphore;
private readonly RecyclableMemoryStreamManager? _memoryStreamManager;
private readonly ArrayPool<byte>? _lz4BufferPool;
Expand Down Expand Up @@ -99,6 +100,7 @@ public CloudFetchDownloader(
_retryDelayMs = config.RetryDelayMs;
_maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts;
_urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds;
_timeoutMinutes = config.TimeoutMinutes;
_memoryStreamManager = config.MemoryStreamManager;
Comment on lines 101 to 104
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions a new connection property adbc.databricks.cloudfetch.body_read_timeout_minutes (default 15), but this change wires the body-read timeout to the existing CloudFetchConfiguration.TimeoutMinutes (default 5) via _timeoutMinutes = config.TimeoutMinutes. Either the implementation should use the new dedicated property (and its intended default), or the PR description/commentary should be updated to reflect that the existing timeout_minutes setting controls body reads as well.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — removed the separate body_read_timeout_minutes parameter. Now using timeout_minutes (default bumped to 15 min) for both HttpClient.Timeout and body read CancelAfter.

_lz4BufferPool = config.Lz4BufferPool;
_downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads);
Expand Down Expand Up @@ -146,6 +148,7 @@ internal CloudFetchDownloader(
_retryDelayMs = retryDelayMs;
_maxUrlRefreshAttempts = CloudFetchConfiguration.DefaultMaxUrlRefreshAttempts;
_urlExpirationBufferSeconds = CloudFetchConfiguration.DefaultUrlExpirationBufferSeconds;
_timeoutMinutes = CloudFetchConfiguration.DefaultTimeoutMinutes;
_memoryStreamManager = null;
_lz4BufferPool = null;
_downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads);
Expand Down Expand Up @@ -573,7 +576,7 @@ await _activityTracer.TraceActivityAsync(async activity =>

response.EnsureSuccessStatusCode();

// Log the download size if available from response headers
// Log the download size from response headers
long? contentLength = response.Content.Headers.ContentLength;
if (contentLength.HasValue && contentLength.Value > 0)
{
Expand All @@ -584,9 +587,38 @@ await _activityTracer.TraceActivityAsync(async activity =>
new("content_length_mb", contentLength.Value / 1024.0 / 1024.0)
]);
}
else
{
activity?.AddEvent("cloudfetch.content_length_missing", [
new("offset", downloadResult.StartRowOffset),
new("sanitized_url", sanitizedUrl)
]);
}

// Read the file data
fileData = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
// Read the file data with an explicit timeout.
// ReadAsByteArrayAsync() on net472 has no CancellationToken overload,
// and HttpClient.Timeout does not protect body reads when
// HttpCompletionOption.ResponseHeadersRead is used — SendAsync returns
// after headers, and the subsequent body read is a separate call on
// HttpContent with no timeout coverage.
// Using CopyToAsync with an explicit token ensures dead TCP connections
// are detected on every 81920-byte chunk read.
using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just reuse the existing cancellationToken and extend that to 15mins?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That cancellation token is global and will cancel all the downloads. We want this to be a per file cancellation token for this use case

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body read timeout should reuse the same config.TimeoutMinutes that HttpClient.Timeout uses (from CloudFetchTimeoutMinutes). No need for a separate field — just use the same config parameter.

However, bump the default from 5 to 15 minutes. The 5-min HttpClient.Timeout only covers SendAsync (header phase with ResponseHeadersRead). ReadAsByteArrayAsync() is a separate call on HttpContent and is NOT covered by HttpClient.Timeout at all — which is the gap this PR correctly fills.

15 min is a safer default for body reads since large CloudFetch files can take longer to transfer, especially on slower networks.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Bumped DefaultCloudFetchTimeoutMinutes from 5 to 15. Single parameter controls both HttpClient.Timeout and body read CancelAfter. No separate field needed.

The 15-min default is important because we confirmed via testing on .NET Framework 4.7.2 that HttpClient.Timeout does NOT protect ReadAsByteArrayAsync body reads when ResponseHeadersRead is used — the body read is a separate call on HttpContent with no timeout coverage.

{
bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_timeoutMinutes));
using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
{
// Pre-allocate with Content-Length when available (CloudFetch always provides it).
int capacity = contentLength.HasValue && contentLength.Value > 0
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often will we not know the Content-Length in advance (i.e. use a chunked transfer)? For Direct Query in the service, I believe there's a commit limit set and I'm not sure what it's set to. I'll try to get an answer on that.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree on this, we should be able to get the contentLength from the http response header always

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've looked at four different potential hosts and there's only one case where this value is set low enough for it to be a possible concern, and that's setting the value to 512 MB. Direct Query datasets are capped at one million rows and are generally not huge, but if there are five concurrent downloads of 20 MB chunks then we would blow through that with the preallocations.

One possible mitigation would be to use p/Invoke to check for the current commit limit and be more conservative if that limit is set conservatively.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. There wouldn't be a case without content length. Just added this as a safety net fallback. For the scope of this PR, I can either change it to 0 if content length isn't available which is never the case actually.
Does that sound good?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that seems like a better option for now. Does this code have access to tracing? If we always expect content-length then it might be worth tracing something when it's not present.

? (int)contentLength.Value
: 0;
using (var memoryStream = new MemoryStream(capacity))
{
await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false);
fileData = memoryStream.ToArray();
}
}
}
break; // Success, exit retry loop
}
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new body-read timeout uses a linked CTS, which can cause ReadAsByteArrayAsync/CopyToAsync to throw OperationCanceledException when the timeout elapses. If that exception escapes the retry loop on the final attempt, DownloadFileAsync will complete in the Canceled state, but the download continuation only handles t.IsFaulted and will not call downloadResult.SetFailed(...). That leaves DownloadCompletedTask never completing (CloudFetchReader awaits it) and can also leak acquired memory. Suggest ensuring timeout-triggered cancellations are converted into a fault (e.g., catch OperationCanceledException when !cancellationToken.IsCancellationRequested and throw a timeout-specific exception), or handle t.IsCanceled in the continuation by completing the result with a failure/cancellation signal.

Suggested change
}
}
catch (OperationCanceledException oce) when (!cancellationToken.IsCancellationRequested)
{
// Treat a timeout of the body read (via the linked CTS) as a fault, not a cancellation.
throw new TimeoutException("Timed out while reading the response body.", oce);
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed — added catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) that rethrows as TimeoutException. This ensures the download continuation sees t.IsFaulted (not t.IsCanceled) and properly completes the DownloadCompletedTask.

catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
Expand Down
2 changes: 1 addition & 1 deletion csharp/src/Reader/DatabricksCompositeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ internal DatabricksCompositeReader(
}
if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true)
{
operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection());
operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection(), activityTracer: this);
operationStatusPoller.Start();
}
}
Expand Down
48 changes: 39 additions & 9 deletions csharp/src/Reader/DatabricksOperationStatusPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using AdbcDrivers.Databricks.Telemetry.TagDefinitions;
using AdbcDrivers.HiveServer2;
using AdbcDrivers.HiveServer2.Hive2;
using Apache.Arrow.Adbc.Tracing;
using Apache.Hive.Service.Rpc.Thrift;

namespace AdbcDrivers.Databricks.Reader
Expand All @@ -42,6 +43,7 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller
private readonly int _heartbeatIntervalSeconds;
private readonly int _requestTimeoutSeconds;
private readonly IResponse _response;
private readonly IActivityTracer? _activityTracer;
// internal cancellation token source - won't affect the external token
private CancellationTokenSource? _internalCts;
private Task? _operationStatusPollingTask;
Expand All @@ -58,12 +60,14 @@ public DatabricksOperationStatusPoller(
IHiveServer2Statement statement,
IResponse response,
int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds,
int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds)
int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds,
IActivityTracer? activityTracer = null)
{
_statement = statement ?? throw new ArgumentNullException(nameof(statement));
_response = response;
_heartbeatIntervalSeconds = heartbeatIntervalSeconds;
_requestTimeoutSeconds = requestTimeoutSeconds;
_activityTracer = activityTracer;
}

public bool IsStarted => _operationStatusPollingTask != null;
Expand All @@ -82,10 +86,22 @@ public void Start(CancellationToken externalToken = default)
_internalCts = new CancellationTokenSource();
// create a linked token to the external token so that the external token can cancel the operation status polling task if needed
var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(_internalCts.Token, externalToken).Token;
_operationStatusPollingTask = Task.Run(() => PollOperationStatus(linkedToken));
_operationStatusPollingTask = Task.Run(async () =>
{
if (_activityTracer != null)
{
await _activityTracer.Trace.TraceActivityAsync(
activity => PollOperationStatus(linkedToken, activity),
activityName: "PollOperationStatus").ConfigureAwait(false);
}
else
{
await PollOperationStatus(linkedToken, null).ConfigureAwait(false);
}
});
}

private async Task PollOperationStatus(CancellationToken cancellationToken)
private async Task PollOperationStatus(CancellationToken cancellationToken, Activity? activity)
{
int consecutiveFailures = 0;

Expand All @@ -108,6 +124,13 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)
// Track poll count for telemetry
_pollCount++;

activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_success",
tags: new ActivityTagsCollection
{
{ "poll_count", _pollCount },
{ "operation_state", response.OperationState.ToString() }
}));

// end the heartbeat if the command has terminated
if (response.OperationState == TOperationState.CANCELED_STATE ||
response.OperationState == TOperationState.ERROR_STATE ||
Expand All @@ -130,7 +153,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)
// Log the error but continue polling. Transient errors (e.g. ObjectDisposedException
// from TLS connection recycling) should not kill the heartbeat poller, as that would
// cause the server-side command inactivity timeout to expire and terminate the query.
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error",
activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_error",
tags: new ActivityTagsCollection
{
{ "error.type", ex.GetType().Name },
Expand All @@ -141,7 +164,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)

if (consecutiveFailures >= MaxConsecutiveFailures)
{
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached",
activity?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached",
tags: new ActivityTagsCollection
{
{ "consecutive_failures", consecutiveFailures },
Expand All @@ -151,13 +174,20 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)
}
}

// Wait before next poll. On cancellation this throws OperationCanceledException
// which propagates up to the caller (Dispose catches it).
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false);
// Wait before next poll.
try
{
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Normal shutdown — don't let this propagate as an error through TraceActivityAsync
break;
}
}

// Add telemetry tags to current activity when polling completes
Activity.Current?.SetTag(StatementExecutionEvent.PollCount, _pollCount);
activity?.SetTag(StatementExecutionEvent.PollCount, _pollCount);
}

public void Stop()
Expand Down
Loading