Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ private static TimeoutOptions ToCore(this ClusterNewInstanceRequest.Types.Option
{
timeoutOptions = timeoutOptions.WithQueryTimeout(protoTimeout.QueryTimeout.ToTimeSpan());
}
// handle_timeout is not supported by the .NET SDK; intentionally ignored.
if (protoTimeout.HandleTimeout is not null)
{
timeoutOptions = timeoutOptions.WithHandleRequestTimeout(protoTimeout.HandleTimeout.ToTimeSpan());
}
return timeoutOptions;
}

Expand Down
45 changes: 26 additions & 19 deletions src/Couchbase.Analytics/Internal/AnalyticsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio
{
var stopwatch = LightweightStopwatch.StartNew();
var queryTimeout = options.QueryTimeout ?? _clusterOptions.TimeoutOptions.QueryTimeout;
// Ensure the resolved query timeout is serialized into the request body.
options = options with { QueryTimeout = queryTimeout };
var requestTimeout = _clusterOptions.TimeoutOptions.DispatchTimeout;

var errorContext = new ErrorContext(options.ClientContextId, stopwatch, queryTimeout);
Expand Down Expand Up @@ -262,24 +264,29 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio
ex, errorContext);
}

if (!response.IsSuccessStatusCode)
// Errors may be present alongside a 2xx response (e.g. HTTP 202 with status:"fatal"
// and a retriable error array). Inspect them before any other branching so the retry
// logic applies uniformly to success and failure HTTP codes.
QueryError[]? errors = null;
if (root.TryGetProperty("errors", out var errorsElement) && errorsElement.ValueKind == JsonValueKind.Array)
{
// Try to parse errors from the response
if (root.TryGetProperty("errors", out var errorsElement))
{
var errors = JsonSerializer.Deserialize<QueryError[]>(errorsElement.GetRawText())
?? Array.Empty<QueryError>();

if (AnalyticsErrorMapper.AreErrorsRetriable(errors))
{
lastException = AnalyticsErrorMapper.MapServiceErrors(errors, errorContext);
await RetryUtils.BackoffAsync(attempt, cancellationToken).ConfigureAwait(false);
continue;
}
errors = JsonSerializer.Deserialize<QueryError[]>(errorsElement.GetRawText());
}

throw AnalyticsErrorMapper.MapServiceErrors(errors, errorContext);
if (errors is { Length: > 0 })
{
if (AnalyticsErrorMapper.AreErrorsRetriable(errors))
{
lastException = AnalyticsErrorMapper.MapServiceErrors(errors, errorContext);
await RetryUtils.BackoffAsync(attempt, cancellationToken).ConfigureAwait(false);
continue;
}

throw AnalyticsErrorMapper.MapServiceErrors(errors, errorContext);
}

if (!response.IsSuccessStatusCode)
{
// 503 is retriable
if (response.StatusCode == HttpStatusCode.ServiceUnavailable)
{
Expand Down Expand Up @@ -339,7 +346,7 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio

public async Task<QueryStatus> FetchStatusAsync(QueryHandle handle, FetchStatusOptions options, CancellationToken cancellationToken = default)
{
var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout;
var timeout = _clusterOptions.TimeoutOptions.HandleRequestTimeout;
using var httpClient = CreateHttpClient(timeout);

var statusUri = Uri.TryCreate(handle.Handle, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps)
Expand Down Expand Up @@ -456,7 +463,7 @@ public async Task<QueryStatus> FetchStatusAsync(QueryHandle handle, FetchStatusO

public async Task<IQueryResult> FetchResultsAsync(string requestId, string handlePath, FetchResultsOptions options, CancellationToken cancellationToken = default)
{
var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout;
var timeout = _clusterOptions.TimeoutOptions.HandleRequestTimeout;
var httpClient = CreateHttpClient(timeout);
var deserializer = options.Deserializer ?? _clusterOptions.Deserializer;

Expand Down Expand Up @@ -505,7 +512,7 @@ public async Task<IQueryResult> FetchResultsAsync(string requestId, string handl

public async Task DiscardResultsAsync(string requestId, string handlePath, DiscardResultsOptions options, CancellationToken cancellationToken = default)
{
var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout;
var timeout = _clusterOptions.TimeoutOptions.HandleRequestTimeout;
using var httpClient = CreateHttpClient(timeout);

var resultUri = Uri.TryCreate(handlePath, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps)
Expand Down Expand Up @@ -543,7 +550,7 @@ public async Task DiscardResultsAsync(string requestId, string handlePath, Disca

public async Task CancelQueryAsync(string requestId, CancelOptions options, CancellationToken cancellationToken = default)
{
var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout;
var timeout = _clusterOptions.TimeoutOptions.HandleRequestTimeout;
using var httpClient = CreateHttpClient(timeout);

var cancelUri = new Uri(_baseUri, "api/v1/active_requests");
Expand Down Expand Up @@ -658,4 +665,4 @@ private static Exception ThrowTooManyRetries(ErrorContext errorContext)
private static partial void LogCancelQueryResponse(ILogger logger, Redacted<string> requestId, int statusCode);

#endregion
}
}
16 changes: 14 additions & 2 deletions src/Couchbase.Analytics/Options/TimeoutOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public record TimeoutOptions
internal TimeSpan ConnectTimeout = TimeSpan.FromSeconds(10);
internal TimeSpan DispatchTimeout = TimeSpan.FromSeconds(30);
internal TimeSpan QueryTimeout = TimeSpan.FromMinutes(10);
internal TimeSpan HandleRequestTimeout = TimeSpan.FromSeconds(10);

/// <summary>
/// Socket connection timeout, or more broadly the timeout
Expand All @@ -52,12 +53,23 @@ public TimeoutOptions WithDispatchTimeout(TimeSpan dispatchTimeout)
}

/// <summary>
/// Columnar query timeout.
/// Cluster-level query timeout. Can be overriden with <see cref="QueryOptions"/>'s Timeout.
/// <remarks>The default is 10m.</remarks>
/// </summary>
public TimeoutOptions WithQueryTimeout(TimeSpan queryTimeout)
{
QueryTimeout = queryTimeout;
return this;
}
}

/// <summary>
/// Per-request timeout for async query handle operations
/// (FetchStatus, FetchResults, DiscardResults, CancelHandle).
/// <remarks>The default is 10s.</remarks>
/// </summary>
public TimeoutOptions WithHandleRequestTimeout(TimeSpan handleTimeout)
{
HandleRequestTimeout = handleTimeout;
return this;
}
}
Loading