Skip to content

Commit f9fe389

Browse files
committed
feat(csharp): optimize CloudFetch defaults for JDBC parity (~32% throughput improvement)
Tune CloudFetch pipeline defaults to match JDBC driver performance patterns: - Increase ParallelDownloads from 3 to 16 (JDBC uses 16 threads) - Increase PrefetchCount from 2 to 16 (resultQueue 4→32, the implicit sliding window) - Increase MemoryBufferSizeMB from 200 to 400 (supports higher parallelism) - Add LinkPrefetchWindowSize=128 (downloadQueue capacity, matches JDBC's LinkPrefetchWindow) - Replace memory polling (Task.Delay 10ms) with async SemaphoreSlim signaling - Add QuickValidation benchmark tool and catalog_sales_sf10 query Benchmark results (catalog_sales SF10, 14.4M rows, 34 columns): Baseline: ~57K rows/sec average Optimized: ~75K rows/sec average (~32% improvement) Best run: 93K rows/sec vs baseline 70K rows/sec Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 80a24d2 commit f9fe389

File tree

9 files changed

+162
-15
lines changed

9 files changed

+162
-15
lines changed

csharp/Benchmarks/CloudFetchBenchmarkRunner.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ public static void Main(string[] args)
4343
// Enable TLS 1.2/1.3 for .NET Framework 4.7.2 (required for modern HTTPS endpoints)
4444
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12 | SecurityProtocolType.Tls11 | (SecurityProtocolType)3072; // 3072 = Tls13
4545
#endif
46+
// Quick validation mode: run a single query to verify CloudFetch correctness
47+
if (args.Length > 0 && args[0] == "--validate")
48+
{
49+
QuickValidation.RunValidation();
50+
return;
51+
}
52+
4653
// Configure with custom columns for CloudFetch-specific metrics + built-in GC columns
4754
var config = DefaultConfig.Instance
4855
.AddColumn(new ColumnsColumn())
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Quick validation script for CloudFetch parity changes.
3+
* Runs catalog_returns SF10 query and reports row count + timing.
4+
*/
5+
6+
using System;
7+
using System.Collections.Generic;
8+
using System.Diagnostics;
9+
using AdbcDrivers.Databricks;
10+
using AdbcDrivers.HiveServer2.Spark;
11+
12+
namespace Apache.Arrow.Adbc.Benchmarks
13+
{
14+
public class QuickValidation
15+
{
16+
public static void RunValidation()
17+
{
18+
string uri = Environment.GetEnvironmentVariable("DATABRICKS_URI")
19+
?? throw new InvalidOperationException("Set DATABRICKS_URI env var (e.g. https://host/sql/1.0/warehouses/id)");
20+
string token = Environment.GetEnvironmentVariable("DATABRICKS_TOKEN")
21+
?? throw new InvalidOperationException("Set DATABRICKS_TOKEN env var");
22+
string query = Environment.GetEnvironmentVariable("DATABRICKS_QUERY")
23+
?? "SELECT * FROM main.tpcds_sf10_delta.catalog_sales";
24+
long expectedRows = long.Parse(
25+
Environment.GetEnvironmentVariable("DATABRICKS_EXPECTED_ROWS") ?? "14400425");
26+
27+
Console.WriteLine("=== ADBC CloudFetch Validation ===");
28+
Console.WriteLine($"Query: {query}");
29+
Console.WriteLine($"Expected rows: {expectedRows}");
30+
31+
var parameters = new Dictionary<string, string>
32+
{
33+
[AdbcOptions.Uri] = uri,
34+
[SparkParameters.Token] = token,
35+
[DatabricksParameters.UseCloudFetch] = "true",
36+
[DatabricksParameters.EnableDirectResults] = "true",
37+
[DatabricksParameters.CanDecompressLz4] = "true",
38+
};
39+
40+
var driver = new DatabricksDriver();
41+
using var database = driver.Open(parameters);
42+
using var connection = database.Connect(parameters);
43+
using var statement = connection.CreateStatement();
44+
45+
statement.SqlQuery = query;
46+
47+
var sw = Stopwatch.StartNew();
48+
var queryResult = statement.ExecuteQuery();
49+
50+
long totalRows = 0;
51+
int totalBatches = 0;
52+
long ttfrMs = 0;
53+
54+
using (var reader = queryResult.Stream)
55+
{
56+
while (true)
57+
{
58+
var batch = reader.ReadNextRecordBatchAsync().Result;
59+
if (batch == null) break;
60+
61+
totalBatches++;
62+
totalRows += batch.Length;
63+
64+
if (totalBatches == 1)
65+
{
66+
ttfrMs = sw.ElapsedMilliseconds;
67+
Console.WriteLine($"TTFR_MS={ttfrMs}");
68+
}
69+
70+
batch.Dispose();
71+
}
72+
}
73+
74+
sw.Stop();
75+
76+
Console.WriteLine($"TOTAL_ROWS={totalRows}");
77+
Console.WriteLine($"TOTAL_BATCHES={totalBatches}");
78+
Console.WriteLine($"TOTAL_TIME_MS={sw.ElapsedMilliseconds}");
79+
Console.WriteLine($"ROWS_PER_SEC={totalRows / (sw.ElapsedMilliseconds / 1000.0):F0}");
80+
81+
if (totalRows != expectedRows)
82+
{
83+
Console.Error.WriteLine($"ROW COUNT MISMATCH! Expected {expectedRows}, got {totalRows}");
84+
Environment.Exit(1);
85+
}
86+
87+
Console.WriteLine("Row count verified OK.");
88+
}
89+
}
90+
}

csharp/Benchmarks/benchmark-queries.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@
77
"columns": 23,
88
"category": "large-wide"
99
},
10+
{
11+
"name": "catalog_sales_sf10",
12+
"description": "Catalog sales SF10 - 14.4M rows, 34 columns (CloudFetch parity benchmark with JDBC)",
13+
"query": "select * from main.tpcds_sf10_delta.catalog_sales",
14+
"expected_rows": 14400425,
15+
"columns": 34,
16+
"category": "large-wide"
17+
},
1018
{
1119
"name": "catalog_sales",
1220
"description": "Catalog sales - 1.4M rows, 34 columns (current default)",

csharp/src/DatabricksParameters.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ public class DatabricksParameters : SparkParameters
182182
/// </summary>
183183
public const string CloudFetchPrefetchEnabled = "adbc.databricks.cloudfetch.prefetch_enabled";
184184

185+
/// <summary>
186+
/// Size of the link prefetch window — how many chunk links to fetch ahead of downloads.
187+
/// Links are lightweight metadata, so a large window uses minimal memory while ensuring
188+
/// the download pipeline never starves. Matches JDBC's LinkPrefetchWindow=128 default.
189+
/// Default value is 128.
190+
/// </summary>
191+
public const string CloudFetchLinkPrefetchWindowSize = "adbc.databricks.cloudfetch.link_prefetch_window_size";
192+
185193
/// <summary>
186194
/// Maximum bytes per fetch request when retrieving query results from servers.
187195
/// The value can be specified with unit suffixes: B (bytes), KB (kilobytes), MB (megabytes), GB (gigabytes).

csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@ namespace AdbcDrivers.Databricks.Reader.CloudFetch
2828
/// </summary>
2929
internal sealed class CloudFetchConfiguration
3030
{
31-
// Default values
32-
internal const int DefaultParallelDownloads = 3;
33-
internal const int DefaultPrefetchCount = 2;
34-
internal const int DefaultMemoryBufferSizeMB = 200;
31+
// Default values — tuned for CloudFetch parity with JDBC driver.
32+
// ParallelDownloads: HTTP concurrency. 5 is a conservative bump from 3 that
33+
// improves network utilization without excessive TCP/TLS overhead.
34+
// PrefetchCount: controls resultQueue capacity (PrefetchCount * 2). This is
35+
// the implicit sliding window — when full, downloader blocks until reader consumes.
36+
// MemoryBufferSizeMB: byte-level cap on in-flight compressed data.
37+
internal const int DefaultParallelDownloads = 16;
38+
internal const int DefaultPrefetchCount = 16;
39+
internal const int DefaultMemoryBufferSizeMB = 400;
40+
internal const int DefaultLinkPrefetchWindowSize = 128;
3541
internal const int DefaultTimeoutMinutes = 5;
3642
internal const int DefaultMaxRetries = 0; // 0 = no limit (use timeout only)
3743
internal const int DefaultRetryTimeoutSeconds = 300; // 5 minutes
@@ -45,10 +51,18 @@ internal sealed class CloudFetchConfiguration
4551
public int ParallelDownloads { get; set; } = DefaultParallelDownloads;
4652

4753
/// <summary>
48-
/// Number of files to prefetch ahead of the reader.
54+
/// Number of files to prefetch ahead of the reader (controls download window / result queue size).
4955
/// </summary>
5056
public int PrefetchCount { get; set; } = DefaultPrefetchCount;
5157

58+
/// <summary>
59+
/// Size of the link prefetch window — how many chunk links to fetch ahead of downloads.
60+
/// The fetcher runs on a background task and can fetch links far ahead while downloads
61+
/// are paced by memory and download slots. This matches JDBC's LinkPrefetchWindow=128.
62+
/// Links are lightweight metadata (URL + offsets), so a large window uses minimal memory.
63+
/// </summary>
64+
public int LinkPrefetchWindowSize { get; set; } = DefaultLinkPrefetchWindowSize;
65+
5266
/// <summary>
5367
/// Memory buffer size limit in MB for buffered files.
5468
/// </summary>
@@ -151,7 +165,8 @@ public static CloudFetchConfiguration FromProperties(
151165
RetryTimeoutSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryTimeoutSeconds, DefaultRetryTimeoutSeconds),
152166
RetryDelayMs = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryDelayMs, DefaultRetryDelayMs),
153167
MaxUrlRefreshAttempts = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, DefaultMaxUrlRefreshAttempts),
154-
UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds)
168+
UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds),
169+
LinkPrefetchWindowSize = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchLinkPrefetchWindowSize, DefaultLinkPrefetchWindowSize)
155170
};
156171

157172
return config;

csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
5858
private readonly SemaphoreSlim _downloadSemaphore;
5959
private readonly RecyclableMemoryStreamManager? _memoryStreamManager;
6060
private readonly ArrayPool<byte>? _lz4BufferPool;
61+
6162
private Task? _downloadTask;
6263
private CancellationTokenSource? _cancellationTokenSource;
6364
private bool _isCompleted;
@@ -351,7 +352,7 @@ await _activityTracer.TraceActivityAsync(async activity =>
351352
}
352353
}
353354

354-
// Acquire a download slot
355+
// Acquire a download slot (limits HTTP concurrency)
355356
await _downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
356357

357358
// Acquire memory for this download (FIFO - acquired in sequential loop)

csharp/src/Reader/CloudFetch/CloudFetchMemoryBufferManager.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ internal sealed class CloudFetchMemoryBufferManager : ICloudFetchMemoryBufferMan
4141
{
4242
private readonly long _maxMemory;
4343
private long _usedMemory;
44-
private readonly SemaphoreSlim _memorySemaphore;
44+
45+
// Async signal: when memory is released, this semaphore is released to wake
46+
// any tasks blocked in AcquireMemoryAsync. Uses SemaphoreSlim (not ManualResetEventSlim)
47+
// because SemaphoreSlim.WaitAsync() is truly async and doesn't block a ThreadPool thread.
48+
private readonly SemaphoreSlim _memoryReleased;
4549

4650
/// <summary>
4751
/// Initializes a new instance of the <see cref="CloudFetchMemoryBufferManager"/> class.
@@ -57,7 +61,7 @@ public CloudFetchMemoryBufferManager(int maxMemoryMB = CloudFetchConfiguration.D
5761
// Convert MB to bytes
5862
_maxMemory = maxMemoryMB * 1024L * 1024L;
5963
_usedMemory = 0;
60-
_memorySemaphore = new SemaphoreSlim(1, 1);
64+
_memoryReleased = new SemaphoreSlim(0);
6165
}
6266

6367
/// <inheritdoc />
@@ -116,8 +120,10 @@ public async Task AcquireMemoryAsync(long size, CancellationToken cancellationTo
116120
return;
117121
}
118122

119-
// If we couldn't acquire memory, wait for some to be released
120-
await Task.Delay(10, cancellationToken).ConfigureAwait(false);
123+
// Wait for memory to become available via async signal from ReleaseMemory.
124+
// SemaphoreSlim.WaitAsync is truly async — no ThreadPool thread blocked.
125+
// Falls back to 50ms timeout to handle edge cases where signal might race.
126+
await _memoryReleased.WaitAsync(50, cancellationToken).ConfigureAwait(false);
121127
}
122128

123129
// If we get here, cancellation was requested
@@ -142,6 +148,10 @@ public void ReleaseMemory(long size)
142148
Interlocked.Exchange(ref _usedMemory, 0);
143149
throw new InvalidOperationException("Memory buffer manager released more memory than was acquired.");
144150
}
151+
152+
// Signal any tasks blocked in AcquireMemoryAsync that memory is now available.
153+
// SemaphoreSlim.Release() wakes one waiting WaitAsync() call immediately.
154+
_memoryReleased.Release();
145155
}
146156
}
147157
}

csharp/src/Reader/CloudFetch/CloudFetchReaderFactory.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,17 @@ public static CloudFetchReader CreateThriftReader(
7070
config.MemoryStreamManager = connection.RecyclableMemoryStreamManager;
7171
config.Lz4BufferPool = connection.Lz4BufferPool;
7272

73-
// Create shared resources
73+
// Create shared resources.
74+
// The downloadQueue acts as the link prefetch buffer — it can hold up to
75+
// LinkPrefetchWindowSize links (128 by default). Links are lightweight metadata
76+
// (URL + offsets), so a large buffer uses minimal memory while ensuring the
77+
// fetcher can run far ahead of downloads. This matches JDBC's LinkPrefetchWindow=128.
78+
// The resultQueue holds downloaded chunks waiting for the reader, bounded by
79+
// PrefetchCount to control actual memory usage.
7480
var memoryManager = new CloudFetchMemoryBufferManager(config.MemoryBufferSizeMB);
7581
var downloadQueue = new BlockingCollection<IDownloadResult>(
7682
new ConcurrentQueue<IDownloadResult>(),
77-
config.PrefetchCount * 2);
83+
config.LinkPrefetchWindowSize);
7884
var resultQueue = new BlockingCollection<IDownloadResult>(
7985
new ConcurrentQueue<IDownloadResult>(),
8086
config.PrefetchCount * 2);
@@ -166,11 +172,11 @@ public static CloudFetchReader CreateStatementExecutionReader(
166172
config.MemoryStreamManager = memoryStreamManager;
167173
config.Lz4BufferPool = lz4BufferPool;
168174

169-
// Create shared resources
175+
// Create shared resources — same link prefetch window design as Thrift path.
170176
var memoryManager = new CloudFetchMemoryBufferManager(config.MemoryBufferSizeMB);
171177
var downloadQueue = new BlockingCollection<IDownloadResult>(
172178
new ConcurrentQueue<IDownloadResult>(),
173-
config.PrefetchCount * 2);
179+
config.LinkPrefetchWindowSize);
174180
var resultQueue = new BlockingCollection<IDownloadResult>(
175181
new ConcurrentQueue<IDownloadResult>(),
176182
config.PrefetchCount * 2);

csharp/src/Reader/CloudFetch/DownloadResult.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ internal sealed class DownloadResult : IDownloadResult
4343
private DateTime _expirationTime;
4444
private IReadOnlyDictionary<string, string>? _httpHeaders;
4545

46+
4647
/// <summary>
4748
/// Initializes a new instance of the <see cref="DownloadResult"/> class.
4849
/// </summary>
@@ -150,6 +151,7 @@ public Stream DataStream
150151
/// </summary>
151152
public int RefreshAttempts { get; private set; } = 0;
152153

154+
153155
/// <summary>
154156
/// Checks if the URL is expired or about to expire.
155157
/// </summary>

0 commit comments

Comments
 (0)