fix(csharp): add configurable timeout for CloudFetch body reads#374
fix(csharp): add configurable timeout for CloudFetch body reads#374msrathore-db merged 14 commits intomainfrom
Conversation
Replace bare ReadAsByteArrayAsync() with CopyToAsync + explicit CancellationToken timeout for CloudFetch body downloads. When HttpCompletionOption.ResponseHeadersRead is used, HttpClient.Timeout may not propagate to body reads on all runtimes (e.g., Mono). This leaves body reads vulnerable to permanent hangs on dead TCP connections. Using CopyToAsync with an explicit cancellation token ensures each 81920-byte chunk read is cancellable. New connection property: adbc.databricks.cloudfetch.body_read_timeout_minutes Default: 15 minutes Closes ES-1778880 Co-authored-by: Isaac
csharp/src/DatabricksParameters.cs
Outdated
| /// on each body download to prevent permanent hangs on dead TCP connections. | ||
| /// Default value is 15 minutes if not specified. | ||
| /// </summary> | ||
| public const string CloudFetchBodyReadTimeoutMinutes = "adbc.databricks.cloudfetch.body_read_timeout_minutes"; |
There was a problem hiding this comment.
Can we just reuse the existing param?
public const string CloudFetchTimeoutMinutes = "adbc.databricks.cloudfetch.timeout_minutes";
There was a problem hiding this comment.
Updated. Thanks
| // (e.g., Mono) when HttpCompletionOption.ResponseHeadersRead is used. | ||
| // Using CopyToAsync with an explicit token ensures dead TCP connections | ||
| // are detected on every 81920-byte chunk read. | ||
| using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) |
There was a problem hiding this comment.
Can we just reuse the existing cancellationToken and extend that to 15mins?
There was a problem hiding this comment.
That cancellation token is global and will cancel all the downloads. We want this to be a per file cancellation token for this use case
| using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) | ||
| { | ||
| int capacity = contentLength.HasValue && contentLength.Value > 0 | ||
| ? (int)Math.Min(contentLength.Value, int.MaxValue) |
There was a problem hiding this comment.
FWIW, it's not actually possible to allocate an array of size int.MaxValue. I don't remember the failure mode and there's a constant for this -- but only in .NET 11+. Copilot claims this value is "~2,147,483,591". Can one of these streams be larger than that size?
There was a problem hiding this comment.
No max chunk size is around 20MB so this won't exceed. However I'll change using int.Maxvalue to a smaller value
There was a problem hiding this comment.
The .NET5+ implementation does not preallocate any size for this. Following the pattern now
There was a problem hiding this comment.
The .NET 5+ path (ReadAsByteArrayAsync) does pre-allocate internally — LoadIntoBufferAsync uses LimitArrayPoolWriteStream which sizes from Content-Length. It's just hidden inside the framework.
The net472 path with new MemoryStream() (no capacity) means the MemoryStream starts at 256 bytes and doubles repeatedly during CopyToAsync writes. For a 20MB CloudFetch file, that's ~17 internal resize+copy operations (256 → 512 → ... → 32MB), then ToArray() copies 20MB one more time from the 32MB internal buffer to a new 20MB array.
The fix for the int.MaxValue issue should be a reasonable cap, not removing pre-allocation entirely:
int capacity = contentLength.HasValue && contentLength.Value > 0
? (int)Math.Min(contentLength.Value, 100 * 1024 * 1024)
: 0;
using (var memoryStream = new MemoryStream(capacity))CloudFetch chunks are ~20MB max, so 100MB cap is safe. This avoids the int.MaxValue problem while keeping the same allocation efficiency as the original code.
There was a problem hiding this comment.
There was a problem hiding this comment.
Makes sense. But since it was configurable by user so I didn't preallocate. Changed to 100 MB. It can still grow if the user configures it that way
There was a problem hiding this comment.
You're right. Restored pre-allocation with Math.Min(contentLength, 100MB) cap. Thanks for the .NET runtime reference.
…ve capacity hint - Reuse existing `adbc.databricks.cloudfetch.timeout_minutes` (default 5 min) for body read timeout instead of introducing a new parameter - Remove MemoryStream capacity pre-allocation to match .NET 5+ behavior (start empty, grow dynamically) — eliminates the int.MaxValue issue - Add `using` on MemoryStream for cleanliness Co-authored-by: Isaac
| // when HttpCompletionOption.ResponseHeadersRead is used. | ||
| // Using CopyToAsync with an explicit token ensures dead TCP connections | ||
| // are detected on every 81920-byte chunk read. | ||
| using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) |
There was a problem hiding this comment.
The body read timeout should reuse the same config.TimeoutMinutes that HttpClient.Timeout uses (from CloudFetchTimeoutMinutes). No need for a separate field — just use the same config parameter.
However, bump the default from 5 to 15 minutes. The 5-min HttpClient.Timeout only covers SendAsync (header phase with ResponseHeadersRead). ReadAsByteArrayAsync() is a separate call on HttpContent and is NOT covered by HttpClient.Timeout at all — which is the gap this PR correctly fills.
15 min is a safer default for body reads since large CloudFetch files can take longer to transfer, especially on slower networks.
There was a problem hiding this comment.
Done. Bumped DefaultCloudFetchTimeoutMinutes from 5 to 15. Single parameter controls both HttpClient.Timeout and body read CancelAfter. No separate field needed.
The 15-min default is important because we confirmed via testing on .NET Framework 4.7.2 that HttpClient.Timeout does NOT protect ReadAsByteArrayAsync body reads when ResponseHeadersRead is used — the body read is a separate call on HttpContent with no timeout coverage.
csharp/src/DatabricksParameters.cs
Outdated
| /// <summary> | ||
| /// Default timeout in minutes for individual CloudFetch body reads. | ||
| /// </summary> | ||
| public const int DefaultCloudFetchBodyReadTimeoutMinutes = 15; |
There was a problem hiding this comment.
I just mean this DefaultCloudFetchBodyReadTimeoutMinutes is probably not needed, just use DefaultCloudFetchTimeoutMinutes
There was a problem hiding this comment.
Done — removed DefaultCloudFetchBodyReadTimeoutMinutes entirely. Using DefaultCloudFetchTimeoutMinutes (now 15 min) for both.
There was a problem hiding this comment.
Pull request overview
This PR updates the CloudFetch download path to enforce an explicit timeout during HTTP response body reads, addressing scenarios where HttpClient.Timeout may not reliably apply when using HttpCompletionOption.ResponseHeadersRead (notably on some runtimes).
Changes:
- Plumbs CloudFetch timeout minutes from configuration into
CloudFetchDownloader. - Replaces
ReadAsByteArrayAsync()with a body-read implementation that uses a linkedCancellationTokenSourcewithCancelAfter(...). - Uses a framework-conditional path: token-aware
ReadAsByteArrayAsync(token)on NET5+;ReadAsStreamAsync+CopyToAsync(..., token)on older targets.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts; | ||
| _urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds; | ||
| _timeoutMinutes = config.TimeoutMinutes; | ||
| _memoryStreamManager = config.MemoryStreamManager; |
There was a problem hiding this comment.
PR description mentions a new connection property adbc.databricks.cloudfetch.body_read_timeout_minutes (default 15), but this change wires the body-read timeout to the existing CloudFetchConfiguration.TimeoutMinutes (default 5) via _timeoutMinutes = config.TimeoutMinutes. Either the implementation should use the new dedicated property (and its intended default), or the PR description/commentary should be updated to reflect that the existing timeout_minutes setting controls body reads as well.
There was a problem hiding this comment.
Fixed — removed the separate body_read_timeout_minutes parameter. Now using timeout_minutes (default bumped to 15 min) for both HttpClient.Timeout and body read CancelAfter.
| // Read the file data with an explicit timeout matching HttpClient.Timeout. | ||
| // ReadAsByteArrayAsync() on net472 has no CancellationToken overload, | ||
| // and HttpClient.Timeout may not propagate to body reads on all runtimes | ||
| // when HttpCompletionOption.ResponseHeadersRead is used. | ||
| // Using CopyToAsync with an explicit token ensures dead TCP connections | ||
| // are detected on every 81920-byte chunk read. | ||
| using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) | ||
| { | ||
| bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_timeoutMinutes)); | ||
| #if NET5_0_OR_GREATER |
There was a problem hiding this comment.
This change introduces new timeout behavior for body reads (linked CTS + CancelAfter) but there doesn't appear to be an automated test covering the timeout path (e.g., a handler/content stream that stalls and verifies the download completes with a failure and the result task is completed rather than hanging). Adding a unit/E2E test for the body-read timeout would help prevent regressions, especially around cancellation vs. failure semantics and retries.
| #endif | ||
| } | ||
| break; // Success, exit retry loop | ||
| } |
There was a problem hiding this comment.
The new body-read timeout uses a linked CTS, which can cause ReadAsByteArrayAsync/CopyToAsync to throw OperationCanceledException when the timeout elapses. If that exception escapes the retry loop on the final attempt, DownloadFileAsync will complete in the Canceled state, but the download continuation only handles t.IsFaulted and will not call downloadResult.SetFailed(...). That leaves DownloadCompletedTask never completing (CloudFetchReader awaits it) and can also leak acquired memory. Suggest ensuring timeout-triggered cancellations are converted into a fault (e.g., catch OperationCanceledException when !cancellationToken.IsCancellationRequested and throw a timeout-specific exception), or handle t.IsCanceled in the continuation by completing the result with a failure/cancellation signal.
| } | |
| } | |
| catch (OperationCanceledException oce) when (!cancellationToken.IsCancellationRequested) | |
| { | |
| // Treat a timeout of the body read (via the linked CTS) as a fault, not a cancellation. | |
| throw new TimeoutException("Timed out while reading the response body.", oce); | |
| } |
There was a problem hiding this comment.
Good catch. Fixed — added catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) that rethrows as TimeoutException. This ensures the download continuation sees t.IsFaulted (not t.IsCanceled) and properly completes the DownloadCompletedTask.
| using (var memoryStream = new MemoryStream()) | ||
| { | ||
| await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); | ||
| fileData = memoryStream.ToArray(); |
There was a problem hiding this comment.
In the net472 path, CopyToAsync into a default MemoryStream and then calling ToArray() adds at least one extra buffer copy and may cause repeated reallocations for large downloads. Since Content-Length is already read above, consider pre-sizing the stream when it fits in int (or using the configured RecyclableMemoryStreamManager when available) to reduce allocations/copies.
| using (var memoryStream = new MemoryStream()) | |
| { | |
| await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); | |
| fileData = memoryStream.ToArray(); | |
| { | |
| int capacity = (size > 0 && size <= int.MaxValue) ? (int)size : 0; | |
| using (var memoryStream = capacity > 0 ? new MemoryStream(capacity) : new MemoryStream()) | |
| { | |
| await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); | |
| fileData = memoryStream.ToArray(); | |
| } |
There was a problem hiding this comment.
Restored pre-allocation with a 100MB cap: Math.Min(contentLength, 100 * 1024 * 1024). This matches the .NET 5+ LimitArrayPoolWriteStream behavior (which also uses Content-Length for sizing) while avoiding the int.MaxValue allocation issue. CloudFetch chunks are ~20MB max so 100MB cap is safe.
…ancellation bug - Bump DefaultCloudFetchTimeoutMinutes from 5 to 15. HttpClient.Timeout only covers SendAsync (headers) when ResponseHeadersRead is used; body reads are unprotected. 15 min is safer for both phases, especially on slower networks. - Fix OperationCanceledException from body timeout not being handled by download continuation. Convert body timeout cancellation into TimeoutException so it propagates as a fault (t.IsFaulted), not a cancellation (t.IsCanceled) which would leave DownloadCompletedTask never completing and hang the reader. - Update comments to clarify why body reads need explicit timeout protection. Co-authored-by: Isaac
Pre-allocate MemoryStream with Content-Length when available, matching .NET 5+'s LimitArrayPoolWriteStream which also sizes from Content-Length. Cap at 100MB (CloudFetch chunks are ~20MB max) to avoid int.MaxValue allocation failures while keeping allocation efficiency. Co-authored-by: Isaac
| int capacity = contentLength.HasValue && contentLength.Value > 0 | ||
| ? (int)Math.Min(contentLength.Value, MaxPreAllocBytes) | ||
| : 0; | ||
| var memoryStream = new MemoryStream(capacity); |
There was a problem hiding this comment.
Should we wrap the memoryStream usage with a using?
eric-wang-1990
left a comment
There was a problem hiding this comment.
Get at least one stamp from @jadewang-db or @CurtHagenlocher, make sure to run sanity test with large download cloudfetch and make sure:
- correct data/number of rows returned
- performance should be comparable
Add large-scale benchmark for JDBC comparison: SELECT * FROM main.tpcds_sf10_delta.store_sales (28.8M rows, 23 columns). Co-authored-by: Isaac
Keep OperationCanceledException catch for body read timeout conversion, merge with main's new catch (Exception ex) when (!cancelled) pattern. Co-authored-by: Isaac
Co-authored-by: Isaac
📊 CloudFetch Benchmark Results.NET 8.0
.NET Framework 4.7.2
🟢 Improvement | 🔴 Regression | ⚪ No change Format:
Metrics:
|
| "expected_rows": 28800501, | ||
| "columns": 23, | ||
| "category": "large-wide" | ||
| }, |
There was a problem hiding this comment.
Did you want to add this in the current PR? It looks like crossover from #376
| // LimitArrayPoolWriteStream which also sizes from Content-Length internally. | ||
| // Cap at 100MB to avoid int.MaxValue allocation failures. | ||
| const int MaxPreAllocBytes = 100 * 1024 * 1024; | ||
| int capacity = contentLength.HasValue && contentLength.Value > 0 |
There was a problem hiding this comment.
How often will we not know the Content-Length in advance (i.e. use a chunked transfer)? For Direct Query in the service, I believe there's a commit limit set and I'm not sure what it's set to. I'll try to get an answer on that.
There was a problem hiding this comment.
agree on this, we should be able to get the contentLength from the http response header always
There was a problem hiding this comment.
Okay, I've looked at four different potential hosts and there's only one case where this value is set low enough for it to be a possible concern, and that's setting the value to 512 MB. Direct Query datasets are capped at one million rows and are generally not huge, but if there are five concurrent downloads of 20 MB chunks then we would blow through that with the preallocations.
One possible mitigation would be to use p/Invoke to check for the current commit limit and be more conservative if that limit is set conservatively.
There was a problem hiding this comment.
I agree. There wouldn't be a case without content length. Just added this as a safety net fallback. For the scope of this PR, I can either change it to 0 if content length isn't available which is never the case actually.
Does that sound good?
There was a problem hiding this comment.
Yes, that seems like a better option for now. Does this code have access to tracing? If we always expect content-length then it might be worth tracing something when it's not present.
| using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) | ||
| { | ||
| bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_timeoutMinutes)); | ||
| #if NET5_0_OR_GREATER |
There was a problem hiding this comment.
let's remove #if here. and use the else part of all runtime versions
| // LimitArrayPoolWriteStream which also sizes from Content-Length internally. | ||
| // Cap at 100MB to avoid int.MaxValue allocation failures. | ||
| const int MaxPreAllocBytes = 100 * 1024 * 1024; | ||
| int capacity = contentLength.HasValue && contentLength.Value > 0 |
There was a problem hiding this comment.
agree on this, we should be able to get the contentLength from the http response header always
| : 0; | ||
| var memoryStream = new MemoryStream(capacity); | ||
| await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); | ||
| fileData = memoryStream.ToArray(); |
There was a problem hiding this comment.
FWIW, if the overall architecture could be changed to return this as a Memory<byte> or ArraySegment<byte> instead of a byte[], then the original MemoryStream's buffer could just be reused (via GetBuffer or TryGetBuffer) and an allocation and copy could be avoided.
There was a problem hiding this comment.
Interesting. I'll add this as a todo to investigate in a broader fix.
…ark query - Remove #if NET5_0_OR_GREATER — use CopyToAsync with CancellationToken for all runtimes, not just net472 (Jade's feedback) - Add using on MemoryStream (Eric's feedback) - Remove store_sales_sf10 benchmark query — belongs in PR #376 (Curt's feedback) Co-authored-by: Isaac
Log operation_status_poller.poll_success with poll_count and operation_state on each successful GetOperationStatus call. Co-authored-by: Isaac
CloudFetchDownloader: - Remove 100MB pre-allocation cap, use Content-Length directly - Add cloudfetch.content_length_missing trace event when Content-Length absent Poller telemetry fix: - Activity.Current is null inside Task.Run (AsyncLocal doesn't propagate) - Pass IActivityTracer from DatabricksCompositeReader to poller - Wrap poll loop in TraceActivityAsync to create proper Activity span - All poll_success, poll_error, max_failures_reached events now appear in adbcfile trace logs (verified) Co-authored-by: Isaac
…r trace Task.Delay throws OperationCanceledException on normal shutdown. Without catching it inside PollOperationStatusCore, TraceActivityAsync logs it as an error and sets ActivityStatusCode.Error — making normal shutdown look like a failure in trace files. Catch and break instead. Co-authored-by: Isaac
Move TraceActivityAsync dispatch to Start() and use single method with Activity? parameter. No behavior change. Co-authored-by: Isaac
…w retry The body read timeout fires OperationCanceledException from the linked CTS. The separate catch was converting it to TimeoutException and throwing it outside the retry loop, preventing retries. The general catch (Exception ex) already handles this correctly since cancellationToken.IsCancellationRequested is false (only the linked body timeout CTS cancelled, not the parent token). After all retries exhaust, InvalidOperationException is thrown (a fault), so the download continuation handles it via t.IsFaulted correctly. Fixes TimeoutRetryWithBackoff integration test. Co-authored-by: Isaac
Summary
ReadAsByteArrayAsync()withCopyToAsync+ explicitCancellationTokentimeout for CloudFetch body downloadsHttpCompletionOption.ResponseHeadersReadis used,HttpClient.Timeoutmay not propagate to body reads on all runtimes (e.g., Mono), leaving body reads vulnerable to permanent hangs on dead TCP connectionsadbc.databricks.cloudfetch.body_read_timeout_minutes(default: 15 minutes)Test plan
This pull request was AI-assisted by Isaac.