Skip to content

Commit bdf5ceb

Browse files
authored
fix(csharp): add configurable timeout for CloudFetch body reads (#374)
## Summary - Replace bare `ReadAsByteArrayAsync()` with `CopyToAsync` + explicit `CancellationToken` timeout for CloudFetch body downloads - When `HttpCompletionOption.ResponseHeadersRead` is used, `HttpClient.Timeout` may not propagate to body reads on all runtimes (e.g., Mono), leaving body reads vulnerable to permanent hangs on dead TCP connections - New configurable connection property: `adbc.databricks.cloudfetch.body_read_timeout_minutes` (default: 15 minutes) ## Test plan - [ ] Run parallel CloudFetch download test on Windows (net472) — verify stalls recover via timeout + retry - [ ] Run on Mono — verify downloads no longer hang permanently - [ ] Verify configurable timeout works via connection property override This pull request was AI-assisted by Isaac.
1 parent 87e5223 commit bdf5ceb

File tree

4 files changed

+79
-14
lines changed

4 files changed

+79
-14
lines changed

csharp/src/DatabricksParameters.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,8 +452,11 @@ public class DatabricksConstants
452452

453453
/// <summary>
454454
/// Default timeout in minutes for CloudFetch HTTP operations.
455+
/// Covers both HttpClient.Timeout (SendAsync/headers) and body read timeout.
456+
/// Set to 15 minutes because ReadAsByteArrayAsync is a separate call from SendAsync
457+
/// and HttpClient.Timeout does not protect body reads when ResponseHeadersRead is used.
455458
/// </summary>
456-
public const int DefaultCloudFetchTimeoutMinutes = 5;
459+
public const int DefaultCloudFetchTimeoutMinutes = 15;
457460

458461
/// <summary>
459462
/// OAuth grant type constants

csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
5555
private readonly int _retryDelayMs;
5656
private readonly int _maxUrlRefreshAttempts;
5757
private readonly int _urlExpirationBufferSeconds;
58+
private readonly int _timeoutMinutes;
5859
private readonly SemaphoreSlim _downloadSemaphore;
5960
private readonly RecyclableMemoryStreamManager? _memoryStreamManager;
6061
private readonly ArrayPool<byte>? _lz4BufferPool;
@@ -99,6 +100,7 @@ public CloudFetchDownloader(
99100
_retryDelayMs = config.RetryDelayMs;
100101
_maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts;
101102
_urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds;
103+
_timeoutMinutes = config.TimeoutMinutes;
102104
_memoryStreamManager = config.MemoryStreamManager;
103105
_lz4BufferPool = config.Lz4BufferPool;
104106
_downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads);
@@ -146,6 +148,7 @@ internal CloudFetchDownloader(
146148
_retryDelayMs = retryDelayMs;
147149
_maxUrlRefreshAttempts = CloudFetchConfiguration.DefaultMaxUrlRefreshAttempts;
148150
_urlExpirationBufferSeconds = CloudFetchConfiguration.DefaultUrlExpirationBufferSeconds;
151+
_timeoutMinutes = CloudFetchConfiguration.DefaultTimeoutMinutes;
149152
_memoryStreamManager = null;
150153
_lz4BufferPool = null;
151154
_downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads);
@@ -573,7 +576,7 @@ await _activityTracer.TraceActivityAsync(async activity =>
573576

574577
response.EnsureSuccessStatusCode();
575578

576-
// Log the download size if available from response headers
579+
// Log the download size from response headers
577580
long? contentLength = response.Content.Headers.ContentLength;
578581
if (contentLength.HasValue && contentLength.Value > 0)
579582
{
@@ -584,9 +587,38 @@ await _activityTracer.TraceActivityAsync(async activity =>
584587
new("content_length_mb", contentLength.Value / 1024.0 / 1024.0)
585588
]);
586589
}
590+
else
591+
{
592+
activity?.AddEvent("cloudfetch.content_length_missing", [
593+
new("offset", downloadResult.StartRowOffset),
594+
new("sanitized_url", sanitizedUrl)
595+
]);
596+
}
587597

588-
// Read the file data
589-
fileData = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
598+
// Read the file data with an explicit timeout.
599+
// ReadAsByteArrayAsync() on net472 has no CancellationToken overload,
600+
// and HttpClient.Timeout does not protect body reads when
601+
// HttpCompletionOption.ResponseHeadersRead is used — SendAsync returns
602+
// after headers, and the subsequent body read is a separate call on
603+
// HttpContent with no timeout coverage.
604+
// Using CopyToAsync with an explicit token ensures dead TCP connections
605+
// are detected on every 81920-byte chunk read.
606+
using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
607+
{
608+
bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_timeoutMinutes));
609+
using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
610+
{
611+
// Pre-allocate with Content-Length when available (CloudFetch always provides it).
612+
int capacity = contentLength.HasValue && contentLength.Value > 0
613+
? (int)contentLength.Value
614+
: 0;
615+
using (var memoryStream = new MemoryStream(capacity))
616+
{
617+
await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false);
618+
fileData = memoryStream.ToArray();
619+
}
620+
}
621+
}
590622
break; // Success, exit retry loop
591623
}
592624
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)

csharp/src/Reader/DatabricksCompositeReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ internal DatabricksCompositeReader(
8989
}
9090
if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true)
9191
{
92-
operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection());
92+
operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection(), activityTracer: this);
9393
operationStatusPoller.Start();
9494
}
9595
}

csharp/src/Reader/DatabricksOperationStatusPoller.cs

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
using AdbcDrivers.Databricks.Telemetry.TagDefinitions;
2929
using AdbcDrivers.HiveServer2;
3030
using AdbcDrivers.HiveServer2.Hive2;
31+
using Apache.Arrow.Adbc.Tracing;
3132
using Apache.Hive.Service.Rpc.Thrift;
3233

3334
namespace AdbcDrivers.Databricks.Reader
@@ -42,6 +43,7 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller
4243
private readonly int _heartbeatIntervalSeconds;
4344
private readonly int _requestTimeoutSeconds;
4445
private readonly IResponse _response;
46+
private readonly IActivityTracer? _activityTracer;
4547
// internal cancellation token source - won't affect the external token
4648
private CancellationTokenSource? _internalCts;
4749
private Task? _operationStatusPollingTask;
@@ -58,12 +60,14 @@ public DatabricksOperationStatusPoller(
5860
IHiveServer2Statement statement,
5961
IResponse response,
6062
int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds,
61-
int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds)
63+
int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds,
64+
IActivityTracer? activityTracer = null)
6265
{
6366
_statement = statement ?? throw new ArgumentNullException(nameof(statement));
6467
_response = response;
6568
_heartbeatIntervalSeconds = heartbeatIntervalSeconds;
6669
_requestTimeoutSeconds = requestTimeoutSeconds;
70+
_activityTracer = activityTracer;
6771
}
6872

6973
public bool IsStarted => _operationStatusPollingTask != null;
@@ -82,10 +86,22 @@ public void Start(CancellationToken externalToken = default)
8286
_internalCts = new CancellationTokenSource();
8387
// create a linked token to the external token so that the external token can cancel the operation status polling task if needed
8488
var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(_internalCts.Token, externalToken).Token;
85-
_operationStatusPollingTask = Task.Run(() => PollOperationStatus(linkedToken));
89+
_operationStatusPollingTask = Task.Run(async () =>
90+
{
91+
if (_activityTracer != null)
92+
{
93+
await _activityTracer.Trace.TraceActivityAsync(
94+
activity => PollOperationStatus(linkedToken, activity),
95+
activityName: "PollOperationStatus").ConfigureAwait(false);
96+
}
97+
else
98+
{
99+
await PollOperationStatus(linkedToken, null).ConfigureAwait(false);
100+
}
101+
});
86102
}
87103

88-
private async Task PollOperationStatus(CancellationToken cancellationToken)
104+
private async Task PollOperationStatus(CancellationToken cancellationToken, Activity? activity)
89105
{
90106
int consecutiveFailures = 0;
91107

@@ -108,6 +124,13 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)
108124
// Track poll count for telemetry
109125
_pollCount++;
110126

127+
activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_success",
128+
tags: new ActivityTagsCollection
129+
{
130+
{ "poll_count", _pollCount },
131+
{ "operation_state", response.OperationState.ToString() }
132+
}));
133+
111134
// end the heartbeat if the command has terminated
112135
if (response.OperationState == TOperationState.CANCELED_STATE ||
113136
response.OperationState == TOperationState.ERROR_STATE ||
@@ -130,7 +153,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)
130153
// Log the error but continue polling. Transient errors (e.g. ObjectDisposedException
131154
// from TLS connection recycling) should not kill the heartbeat poller, as that would
132155
// cause the server-side command inactivity timeout to expire and terminate the query.
133-
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error",
156+
activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_error",
134157
tags: new ActivityTagsCollection
135158
{
136159
{ "error.type", ex.GetType().Name },
@@ -141,7 +164,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)
141164

142165
if (consecutiveFailures >= MaxConsecutiveFailures)
143166
{
144-
Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached",
167+
activity?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached",
145168
tags: new ActivityTagsCollection
146169
{
147170
{ "consecutive_failures", consecutiveFailures },
@@ -151,13 +174,20 @@ private async Task PollOperationStatus(CancellationToken cancellationToken)
151174
}
152175
}
153176

154-
// Wait before next poll. On cancellation this throws OperationCanceledException
155-
// which propagates up to the caller (Dispose catches it).
156-
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false);
177+
// Wait before next poll.
178+
try
179+
{
180+
await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false);
181+
}
182+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
183+
{
184+
// Normal shutdown — don't let this propagate as an error through TraceActivityAsync
185+
break;
186+
}
157187
}
158188

159189
// Add telemetry tags to current activity when polling completes
160-
Activity.Current?.SetTag(StatementExecutionEvent.PollCount, _pollCount);
190+
activity?.SetTag(StatementExecutionEvent.PollCount, _pollCount);
161191
}
162192

163193
public void Stop()

0 commit comments

Comments
 (0)