From 48a88ec7284f758097a253126ef12e83ed88cb43 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 28 Mar 2026 01:02:06 +0530 Subject: [PATCH 01/12] fix(csharp): add configurable timeout for CloudFetch body reads Replace bare ReadAsByteArrayAsync() with CopyToAsync + explicit CancellationToken timeout for CloudFetch body downloads. When HttpCompletionOption.ResponseHeadersRead is used, HttpClient.Timeout may not propagate to body reads on all runtimes (e.g., Mono). This leaves body reads vulnerable to permanent hangs on dead TCP connections. Using CopyToAsync with an explicit cancellation token ensures each 81920-byte chunk read is cancellable. New connection property: adbc.databricks.cloudfetch.body_read_timeout_minutes Default: 15 minutes Closes ES-1778880 Co-authored-by: Isaac --- csharp/src/DatabricksParameters.cs | 14 ++++++++++ .../CloudFetch/CloudFetchConfiguration.cs | 11 +++++++- .../Reader/CloudFetch/CloudFetchDownloader.cs | 28 +++++++++++++++++-- 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/csharp/src/DatabricksParameters.cs b/csharp/src/DatabricksParameters.cs index b4587ec2..0dfdf738 100644 --- a/csharp/src/DatabricksParameters.cs +++ b/csharp/src/DatabricksParameters.cs @@ -73,6 +73,15 @@ public class DatabricksParameters : SparkParameters /// Buffer time in seconds before URL expiration to trigger refresh. /// Default value is 60 seconds if not specified. /// + /// + /// Timeout in minutes for individual CloudFetch body reads. + /// When HttpCompletionOption.ResponseHeadersRead is used, HttpClient.Timeout may not + /// protect the body read on all runtimes. This provides an explicit cancellation timeout + /// on each body download to prevent permanent hangs on dead TCP connections. + /// Default value is 15 minutes if not specified. + /// + public const string CloudFetchBodyReadTimeoutMinutes = "adbc.databricks.cloudfetch.body_read_timeout_minutes"; + public const string CloudFetchUrlExpirationBufferSeconds = "adbc.databricks.cloudfetch.url_expiration_buffer_seconds"; /// @@ -426,6 +435,11 @@ public class DatabricksConstants /// public const int DefaultCloudFetchTimeoutMinutes = 5; + /// + /// Default timeout in minutes for individual CloudFetch body reads. + /// + public const int DefaultCloudFetchBodyReadTimeoutMinutes = 15; + /// /// OAuth grant type constants /// diff --git a/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs b/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs index 5de62864..f24aac22 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs @@ -37,6 +37,7 @@ internal sealed class CloudFetchConfiguration internal const int DefaultRetryDelayMs = 500; internal const int DefaultMaxUrlRefreshAttempts = 3; internal const int DefaultUrlExpirationBufferSeconds = 60; + internal const int DefaultBodyReadTimeoutMinutes = 15; /// /// Number of parallel downloads to perform. @@ -78,6 +79,13 @@ internal sealed class CloudFetchConfiguration /// public int UrlExpirationBufferSeconds { get; set; } = DefaultUrlExpirationBufferSeconds; + /// + /// Timeout in minutes for individual body reads. + /// Provides explicit cancellation on body downloads to prevent permanent hangs + /// on dead TCP connections where HttpClient.Timeout may not propagate. + /// + public int BodyReadTimeoutMinutes { get; set; } = DefaultBodyReadTimeoutMinutes; + /// /// Whether the result data is LZ4 compressed. /// @@ -127,7 +135,8 @@ public static CloudFetchConfiguration FromProperties( MaxRetries = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxRetries, DefaultMaxRetries), RetryDelayMs = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryDelayMs, DefaultRetryDelayMs), MaxUrlRefreshAttempts = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, DefaultMaxUrlRefreshAttempts), - UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds) + UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds), + BodyReadTimeoutMinutes = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchBodyReadTimeoutMinutes, DefaultBodyReadTimeoutMinutes) }; return config; diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 4f1a84a6..5b838c7e 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -54,6 +54,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader private readonly int _retryDelayMs; private readonly int _maxUrlRefreshAttempts; private readonly int _urlExpirationBufferSeconds; + private readonly int _bodyReadTimeoutMinutes; private readonly SemaphoreSlim _downloadSemaphore; private readonly RecyclableMemoryStreamManager? _memoryStreamManager; private readonly ArrayPool? _lz4BufferPool; @@ -97,6 +98,7 @@ public CloudFetchDownloader( _retryDelayMs = config.RetryDelayMs; _maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts; _urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds; + _bodyReadTimeoutMinutes = config.BodyReadTimeoutMinutes; _memoryStreamManager = config.MemoryStreamManager; _lz4BufferPool = config.Lz4BufferPool; _downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads); @@ -141,6 +143,7 @@ internal CloudFetchDownloader( _retryDelayMs = retryDelayMs; _maxUrlRefreshAttempts = CloudFetchConfiguration.DefaultMaxUrlRefreshAttempts; _urlExpirationBufferSeconds = CloudFetchConfiguration.DefaultUrlExpirationBufferSeconds; + _bodyReadTimeoutMinutes = CloudFetchConfiguration.DefaultBodyReadTimeoutMinutes; _memoryStreamManager = null; _lz4BufferPool = null; _downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads); @@ -558,8 +561,29 @@ await _activityTracer.TraceActivityAsync(async activity => ]); } - // Read the file data - fileData = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false); + // Read the file data with a 15-minute timeout. + // ReadAsByteArrayAsync() on net472 has no CancellationToken overload, + // and HttpClient.Timeout may not propagate to body reads on all runtimes + // (e.g., Mono) when HttpCompletionOption.ResponseHeadersRead is used. + // Using CopyToAsync with an explicit token ensures dead TCP connections + // are detected on every 81920-byte chunk read. + using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_bodyReadTimeoutMinutes)); +#if NET5_0_OR_GREATER + fileData = await response.Content.ReadAsByteArrayAsync(bodyTimeoutCts.Token).ConfigureAwait(false); +#else + using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) + { + int capacity = contentLength.HasValue && contentLength.Value > 0 + ? (int)Math.Min(contentLength.Value, int.MaxValue) + : 0; + var memoryStream = new MemoryStream(capacity); + await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); + fileData = memoryStream.ToArray(); + } +#endif + } break; // Success, exit retry loop } catch (Exception ex) when (retry < _maxRetries - 1 && !cancellationToken.IsCancellationRequested) From 84435a6db4de10a76bc96d4c52861ee866125392 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sat, 28 Mar 2026 12:45:37 +0530 Subject: [PATCH 02/12] =?UTF-8?q?fix(csharp):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20reuse=20CloudFetchTimeoutMinutes,=20remove=20capaci?= =?UTF-8?q?ty=20hint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reuse existing `adbc.databricks.cloudfetch.timeout_minutes` (default 5 min) for body read timeout instead of introducing a new parameter - Remove MemoryStream capacity pre-allocation to match .NET 5+ behavior (start empty, grow dynamically) — eliminates the int.MaxValue issue - Add `using` on MemoryStream for cleanliness Co-authored-by: Isaac --- csharp/src/DatabricksParameters.cs | 14 -------------- .../CloudFetch/CloudFetchConfiguration.cs | 11 +---------- .../Reader/CloudFetch/CloudFetchDownloader.cs | 17 +++++++---------- 3 files changed, 8 insertions(+), 34 deletions(-) diff --git a/csharp/src/DatabricksParameters.cs b/csharp/src/DatabricksParameters.cs index 0dfdf738..b4587ec2 100644 --- a/csharp/src/DatabricksParameters.cs +++ b/csharp/src/DatabricksParameters.cs @@ -73,15 +73,6 @@ public class DatabricksParameters : SparkParameters /// Buffer time in seconds before URL expiration to trigger refresh. /// Default value is 60 seconds if not specified. /// - /// - /// Timeout in minutes for individual CloudFetch body reads. - /// When HttpCompletionOption.ResponseHeadersRead is used, HttpClient.Timeout may not - /// protect the body read on all runtimes. This provides an explicit cancellation timeout - /// on each body download to prevent permanent hangs on dead TCP connections. - /// Default value is 15 minutes if not specified. - /// - public const string CloudFetchBodyReadTimeoutMinutes = "adbc.databricks.cloudfetch.body_read_timeout_minutes"; - public const string CloudFetchUrlExpirationBufferSeconds = "adbc.databricks.cloudfetch.url_expiration_buffer_seconds"; /// @@ -435,11 +426,6 @@ public class DatabricksConstants /// public const int DefaultCloudFetchTimeoutMinutes = 5; - /// - /// Default timeout in minutes for individual CloudFetch body reads. - /// - public const int DefaultCloudFetchBodyReadTimeoutMinutes = 15; - /// /// OAuth grant type constants /// diff --git a/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs b/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs index f24aac22..5de62864 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs @@ -37,7 +37,6 @@ internal sealed class CloudFetchConfiguration internal const int DefaultRetryDelayMs = 500; internal const int DefaultMaxUrlRefreshAttempts = 3; internal const int DefaultUrlExpirationBufferSeconds = 60; - internal const int DefaultBodyReadTimeoutMinutes = 15; /// /// Number of parallel downloads to perform. @@ -79,13 +78,6 @@ internal sealed class CloudFetchConfiguration /// public int UrlExpirationBufferSeconds { get; set; } = DefaultUrlExpirationBufferSeconds; - /// - /// Timeout in minutes for individual body reads. - /// Provides explicit cancellation on body downloads to prevent permanent hangs - /// on dead TCP connections where HttpClient.Timeout may not propagate. - /// - public int BodyReadTimeoutMinutes { get; set; } = DefaultBodyReadTimeoutMinutes; - /// /// Whether the result data is LZ4 compressed. /// @@ -135,8 +127,7 @@ public static CloudFetchConfiguration FromProperties( MaxRetries = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxRetries, DefaultMaxRetries), RetryDelayMs = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryDelayMs, DefaultRetryDelayMs), MaxUrlRefreshAttempts = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, DefaultMaxUrlRefreshAttempts), - UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds), - BodyReadTimeoutMinutes = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchBodyReadTimeoutMinutes, DefaultBodyReadTimeoutMinutes) + UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds) }; return config; diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 5b838c7e..5feadd1f 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -54,7 +54,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader private readonly int _retryDelayMs; private readonly int _maxUrlRefreshAttempts; private readonly int _urlExpirationBufferSeconds; - private readonly int _bodyReadTimeoutMinutes; + private readonly int _timeoutMinutes; private readonly SemaphoreSlim _downloadSemaphore; private readonly RecyclableMemoryStreamManager? _memoryStreamManager; private readonly ArrayPool? _lz4BufferPool; @@ -98,7 +98,7 @@ public CloudFetchDownloader( _retryDelayMs = config.RetryDelayMs; _maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts; _urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds; - _bodyReadTimeoutMinutes = config.BodyReadTimeoutMinutes; + _timeoutMinutes = config.TimeoutMinutes; _memoryStreamManager = config.MemoryStreamManager; _lz4BufferPool = config.Lz4BufferPool; _downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads); @@ -143,7 +143,7 @@ internal CloudFetchDownloader( _retryDelayMs = retryDelayMs; _maxUrlRefreshAttempts = CloudFetchConfiguration.DefaultMaxUrlRefreshAttempts; _urlExpirationBufferSeconds = CloudFetchConfiguration.DefaultUrlExpirationBufferSeconds; - _bodyReadTimeoutMinutes = CloudFetchConfiguration.DefaultBodyReadTimeoutMinutes; + _timeoutMinutes = CloudFetchConfiguration.DefaultTimeoutMinutes; _memoryStreamManager = null; _lz4BufferPool = null; _downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads); @@ -561,24 +561,21 @@ await _activityTracer.TraceActivityAsync(async activity => ]); } - // Read the file data with a 15-minute timeout. + // Read the file data with an explicit timeout matching HttpClient.Timeout. // ReadAsByteArrayAsync() on net472 has no CancellationToken overload, // and HttpClient.Timeout may not propagate to body reads on all runtimes - // (e.g., Mono) when HttpCompletionOption.ResponseHeadersRead is used. + // when HttpCompletionOption.ResponseHeadersRead is used. // Using CopyToAsync with an explicit token ensures dead TCP connections // are detected on every 81920-byte chunk read. using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { - bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_bodyReadTimeoutMinutes)); + bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_timeoutMinutes)); #if NET5_0_OR_GREATER fileData = await response.Content.ReadAsByteArrayAsync(bodyTimeoutCts.Token).ConfigureAwait(false); #else using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) + using (var memoryStream = new MemoryStream()) { - int capacity = contentLength.HasValue && contentLength.Value > 0 - ? (int)Math.Min(contentLength.Value, int.MaxValue) - : 0; - var memoryStream = new MemoryStream(capacity); await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); fileData = memoryStream.ToArray(); } From f32bfaf0bf3ef01113d08a4f320db2a1ba421406 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sun, 29 Mar 2026 14:05:15 +0530 Subject: [PATCH 03/12] =?UTF-8?q?fix(csharp):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20bump=20default=20to=2015min,=20fix=20timeout=20canc?= =?UTF-8?q?ellation=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bump DefaultCloudFetchTimeoutMinutes from 5 to 15. HttpClient.Timeout only covers SendAsync (headers) when ResponseHeadersRead is used; body reads are unprotected. 15 min is safer for both phases, especially on slower networks. - Fix OperationCanceledException from body timeout not being handled by download continuation. Convert body timeout cancellation into TimeoutException so it propagates as a fault (t.IsFaulted), not a cancellation (t.IsCanceled) which would leave DownloadCompletedTask never completing and hang the reader. - Update comments to clarify why body reads need explicit timeout protection. Co-authored-by: Isaac --- csharp/src/DatabricksParameters.cs | 5 ++++- .../src/Reader/CloudFetch/CloudFetchDownloader.cs | 15 ++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/csharp/src/DatabricksParameters.cs b/csharp/src/DatabricksParameters.cs index b4587ec2..0349040c 100644 --- a/csharp/src/DatabricksParameters.cs +++ b/csharp/src/DatabricksParameters.cs @@ -423,8 +423,11 @@ public class DatabricksConstants /// /// Default timeout in minutes for CloudFetch HTTP operations. + /// Covers both HttpClient.Timeout (SendAsync/headers) and body read timeout. + /// Set to 15 minutes because ReadAsByteArrayAsync is a separate call from SendAsync + /// and HttpClient.Timeout does not protect body reads when ResponseHeadersRead is used. /// - public const int DefaultCloudFetchTimeoutMinutes = 5; + public const int DefaultCloudFetchTimeoutMinutes = 15; /// /// OAuth grant type constants diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 5feadd1f..50e6b2d4 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -561,10 +561,12 @@ await _activityTracer.TraceActivityAsync(async activity => ]); } - // Read the file data with an explicit timeout matching HttpClient.Timeout. + // Read the file data with an explicit timeout. // ReadAsByteArrayAsync() on net472 has no CancellationToken overload, - // and HttpClient.Timeout may not propagate to body reads on all runtimes - // when HttpCompletionOption.ResponseHeadersRead is used. + // and HttpClient.Timeout does not protect body reads when + // HttpCompletionOption.ResponseHeadersRead is used — SendAsync returns + // after headers, and the subsequent body read is a separate call on + // HttpContent with no timeout coverage. // Using CopyToAsync with an explicit token ensures dead TCP connections // are detected on every 81920-byte chunk read. using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) @@ -583,6 +585,13 @@ await _activityTracer.TraceActivityAsync(async activity => } break; // Success, exit retry loop } + catch (OperationCanceledException oce) when (!cancellationToken.IsCancellationRequested) + { + // Body read timeout (from the linked CTS) fired, not a real cancellation. + // Convert to a fault so the download continuation handles it correctly — + // otherwise t.IsCanceled leaves DownloadCompletedTask never completing. + throw new TimeoutException($"CloudFetch body read timed out after {_timeoutMinutes} minutes for offset {downloadResult.StartRowOffset}.", oce); + } catch (Exception ex) when (retry < _maxRetries - 1 && !cancellationToken.IsCancellationRequested) { // Log the error and retry From 741430f6f80fc74f4edd8cc161daafa959e6e389 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sun, 29 Mar 2026 14:07:31 +0530 Subject: [PATCH 04/12] fix(csharp): restore MemoryStream pre-allocation with 100MB cap Pre-allocate MemoryStream with Content-Length when available, matching .NET 5+'s LimitArrayPoolWriteStream which also sizes from Content-Length. Cap at 100MB (CloudFetch chunks are ~20MB max) to avoid int.MaxValue allocation failures while keeping allocation efficiency. Co-authored-by: Isaac --- csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 50e6b2d4..c2bd6a2c 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -576,8 +576,15 @@ await _activityTracer.TraceActivityAsync(async activity => fileData = await response.Content.ReadAsByteArrayAsync(bodyTimeoutCts.Token).ConfigureAwait(false); #else using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) - using (var memoryStream = new MemoryStream()) { + // Pre-allocate with Content-Length when available, matching .NET 5+'s + // LimitArrayPoolWriteStream which also sizes from Content-Length internally. + // Cap at 100MB to avoid int.MaxValue allocation failures. + const int MaxPreAllocBytes = 100 * 1024 * 1024; + int capacity = contentLength.HasValue && contentLength.Value > 0 + ? (int)Math.Min(contentLength.Value, MaxPreAllocBytes) + : 0; + var memoryStream = new MemoryStream(capacity); await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); fileData = memoryStream.ToArray(); } From 6748d233f5149e22a5636dfabd8dd988f5d5a888 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sun, 29 Mar 2026 23:08:47 +0530 Subject: [PATCH 05/12] bench(csharp): add store_sales SF10 benchmark query (28.8M rows) Add large-scale benchmark for JDBC comparison: SELECT * FROM main.tpcds_sf10_delta.store_sales (28.8M rows, 23 columns). Co-authored-by: Isaac --- csharp/Benchmarks/benchmark-queries.json | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/csharp/Benchmarks/benchmark-queries.json b/csharp/Benchmarks/benchmark-queries.json index debb60e6..45a180c7 100644 --- a/csharp/Benchmarks/benchmark-queries.json +++ b/csharp/Benchmarks/benchmark-queries.json @@ -1,4 +1,12 @@ [ + { + "name": "store_sales_sf10", + "description": "Store sales SF10 - 28.8M rows, 23 columns (large scale for JDBC comparison)", + "query": "select * from main.tpcds_sf10_delta.store_sales", + "expected_rows": 28800501, + "columns": 23, + "category": "large-wide" + }, { "name": "catalog_sales", "description": "Catalog sales - 1.4M rows, 34 columns (current default)", From 052d9a5d8e0d731ff7ba34a3e7c39f3288936847 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Sun, 29 Mar 2026 23:54:03 +0530 Subject: [PATCH 06/12] fix(csharp): rename oce variable to fix codespell lint failure Co-authored-by: Isaac --- csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 88008c33..5e0cb0f5 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -619,12 +619,12 @@ await _activityTracer.TraceActivityAsync(async activity => } break; // Success, exit retry loop } - catch (OperationCanceledException oce) when (!cancellationToken.IsCancellationRequested) + catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested) { // Body read timeout (from the linked CTS) fired, not a real cancellation. // Convert to a fault so the download continuation handles it correctly — // otherwise t.IsCanceled leaves DownloadCompletedTask never completing. - throw new TimeoutException($"CloudFetch body read timed out after {_timeoutMinutes} minutes for offset {downloadResult.StartRowOffset}.", oce); + throw new TimeoutException($"CloudFetch body read timed out after {_timeoutMinutes} minutes for offset {downloadResult.StartRowOffset}.", ex); } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { From 4fcead33b07c077f6ad7ae05fd4febc4eee95689 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 30 Mar 2026 22:03:26 +0530 Subject: [PATCH 07/12] =?UTF-8?q?fix(csharp):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20remove=20#if,=20add=20using,=20remove=20benchmark?= =?UTF-8?q?=20query?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove #if NET5_0_OR_GREATER — use CopyToAsync with CancellationToken for all runtimes, not just net472 (Jade's feedback) - Add using on MemoryStream (Eric's feedback) - Remove store_sales_sf10 benchmark query — belongs in PR #376 (Curt's feedback) Co-authored-by: Isaac --- csharp/Benchmarks/benchmark-queries.json | 8 -------- .../Reader/CloudFetch/CloudFetchDownloader.cs | 17 +++++++---------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/csharp/Benchmarks/benchmark-queries.json b/csharp/Benchmarks/benchmark-queries.json index 45a180c7..debb60e6 100644 --- a/csharp/Benchmarks/benchmark-queries.json +++ b/csharp/Benchmarks/benchmark-queries.json @@ -1,12 +1,4 @@ [ - { - "name": "store_sales_sf10", - "description": "Store sales SF10 - 28.8M rows, 23 columns (large scale for JDBC comparison)", - "query": "select * from main.tpcds_sf10_delta.store_sales", - "expected_rows": 28800501, - "columns": 23, - "category": "large-wide" - }, { "name": "catalog_sales", "description": "Catalog sales - 1.4M rows, 34 columns (current default)", diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 5e0cb0f5..311d61f8 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -599,23 +599,20 @@ await _activityTracer.TraceActivityAsync(async activity => using (var bodyTimeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { bodyTimeoutCts.CancelAfter(TimeSpan.FromMinutes(_timeoutMinutes)); -#if NET5_0_OR_GREATER - fileData = await response.Content.ReadAsByteArrayAsync(bodyTimeoutCts.Token).ConfigureAwait(false); -#else using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) { - // Pre-allocate with Content-Length when available, matching .NET 5+'s - // LimitArrayPoolWriteStream which also sizes from Content-Length internally. - // Cap at 100MB to avoid int.MaxValue allocation failures. + // Pre-allocate with Content-Length when available (CloudFetch always provides it). + // Cap at 100MB to avoid excessive memory pressure on constrained hosts. const int MaxPreAllocBytes = 100 * 1024 * 1024; int capacity = contentLength.HasValue && contentLength.Value > 0 ? (int)Math.Min(contentLength.Value, MaxPreAllocBytes) : 0; - var memoryStream = new MemoryStream(capacity); - await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); - fileData = memoryStream.ToArray(); + using (var memoryStream = new MemoryStream(capacity)) + { + await contentStream.CopyToAsync(memoryStream, 81920, bodyTimeoutCts.Token).ConfigureAwait(false); + fileData = memoryStream.ToArray(); + } } -#endif } break; // Success, exit retry loop } From 057aa11b9be1e59febd494a4368516a0eb635403 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 30 Mar 2026 22:44:22 +0530 Subject: [PATCH 08/12] feat(csharp): add telemetry for successful heartbeat polls Log operation_status_poller.poll_success with poll_count and operation_state on each successful GetOperationStatus call. Co-authored-by: Isaac --- csharp/src/Reader/DatabricksOperationStatusPoller.cs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index d5834c11..36f55b3a 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -108,6 +108,13 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) // Track poll count for telemetry _pollCount++; + Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_success", + tags: new ActivityTagsCollection + { + { "poll_count", _pollCount }, + { "operation_state", response.OperationState.ToString() } + })); + // end the heartbeat if the command has terminated if (response.OperationState == TOperationState.CANCELED_STATE || response.OperationState == TOperationState.ERROR_STATE || From 97647646e55f607cc8328e5f0fa504ee93bcb910 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Mon, 30 Mar 2026 23:55:23 +0530 Subject: [PATCH 09/12] fix(csharp): address PR review + fix poller telemetry CloudFetchDownloader: - Remove 100MB pre-allocation cap, use Content-Length directly - Add cloudfetch.content_length_missing trace event when Content-Length absent Poller telemetry fix: - Activity.Current is null inside Task.Run (AsyncLocal doesn't propagate) - Pass IActivityTracer from DatabricksCompositeReader to poller - Wrap poll loop in TraceActivityAsync to create proper Activity span - All poll_success, poll_error, max_failures_reached events now appear in adbcfile trace logs (verified) Co-authored-by: Isaac --- .../Reader/CloudFetch/CloudFetchDownloader.cs | 13 ++++++--- .../src/Reader/DatabricksCompositeReader.cs | 2 +- .../Reader/DatabricksOperationStatusPoller.cs | 28 +++++++++++++++---- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 311d61f8..8f38e3fc 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -576,7 +576,7 @@ await _activityTracer.TraceActivityAsync(async activity => response.EnsureSuccessStatusCode(); - // Log the download size if available from response headers + // Log the download size from response headers long? contentLength = response.Content.Headers.ContentLength; if (contentLength.HasValue && contentLength.Value > 0) { @@ -587,6 +587,13 @@ await _activityTracer.TraceActivityAsync(async activity => new("content_length_mb", contentLength.Value / 1024.0 / 1024.0) ]); } + else + { + activity?.AddEvent("cloudfetch.content_length_missing", [ + new("offset", downloadResult.StartRowOffset), + new("sanitized_url", sanitizedUrl) + ]); + } // Read the file data with an explicit timeout. // ReadAsByteArrayAsync() on net472 has no CancellationToken overload, @@ -602,10 +609,8 @@ await _activityTracer.TraceActivityAsync(async activity => using (var contentStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) { // Pre-allocate with Content-Length when available (CloudFetch always provides it). - // Cap at 100MB to avoid excessive memory pressure on constrained hosts. - const int MaxPreAllocBytes = 100 * 1024 * 1024; int capacity = contentLength.HasValue && contentLength.Value > 0 - ? (int)Math.Min(contentLength.Value, MaxPreAllocBytes) + ? (int)contentLength.Value : 0; using (var memoryStream = new MemoryStream(capacity)) { diff --git a/csharp/src/Reader/DatabricksCompositeReader.cs b/csharp/src/Reader/DatabricksCompositeReader.cs index f60ac003..4f1f932a 100644 --- a/csharp/src/Reader/DatabricksCompositeReader.cs +++ b/csharp/src/Reader/DatabricksCompositeReader.cs @@ -89,7 +89,7 @@ internal DatabricksCompositeReader( } if (_response.DirectResults?.ResultSet?.HasMoreRows ?? true) { - operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection()); + operationStatusPoller = operationPoller ?? new DatabricksOperationStatusPoller(_statement, response, GetHeartbeatIntervalFromConnection(), GetRequestTimeoutFromConnection(), activityTracer: this); operationStatusPoller.Start(); } } diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 36f55b3a..4c85524e 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -28,6 +28,7 @@ using AdbcDrivers.Databricks.Telemetry.TagDefinitions; using AdbcDrivers.HiveServer2; using AdbcDrivers.HiveServer2.Hive2; +using Apache.Arrow.Adbc.Tracing; using Apache.Hive.Service.Rpc.Thrift; namespace AdbcDrivers.Databricks.Reader @@ -42,6 +43,7 @@ internal class DatabricksOperationStatusPoller : IOperationStatusPoller private readonly int _heartbeatIntervalSeconds; private readonly int _requestTimeoutSeconds; private readonly IResponse _response; + private readonly IActivityTracer? _activityTracer; // internal cancellation token source - won't affect the external token private CancellationTokenSource? _internalCts; private Task? _operationStatusPollingTask; @@ -58,12 +60,14 @@ public DatabricksOperationStatusPoller( IHiveServer2Statement statement, IResponse response, int heartbeatIntervalSeconds = DatabricksConstants.DefaultOperationStatusPollingIntervalSeconds, - int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds) + int requestTimeoutSeconds = DatabricksConstants.DefaultOperationStatusRequestTimeoutSeconds, + IActivityTracer? activityTracer = null) { _statement = statement ?? throw new ArgumentNullException(nameof(statement)); _response = response; _heartbeatIntervalSeconds = heartbeatIntervalSeconds; _requestTimeoutSeconds = requestTimeoutSeconds; + _activityTracer = activityTracer; } public bool IsStarted => _operationStatusPollingTask != null; @@ -86,6 +90,20 @@ public void Start(CancellationToken externalToken = default) } private async Task PollOperationStatus(CancellationToken cancellationToken) + { + if (_activityTracer == null) + { + await PollOperationStatusCore(null, cancellationToken).ConfigureAwait(false); + } + else + { + await _activityTracer.Trace.TraceActivityAsync( + activity => PollOperationStatusCore(activity, cancellationToken), + activityName: "PollOperationStatus").ConfigureAwait(false); + } + } + + private async Task PollOperationStatusCore(Activity? activity, CancellationToken cancellationToken) { int consecutiveFailures = 0; @@ -108,7 +126,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) // Track poll count for telemetry _pollCount++; - Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_success", + activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_success", tags: new ActivityTagsCollection { { "poll_count", _pollCount }, @@ -137,7 +155,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) // Log the error but continue polling. Transient errors (e.g. ObjectDisposedException // from TLS connection recycling) should not kill the heartbeat poller, as that would // cause the server-side command inactivity timeout to expire and terminate the query. - Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.poll_error", + activity?.AddEvent(new ActivityEvent("operation_status_poller.poll_error", tags: new ActivityTagsCollection { { "error.type", ex.GetType().Name }, @@ -148,7 +166,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) if (consecutiveFailures >= MaxConsecutiveFailures) { - Activity.Current?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached", + activity?.AddEvent(new ActivityEvent("operation_status_poller.max_failures_reached", tags: new ActivityTagsCollection { { "consecutive_failures", consecutiveFailures }, @@ -164,7 +182,7 @@ private async Task PollOperationStatus(CancellationToken cancellationToken) } // Add telemetry tags to current activity when polling completes - Activity.Current?.SetTag(StatementExecutionEvent.PollCount, _pollCount); + activity?.SetTag(StatementExecutionEvent.PollCount, _pollCount); } public void Stop() From 4efc5d8479d3d8998913d272702d72401440cbb6 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Tue, 31 Mar 2026 00:02:56 +0530 Subject: [PATCH 10/12] fix(csharp): catch cancellation in poller delay to prevent false error trace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Task.Delay throws OperationCanceledException on normal shutdown. Without catching it inside PollOperationStatusCore, TraceActivityAsync logs it as an error and sets ActivityStatusCode.Error — making normal shutdown look like a failure in trace files. Catch and break instead. Co-authored-by: Isaac --- .../src/Reader/DatabricksOperationStatusPoller.cs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 4c85524e..74b0afee 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -176,9 +176,16 @@ private async Task PollOperationStatusCore(Activity? activity, CancellationToken } } - // Wait before next poll. On cancellation this throws OperationCanceledException - // which propagates up to the caller (Dispose catches it). - await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false); + // Wait before next poll. + try + { + await Task.Delay(TimeSpan.FromSeconds(_heartbeatIntervalSeconds), cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Normal shutdown — don't let this propagate as an error through TraceActivityAsync + break; + } } // Add telemetry tags to current activity when polling completes From a964fff03221279e4146c33535666e4860a2b7b6 Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Tue, 31 Mar 2026 00:10:08 +0530 Subject: [PATCH 11/12] refactor(csharp): merge PollOperationStatusCore into PollOperationStatus Move TraceActivityAsync dispatch to Start() and use single method with Activity? parameter. No behavior change. Co-authored-by: Isaac --- .../Reader/DatabricksOperationStatusPoller.cs | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/csharp/src/Reader/DatabricksOperationStatusPoller.cs b/csharp/src/Reader/DatabricksOperationStatusPoller.cs index 74b0afee..6a4334b8 100644 --- a/csharp/src/Reader/DatabricksOperationStatusPoller.cs +++ b/csharp/src/Reader/DatabricksOperationStatusPoller.cs @@ -86,24 +86,22 @@ public void Start(CancellationToken externalToken = default) _internalCts = new CancellationTokenSource(); // create a linked token to the external token so that the external token can cancel the operation status polling task if needed var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(_internalCts.Token, externalToken).Token; - _operationStatusPollingTask = Task.Run(() => PollOperationStatus(linkedToken)); - } - - private async Task PollOperationStatus(CancellationToken cancellationToken) - { - if (_activityTracer == null) - { - await PollOperationStatusCore(null, cancellationToken).ConfigureAwait(false); - } - else + _operationStatusPollingTask = Task.Run(async () => { - await _activityTracer.Trace.TraceActivityAsync( - activity => PollOperationStatusCore(activity, cancellationToken), - activityName: "PollOperationStatus").ConfigureAwait(false); - } + if (_activityTracer != null) + { + await _activityTracer.Trace.TraceActivityAsync( + activity => PollOperationStatus(linkedToken, activity), + activityName: "PollOperationStatus").ConfigureAwait(false); + } + else + { + await PollOperationStatus(linkedToken, null).ConfigureAwait(false); + } + }); } - private async Task PollOperationStatusCore(Activity? activity, CancellationToken cancellationToken) + private async Task PollOperationStatus(CancellationToken cancellationToken, Activity? activity) { int consecutiveFailures = 0; From b20b38d436143056d0524bca718b50cb86d07e7a Mon Sep 17 00:00:00 2001 From: Madhavendra Rathore Date: Tue, 31 Mar 2026 01:06:00 +0530 Subject: [PATCH 12/12] fix(csharp): remove separate OperationCanceledException catch to allow retry The body read timeout fires OperationCanceledException from the linked CTS. The separate catch was converting it to TimeoutException and throwing it outside the retry loop, preventing retries. The general catch (Exception ex) already handles this correctly since cancellationToken.IsCancellationRequested is false (only the linked body timeout CTS cancelled, not the parent token). After all retries exhaust, InvalidOperationException is thrown (a fault), so the download continuation handles it via t.IsFaulted correctly. Fixes TimeoutRetryWithBackoff integration test. Co-authored-by: Isaac --- csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs index 8f38e3fc..4fad9cae 100644 --- a/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs @@ -621,13 +621,6 @@ await _activityTracer.TraceActivityAsync(async activity => } break; // Success, exit retry loop } - catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested) - { - // Body read timeout (from the linked CTS) fired, not a real cancellation. - // Convert to a fault so the download continuation handles it correctly — - // otherwise t.IsCanceled leaves DownloadCompletedTask never completing. - throw new TimeoutException($"CloudFetch body read timed out after {_timeoutMinutes} minutes for offset {downloadResult.StartRowOffset}.", ex); - } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { lastException = ex;