diff --git a/csharp/src/DatabricksParameters.cs b/csharp/src/DatabricksParameters.cs index 6e2b397f..b42046f4 100644 --- a/csharp/src/DatabricksParameters.cs +++ b/csharp/src/DatabricksParameters.cs @@ -452,8 +452,11 @@ public class DatabricksConstants /// /// Default timeout in minutes for CloudFetch HTTP operations. + /// Covers both HttpClient.Timeout (SendAsync/headers) and body read timeout. + /// Set to 15 minutes because ReadAsByteArrayAsync is a separate call from SendAsync + /// and HttpClient.Timeout does not protect body reads when ResponseHeadersRead is used. /// - public const int DefaultCloudFetchTimeoutMinutes = 5; + public const int DefaultCloudFetchTimeoutMinutes = 15; /// /// OAuth grant type constants diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index de82b199..4fad9cae 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -55,6 +55,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader private readonly int _retryDelayMs; private readonly int _maxUrlRefreshAttempts; private readonly int _urlExpirationBufferSeconds; + private readonly int _timeoutMinutes; private readonly SemaphoreSlim _downloadSemaphore; private readonly RecyclableMemoryStreamManager? _memoryStreamManager; private readonly ArrayPool? _lz4BufferPool; @@ -99,6 +100,7 @@ public CloudFetchDownloader( _retryDelayMs = config.RetryDelayMs; _maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts; _urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds; + _timeoutMinutes = config.TimeoutMinutes; _memoryStreamManager = config.MemoryStreamManager; _lz4BufferPool = config.Lz4BufferPool; _downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads); @@ -146,6 +148,7 @@ internal CloudFetchDownloader( _retryDelayMs = retryDelayMs; _maxUrlRefreshAttempts = CloudFetchConfiguration.DefaultMaxUrlRefreshAttempts; _urlExpirationBufferSeconds = CloudFetchConfiguration.DefaultUrlExpirationBufferSeconds; + _timeoutMinutes = CloudFetchConfiguration.DefaultTimeoutMinutes; _memoryStreamManager = null; _lz4BufferPool = null; _downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads); @@ -573,7 +576,7 @@ await _activityTracer.TraceActivityAsync(async activity => response.EnsureSuccessStatusCode(); - // Log the download size if available from response headers + // Log the download size from response headers long? contentLength = response.Content.Headers.ContentLength; if (contentLength.HasValue && contentLength.Value > 0) { @@ -584,9 +587,38 @@ await _activityTracer.TraceActivityAsync(async activity => new("content_length_mb", contentLength.Value / 1024.0 / 1024.0) ]); } + else + { + activity?.AddEvent("cloudfetch.content_length_missing", [ + new("offset", downloadResult.StartRowOffset), + new("sanitized_url", sanitizedUrl) + ]); + } - // Read the file data - fileData = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false); + // Read the file data with an explicit timeout. + // ReadAsByteArrayAsync() on net472 has no CancellationToken overload, + // and HttpClient.Timeout does not protect body reads when + // HttpCompletionOption.ResponseHeadersRead is used — SendAsync returns + // after headers, and the subsequent body read is a separate call on + // HttpContent with no timeout coverage. + // Using CopyToAsync with an explicit token ensures dead TCP connections + // are detected on every 81920-byte chunk read. + using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_timeoutMinutes)); + using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) + { + // Pre-allocate with Content-Length when available (CloudFetch always provides it). + int capacity = contentLength.HasValue && contentLength.Value > 0 + ? (int)contentLength.Value + : 0; + using (var memoryStream = new MemoryStream(capacity)) + { + await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); + fileData = memoryStream.ToArray(); + } + } + } break; // Success, exit retry loop } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) diff --git a/csharp/src/Reader/DatabricksCompositeReader.cs b/csharp/src/Reader/DatabricksCompositeReader.cs index f60ac003..4f1f932a 100644 --- a/csharp/src/Reader/DatabricksCompositeReader.cs +++ b/csharp/src/Reader/DatabricksCompositeReader.cs @@ -89,7 +89,7 @@ internal DatabricksCompositeReader( } if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true) { - operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection()); + operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection(), activityTracer: this); operationStatusPoller.Start(); } } diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index d5834c11..6a4334b8 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -28,6 +28,7 @@ using AdbcDrivers.Databricks.Telemetry.TagDefinitions; using AdbcDrivers.HiveServer2; using AdbcDrivers.HiveServer2.Hive2; +using Apache.Arrow.Adbc.Tracing; using Apache.Hive.Service.Rpc.Thrift; namespace AdbcDrivers.Databricks.Reader @@ -42,6 +43,7 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller private readonly int _heartbeatIntervalSeconds; private readonly int _requestTimeoutSeconds; private readonly IResponse _response; + private readonly IActivityTracer? _activityTracer; // internal cancellation token source - won't affect the external token private CancellationTokenSource? _internalCts; private Task? _operationStatusPollingTask; @@ -58,12 +60,14 @@ public DatabricksOperationStatusPoller( IHiveServer2Statement statement, IResponse response, int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds, - int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds) + int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds, + IActivityTracer? activityTracer = null) { _statement = statement ?? throw new ArgumentNullException(nameof(statement)); _response = response; _heartbeatIntervalSeconds = heartbeatIntervalSeconds; _requestTimeoutSeconds = requestTimeoutSeconds; + _activityTracer = activityTracer; } public bool IsStarted => _operationStatusPollingTask != null; @@ -82,10 +86,22 @@ public void Start(CancellationToken externalToken = default) _internalCts = new CancellationTokenSource(); // create a linked token to the external token so that the external token can cancel the operation status polling task if needed var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(_internalCts.Token, externalToken).Token; - _operationStatusPollingTask = Task.Run(() => PollOperationStatus(linkedToken)); + _operationStatusPollingTask = Task.Run(async () => + { + if (_activityTracer != null) + { + await _activityTracer.Trace.TraceActivityAsync( + activity => PollOperationStatus(linkedToken, activity), + activityName: "PollOperationStatus").ConfigureAwait(false); + } + else + { + await PollOperationStatus(linkedToken, null).ConfigureAwait(false); + } + }); } - private async Task PollOperationStatus(CancellationToken cancellationToken) + private async Task PollOperationStatus(CancellationToken cancellationToken, Activity? activity) { int consecutiveFailures = 0; @@ -108,6 +124,13 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) // Track poll count for telemetry _pollCount++; + activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_success", + tags: new ActivityTagsCollection + { + { "poll_count", _pollCount }, + { "operation_state", response.OperationState.ToString() } + })); + // end the heartbeat if the command has terminated if (response.OperationState == TOperationState.CANCELED_STATE || response.OperationState == TOperationState.ERROR_STATE || @@ -130,7 +153,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) // Log the error but continue polling. Transient errors (e.g. ObjectDisposedException // from TLS connection recycling) should not kill the heartbeat poller, as that would // cause the server-side command inactivity timeout to expire and terminate the query. - Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error", + activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_error", tags: new ActivityTagsCollection { { "error.type", ex.GetType().Name }, @@ -141,7 +164,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) if (consecutiveFailures >= MaxConsecutiveFailures) { - Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached", + activity?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached", tags: new ActivityTagsCollection { { "consecutive_failures", consecutiveFailures }, @@ -151,13 +174,20 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) } } - // Wait before next poll. On cancellation this throws OperationCanceledException - // which propagates up to the caller (Dispose catches it). - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false); + // Wait before next poll. + try + { + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Normal shutdown — don't let this propagate as an error through TraceActivityAsync + break; + } } // Add telemetry tags to current activity when polling completes - Activity.Current?.SetTag(StatementExecutionEvent.PollCount, _pollCount); + activity?.SetTag(StatementExecutionEvent.PollCount, _pollCount); } public void Stop()