Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion csharp/src/DatabricksParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,18 @@ public class DatabricksParameters : SparkParameters

/// <summary>
/// Maximum number of retry attempts for CloudFetch downloads.
/// Default value is 3 if not specified.
/// Default value is -1 (not set, use timeout only).
/// When set to a non-negative value, the retry loop exits if either this count or the timeout is reached.
/// </summary>
public const string CloudFetchMaxRetries = "adbc.databricks.cloudfetch.max_retries";

/// <summary>
/// Maximum time in seconds to retry failed CloudFetch downloads.
/// Uses exponential backoff with jitter within this time budget.
/// Default value is 300 (5 minutes) if not specified.
/// </summary>
public const string CloudFetchRetryTimeoutSeconds = "adbc.databricks.cloudfetch.retry_timeout_seconds";

/// <summary>
/// Delay in milliseconds between CloudFetch retry attempts.
/// Default value is 500ms if not specified.
Expand Down
14 changes: 12 additions & 2 deletions csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ internal sealed class CloudFetchConfiguration
internal const int DefaultPrefetchCount = 2;
internal const int DefaultMemoryBufferSizeMB = 200;
internal const int DefaultTimeoutMinutes = 5;
internal const int DefaultMaxRetries = 3;
internal const int DefaultMaxRetries = -1; // -1 = not set (use timeout only)
internal const int DefaultRetryTimeoutSeconds = 300; // 5 minutes
internal const int DefaultRetryDelayMs = 500;
internal const int DefaultMaxUrlRefreshAttempts = 3;
internal const int DefaultUrlExpirationBufferSeconds = 60;
Expand All @@ -60,9 +61,17 @@ internal sealed class CloudFetchConfiguration

/// <summary>
/// Maximum retry attempts for failed downloads.
/// -1 means not set (use timeout only). When set to a non-negative value,
/// the retry loop exits if either this count or the timeout is reached.
/// </summary>
public int MaxRetries { get; set; } = DefaultMaxRetries;

/// <summary>
/// Maximum time in seconds to retry failed downloads before giving up.
/// Uses exponential backoff with jitter within this time budget.
/// </summary>
public int RetryTimeoutSeconds { get; set; } = DefaultRetryTimeoutSeconds;

/// <summary>
/// Delay between retry attempts (in milliseconds).
/// </summary>
Expand Down Expand Up @@ -124,7 +133,8 @@ public static CloudFetchConfiguration FromProperties(
PrefetchCount = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchPrefetchCount, DefaultPrefetchCount),
MemoryBufferSizeMB = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMemoryBufferSize, DefaultMemoryBufferSizeMB),
TimeoutMinutes = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchTimeoutMinutes, DefaultTimeoutMinutes),
MaxRetries = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxRetries, DefaultMaxRetries),
MaxRetries = properties.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr) && int.TryParse(maxRetriesStr, out int maxRetries) && maxRetries >= 0 ? maxRetries : DefaultMaxRetries,
RetryTimeoutSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryTimeoutSeconds, DefaultRetryTimeoutSeconds),
RetryDelayMs = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryDelayMs, DefaultRetryDelayMs),
MaxUrlRefreshAttempts = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, DefaultMaxUrlRefreshAttempts),
UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds)
Expand Down
89 changes: 75 additions & 14 deletions csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
private readonly int _maxParallelDownloads;
private readonly bool _isLz4Compressed;
private readonly int _maxRetries;
private readonly int _retryTimeoutSeconds;
private readonly int _retryDelayMs;
private readonly int _maxUrlRefreshAttempts;
private readonly int _urlExpirationBufferSeconds;
Expand Down Expand Up @@ -94,6 +95,7 @@ public CloudFetchDownloader(
_maxParallelDownloads = config.ParallelDownloads;
_isLz4Compressed = config.IsLz4Compressed;
_maxRetries = config.MaxRetries;
_retryTimeoutSeconds = config.RetryTimeoutSeconds;
_retryDelayMs = config.RetryDelayMs;
_maxUrlRefreshAttempts = config.MaxUrlRefreshAttempts;
_urlExpirationBufferSeconds = config.UrlExpirationBufferSeconds;
Expand All @@ -114,8 +116,9 @@ public CloudFetchDownloader(
/// <param name="resultFetcher">The result fetcher that manages URLs.</param>
/// <param name="maxParallelDownloads">Maximum parallel downloads.</param>
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
/// <param name="maxRetries">Maximum retry attempts (optional, default 3).</param>
/// <param name="retryDelayMs">Delay between retries in ms (optional, default 1000).</param>
/// <param name="maxRetries">Maximum retry attempts (0 = no limit, use timeout only).</param>
/// <param name="retryTimeoutSeconds">Time budget for retries in seconds (optional, default 300).</param>
/// <param name="retryDelayMs">Initial delay between retries in ms (optional, default 500).</param>
internal CloudFetchDownloader(
IActivityTracer activityTracer,
BlockingCollection<IDownloadResult> downloadQueue,
Expand All @@ -125,8 +128,9 @@ internal CloudFetchDownloader(
ICloudFetchResultFetcher resultFetcher,
int maxParallelDownloads,
bool isLz4Compressed,
int maxRetries = 3,
int retryDelayMs = 1000)
int maxRetries = CloudFetchConfiguration.DefaultMaxRetries,
int retryTimeoutSeconds = CloudFetchConfiguration.DefaultRetryTimeoutSeconds,
int retryDelayMs = CloudFetchConfiguration.DefaultRetryDelayMs)
{
_activityTracer = activityTracer ?? throw new ArgumentNullException(nameof(activityTracer));
_downloadQueue = downloadQueue ?? throw new ArgumentNullException(nameof(downloadQueue));
Expand All @@ -138,6 +142,7 @@ internal CloudFetchDownloader(
_maxParallelDownloads = maxParallelDownloads;
_isLz4Compressed = isLz4Compressed;
_maxRetries = maxRetries;
_retryTimeoutSeconds = retryTimeoutSeconds;
_retryDelayMs = retryDelayMs;
_maxUrlRefreshAttempts = CloudFetchConfiguration.DefaultMaxUrlRefreshAttempts;
_urlExpirationBufferSeconds = CloudFetchConfiguration.DefaultUrlExpirationBufferSeconds;
Expand Down Expand Up @@ -486,9 +491,32 @@ await _activityTracer.TraceActivityAsync(async activity =>
new("expected_size_kb", size / 1024.0)
]);

// Retry logic for downloading files
for (int retry = 0; retry < _maxRetries; retry++)
// Retry logic with time-budget approach and exponential backoff with jitter.
// Similar to RetryHttpHandler: keeps retrying until the time budget is exhausted,
// rather than a fixed retry count. This gives transient issues (firewall, proxy 502,
// connection drops) enough time to resolve.
int currentBackoffMs = _retryDelayMs;
int totalRetryWaitMs = 0;
int attemptCount = 0;
Exception? lastException = null;

while (!cancellationToken.IsCancellationRequested)
{
// Check if we've exceeded the max retry count (if set)
if (_maxRetries >= 0 && attemptCount >= _maxRetries)
{
activity?.AddEvent("cloudfetch.download_max_retries_exceeded", [
new("offset", downloadResult.StartRowOffset),
new("sanitized_url", SanitizeUrl(url)),
new("total_attempts", attemptCount),
new("max_retries", _maxRetries),
new("last_error", lastException?.GetType().Name ?? "none"),
new("last_error_message", lastException?.Message ?? "none")
]);
break;
}

attemptCount++;
try
{
// Create HTTP request with optional custom headers
Expand Down Expand Up @@ -562,18 +590,45 @@ await _activityTracer.TraceActivityAsync(async activity =>
fileData = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
break; // Success, exit retry loop
}
catch (Exception ex) when (retry < _maxRetries - 1 && !cancellationToken.IsCancellationRequested)
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
{
// Log the error and retry
activity?.AddException(ex, [
lastException = ex;

// Exponential backoff with jitter (80-120% of base)
int waitMs = (int)Math.Max(100, currentBackoffMs * (0.8 + new Random().NextDouble() * 0.4));

// Check if we would exceed the retry time budget
int retryTimeoutMs = _retryTimeoutSeconds * 1000;
if (retryTimeoutMs > 0 && totalRetryWaitMs + waitMs > retryTimeoutMs)
{
activity?.AddEvent("cloudfetch.download_retry_timeout_exceeded", [
new("offset", downloadResult.StartRowOffset),
new("sanitized_url", SanitizeUrl(url)),
new("total_attempts", attemptCount),
new("total_retry_wait_ms", totalRetryWaitMs),
new("retry_timeout_seconds", _retryTimeoutSeconds),
new("last_error", ex.GetType().Name),
new("last_error_message", ex.Message)
]);
break; // Exceeded time budget
}

totalRetryWaitMs += waitMs;

activity?.AddEvent("cloudfetch.download_retry", [
new("error.context", "cloudfetch.download_retry"),
new("offset", downloadResult.StartRowOffset),
new("sanitized_url", SanitizeUrl(url)),
new("attempt", retry + 1),
new("max_retries", _maxRetries)
new("attempt", attemptCount),
new("retry_timeout_seconds", _retryTimeoutSeconds),
new("total_retry_wait_ms", totalRetryWaitMs),
new("error_type", ex.GetType().Name),
new("error_message", ex.Message),
new("backoff_ms", waitMs)
]);

await Task.Delay(_retryDelayMs * (retry + 1), cancellationToken).ConfigureAwait(false);
await Task.Delay(waitMs, cancellationToken).ConfigureAwait(false);
currentBackoffMs = Math.Min(currentBackoffMs * 2, 32_000);
}
}

Expand All @@ -583,13 +638,19 @@ await _activityTracer.TraceActivityAsync(async activity =>
activity?.AddEvent("cloudfetch.download_failed_all_retries", [
new("offset", downloadResult.StartRowOffset),
new("sanitized_url", sanitizedUrl),
new("max_retries", _maxRetries),
new("total_attempts", attemptCount),
new("total_retry_wait_ms", totalRetryWaitMs),
new("elapsed_time_ms", stopwatch.ElapsedMilliseconds)
]);

// Release the memory we acquired
_memoryManager.ReleaseMemory(size);
throw new InvalidOperationException($"Failed to download file from {url} after {_maxRetries} attempts.");
string retryLimits = _maxRetries >= 0
? $"max_retries: {_maxRetries}, timeout: {_retryTimeoutSeconds}s"
: $"timeout: {_retryTimeoutSeconds}s";
throw new InvalidOperationException(
$"Failed to download file from {sanitizedUrl} after {attemptCount} attempts over {totalRetryWaitMs / 1000}s ({retryLimits}).",
lastException);
}

// Process the downloaded file data
Expand Down
28 changes: 16 additions & 12 deletions csharp/test/E2E/CloudFetch/CloudFetchDownloaderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ public async Task DownloadFileAsync_ProcessesFile_AndAddsToResultQueue()
_mockResultFetcher.Object,
1, // maxParallelDownloads
false, // isLz4Compressed
1, // maxRetries
10); // retryDelayMs
maxRetries: -1,
retryTimeoutSeconds: 5,
retryDelayMs: 10);

// Act
await downloader.StartAsync(CancellationToken.None);
Expand Down Expand Up @@ -262,15 +263,16 @@ public async Task DownloadFileAsync_HandlesHttpError_AndSetsFailedOnDownloadResu
_mockResultFetcher.Object,
1, // maxParallelDownloads
false, // isLz4Compressed
1, // maxRetries
10); // retryDelayMs
maxRetries: -1,
retryTimeoutSeconds: 1,
retryDelayMs: 10);

// Act
await downloader.StartAsync(CancellationToken.None);
_downloadQueue.Add(mockDownloadResult.Object);

// Wait for the download to be processed
await Task.Delay(100);
// Wait for retries to exhaust the time budget
await Task.Delay(3000);

// Add the end of results guard to complete the downloader
_downloadQueue.Add(EndOfResultsGuard.Instance);
Expand All @@ -279,7 +281,7 @@ public async Task DownloadFileAsync_HandlesHttpError_AndSetsFailedOnDownloadResu
// Verify SetFailed was called
mockDownloadResult.Verify(r => r.SetFailed(It.IsAny<Exception>()), Times.Once);
Assert.NotNull(capturedException);
Assert.IsType<HttpRequestException>(capturedException);
Assert.IsType<InvalidOperationException>(capturedException);

// Verify the downloader has an error
Assert.True(downloader.HasError);
Expand Down Expand Up @@ -351,8 +353,9 @@ public async Task DownloadFileAsync_WithError_StopsProcessingRemainingFiles()
_mockResultFetcher.Object,
1, // maxParallelDownloads
false, // isLz4Compressed
1, // maxRetries
10); // retryDelayMs
maxRetries: -1,
retryTimeoutSeconds: 1,
retryDelayMs: 10);

// Act
await downloader.StartAsync(CancellationToken.None);
Expand All @@ -365,7 +368,7 @@ public async Task DownloadFileAsync_WithError_StopsProcessingRemainingFiles()
Console.WriteLine("Added end guard");

// Wait for the download to fail - use a timeout to avoid hanging
int maxWaitMs = 2000;
int maxWaitMs = 5000;
int waitedMs = 0;
while (!downloader.HasError && !setFailedCalled && waitedMs < maxWaitMs)
{
Expand Down Expand Up @@ -684,8 +687,9 @@ public async Task DownloadFileAsync_RefreshesExpiredUrl_WhenHttpErrorOccurs()
_mockResultFetcher.Object,
1, // maxParallelDownloads
false, // isLz4Compressed
2, // maxRetries
10); // retryDelayMs
maxRetries: -1,
retryTimeoutSeconds: 5,
retryDelayMs: 10);

// Act
await downloader.StartAsync(CancellationToken.None);
Expand Down
Loading