Skip to content

Commit adf4853

Browse files
feat(csharp): convert cloudfetch retry from limit by # of retries to limit by time (#375)
## Summary - Replace fixed 3-attempt retry with **5-minute time-budget** approach using exponential backoff with jitter (matching RetryHttpHandler's pattern) - Retry all exceptions (502 Bad Gateway from proxy, connection drops, TCP resets, HttpClient.Timeout) — catch-all, not just transport errors - Add new `adbc.databricks.cloudfetch.retry_timeout_seconds` parameter (default 300s / 5 min) - Keep `adbc.databricks.cloudfetch.max_retries` parameter for PowerBI backward compatibility (default -1 = not set). When set, retry loop exits if either max retries or timeout is reached first - Add detailed retry tracing (attempt count, backoff delay, timeout budget remaining) ### Problem CloudFetch downloads used a fixed 3-retry loop with linear backoff (~1.5s total). When a proxy returns 502 or a firewall blocks cloud storage, all 3 retries exhaust within seconds — not enough time for transient issues to resolve. ### Solution Switch to a time-budget approach (same as RetryHttpHandler from #373): keep retrying with exponential backoff + jitter until the 5-minute budget is exhausted. This gives transient issues (proxy errors, firewall rules, connection drops) enough time to resolve. | | Before | After | |---|---|---| | Retry limit | 3 attempts | 5-minute time budget (+ optional max_retries) | | Backoff | Linear (500ms, 1s, 1.5s) | Exponential with jitter (500ms → 1s → 2s → ... → 32s cap) | | Total retry window | ~3 seconds | Up to 5 minutes | | Error classification | Retry all exceptions | Retry all exceptions (same) | | max_retries param | Required (default 3) | Optional (default -1 = not set) | ## Test plan - [x] All 9 CloudFetch downloader unit tests pass - [x] Full build succeeds (netstandard2.0 + net8.0) - [ ] Manual test: block cloud storage via MITM firewall, verify retries continue for 5 min - [ ] Manual test: unblock within 5 min, verify download resumes successfully This pull request was AI-assisted by Isaac.
1 parent 38305b5 commit adf4853

File tree

4 files changed

+132
-35
lines changed

4 files changed

+132
-35
lines changed

csharp/src/DatabricksParameters.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,19 @@ public class DatabricksParameters : SparkParameters
5252
public const string MaxBytesPerFile = "adbc.databricks.cloudfetch.max_bytes_per_file";
5353

5454
/// <summary>
55-
/// Maximum number of retry attempts for CloudFetch downloads.
56-
/// Default value is 3 if not specified.
55+
/// Maximum number of retry attempts for CloudFetch downloads (total attempts, including first try).
56+
/// Default value is 0 (no limit, use timeout only).
57+
/// When set to a positive value, the retry loop exits if either this count or the timeout is reached.
5758
/// </summary>
5859
public const string CloudFetchMaxRetries = "adbc.databricks.cloudfetch.max_retries";
5960

61+
/// <summary>
62+
/// Maximum time in seconds to retry failed CloudFetch downloads.
63+
/// Uses exponential backoff with jitter within this time budget.
64+
/// Default value is 300 (5 minutes) if not specified.
65+
/// </summary>
66+
public const string CloudFetchRetryTimeoutSeconds = "adbc.databricks.cloudfetch.retry_timeout_seconds";
67+
6068
/// <summary>
6169
/// Delay in milliseconds between CloudFetch retry attempts.
6270
/// Default value is 500ms if not specified.

csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ internal sealed class CloudFetchConfiguration
3333
internal const int DefaultPrefetchCount = 2;
3434
internal const int DefaultMemoryBufferSizeMB = 200;
3535
internal const int DefaultTimeoutMinutes = 5;
36-
internal const int DefaultMaxRetries = 3;
36+
internal const int DefaultMaxRetries = 0; // 0 = no limit (use timeout only)
37+
internal const int DefaultRetryTimeoutSeconds = 300; // 5 minutes
3738
internal const int DefaultRetryDelayMs = 500;
3839
internal const int DefaultMaxUrlRefreshAttempts = 3;
3940
internal const int DefaultUrlExpirationBufferSeconds = 60;
@@ -59,10 +60,18 @@ internal sealed class CloudFetchConfiguration
5960
public int TimeoutMinutes { get; set; } = DefaultTimeoutMinutes;
6061

6162
/// <summary>
62-
/// Maximum retry attempts for failed downloads.
63+
/// Maximum retry attempts for failed downloads (total attempts, including first try).
64+
/// 0 means no limit (use timeout only). When set to a positive value,
65+
/// the retry loop exits if either this count or the timeout is reached.
6366
/// </summary>
6467
public int MaxRetries { get; set; } = DefaultMaxRetries;
6568

69+
/// <summary>
70+
/// Maximum time in seconds to retry failed downloads before giving up.
71+
/// Uses exponential backoff with jitter within this time budget.
72+
/// </summary>
73+
public int RetryTimeoutSeconds { get; set; } = DefaultRetryTimeoutSeconds;
74+
6675
/// <summary>
6776
/// Delay between retry attempts (in milliseconds).
6877
/// </summary>
@@ -116,6 +125,20 @@ public static CloudFetchConfiguration FromProperties(
116125
Schema schema,
117126
bool isLz4Compressed)
118127
{
128+
// Parse MaxRetries separately: 0 (default) = no limit, >0 = total attempt count.
129+
// Throw on non-integer or negative values to surface misconfiguration.
130+
int parsedMaxRetries = DefaultMaxRetries;
131+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr))
132+
{
133+
if (!int.TryParse(maxRetriesStr, out int maxRetries) || maxRetries < 0)
134+
{
135+
throw new ArgumentException(
136+
$"Invalid value '{maxRetriesStr}' for {DatabricksParameters.CloudFetchMaxRetries}. " +
137+
$"Expected 0 (no limit) or a positive integer.");
138+
}
139+
parsedMaxRetries = maxRetries;
140+
}
141+
119142
var config = new CloudFetchConfiguration
120143
{
121144
Schema = schema ?? throw new ArgumentNullException(nameof(schema)),
@@ -124,7 +147,8 @@ public static CloudFetchConfiguration FromProperties(
124147
PrefetchCount = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchPrefetchCount, DefaultPrefetchCount),
125148
MemoryBufferSizeMB = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMemoryBufferSize, DefaultMemoryBufferSizeMB),
126149
TimeoutMinutes = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchTimeoutMinutes, DefaultTimeoutMinutes),
127-
MaxRetries = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxRetries, DefaultMaxRetries),
150+
MaxRetries = parsedMaxRetries,
151+
RetryTimeoutSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryTimeoutSeconds, DefaultRetryTimeoutSeconds),
128152
RetryDelayMs = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryDelayMs, DefaultRetryDelayMs),
129153
MaxUrlRefreshAttempts = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, DefaultMaxUrlRefreshAttempts),
130154
UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds)

csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
5151
private readonly int _maxParallelDownloads;
5252
private readonly bool _isLz4Compressed;
5353
private readonly int _maxRetries;
54+
private readonly int _retryTimeoutSeconds;
5455
private readonly int _retryDelayMs;
5556
private readonly int _maxUrlRefreshAttempts;
5657
private readonly int _urlExpirationBufferSeconds;
@@ -94,6 +95,7 @@ public CloudFetchDownloader(
9495
_maxParallelDownloads = config.ParallelDownloads;
9596
_isLz4Compressed = config.IsLz4Compressed;
9697
_maxRetries = config.MaxRetries;
98+
_retryTimeoutSeconds = config.RetryTimeoutSeconds;
9799
_retryDelayMs = config.RetryDelayMs;
98100
_maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts;
99101
_urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds;
@@ -114,8 +116,9 @@ public CloudFetchDownloader(
114116
/// <param name="resultFetcher">The result fetcher that manages URLs.</param>
115117
/// <param name="maxParallelDownloads">Maximum parallel downloads.</param>
116118
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
117-
/// <param name="maxRetries">Maximum retry attempts (optional, default 3).</param>
118-
/// <param name="retryDelayMs">Delay between retries in ms (optional, default 1000).</param>
119+
/// <param name="maxRetries">Total number of attempts. 0 = no limit (use timeout only), positive = max total attempts.</param>
120+
/// <param name="retryTimeoutSeconds">Time budget for retries in seconds (optional, default 300).</param>
121+
/// <param name="retryDelayMs">Initial delay between retries in ms (optional, default 500).</param>
119122
internal CloudFetchDownloader(
120123
IActivityTracer activityTracer,
121124
BlockingCollection<IDownloadResult> downloadQueue,
@@ -125,8 +128,9 @@ internal CloudFetchDownloader(
125128
ICloudFetchResultFetcher resultFetcher,
126129
int maxParallelDownloads,
127130
bool isLz4Compressed,
128-
int maxRetries = 3,
129-
int retryDelayMs = 1000)
131+
int maxRetries = CloudFetchConfiguration.DefaultMaxRetries,
132+
int retryTimeoutSeconds = CloudFetchConfiguration.DefaultRetryTimeoutSeconds,
133+
int retryDelayMs = CloudFetchConfiguration.DefaultRetryDelayMs)
130134
{
131135
_activityTracer = activityTracer ?? throw new ArgumentNullException(nameof(activityTracer));
132136
_downloadQueue = downloadQueue ?? throw new ArgumentNullException(nameof(downloadQueue));
@@ -138,6 +142,7 @@ internal CloudFetchDownloader(
138142
_maxParallelDownloads = maxParallelDownloads;
139143
_isLz4Compressed = isLz4Compressed;
140144
_maxRetries = maxRetries;
145+
_retryTimeoutSeconds = retryTimeoutSeconds;
141146
_retryDelayMs = retryDelayMs;
142147
_maxUrlRefreshAttempts = CloudFetchConfiguration.DefaultMaxUrlRefreshAttempts;
143148
_urlExpirationBufferSeconds = CloudFetchConfiguration.DefaultUrlExpirationBufferSeconds;
@@ -486,9 +491,31 @@ await _activityTracer.TraceActivityAsync(async activity =>
486491
new("expected_size_kb", size / 1024.0)
487492
]);
488493

489-
// Retry logic for downloading files
490-
for (int retry = 0; retry < _maxRetries; retry++)
494+
// Retry logic with time-budget approach and exponential backoff with jitter.
495+
// Same pattern as RetryHttpHandler: tracks cumulative backoff sleep time against
496+
// the budget. This gives transient issues (firewall, proxy 502, connection drops)
497+
// enough time to resolve.
498+
int currentBackoffMs = (int)Math.Min(Math.Max(0L, (long)_retryDelayMs), 32_000L);
499+
long retryTimeoutMs = Math.Min((long)_retryTimeoutSeconds, int.MaxValue / 1000L) * 1000L;
500+
long totalRetryWaitMs = 0;
501+
int attemptCount = 0;
502+
Exception? lastException = null;
503+
504+
while (!cancellationToken.IsCancellationRequested)
491505
{
506+
// Check max retry count before each attempt (0 = no limit, >0 = total attempts)
507+
if (_maxRetries > 0 && attemptCount >= _maxRetries)
508+
{
509+
activity?.AddEvent("cloudfetch.download_max_retries_exceeded", [
510+
new("offset", downloadResult.StartRowOffset),
511+
new("sanitized_url", sanitizedUrl),
512+
new("total_attempts", attemptCount),
513+
new("max_retries", _maxRetries)
514+
]);
515+
break;
516+
}
517+
518+
attemptCount++;
492519
try
493520
{
494521
// Create HTTP request with optional custom headers
@@ -562,18 +589,41 @@ await _activityTracer.TraceActivityAsync(async activity =>
562589
fileData = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
563590
break; // Success, exit retry loop
564591
}
565-
catch (Exception ex) when (retry < _maxRetries - 1 && !cancellationToken.IsCancellationRequested)
592+
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
566593
{
567-
// Log the error and retry
568-
activity?.AddException(ex, [
569-
new("error.context", "cloudfetch.download_retry"),
594+
lastException = ex;
595+
596+
// Exponential backoff with jitter (80-120% of base)
597+
int waitMs = (int)Math.Max(100, currentBackoffMs * (0.8 + new Random().NextDouble() * 0.4));
598+
599+
// Check if we would exceed the time budget
600+
if (retryTimeoutMs > 0 && totalRetryWaitMs + waitMs > retryTimeoutMs)
601+
{
602+
activity?.AddEvent("cloudfetch.download_retry_timeout_exceeded", [
603+
new("offset", downloadResult.StartRowOffset),
604+
new("sanitized_url", sanitizedUrl),
605+
new("total_attempts", attemptCount),
606+
new("total_retry_wait_ms", totalRetryWaitMs),
607+
new("retry_timeout_seconds", _retryTimeoutSeconds),
608+
new("last_error", ex.GetType().Name)
609+
]);
610+
break;
611+
}
612+
613+
totalRetryWaitMs += waitMs;
614+
615+
activity?.AddEvent("cloudfetch.download_retry", [
570616
new("offset", downloadResult.StartRowOffset),
571-
new("sanitized_url", SanitizeUrl(url)),
572-
new("attempt", retry + 1),
573-
new("max_retries", _maxRetries)
617+
new("sanitized_url", sanitizedUrl),
618+
new("attempt", attemptCount),
619+
new("total_retry_wait_ms", totalRetryWaitMs),
620+
new("retry_timeout_seconds", _retryTimeoutSeconds),
621+
new("error_type", ex.GetType().Name),
622+
new("backoff_ms", waitMs)
574623
]);
575624

576-
await Task.Delay(_retryDelayMs * (retry + 1), cancellationToken).ConfigureAwait(false);
625+
await Task.Delay(waitMs, cancellationToken).ConfigureAwait(false);
626+
currentBackoffMs = (int)Math.Min((long)currentBackoffMs * 2L, 32_000L);
577627
}
578628
}
579629

@@ -583,13 +633,18 @@ await _activityTracer.TraceActivityAsync(async activity =>
583633
activity?.AddEvent("cloudfetch.download_failed_all_retries", [
584634
new("offset", downloadResult.StartRowOffset),
585635
new("sanitized_url", sanitizedUrl),
586-
new("max_retries", _maxRetries),
636+
new("total_attempts", attemptCount),
637+
new("total_retry_wait_ms", totalRetryWaitMs),
587638
new("elapsed_time_ms", stopwatch.ElapsedMilliseconds)
588639
]);
589640

590641
// Release the memory we acquired
591642
_memoryManager.ReleaseMemory(size);
592-
throw new InvalidOperationException($"Failed to download file from {url} after {_maxRetries} attempts.");
643+
string retryLimits = _maxRetries > 0
644+
? $"max_retries: {_maxRetries}, timeout: {_retryTimeoutSeconds}s"
645+
: $"timeout: {_retryTimeoutSeconds}s";
646+
throw new InvalidOperationException(
647+
$"Failed to download file from {sanitizedUrl} after {attemptCount} attempts over {stopwatch.Elapsed.TotalSeconds:F1}s ({retryLimits}). Last error: {lastException?.GetType().Name ?? "unknown"}");
593648
}
594649

595650
// Process the downloaded file data

csharp/test/E2E/CloudFetch/CloudFetchDownloaderTest.cs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,9 @@ public async Task DownloadFileAsync_ProcessesFile_AndAddsToResultQueue()
178178
_mockResultFetcher.Object,
179179
1, // maxParallelDownloads
180180
false, // isLz4Compressed
181-
1, // maxRetries
182-
10); // retryDelayMs
181+
maxRetries: 0,
182+
retryTimeoutSeconds: 5,
183+
retryDelayMs: 10);
183184

184185
// Act
185186
await downloader.StartAsync(CancellationToken.None);
@@ -262,24 +263,31 @@ public async Task DownloadFileAsync_HandlesHttpError_AndSetsFailedOnDownloadResu
262263
_mockResultFetcher.Object,
263264
1, // maxParallelDownloads
264265
false, // isLz4Compressed
265-
1, // maxRetries
266-
10); // retryDelayMs
266+
maxRetries: 0,
267+
retryTimeoutSeconds: 1,
268+
retryDelayMs: 10);
267269

268270
// Act
269271
await downloader.StartAsync(CancellationToken.None);
270272
_downloadQueue.Add(mockDownloadResult.Object);
271273

272-
// Wait for the download to be processed
273-
await Task.Delay(100);
274-
275274
// Add the end of results guard to complete the downloader
276275
_downloadQueue.Add(EndOfResultsGuard.Instance);
277276

277+
// Wait for the download to fail - poll deterministically instead of fixed delay
278+
int maxWaitMs = 5000;
279+
int waitedMs = 0;
280+
while (capturedException == null && waitedMs < maxWaitMs)
281+
{
282+
await Task.Delay(50);
283+
waitedMs += 50;
284+
}
285+
278286
// Assert
279287
// Verify SetFailed was called
288+
Assert.True(capturedException != null, $"SetFailed was not called within {maxWaitMs}ms");
280289
mockDownloadResult.Verify(r => r.SetFailed(It.IsAny<Exception>()), Times.Once);
281-
Assert.NotNull(capturedException);
282-
Assert.IsType<HttpRequestException>(capturedException);
290+
Assert.IsType<InvalidOperationException>(capturedException);
283291

284292
// Verify the downloader has an error
285293
Assert.True(downloader.HasError);
@@ -351,8 +359,9 @@ public async Task DownloadFileAsync_WithError_StopsProcessingRemainingFiles()
351359
_mockResultFetcher.Object,
352360
1, // maxParallelDownloads
353361
false, // isLz4Compressed
354-
1, // maxRetries
355-
10); // retryDelayMs
362+
maxRetries: 0,
363+
retryTimeoutSeconds: 1,
364+
retryDelayMs: 10);
356365

357366
// Act
358367
await downloader.StartAsync(CancellationToken.None);
@@ -365,7 +374,7 @@ public async Task DownloadFileAsync_WithError_StopsProcessingRemainingFiles()
365374
Console.WriteLine("Added end guard");
366375

367376
// Wait for the download to fail - use a timeout to avoid hanging
368-
int maxWaitMs = 2000;
377+
int maxWaitMs = 5000;
369378
int waitedMs = 0;
370379
while (!downloader.HasError && !setFailedCalled && waitedMs < maxWaitMs)
371380
{
@@ -684,8 +693,9 @@ public async Task DownloadFileAsync_RefreshesExpiredUrl_WhenHttpErrorOccurs()
684693
_mockResultFetcher.Object,
685694
1, // maxParallelDownloads
686695
false, // isLz4Compressed
687-
2, // maxRetries
688-
10); // retryDelayMs
696+
maxRetries: 0,
697+
retryTimeoutSeconds: 5,
698+
retryDelayMs: 10);
689699

690700
// Act
691701
await downloader.StartAsync(CancellationToken.None);

0 commit comments

Comments
 (0)