Skip to content

Commit 162f7df

Browse files
fix(csharp): address PR #375 review feedback
- Fix maxRetries semantics: -1=unlimited, 0=no retries, >0=max retries. Move check to catch block so first attempt always runs. - Use real elapsed time (stopwatch) for retry budget instead of summing backoff delays, so slow/hanging attempts count against the budget. - Use long for retryTimeoutMs to prevent int overflow on large values. - Remove raw exception messages from trace events to avoid leaking pre-signed URL credentials in telemetry. - Throw on invalid max_retries property values instead of silent fallback. - Replace flaky Task.Delay(3000) in test with deterministic polling. Co-authored-by: Isaac
1 parent b4163cd commit 162f7df

File tree

3 files changed

+49
-37
lines changed

3 files changed

+49
-37
lines changed

csharp/src/Reader/CloudFetch/CloudFetchConfiguration.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ public static CloudFetchConfiguration FromProperties(
125125
Schema schema,
126126
bool isLz4Compressed)
127127
{
128+
// Parse MaxRetries separately: -1 (default) = not set, >= 0 = explicit limit.
129+
// Throw on non-integer values to surface misconfiguration.
130+
int parsedMaxRetries = DefaultMaxRetries;
131+
if (properties.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr))
132+
{
133+
if (!int.TryParse(maxRetriesStr, out int maxRetries) || maxRetries < -1)
134+
{
135+
throw new ArgumentException(
136+
$"Invalid value '{maxRetriesStr}' for {DatabricksParameters.CloudFetchMaxRetries}. " +
137+
$"Expected -1 (no limit) or a non-negative integer.");
138+
}
139+
parsedMaxRetries = maxRetries;
140+
}
141+
128142
var config = new CloudFetchConfiguration
129143
{
130144
Schema = schema ?? throw new ArgumentNullException(nameof(schema)),
@@ -133,7 +147,7 @@ public static CloudFetchConfiguration FromProperties(
133147
PrefetchCount = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchPrefetchCount, DefaultPrefetchCount),
134148
MemoryBufferSizeMB = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMemoryBufferSize, DefaultMemoryBufferSizeMB),
135149
TimeoutMinutes = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchTimeoutMinutes, DefaultTimeoutMinutes),
136-
MaxRetries = properties.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr) && int.TryParse(maxRetriesStr, out int maxRetries) && maxRetries >= 0 ? maxRetries : DefaultMaxRetries,
150+
MaxRetries = parsedMaxRetries,
137151
RetryTimeoutSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryTimeoutSeconds, DefaultRetryTimeoutSeconds),
138152
RetryDelayMs = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryDelayMs, DefaultRetryDelayMs),
139153
MaxUrlRefreshAttempts = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, DefaultMaxUrlRefreshAttempts),

csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public CloudFetchDownloader(
116116
/// <param name="resultFetcher">The result fetcher that manages URLs.</param>
117117
/// <param name="maxParallelDownloads">Maximum parallel downloads.</param>
118118
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
119-
/// <param name="maxRetries">Maximum retry attempts (0 = no limit, use timeout only).</param>
119+
/// <param name="maxRetries">Maximum retry attempts. -1 = no limit (use timeout only), 0 = no retries (single attempt), positive = max retry attempts.</param>
120120
/// <param name="retryTimeoutSeconds">Time budget for retries in seconds (optional, default 300).</param>
121121
/// <param name="retryDelayMs">Initial delay between retries in ms (optional, default 500).</param>
122122
internal CloudFetchDownloader(
@@ -492,30 +492,16 @@ await _activityTracer.TraceActivityAsync(async activity =>
492492
]);
493493

494494
// Retry logic with time-budget approach and exponential backoff with jitter.
495-
// Similar to RetryHttpHandler: keeps retrying until the time budget is exhausted,
495+
// Keeps retrying until the time budget (real elapsed time) is exhausted,
496496
// rather than a fixed retry count. This gives transient issues (firewall, proxy 502,
497497
// connection drops) enough time to resolve.
498498
int currentBackoffMs = _retryDelayMs;
499-
int totalRetryWaitMs = 0;
499+
long retryTimeoutMs = Math.Min((long)_retryTimeoutSeconds, int.MaxValue / 1000L) * 1000L;
500500
int attemptCount = 0;
501501
Exception? lastException = null;
502502

503503
while (!cancellationToken.IsCancellationRequested)
504504
{
505-
// Check if we've exceeded the max retry count (if set)
506-
if (_maxRetries >= 0 && attemptCount >= _maxRetries)
507-
{
508-
activity?.AddEvent("cloudfetch.download_max_retries_exceeded", [
509-
new("offset", downloadResult.StartRowOffset),
510-
new("sanitized_url", SanitizeUrl(url)),
511-
new("total_attempts", attemptCount),
512-
new("max_retries", _maxRetries),
513-
new("last_error", lastException?.GetType().Name ?? "none"),
514-
new("last_error_message", lastException?.Message ?? "none")
515-
]);
516-
break;
517-
}
518-
519505
attemptCount++;
520506
try
521507
{
@@ -594,36 +580,43 @@ await _activityTracer.TraceActivityAsync(async activity =>
594580
{
595581
lastException = ex;
596582

597-
// Exponential backoff with jitter (80-120% of base)
598-
int waitMs = (int)Math.Max(100, currentBackoffMs * (0.8 + new Random().NextDouble() * 0.4));
583+
// Check if we've exceeded the max retry count (if set)
584+
// -1 = unlimited, 0 = no retries (single attempt), >0 = max retry attempts
585+
if (_maxRetries >= 0 && attemptCount > _maxRetries)
586+
{
587+
activity?.AddEvent("cloudfetch.download_max_retries_exceeded", [
588+
new("offset", downloadResult.StartRowOffset),
589+
new("sanitized_url", SanitizeUrl(url)),
590+
new("total_attempts", attemptCount),
591+
new("max_retries", _maxRetries)
592+
]);
593+
break;
594+
}
599595

600-
// Check if we would exceed the retry time budget
601-
int retryTimeoutMs = _retryTimeoutSeconds * 1000;
602-
if (retryTimeoutMs > 0 && totalRetryWaitMs + waitMs > retryTimeoutMs)
596+
// Check if we've exceeded the time budget (real elapsed time)
597+
if (retryTimeoutMs > 0 && stopwatch.ElapsedMilliseconds >= retryTimeoutMs)
603598
{
604599
activity?.AddEvent("cloudfetch.download_retry_timeout_exceeded", [
605600
new("offset", downloadResult.StartRowOffset),
606601
new("sanitized_url", SanitizeUrl(url)),
607602
new("total_attempts", attemptCount),
608-
new("total_retry_wait_ms", totalRetryWaitMs),
603+
new("elapsed_ms", stopwatch.ElapsedMilliseconds),
609604
new("retry_timeout_seconds", _retryTimeoutSeconds),
610-
new("last_error", ex.GetType().Name),
611-
new("last_error_message", ex.Message)
605+
new("last_error", ex.GetType().Name)
612606
]);
613-
break; // Exceeded time budget
607+
break;
614608
}
615609

616-
totalRetryWaitMs += waitMs;
610+
// Exponential backoff with jitter (80-120% of base)
611+
int waitMs = (int)Math.Max(100, currentBackoffMs * (0.8 + new Random().NextDouble() * 0.4));
617612

618613
activity?.AddEvent("cloudfetch.download_retry", [
619-
new("error.context", "cloudfetch.download_retry"),
620614
new("offset", downloadResult.StartRowOffset),
621615
new("sanitized_url", SanitizeUrl(url)),
622616
new("attempt", attemptCount),
617+
new("elapsed_ms", stopwatch.ElapsedMilliseconds),
623618
new("retry_timeout_seconds", _retryTimeoutSeconds),
624-
new("total_retry_wait_ms", totalRetryWaitMs),
625619
new("error_type", ex.GetType().Name),
626-
new("error_message", ex.Message),
627620
new("backoff_ms", waitMs)
628621
]);
629622

@@ -639,7 +632,6 @@ await _activityTracer.TraceActivityAsync(async activity =>
639632
new("offset", downloadResult.StartRowOffset),
640633
new("sanitized_url", sanitizedUrl),
641634
new("total_attempts", attemptCount),
642-
new("total_retry_wait_ms", totalRetryWaitMs),
643635
new("elapsed_time_ms", stopwatch.ElapsedMilliseconds)
644636
]);
645637

@@ -649,7 +641,7 @@ await _activityTracer.TraceActivityAsync(async activity =>
649641
? $"max_retries: {_maxRetries}, timeout: {_retryTimeoutSeconds}s"
650642
: $"timeout: {_retryTimeoutSeconds}s";
651643
throw new InvalidOperationException(
652-
$"Failed to download file from {sanitizedUrl} after {attemptCount} attempts over {totalRetryWaitMs / 1000}s ({retryLimits}).",
644+
$"Failed to download file from {sanitizedUrl} after {attemptCount} attempts over {stopwatch.ElapsedMilliseconds / 1000}s ({retryLimits}).",
653645
lastException);
654646
}
655647

csharp/test/E2E/CloudFetch/CloudFetchDownloaderTest.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,16 +271,22 @@ public async Task DownloadFileAsync_HandlesHttpError_AndSetsFailedOnDownloadResu
271271
await downloader.StartAsync(CancellationToken.None);
272272
_downloadQueue.Add(mockDownloadResult.Object);
273273

274-
// Wait for retries to exhaust the time budget
275-
await Task.Delay(3000);
276-
277274
// Add the end of results guard to complete the downloader
278275
_downloadQueue.Add(EndOfResultsGuard.Instance);
279276

277+
// Wait for the download to fail - poll deterministically instead of fixed delay
278+
int maxWaitMs = 5000;
279+
int waitedMs = 0;
280+
while (capturedException == null && waitedMs < maxWaitMs)
281+
{
282+
await Task.Delay(50);
283+
waitedMs += 50;
284+
}
285+
280286
// Assert
281287
// Verify SetFailed was called
288+
Assert.True(capturedException != null, $"SetFailed was not called within {maxWaitMs}ms");
282289
mockDownloadResult.Verify(r => r.SetFailed(It.IsAny<Exception>()), Times.Once);
283-
Assert.NotNull(capturedException);
284290
Assert.IsType<InvalidOperationException>(capturedException);
285291

286292
// Verify the downloader has an error

0 commit comments

Comments
 (0)