Skip to content

Commit 521c7aa

Browse files
committed
Update RetryHttpHandlerTest.cs
Add retry after handling in ADBC Spark driver fix pre commit check failures
1 parent 9ba6bdb commit 521c7aa

File tree

5 files changed

+562
-1
lines changed

5 files changed

+562
-1
lines changed

Diff for: csharp/src/Drivers/Apache/Spark/RetryHttpHandler.cs

+204
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
using System.Net;
20+
using System.Net.Http;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using System.IO;
24+
using System.Text;
25+
using Thrift;
26+
using Thrift.Protocol;
27+
using Thrift.Transport;
28+
using Apache.Hive.Service.Rpc.Thrift;
29+
30+
namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
31+
{
32+
/// <summary>
33+
/// HTTP handler that implements retry behavior for 503 responses with Retry-After headers.
34+
/// </summary>
35+
internal class RetryHttpHandler : DelegatingHandler
36+
{
37+
private readonly bool _retryEnabled;
38+
private readonly int _retryTimeoutSeconds;
39+
40+
/// <summary>
41+
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
42+
/// </summary>
43+
/// <param name="innerHandler">The inner handler to delegate to.</param>
44+
/// <param name="retryEnabled">Whether retry behavior is enabled.</param>
45+
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry before failing.</param>
46+
public RetryHttpHandler(HttpMessageHandler innerHandler, bool retryEnabled, int retryTimeoutSeconds)
47+
: base(innerHandler)
48+
{
49+
_retryEnabled = retryEnabled;
50+
_retryTimeoutSeconds = retryTimeoutSeconds;
51+
}
52+
53+
/// <summary>
54+
/// Sends an HTTP request to the inner handler with retry logic for 503 responses.
55+
/// </summary>
56+
protected override async Task<HttpResponseMessage> SendAsync(
57+
HttpRequestMessage request,
58+
CancellationToken cancellationToken)
59+
{
60+
// If retry is disabled, just pass through to the inner handler
61+
if (!_retryEnabled)
62+
{
63+
return await base.SendAsync(request, cancellationToken);
64+
}
65+
66+
// Clone the request content if it's not null so we can reuse it for retries
67+
var requestContentClone = request.Content != null
68+
? await CloneHttpContentAsync(request.Content)
69+
: null;
70+
71+
HttpResponseMessage response;
72+
string? lastErrorMessage = null;
73+
DateTime startTime = DateTime.UtcNow;
74+
int totalRetrySeconds = 0;
75+
76+
do
77+
{
78+
// Set the content for each attempt (if needed)
79+
if (requestContentClone != null && request.Content == null)
80+
{
81+
request.Content = await CloneHttpContentAsync(requestContentClone);
82+
}
83+
84+
response = await base.SendAsync(request, cancellationToken);
85+
86+
// If it's not a 503 response, return immediately
87+
if (response.StatusCode != HttpStatusCode.ServiceUnavailable)
88+
{
89+
return response;
90+
}
91+
92+
// Check for Retry-After header
93+
if (!response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
94+
{
95+
// No Retry-After header, so return the response as is
96+
return response;
97+
}
98+
99+
// Parse the Retry-After value
100+
string retryAfterValue = string.Join(",", retryAfterValues);
101+
if (!int.TryParse(retryAfterValue, out int retryAfterSeconds) || retryAfterSeconds <= 0)
102+
{
103+
// Invalid Retry-After value, return the response as is
104+
return response;
105+
}
106+
107+
// Extract error message from response if possible
108+
try
109+
{
110+
lastErrorMessage = await ExtractErrorMessageAsync(response);
111+
}
112+
catch
113+
{
114+
// If we can't extract the error message, just use a generic one
115+
lastErrorMessage = $"Service temporarily unavailable (HTTP 503). Retry after {retryAfterSeconds} seconds.";
116+
}
117+
118+
// Check if we've exceeded the timeout
119+
totalRetrySeconds += retryAfterSeconds;
120+
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
121+
{
122+
// We've exceeded the timeout, so break out of the loop
123+
break;
124+
}
125+
126+
// Dispose the response before retrying
127+
response.Dispose();
128+
129+
// Wait for the specified retry time
130+
await Task.Delay(TimeSpan.FromSeconds(retryAfterSeconds), cancellationToken);
131+
132+
// Reset the request content for the next attempt
133+
request.Content = null;
134+
135+
} while (!cancellationToken.IsCancellationRequested);
136+
137+
// If we get here, we've either exceeded the timeout or been cancelled
138+
if (cancellationToken.IsCancellationRequested)
139+
{
140+
throw new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
141+
}
142+
143+
// Create a custom exception with the SQL state code and last error message
144+
var exception = new AdbcException(
145+
lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded",
146+
AdbcStatusCode.IOError);
147+
148+
// Add SQL state as part of the message since we can't set it directly
149+
throw new AdbcException(
150+
$"[SQLState: 08001] {exception.Message}",
151+
AdbcStatusCode.IOError);
152+
}
153+
154+
/// <summary>
155+
/// Clones an HttpContent object so it can be reused for retries.
156+
/// </summary>
157+
private static async Task<HttpContent> CloneHttpContentAsync(HttpContent content)
158+
{
159+
var ms = new MemoryStream();
160+
await content.CopyToAsync(ms);
161+
ms.Position = 0;
162+
163+
var clone = new StreamContent(ms);
164+
if (content.Headers != null)
165+
{
166+
foreach (var header in content.Headers)
167+
{
168+
clone.Headers.Add(header.Key, header.Value);
169+
}
170+
}
171+
return clone;
172+
}
173+
174+
/// <summary>
175+
/// Attempts to extract the error message from a Thrift TApplicationException in the response body.
176+
/// </summary>
177+
private static async Task<string?> ExtractErrorMessageAsync(HttpResponseMessage response)
178+
{
179+
if (response.Content == null)
180+
{
181+
return null;
182+
}
183+
184+
// Check if the content type is application/x-thrift
185+
if (response.Content.Headers.ContentType?.MediaType != "application/x-thrift")
186+
{
187+
// If it's not Thrift, just return the content as a string
188+
return await response.Content.ReadAsStringAsync();
189+
}
190+
191+
try
192+
{
193+
// For Thrift content, just return a generic message
194+
// We can't easily parse the Thrift message without access to the specific methods
195+
return await response.Content.ReadAsStringAsync();
196+
}
197+
catch
198+
{
199+
// If we can't read the content, return null
200+
return null;
201+
}
202+
}
203+
}
204+
}

Diff for: csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs

+45-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ public SparkHttpConnection(IReadOnlyDictionary<string, string> properties) : bas
4343
{
4444
}
4545

46+
/// <summary>
47+
/// Gets a value indicating whether to retry requests that receive a 503 response with a Retry-After header.
48+
/// </summary>
49+
protected bool TemporarilyUnavailableRetry { get; private set; } = true;
50+
51+
/// <summary>
52+
/// Gets the maximum total time in seconds to retry 503 responses before failing.
53+
/// </summary>
54+
protected int TemporarilyUnavailableRetryTimeout { get; private set; } = 900;
55+
4656
protected override void ValidateAuthentication()
4757
{
4858
// Validate authentication parameters
@@ -135,6 +145,33 @@ protected override void ValidateOptions()
135145
? connectTimeoutMsValue
136146
: throw new ArgumentOutOfRangeException(SparkParameters.ConnectTimeoutMilliseconds, connectTimeoutMs, $"must be a value of 0 (infinite) or between 1 .. {int.MaxValue}. default is 30000 milliseconds.");
137147
}
148+
149+
// Parse retry configuration parameters
150+
Properties.TryGetValue(SparkParameters.TemporarilyUnavailableRetry, out string? tempUnavailableRetryStr);
151+
int tempUnavailableRetryValue = 1; // Default to enabled
152+
if (tempUnavailableRetryStr != null && !int.TryParse(tempUnavailableRetryStr, out tempUnavailableRetryValue))
153+
{
154+
throw new ArgumentOutOfRangeException(SparkParameters.TemporarilyUnavailableRetry, tempUnavailableRetryStr,
155+
$"must be a value of 0 (disabled) or 1 (enabled). Default is 1.");
156+
}
157+
TemporarilyUnavailableRetry = tempUnavailableRetryValue != 0;
158+
159+
Properties.TryGetValue(SparkParameters.TemporarilyUnavailableRetryTimeout, out string? tempUnavailableRetryTimeoutStr);
160+
if (tempUnavailableRetryTimeoutStr != null)
161+
{
162+
if (!int.TryParse(tempUnavailableRetryTimeoutStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out int tempUnavailableRetryTimeoutValue) ||
163+
tempUnavailableRetryTimeoutValue < 0)
164+
{
165+
throw new ArgumentOutOfRangeException(SparkParameters.TemporarilyUnavailableRetryTimeout, tempUnavailableRetryTimeoutStr,
166+
$"must be a value of 0 (retry indefinitely) or a positive integer representing seconds. Default is 900 seconds (15 minutes).");
167+
}
168+
TemporarilyUnavailableRetryTimeout = tempUnavailableRetryTimeoutValue;
169+
}
170+
else
171+
{
172+
TemporarilyUnavailableRetryTimeout = 900; // Default to 15 minutes
173+
}
174+
138175
TlsOptions = HiveServer2TlsImpl.GetHttpTlsOptions(Properties);
139176
}
140177

@@ -161,7 +198,14 @@ protected override TTransport CreateTransport()
161198
AuthenticationHeaderValue? authenticationHeaderValue = GetAuthenticationHeaderValue(authTypeValue, token, username, password, access_token);
162199

163200
HttpClientHandler httpClientHandler = HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions);
164-
HttpClient httpClient = new(httpClientHandler);
201+
202+
// Create a RetryHttpHandler that wraps the HttpClientHandler to handle 503 responses
203+
var retryHandler = new RetryHttpHandler(
204+
httpClientHandler,
205+
TemporarilyUnavailableRetry,
206+
TemporarilyUnavailableRetryTimeout);
207+
208+
HttpClient httpClient = new(retryHandler);
165209
httpClient.BaseAddress = baseAddress;
166210
httpClient.DefaultRequestHeaders.Authorization = authenticationHeaderValue;
167211
httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(s_userAgent);

Diff for: csharp/src/Drivers/Apache/Spark/SparkParameters.cs

+31
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,37 @@ public class SparkParameters
3333
public const string Type = "adbc.spark.type";
3434
public const string DataTypeConv = "adbc.spark.data_type_conv";
3535
public const string ConnectTimeoutMilliseconds = "adbc.spark.connect_timeout_ms";
36+
37+
// CloudFetch configuration parameters
38+
/// <summary>
39+
/// Maximum number of retry attempts for CloudFetch downloads.
40+
/// Default value is 3 if not specified.
41+
/// </summary>
42+
public const string CloudFetchMaxRetries = "adbc.spark.cloudfetch.max_retries";
43+
44+
/// <summary>
45+
/// Delay in milliseconds between CloudFetch retry attempts.
46+
/// Default value is 500ms if not specified.
47+
/// </summary>
48+
public const string CloudFetchRetryDelayMs = "adbc.spark.cloudfetch.retry_delay_ms";
49+
50+
/// <summary>
51+
/// Timeout in minutes for CloudFetch HTTP operations.
52+
/// Default value is 5 minutes if not specified.
53+
/// </summary>
54+
public const string CloudFetchTimeoutMinutes = "adbc.spark.cloudfetch.timeout_minutes";
55+
56+
/// <summary>
57+
/// Controls whether to retry requests that receive a 503 response with a Retry-After header.
58+
/// Default value is 1 (enabled). Set to 0 to disable retry behavior.
59+
/// </summary>
60+
public const string TemporarilyUnavailableRetry = "adbc.spark.temporarily_unavailable_retry";
61+
62+
/// <summary>
63+
/// Maximum total time in seconds to retry 503 responses before failing.
64+
/// Default value is 900 seconds (15 minutes). Set to 0 to retry indefinitely.
65+
/// </summary>
66+
public const string TemporarilyUnavailableRetryTimeout = "adbc.spark.temporarily_unavailable_retry_timeout";
3667
}
3768

3869
public static class SparkAuthTypeConstants

0 commit comments

Comments
 (0)