diff --git a/src/Couchbase.Analytics/Async/QueryHandle.cs b/src/Couchbase.Analytics/Async/QueryHandle.cs index 50bae13..5e4f14b 100644 --- a/src/Couchbase.Analytics/Async/QueryHandle.cs +++ b/src/Couchbase.Analytics/Async/QueryHandle.cs @@ -21,21 +21,21 @@ using System.Text.Json; using Couchbase.AnalyticsClient.Internal; +using Couchbase.AnalyticsClient.Options; +using Couchbase.AnalyticsClient.Results; namespace Couchbase.AnalyticsClient.Async; /// /// Represents a handle to a server-side asynchronous query. -/// Obtained from or . +/// Obtained from . /// public class QueryHandle { private readonly IAnalyticsService _analyticsService; - private readonly TimeSpan? _requestTimeout; /// - /// The query handle string used to poll status and fetch results. - /// This is the path segment after /api/v1/request/status/. + /// The query handle string used to poll for the result handle. /// public string Handle { get; } @@ -44,75 +44,53 @@ public class QueryHandle /// public string RequestId { get; } - internal QueryHandle(string handle, string requestId, IAnalyticsService analyticsService, TimeSpan? requestTimeout = null) + internal QueryHandle(string handle, string requestId, IAnalyticsService analyticsService) { Handle = handle ?? throw new ArgumentNullException(nameof(handle)); RequestId = requestId ?? throw new ArgumentNullException(nameof(requestId)); _analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService)); - _requestTimeout = requestTimeout; } /// - /// Fetches the current status of the asynchronous query from the server. + /// Fetches the result handle of the asynchronous query from the server. /// + /// Options for fetching the result handle. /// A cancellation token. - /// A representing the current state of the query. - public async Task FetchStatusAsync(CancellationToken cancellationToken = default) + /// A if results are ready, otherwise null. + public Task FetchResultHandleAsync(FetchResultHandleOptions? options = null, CancellationToken cancellationToken = default) { - return await _analyticsService.FetchStatusAsync(Handle, _requestTimeout, cancellationToken) - .ConfigureAwait(false); + options ??= new FetchResultHandleOptions(); + return _analyticsService.FetchResultHandleAsync(this, options, cancellationToken); } /// - /// Discards the query results on the server. After this call, the results can no longer be fetched. + /// Fetches the result handle of the asynchronous query from the server. /// - /// A cancellation token. - public async Task DiscardResultsAsync(CancellationToken cancellationToken = default) + public Task FetchResultHandleAsync(Func options, CancellationToken cancellationToken = default) { - await _analyticsService.DiscardResultsAsync(Handle, _requestTimeout, cancellationToken) - .ConfigureAwait(false); + var fetchOptions = new FetchResultHandleOptions(); + fetchOptions = options.Invoke(fetchOptions); + return FetchResultHandleAsync(fetchOptions, cancellationToken); } /// /// Cancels the query on the server. If the query has already completed, this is a no-op. /// + /// Options for cancellation. /// A cancellation token. - public async Task CancelAsync(CancellationToken cancellationToken = default) - { - await _analyticsService.CancelQueryAsync(RequestId, _requestTimeout, cancellationToken) - .ConfigureAwait(false); - } - - /// - /// Serializes this to a JSON string so it can be persisted and - /// later reconstructed via . - /// This method does not perform any network operations. - /// - /// A JSON string containing the handle and request ID. - public string Serialize() + public Task CancelAsync(CancelOptions? options = null, CancellationToken cancellationToken = default) { - var data = new SerializedQueryHandle(Handle, RequestId); - return JsonSerializer.Serialize(data); + options ??= new CancelOptions(); + return _analyticsService.CancelQueryAsync(RequestId, options, cancellationToken); } /// - /// Deserializes a from a JSON string previously produced by . - /// This method does not perform any network operations. + /// Cancels the query on the server. If the query has already completed, this is a no-op. /// - internal static QueryHandle Deserialize(string serializedHandle, IAnalyticsService analyticsService, TimeSpan? requestTimeout = null) + public Task CancelAsync(Func options, CancellationToken cancellationToken = default) { - ArgumentNullException.ThrowIfNull(serializedHandle); - - var data = JsonSerializer.Deserialize(serializedHandle) - ?? throw new ArgumentException("Invalid serialized handle format.", nameof(serializedHandle)); - - if (string.IsNullOrWhiteSpace(data.Handle) || string.IsNullOrWhiteSpace(data.RequestId)) - { - throw new ArgumentException("Serialized handle is missing required fields.", nameof(serializedHandle)); - } - - return new QueryHandle(data.Handle, data.RequestId, analyticsService, requestTimeout); + var cancelOptions = new CancelOptions(); + cancelOptions = options.Invoke(cancelOptions); + return CancelAsync(cancelOptions, cancellationToken); } - - private record SerializedQueryHandle(string Handle, string RequestId); } diff --git a/src/Couchbase.Analytics/Async/QueryHandleResults.cs b/src/Couchbase.Analytics/Async/QueryHandleResults.cs deleted file mode 100644 index e985ad5..0000000 --- a/src/Couchbase.Analytics/Async/QueryHandleResults.cs +++ /dev/null @@ -1,58 +0,0 @@ -#region License -/* ************************************************************ - * - * @author Couchbase - * @copyright 2025 Couchbase, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ************************************************************/ -#endregion - -using Couchbase.AnalyticsClient.Internal; -using Couchbase.AnalyticsClient.Results; -using Couchbase.Core.Json; - -namespace Couchbase.AnalyticsClient.Async; - -/// -/// Provides access to the results of a completed asynchronous query. -/// Obtained from when the query status is "success". -/// -public class QueryHandleResults -{ - private readonly string _handle; - private readonly IAnalyticsService _analyticsService; - private readonly IDeserializer _deserializer; - private readonly TimeSpan? _requestTimeout; - - internal QueryHandleResults(string handle, IAnalyticsService analyticsService, IDeserializer deserializer, TimeSpan? requestTimeout = null) - { - _handle = handle ?? throw new ArgumentNullException(nameof(handle)); - _analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService)); - _deserializer = deserializer ?? throw new ArgumentNullException(nameof(deserializer)); - _requestTimeout = requestTimeout; - } - - /// - /// Streams all query result rows from the server. - /// The returned behaves the same as the synchronous query API result. - /// - /// A cancellation token. - /// An that can be used to enumerate the result rows. - public async Task StreamAllAsync(CancellationToken cancellationToken = default) - { - return await _analyticsService.FetchResultsAsync(_handle, _requestTimeout, _deserializer, cancellationToken) - .ConfigureAwait(false); - } -} diff --git a/src/Couchbase.Analytics/Async/QueryPartition.cs b/src/Couchbase.Analytics/Async/QueryPartition.cs deleted file mode 100644 index b5d6291..0000000 --- a/src/Couchbase.Analytics/Async/QueryPartition.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System.Text.Json.Serialization; - -namespace Couchbase.AnalyticsClient.Async; - -/// -/// Represents a partition of a completed asynchronous query's result set, -/// as returned in the FetchStatus response. -/// -public class QueryPartition -{ - /// - /// The handle path for fetching this partition's results. - /// - [JsonPropertyName("handle")] - public string? Handle { get; set; } - - /// - /// The number of result rows in this partition. - /// - [JsonPropertyName("resultCount")] - public long ResultCount { get; set; } -} diff --git a/src/Couchbase.Analytics/Async/QueryResultHandle.cs b/src/Couchbase.Analytics/Async/QueryResultHandle.cs new file mode 100644 index 0000000..308fea4 --- /dev/null +++ b/src/Couchbase.Analytics/Async/QueryResultHandle.cs @@ -0,0 +1,90 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using Couchbase.AnalyticsClient.Internal; +using Couchbase.AnalyticsClient.Options; +using Couchbase.AnalyticsClient.Results; + +namespace Couchbase.AnalyticsClient.Async; + +/// +/// Provides access to the results of a completed asynchronous query. +/// +public class QueryResultHandle +{ + private readonly string _handlePath; + private readonly IAnalyticsService _analyticsService; + + /// + /// The request ID assigned by the server when the query was submitted. + /// + public string RequestId { get; } + + internal QueryResultHandle(string handlePath, string requestId, IAnalyticsService analyticsService) + { + _handlePath = handlePath ?? throw new ArgumentNullException(nameof(handlePath)); + RequestId = requestId ?? throw new ArgumentNullException(nameof(requestId)); + _analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService)); + } + + /// + /// Fetches the results of the query from the server. + /// + /// Options for fetching the results. + /// A cancellation token. + /// An that can be used to enumerate the result rows. + public Task FetchResultsAsync(FetchResultsOptions? options = null, CancellationToken cancellationToken = default) + { + options ??= new FetchResultsOptions(); + return _analyticsService.FetchResultsAsync(RequestId, _handlePath, options, cancellationToken); + } + + /// + /// Fetches the results of the query from the server. + /// + public Task FetchResultsAsync(Func options, CancellationToken cancellationToken = default) + { + var fetchOptions = new FetchResultsOptions(); + fetchOptions = options.Invoke(fetchOptions); + return FetchResultsAsync(fetchOptions, cancellationToken); + } + + /// + /// Discards the query results on the server. After this call, the results can no longer be fetched. + /// + /// Options for discarding results. + /// A cancellation token. + public Task DiscardResultsAsync(DiscardResultsOptions? options = null, CancellationToken cancellationToken = default) + { + options ??= new DiscardResultsOptions(); + return _analyticsService.DiscardResultsAsync(RequestId, _handlePath, options, cancellationToken); + } + + /// + /// Discards the query results on the server. After this call, the results can no longer be fetched. + /// + public Task DiscardResultsAsync(Func options, CancellationToken cancellationToken = default) + { + var discardOptions = new DiscardResultsOptions(); + discardOptions = options.Invoke(discardOptions); + return DiscardResultsAsync(discardOptions, cancellationToken); + } +} diff --git a/src/Couchbase.Analytics/Async/QueryStatus.cs b/src/Couchbase.Analytics/Async/QueryStatus.cs deleted file mode 100644 index 9ec75f4..0000000 --- a/src/Couchbase.Analytics/Async/QueryStatus.cs +++ /dev/null @@ -1,181 +0,0 @@ -#region License -/* ************************************************************ - * - * @author Couchbase - * @copyright 2025 Couchbase, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ************************************************************/ -#endregion - -using Couchbase.AnalyticsClient.Exceptions; -using Couchbase.AnalyticsClient.Internal; -using Couchbase.AnalyticsClient.Query; -using Couchbase.Core.Json; - -namespace Couchbase.AnalyticsClient.Async; - -/// -/// Represents the status of an asynchronous query, obtained by polling via -/// . -/// -public class QueryStatus -{ - private readonly IAnalyticsService _analyticsService; - private readonly IDeserializer _deserializer; - private readonly TimeSpan? _requestTimeout; - - /// - /// The raw status string from the server (e.g., "queued", "running", "success", "fatal", "timeout"). - /// - public string Status { get; } - - /// - /// The handle path for fetching results, available when is true. - /// This is the full path from the status response (e.g., "/api/v1/request/result/{handle}"). - /// - internal string? ResultHandle { get; } - - /// - /// Errors returned by the server when the query status is "fatal" or "timeout". - /// - internal IReadOnlyList? Errors { get; } - - /// - /// Metrics from the status response. - /// - public QueryMetrics? Metrics { get; } - - /// - /// The total number of result rows, available when the query has completed successfully. - /// - public long? ResultCount { get; } - - /// - /// Partition information from the status response, if available. - /// Each element contains a handle path and result count for that partition. - /// - public IReadOnlyList? Partitions { get; } - - /// - /// Whether the result set is ordered. Available when the query has completed successfully. - /// - public bool? ResultSetOrdered { get; } - - /// - /// The timestamp when the query was created on the server. - /// - public DateTimeOffset? CreatedAt { get; } - - internal QueryStatus( - string status, - string? resultHandle, - IReadOnlyList? errors, - QueryMetrics? metrics, - long? resultCount, - IReadOnlyList? partitions, - bool? resultSetOrdered, - DateTimeOffset? createdAt, - IAnalyticsService analyticsService, - IDeserializer deserializer, - TimeSpan? requestTimeout = null) - { - Status = status ?? throw new ArgumentNullException(nameof(status)); - ResultHandle = resultHandle; - Errors = errors; - Metrics = metrics; - ResultCount = resultCount; - Partitions = partitions; - ResultSetOrdered = resultSetOrdered; - CreatedAt = createdAt; - _analyticsService = analyticsService; - _deserializer = deserializer; - _requestTimeout = requestTimeout; - } - - /// - /// Returns true if the query has completed successfully and results are ready to be streamed. - /// - public bool AreResultsReady => string.Equals(Status, "success", StringComparison.OrdinalIgnoreCase); - - /// - /// Returns true if the query ended with a terminal error status. - /// Per spec: "fatal" or "timeout". - /// Also includes "failed" which is not in the spec but has been observed - /// from the server (e.g., for cancelled queries). - /// - public bool IsError => string.Equals(Status, "fatal", StringComparison.OrdinalIgnoreCase) - || string.Equals(Status, "failed", StringComparison.OrdinalIgnoreCase) - || string.Equals(Status, "timeout", StringComparison.OrdinalIgnoreCase); - - /// - /// If results are ready, returns a that can be used to stream the query rows. - /// If the query ended with an error, returns the error. - /// This method does not perform network calls. - /// - /// - /// A tuple of (?, ?). - /// When results are ready, the first element is populated and the second is null. - /// When the query errored, the first element is null and the second contains the error. - /// - /// - /// Thrown if the query is still in progress (not "success", "fatal", or "timeout"). - /// - public (QueryHandleResults? Results, AnalyticsException? Error) GetResults() - { - if (AreResultsReady) - { - if (string.IsNullOrWhiteSpace(ResultHandle)) - { - throw new InvalidOperationException( - "Query status indicates success but no result handle was provided by the server."); - } - - // Strip the leading path prefix to get just the handle portion - var handle = ResultHandle.StartsWith("/api/v1/request/result/") - ? ResultHandle["/api/v1/request/result/".Length..] - : ResultHandle; - - var handleResults = new QueryHandleResults(handle, _analyticsService, _deserializer, _requestTimeout); - return (handleResults, null); - } - - if (IsError) - { - var error = BuildErrorFromStatus(); - return (null, error); - } - - throw new InvalidOperationException( - $"Cannot get results while query is in status '{Status}'. Poll again using FetchStatusAsync()."); - } - - private AnalyticsException BuildErrorFromStatus() - { - if (Errors is { Count: > 0 }) - { - var firstError = Errors[0]; - return firstError.Code switch - { - 20000 => new InvalidCredentialException(firstError.Message), - 21002 => new AnalyticsTimeoutException($"{firstError.Message}. Error code: {firstError.Code}"), - _ => new QueryException(firstError.Message) { Code = firstError.Code, ServerMessage = firstError.Message } - }; - } - - return string.Equals(Status, "timeout", StringComparison.OrdinalIgnoreCase) - ? new AnalyticsTimeoutException($"Query timed out on the server (status: {Status}).") - : new AnalyticsException($"Query failed with status: {Status}"); - } -} diff --git a/src/Couchbase.Analytics/Cluster.cs b/src/Couchbase.Analytics/Cluster.cs index 67fc2ae..bdf78e6 100644 --- a/src/Couchbase.Analytics/Cluster.cs +++ b/src/Couchbase.Analytics/Cluster.cs @@ -209,17 +209,7 @@ public Task StartQueryAsync(string statement, Func - /// Reconstructs a from a previously serialized handle string. - /// This method does not perform any network operations. - /// - /// A JSON string previously produced by . - /// A that can be used to interact with the query. - public QueryHandle QueryHandleFromSerialized(string serializedHandle) - { - var service = _analyticsService.GetValueOrThrow(); - return QueryHandle.Deserialize(serializedHandle, service); - } + public Database Database(string databaseName) { diff --git a/src/Couchbase.Analytics/Exceptions/QueryNotFoundException.cs b/src/Couchbase.Analytics/Exceptions/QueryNotFoundException.cs new file mode 100644 index 0000000..40f4720 --- /dev/null +++ b/src/Couchbase.Analytics/Exceptions/QueryNotFoundException.cs @@ -0,0 +1,37 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +namespace Couchbase.AnalyticsClient.Exceptions; + +/// +/// Exception thrown when the analytics service returns an HTTP Status 404 Not Found. +/// Currently only applies to the server async query requests API. +/// +public class QueryNotFoundException : AnalyticsException +{ + public QueryNotFoundException() : base("Query not found.") { } + + public QueryNotFoundException(string message) : base(message) { } + + public QueryNotFoundException(string message, Exception innerException) : base(message, innerException) { } + + internal QueryNotFoundException(string message, Exception innerException, Couchbase.AnalyticsClient.Internal.Retry.ErrorContext errorContext) : base(message, innerException, errorContext) { } +} diff --git a/src/Couchbase.Analytics/Internal/AnalyticsService.cs b/src/Couchbase.Analytics/Internal/AnalyticsService.cs index 036fe02..48f8dbd 100644 --- a/src/Couchbase.Analytics/Internal/AnalyticsService.cs +++ b/src/Couchbase.Analytics/Internal/AnalyticsService.cs @@ -214,8 +214,7 @@ public async Task StartQueryAsync(string statement, StartQueryOptio { var stopwatch = LightweightStopwatch.StartNew(); var queryTimeout = options.QueryTimeout ?? _clusterOptions.TimeoutOptions.QueryTimeout; - var requestTimeout = options.RequestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; - var deserializer = options.Deserializer ?? _clusterOptions.Deserializer; + var requestTimeout = _clusterOptions.TimeoutOptions.DispatchTimeout; var errorContext = new ErrorContext(options.ClientContextId, stopwatch, queryTimeout); Exception? lastException = null; @@ -289,13 +288,8 @@ public async Task StartQueryAsync(string statement, StartQueryOptio throw new AnalyticsException("Server response is missing required 'requestID' or 'handle' fields.", errorContext); } - // Strip the leading path prefix from the handle - var handle = handlePath.StartsWith("/api/v1/request/status/") - ? handlePath["/api/v1/request/status/".Length..] - : handlePath; - - LogAsyncStartQuerySucceeded(_logger, options.ClientContextId, _redactor.SystemData(handle), _redactor.SystemData(requestId), (int)response.StatusCode); - return new QueryHandle(handle, requestId, this, requestTimeout); + LogAsyncStartQuerySucceeded(_logger, options.ClientContextId, _redactor.SystemData(handlePath), _redactor.SystemData(requestId), (int)response.StatusCode); + return new QueryHandle(handlePath, requestId, this); } catch (HttpRequestException httpRequestException) { @@ -331,16 +325,18 @@ public async Task StartQueryAsync(string statement, StartQueryOptio throw lastException ?? ThrowTooManyRetries(errorContext); } - public async Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + public async Task FetchResultHandleAsync(QueryHandle handle, FetchResultHandleOptions options, CancellationToken cancellationToken = default) { - var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout; var httpClient = CreateHttpClient(timeout); - var deserializer = _clusterOptions.Deserializer; - var statusUri = new Uri(_baseUri, $"api/v1/request/status/{handle}"); + var statusUri = Uri.TryCreate(handle.Handle, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps) + ? absUri + : new Uri(_baseUri, handle.Handle); + var request = new HttpRequestMessage(HttpMethod.Get, statusUri); - LogFetchStatusRequest(_logger, _redactor.SystemData(statusUri), _redactor.SystemData(handle)); + LogFetchResultHandleRequest(_logger, _redactor.SystemData(statusUri), _redactor.SystemData(handle.Handle)); try { @@ -349,7 +345,7 @@ public async Task FetchStatusAsync(string handle, TimeSpan? request if (response.StatusCode == HttpStatusCode.NotFound) { - throw new AnalyticsException("Query has been discarded or canceled (404 Not Found)."); + throw new QueryNotFoundException("Query has been discarded or canceled (404 Not Found)."); } var responseBody = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); @@ -363,72 +359,103 @@ public async Task FetchStatusAsync(string handle, TimeSpan? request throw new AnalyticsException("Server response is missing required 'status' field."); } - if (!response.IsSuccessStatusCode - && response.StatusCode != HttpStatusCode.NotFound) - { - LogFetchStatusUnexpectedHttp(_logger, _redactor.SystemData(handle), (int)response.StatusCode); - } - else + if (!response.IsSuccessStatusCode) { - LogFetchStatusResponse(_logger, _redactor.SystemData(handle), status, (int)response.StatusCode); + LogFetchResultHandleUnexpectedHttp(_logger, _redactor.SystemData(handle.Handle), (int)response.StatusCode); } - // Parse optional fields - var resultHandle = root.TryGetProperty("handle", out var handleProp) ? handleProp.GetString() : null; - IReadOnlyList? errors = null; if (root.TryGetProperty("errors", out var errorsElement)) { - errors = JsonSerializer.Deserialize(errorsElement.GetRawText()) - ?? Array.Empty(); + errors = JsonSerializer.Deserialize(errorsElement.GetRawText()) ?? Array.Empty(); } - QueryMetrics? metrics = null; - if (root.TryGetProperty("metrics", out var metricsElement)) + if (!response.IsSuccessStatusCode) { - metrics = JsonSerializer.Deserialize(metricsElement.GetRawText()); + var errorContext = new ErrorContext(string.Empty, LightweightStopwatch.StartNew(), timeout); + errorContext.StatusCode = response.StatusCode; + if (errors is { Count: > 0 }) + { + throw AnalyticsErrorMapper.MapServiceErrors(errors, errorContext); + } + throw new AnalyticsException($"Query status fetch failed with HTTP {(int)response.StatusCode} and status: {status}", errorContext); } - // Parse additional status response fields per spec - long? resultCount = root.TryGetProperty("resultCount", out var rcProp) && rcProp.TryGetInt64(out var rc) ? rc : null; - bool? resultSetOrdered = root.TryGetProperty("resultSetOrdered", out var rsoProp) ? rsoProp.GetBoolean() : null; - DateTimeOffset? createdAt = root.TryGetProperty("createdAt", out var caProp) && caProp.TryGetDateTimeOffset(out var ca) ? ca : null; + LogFetchResultHandleResponse(_logger, _redactor.SystemData(handle.Handle), status, (int)response.StatusCode); - IReadOnlyList? partitions = null; - if (root.TryGetProperty("partitions", out var partitionsElement)) + if (string.Equals(status, "running", StringComparison.OrdinalIgnoreCase)) { - partitions = JsonSerializer.Deserialize(partitionsElement.GetRawText()) - ?? Array.Empty(); + return null; } - return new QueryStatus(status, resultHandle, errors, metrics, resultCount, partitions, resultSetOrdered, createdAt, this, deserializer, requestTimeout); + if (!string.Equals(status, "success", StringComparison.OrdinalIgnoreCase)) + { + if (string.Equals(status, "stopped", StringComparison.OrdinalIgnoreCase) || + string.Equals(status, "aborted", StringComparison.OrdinalIgnoreCase) || + string.Equals(status, "closed", StringComparison.OrdinalIgnoreCase)) + { + throw new QueryNotFoundException($"Query has been discarded or canceled (status: {status})."); + } + + if (string.Equals(status, "timeout", StringComparison.OrdinalIgnoreCase)) + { + throw new AnalyticsTimeoutException("The query evaluation timed out on the server."); + } + + if (string.Equals(status, "fatal", StringComparison.OrdinalIgnoreCase) || + string.Equals(status, "failed", StringComparison.OrdinalIgnoreCase) || + string.Equals(status, "errors", StringComparison.OrdinalIgnoreCase)) + { + if (errors is { Count: > 0 }) + { + var errorContext = new ErrorContext(string.Empty, LightweightStopwatch.StartNew(), timeout); + errorContext.StatusCode = response.StatusCode; + throw AnalyticsErrorMapper.MapServiceErrors(errors, errorContext); + } + throw new AnalyticsException($"Query execution failed on the server (status: {status})."); + } + + throw new AnalyticsException($"Query status fetch failed with unrecognized status: {status}"); + } + + var resultHandle = root.TryGetProperty("handle", out var handleProp) ? handleProp.GetString() : null; + if (string.IsNullOrWhiteSpace(resultHandle)) + { + throw new InvalidOperationException("Query status indicates success but no result handle was provided by the server."); + } + + return new QueryResultHandle(resultHandle, handle.RequestId, this); } catch (TaskCanceledException taskCanceledEx) { - throw new AnalyticsTimeoutException("The FetchStatus request was canceled.", taskCanceledEx); + throw new AnalyticsTimeoutException("The FetchResultHandle request was canceled.", taskCanceledEx); } } - public async Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default) + public async Task FetchResultsAsync(string requestId, string handlePath, FetchResultsOptions options, CancellationToken cancellationToken = default) { - var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout; var httpClient = CreateHttpClient(timeout); + var deserializer = options.Deserializer ?? _clusterOptions.Deserializer; - var resultUri = new Uri(_baseUri, $"api/v1/request/result/{handle}"); + var resultUri = Uri.TryCreate(handlePath, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps) + ? absUri + : new Uri(_baseUri, handlePath); + var request = new HttpRequestMessage(HttpMethod.Get, resultUri); - LogFetchResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handle)); + LogFetchResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handlePath)); try { var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken) .ConfigureAwait(false); - LogFetchResultsResponse(_logger, _redactor.SystemData(handle), (int)response.StatusCode); + LogFetchResultsResponse(_logger, _redactor.SystemData(handlePath), (int)response.StatusCode); if (response.StatusCode == HttpStatusCode.NotFound) { - throw new AnalyticsException("Query results have been discarded or canceled (404 Not Found)."); + throw new QueryNotFoundException("Query results have been discarded or canceled (404 Not Found)."); } var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); @@ -454,15 +481,18 @@ public async Task FetchResultsAsync(string handle, TimeSpan? reque } } - public async Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + public async Task DiscardResultsAsync(string requestId, string handlePath, DiscardResultsOptions options, CancellationToken cancellationToken = default) { - var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout; var httpClient = CreateHttpClient(timeout); - var resultUri = new Uri(_baseUri, $"api/v1/request/result/{handle}"); + var resultUri = Uri.TryCreate(handlePath, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps) + ? absUri + : new Uri(_baseUri, handlePath); + var request = new HttpRequestMessage(HttpMethod.Delete, resultUri); - LogDiscardResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handle)); + LogDiscardResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handlePath)); try { @@ -472,11 +502,11 @@ public async Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, C if (response.StatusCode == HttpStatusCode.NotFound) { // Per spec: 404 means already discarded or canceled — not an error - LogDiscardResults404(_logger, _redactor.SystemData(handle)); + LogDiscardResults404(_logger, _redactor.SystemData(handlePath)); return; } - LogDiscardResultsResponse(_logger, _redactor.SystemData(handle), (int)response.StatusCode); + LogDiscardResultsResponse(_logger, _redactor.SystemData(handlePath), (int)response.StatusCode); if (response.StatusCode != HttpStatusCode.Accepted) { @@ -489,9 +519,9 @@ public async Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, C } } - public async Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + public async Task CancelQueryAsync(string requestId, CancelOptions options, CancellationToken cancellationToken = default) { - var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var timeout = _clusterOptions.TimeoutOptions.DispatchTimeout; var httpClient = CreateHttpClient(timeout); var cancelUri = new Uri(_baseUri, "api/v1/active_requests"); @@ -578,14 +608,14 @@ private static Exception ThrowTooManyRetries(ErrorContext errorContext) [LoggerMessage(9, LogLevel.Debug, "Async StartQuery succeeded for {ClientContextId}. Handle={Handle}, RequestId={RequestId} (HTTP {StatusCode})")] private static partial void LogAsyncStartQuerySucceeded(ILogger logger, string? clientContextId, Redacted handle, Redacted requestId, int statusCode); - [LoggerMessage(10, LogLevel.Debug, "FetchStatus sending GET to {Uri} for handle {Handle}")] - private static partial void LogFetchStatusRequest(ILogger logger, Redacted uri, Redacted handle); + [LoggerMessage(10, LogLevel.Debug, "FetchResultHandle sending GET to {Uri} for handle {Handle}")] + private static partial void LogFetchResultHandleRequest(ILogger logger, Redacted uri, Redacted handle); - [LoggerMessage(11, LogLevel.Debug, "FetchStatus for handle {Handle} returned status={Status} (HTTP {StatusCode})")] - private static partial void LogFetchStatusResponse(ILogger logger, Redacted handle, string status, int statusCode); + [LoggerMessage(11, LogLevel.Debug, "FetchResultHandle for handle {Handle} returned status={Status} (HTTP {StatusCode})")] + private static partial void LogFetchResultHandleResponse(ILogger logger, Redacted handle, string status, int statusCode); - [LoggerMessage(12, LogLevel.Warning, "FetchStatus for handle {Handle} returned unexpected HTTP {StatusCode}")] - private static partial void LogFetchStatusUnexpectedHttp(ILogger logger, Redacted handle, int statusCode); + [LoggerMessage(12, LogLevel.Warning, "FetchResultHandle for handle {Handle} returned unexpected HTTP {StatusCode}")] + private static partial void LogFetchResultHandleUnexpectedHttp(ILogger logger, Redacted handle, int statusCode); [LoggerMessage(13, LogLevel.Debug, "FetchResults sending GET to {Uri} for handle {Handle}")] private static partial void LogFetchResultsRequest(ILogger logger, Redacted uri, Redacted handle); diff --git a/src/Couchbase.Analytics/Internal/IAnalyticsService.cs b/src/Couchbase.Analytics/Internal/IAnalyticsService.cs index 376411f..6201737 100644 --- a/src/Couchbase.Analytics/Internal/IAnalyticsService.cs +++ b/src/Couchbase.Analytics/Internal/IAnalyticsService.cs @@ -42,25 +42,25 @@ internal interface IAnalyticsService /// /// Fetches the status of an async query from the server. - /// Sends GET to /api/v1/request/status/{handle}. + /// Sends GET to /api/v1/request/status/{requestID}/{handleID}. /// - Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default); + Task FetchResultHandleAsync(QueryHandle handle, FetchResultHandleOptions options, CancellationToken cancellationToken = default); /// /// Fetches the results of a completed async query from the server. - /// Sends GET to /api/v1/request/result/{handle}. + /// Sends GET to /api/v1/request/result/{requestID}/{handleID}. /// - Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default); + Task FetchResultsAsync(string requestId, string handlePath, FetchResultsOptions options, CancellationToken cancellationToken = default); /// /// Discards the results of an async query on the server. - /// Sends DELETE to /api/v1/request/result/{handle}. + /// Sends DELETE to /api/v1/request/result/{requestID}/{handleID}. /// - Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default); + Task DiscardResultsAsync(string requestId, string handlePath, DiscardResultsOptions options, CancellationToken cancellationToken = default); /// /// Cancels an active async query on the server. /// Sends DELETE to /api/v1/active_requests with the request_id. /// - Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default); + Task CancelQueryAsync(string requestId, CancelOptions options, CancellationToken cancellationToken = default); } diff --git a/src/Couchbase.Analytics/Options/CancelOptions.cs b/src/Couchbase.Analytics/Options/CancelOptions.cs new file mode 100644 index 0000000..b029aef --- /dev/null +++ b/src/Couchbase.Analytics/Options/CancelOptions.cs @@ -0,0 +1,30 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +namespace Couchbase.AnalyticsClient.Options; + +/// +/// Options for canceling an asynchronous server-side query. +/// +public record CancelOptions +{ + // Reserved for future use. +} diff --git a/src/Couchbase.Analytics/Options/DiscardResultsOptions.cs b/src/Couchbase.Analytics/Options/DiscardResultsOptions.cs new file mode 100644 index 0000000..27ec2fa --- /dev/null +++ b/src/Couchbase.Analytics/Options/DiscardResultsOptions.cs @@ -0,0 +1,30 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +namespace Couchbase.AnalyticsClient.Options; + +/// +/// Options for discarding results of an asynchronous server-side query. +/// +public record DiscardResultsOptions +{ + // Reserved for future use. +} diff --git a/src/Couchbase.Analytics/Options/FetchResultHandleOptions.cs b/src/Couchbase.Analytics/Options/FetchResultHandleOptions.cs new file mode 100644 index 0000000..f7ade57 --- /dev/null +++ b/src/Couchbase.Analytics/Options/FetchResultHandleOptions.cs @@ -0,0 +1,30 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +namespace Couchbase.AnalyticsClient.Options; + +/// +/// Options for fetching a result handle from an asynchronous server-side query. +/// +public record FetchResultHandleOptions +{ + // Reserved for future use. +} diff --git a/src/Couchbase.Analytics/Options/FetchResultsOptions.cs b/src/Couchbase.Analytics/Options/FetchResultsOptions.cs new file mode 100644 index 0000000..4dade0a --- /dev/null +++ b/src/Couchbase.Analytics/Options/FetchResultsOptions.cs @@ -0,0 +1,42 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using Couchbase.Core.Json; + +namespace Couchbase.AnalyticsClient.Options; + +/// +/// Options for fetching results of an asynchronous server-side query. +/// +public record FetchResultsOptions +{ + /// + /// The deserializer to use when parsing the results. + /// + public IDeserializer? Deserializer { get; init; } + + /// + /// Sets the deserializer to use when parsing the results. + /// + /// The deserializer to use. + /// A new with the deserializer set. + public FetchResultsOptions WithDeserializer(IDeserializer deserializer) => this with { Deserializer = deserializer }; +} diff --git a/src/Couchbase.Analytics/Options/StartQueryOptions.cs b/src/Couchbase.Analytics/Options/StartQueryOptions.cs index e4250ad..783fe99 100644 --- a/src/Couchbase.Analytics/Options/StartQueryOptions.cs +++ b/src/Couchbase.Analytics/Options/StartQueryOptions.cs @@ -45,20 +45,6 @@ public record StartQueryOptions /// public TimeSpan? QueryTimeout { get; init; } - /// - /// The per-HTTP-request timeout for SDK operations related to this async query - /// (e.g., starting the query, polling status, fetching results, cancelling). - /// If unset, defaults to . - /// - public TimeSpan? RequestTimeout { get; init; } - - /// - /// Optional result TTL that overrides the cluster's default (1 hour) for the query's result set. - /// Once the TTL expires, the server discards the result set. - /// The value should be a duration string (e.g., "30m", "2h"). - /// - public string? ResultTTL { get; init; } - /// /// The ClientContextId to be used for the query request. Used to identify the query in logs and profiles. /// If none is provided, a new GUID will be generated. @@ -80,11 +66,6 @@ public record StartQueryOptions /// public QueryScanConsistency? ScanConsistency { get; init; } - /// - /// Used to deserialize query rows. - /// - public IDeserializer? Deserializer { get; init; } - /// /// Whether the query is read-only. /// @@ -144,11 +125,6 @@ internal IDictionary GetFormValues(string statement) formValues["query_context"] = QueryContext.ToString(); } - if (!string.IsNullOrWhiteSpace(ResultTTL)) - { - formValues["result_ttl"] = ResultTTL; - } - foreach (var parameter in NamedParameters) { formValues.Add(parameter.Key.StartsWith('$') ? parameter.Key : $"${parameter.Key}", parameter.Value); @@ -181,10 +157,6 @@ private static string CleanStatement(string statement) public StartQueryOptions WithQueryTimeout(TimeSpan? queryTimeout) => this with { QueryTimeout = queryTimeout }; - public StartQueryOptions WithRequestTimeout(TimeSpan? requestTimeout) => this with { RequestTimeout = requestTimeout }; - - public StartQueryOptions WithResultTTL(string? resultTTL) => this with { ResultTTL = resultTTL }; - public StartQueryOptions WithClientContextId(string clientContextId) => this with { ClientContextId = clientContextId }; public StartQueryOptions WithScanConsistency(QueryScanConsistency? scanConsistency) => this with { ScanConsistency = scanConsistency }; @@ -221,7 +193,5 @@ public StartQueryOptions WithRaw(string name, object value) return this with { Raw = copy }; } - public StartQueryOptions WithDeserializer(IDeserializer deserializer) => this with { Deserializer = deserializer }; - internal StartQueryOptions WithQueryContext(QueryContext queryContext) => this with { QueryContext = queryContext }; } diff --git a/src/Couchbase.Analytics/Scope.cs b/src/Couchbase.Analytics/Scope.cs index 3f1c38e..edccdd7 100644 --- a/src/Couchbase.Analytics/Scope.cs +++ b/src/Couchbase.Analytics/Scope.cs @@ -22,6 +22,7 @@ using Couchbase.AnalyticsClient.Options; using Couchbase.AnalyticsClient.Query; using Couchbase.AnalyticsClient.Results; +using Couchbase.AnalyticsClient.Async; namespace Couchbase.AnalyticsClient; @@ -53,4 +54,18 @@ public Task ExecuteQueryAsync(string statement, Func StartQueryAsync(string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default) + { + options ??= new StartQueryOptions(); + options = options.WithQueryContext(new QueryContext(_database.Name, Name)); + return _cluster.StartQueryAsync(statement, options, cancellationToken); + } + + public Task StartQueryAsync(string statement, Func options, CancellationToken cancellationToken = default) + { + var startQueryOptions = new StartQueryOptions().WithQueryContext(new QueryContext(_database.Name, Name)); + startQueryOptions = options.Invoke(startQueryOptions); + return _cluster.StartQueryAsync(statement, startQueryOptions, cancellationToken); + } } diff --git a/tests/Couchbase.Analytics.FunctionalTests/AsyncAnalyticsTests.cs b/tests/Couchbase.Analytics.FunctionalTests/AsyncAnalyticsTests.cs new file mode 100644 index 0000000..e4720ee --- /dev/null +++ b/tests/Couchbase.Analytics.FunctionalTests/AsyncAnalyticsTests.cs @@ -0,0 +1,140 @@ +using System.Text.Json; +using Couchbase.AnalyticsClient.Exceptions; +using Couchbase.AnalyticsClient.FunctionalTests.Fixtures; +using Couchbase.AnalyticsClient.Options; +using Couchbase.AnalyticsClient.Async; +using Xunit; +using Xunit.Abstractions; + +namespace Couchbase.AnalyticsClient.FunctionalTests; + +[Collection(SimpleCollection.Name)] +public class AsyncAnalyticsTests +{ + private readonly SimpleFixture _simpleFixture; + private readonly ITestOutputHelper _outputHelper; + + public AsyncAnalyticsTests(SimpleFixture fixture, ITestOutputHelper outputHelper) + { + _simpleFixture = fixture; + _outputHelper = outputHelper; + } + + [Fact] + public async Task Test_AsyncAnalytics_EndToEnd_Cluster() + { + var statement = "select i from array_range(1, 100) as i;"; + var queryOptions = new StartQueryOptions() + { + QueryTimeout = TimeSpan.FromSeconds(30) + }; + + // 1. Start the query + var handle = await _simpleFixture.Cluster.StartQueryAsync(statement, queryOptions); + Assert.NotNull(handle); + Assert.NotNull(handle.Handle); + Assert.NotNull(handle.RequestId); + + _outputHelper.WriteLine($"Handle: {handle.Handle}"); + _outputHelper.WriteLine($"RequestId: {handle.RequestId}"); + + // 2. Poll for the result handle + QueryResultHandle? resultHandle = null; + for (int i = 0; i < 20; i++) + { + resultHandle = await handle.FetchResultHandleAsync(new FetchResultHandleOptions()); + if (resultHandle != null) + { + break; + } + await Task.Delay(500); + } + + Assert.NotNull(resultHandle); + + // 3. Fetch the results + var results = await resultHandle!.FetchResultsAsync(new FetchResultsOptions()); + Assert.NotNull(results); + + var count = 0; + await foreach (var row in results.ConfigureAwait(false)) + { + count++; + } + + Assert.Equal(99, count); + Assert.Equal(99, results.MetaData.Metrics?.ResultCount); + } + + [Fact] + public async Task Test_AsyncAnalytics_Cancellation_Cluster() + { + // Use a statement that takes some time to execute (Cartesian product to delay) + var statement = "select * from array_range(1, 5000) as a, array_range(1, 5000) as b;"; + var queryOptions = new StartQueryOptions() + { + QueryTimeout = TimeSpan.FromSeconds(30) + }; + + var handle = await _simpleFixture.Cluster.StartQueryAsync(statement, queryOptions); + Assert.NotNull(handle); + + // Immediately cancel + await handle.CancelAsync(new CancelOptions()); + + // Attempting to fetch the handle afterwards should return 404 because the job is killed. + // It's possible the cancel takes a brief moment to process gracefully on the server. + var ex = await Record.ExceptionAsync(async () => + { + for (int i = 0; i < 20; i++) + { + var resultHandle = await handle.FetchResultHandleAsync(new FetchResultHandleOptions()); + if (resultHandle != null) + { + // If it somehow completed, we're not testing cancellation properly, but let's break + break; + } + await Task.Delay(500); + } + }); + + // The query should have been killed, resulting in a QueryNotFoundException when it's purged, + // or a cleanly mapped QueryException ("Job Killed") if the server responds gracefully before purging. + Assert.NotNull(ex); + Assert.True(ex is QueryNotFoundException || ex is QueryException, + $"Expected QueryNotFoundException or QueryException upon cancellation, but received: {ex.GetType().FullName}"); + } + + [Fact] + public async Task Test_AsyncAnalytics_DiscardResults_Cluster() + { + var statement = "select i from array_range(1, 5) as i;"; + var handle = await _simpleFixture.Cluster.StartQueryAsync(statement, new StartQueryOptions()); + + // Poll for the result handle + QueryResultHandle? resultHandle = null; + for (int i = 0; i < 20; i++) + { + resultHandle = await handle.FetchResultHandleAsync(new FetchResultHandleOptions()); + if (resultHandle != null) + { + break; + } + await Task.Delay(500); + } + + Assert.NotNull(resultHandle); + + // Discard the results + await resultHandle!.DiscardResultsAsync(new DiscardResultsOptions()); + + // Discarding again should succeed seamlessly due to 404 handling + await resultHandle!.DiscardResultsAsync(new DiscardResultsOptions()); + + // Attempting to fetch the results after discarding should throw QueryNotFoundException + await Assert.ThrowsAsync(async () => + { + await resultHandle.FetchResultsAsync(new FetchResultsOptions()); + }); + } +} diff --git a/tests/Couchbase.Analytics.UnitTests/Async/QueryHandleTests.cs b/tests/Couchbase.Analytics.UnitTests/Async/QueryHandleTests.cs new file mode 100644 index 0000000..5475d66 --- /dev/null +++ b/tests/Couchbase.Analytics.UnitTests/Async/QueryHandleTests.cs @@ -0,0 +1,94 @@ +using Couchbase.AnalyticsClient.Async; +using Couchbase.AnalyticsClient.Internal; +using Couchbase.AnalyticsClient.Options; +using Couchbase.AnalyticsClient.Results; +using Moq; +using Xunit; + +namespace Couchbase.AnalyticsClient.UnitTests.Async; + +public class QueryHandleTests +{ + [Fact] + public void Constructor_InitializesProperties() + { + var serviceMock = new Mock(); + var handle = new QueryHandle("test-handle", "test-req", serviceMock.Object); + + Assert.Equal("test-handle", handle.Handle); + Assert.Equal("test-req", handle.RequestId); + } + + [Fact] + public async Task FetchResultHandleAsync_DelegatesToService() + { + var serviceMock = new Mock(); + var handle = new QueryHandle("test-handle", "test-req", serviceMock.Object); + var expectedResult = new Mock("path", "req", serviceMock.Object).Object; + + serviceMock.Setup(x => x.FetchResultHandleAsync(handle, It.IsAny(), It.IsAny())) + .ReturnsAsync(expectedResult); + + var result = await handle.FetchResultHandleAsync(new FetchResultHandleOptions()); + Assert.Same(expectedResult, result); + + serviceMock.Verify(x => x.FetchResultHandleAsync(handle, It.IsAny(), default), Times.Once); + } + + [Fact] + public async Task CancelAsync_DelegatesToService() + { + var serviceMock = new Mock(); + var handle = new QueryHandle("test-handle", "test-req", serviceMock.Object); + var options = new CancelOptions(); + + serviceMock.Setup(x => x.CancelQueryAsync("test-req", options, It.IsAny())) + .Returns(Task.CompletedTask); + + await handle.CancelAsync(options); + + serviceMock.Verify(x => x.CancelQueryAsync("test-req", options, default), Times.Once); + } + + [Fact] + public void Constructor_NullArguments_ThrowsArgumentNullException() + { + var serviceMock = new Mock(); + + Assert.Throws(() => new QueryHandle(null!, "req", serviceMock.Object)); + Assert.Throws(() => new QueryHandle("handle", null!, serviceMock.Object)); + Assert.Throws(() => new QueryHandle("handle", "req", null!)); + } + + [Fact] + public async Task FetchResultHandleAsync_FluentOptions_DelegatesProperly() + { + var serviceMock = new Mock(); + var handle = new QueryHandle("test-handle", "test-req", serviceMock.Object); + var expectedResult = new Mock("path", "req", serviceMock.Object).Object; + + serviceMock.Setup(x => x.FetchResultHandleAsync(handle, It.IsAny(), It.IsAny())) + .ReturnsAsync(expectedResult); + + // Act using the fluent Options builder + var result = await handle.FetchResultHandleAsync(opt => opt); + + Assert.Same(expectedResult, result); + serviceMock.Verify(x => x.FetchResultHandleAsync(handle, It.IsAny(), default), Times.Once); + } + + [Fact] + public async Task CancelAsync_FluentOptions_DelegatesProperly() + { + var serviceMock = new Mock(); + var handle = new QueryHandle("test-handle", "test-req", serviceMock.Object); + + serviceMock.Setup(x => x.CancelQueryAsync("test-req", It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // Act using the fluent Options builder + await handle.CancelAsync(opt => opt); + + serviceMock.Verify(x => x.CancelQueryAsync("test-req", It.IsAny(), default), Times.Once); + } +} diff --git a/tests/Couchbase.Analytics.UnitTests/Async/QueryResultHandleTests.cs b/tests/Couchbase.Analytics.UnitTests/Async/QueryResultHandleTests.cs new file mode 100644 index 0000000..c913e8f --- /dev/null +++ b/tests/Couchbase.Analytics.UnitTests/Async/QueryResultHandleTests.cs @@ -0,0 +1,94 @@ +using Couchbase.AnalyticsClient.Async; +using Couchbase.AnalyticsClient.Internal; +using Couchbase.AnalyticsClient.Options; +using Couchbase.AnalyticsClient.Results; +using Moq; +using Xunit; + +namespace Couchbase.AnalyticsClient.UnitTests.Async; + +public class QueryResultHandleTests +{ + [Fact] + public void Constructor_InitializesProperties() + { + var serviceMock = new Mock(); + var handle = new QueryResultHandle("test-path", "test-req", serviceMock.Object); + + Assert.Equal("test-req", handle.RequestId); + } + + [Fact] + public async Task FetchResultsAsync_DelegatesToService() + { + var serviceMock = new Mock(); + var handle = new QueryResultHandle("test-path", "test-req", serviceMock.Object); + var expectedResult = new Mock().Object; + var options = new FetchResultsOptions(); + + serviceMock.Setup(x => x.FetchResultsAsync("test-req", "test-path", options, It.IsAny())) + .ReturnsAsync(expectedResult); + + var result = await handle.FetchResultsAsync(options); + Assert.Same(expectedResult, result); + + serviceMock.Verify(x => x.FetchResultsAsync("test-req", "test-path", options, default), Times.Once); + } + + [Fact] + public async Task DiscardResultsAsync_DelegatesToService() + { + var serviceMock = new Mock(); + var handle = new QueryResultHandle("test-path", "test-req", serviceMock.Object); + var options = new DiscardResultsOptions(); + + serviceMock.Setup(x => x.DiscardResultsAsync("test-req", "test-path", options, It.IsAny())) + .Returns(Task.CompletedTask); + + await handle.DiscardResultsAsync(options); + + serviceMock.Verify(x => x.DiscardResultsAsync("test-req", "test-path", options, default), Times.Once); + } + + [Fact] + public void Constructor_NullArguments_ThrowsArgumentNullException() + { + var serviceMock = new Mock(); + + Assert.Throws(() => new QueryResultHandle(null!, "req", serviceMock.Object)); + Assert.Throws(() => new QueryResultHandle("path", null!, serviceMock.Object)); + Assert.Throws(() => new QueryResultHandle("path", "req", null!)); + } + + [Fact] + public async Task FetchResultsAsync_FluentOptions_DelegatesProperly() + { + var serviceMock = new Mock(); + var handle = new QueryResultHandle("test-path", "test-req", serviceMock.Object); + var expectedResult = new Mock().Object; + + serviceMock.Setup(x => x.FetchResultsAsync("test-req", "test-path", It.IsAny(), It.IsAny())) + .ReturnsAsync(expectedResult); + + // Act using the fluent Options builder + var result = await handle.FetchResultsAsync(opt => opt); + + Assert.Same(expectedResult, result); + serviceMock.Verify(x => x.FetchResultsAsync("test-req", "test-path", It.IsAny(), default), Times.Once); + } + + [Fact] + public async Task DiscardResultsAsync_FluentOptions_DelegatesProperly() + { + var serviceMock = new Mock(); + var handle = new QueryResultHandle("test-path", "test-req", serviceMock.Object); + + serviceMock.Setup(x => x.DiscardResultsAsync("test-req", "test-path", It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // Act using the fluent Options builder + await handle.DiscardResultsAsync(opt => opt); + + serviceMock.Verify(x => x.DiscardResultsAsync("test-req", "test-path", It.IsAny(), default), Times.Once); + } +} diff --git a/tests/Couchbase.Analytics.UnitTests/Async/QueryStatusTests.cs b/tests/Couchbase.Analytics.UnitTests/Async/QueryStatusTests.cs deleted file mode 100644 index ed0aced..0000000 --- a/tests/Couchbase.Analytics.UnitTests/Async/QueryStatusTests.cs +++ /dev/null @@ -1,209 +0,0 @@ -#region License -/* ************************************************************ - * - * @author Couchbase - * @copyright 2026 Couchbase, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ************************************************************/ -#endregion - -using System.Text.Json; -using Couchbase.AnalyticsClient.Async; -using Couchbase.AnalyticsClient.Exceptions; -using Couchbase.AnalyticsClient.Internal; -using Couchbase.AnalyticsClient.Options; -using Couchbase.AnalyticsClient.Query; -using Couchbase.AnalyticsClient.Results; -using Couchbase.Core.Json; -using Xunit; - -namespace Couchbase.AnalyticsClient.UnitTests.Async; - -public class QueryStatusTests -{ - private static readonly IAnalyticsService StubService = new StubAnalyticsService(); - private static readonly IDeserializer StubDeserializer = new StjJsonDeserializer(new JsonSerializerOptions()); - - private static QueryStatus Create(string status, string? resultHandle = null) => - new(status, resultHandle, errors: null, metrics: null, - resultCount: null, partitions: null, resultSetOrdered: null, createdAt: null, - StubService, StubDeserializer); - - // ─── AreResultsReady ─── - - [Fact] - public void AreResultsReady_Success_ReturnsTrue() - { - Assert.True(Create("success", "/api/v1/request/result/abc/1-0").AreResultsReady); - } - - [Theory] - [InlineData("queued")] - [InlineData("running")] - [InlineData("fatal")] - [InlineData("failed")] - [InlineData("timeout")] - public void AreResultsReady_NonSuccess_ReturnsFalse(string status) - { - Assert.False(Create(status).AreResultsReady); - } - - // ─── IsError ─── - - [Theory] - [InlineData("fatal")] - [InlineData("timeout")] - [InlineData("failed")] // undocumented but observed from server (e.g., cancelled queries) - public void IsError_TerminalErrorStatuses_ReturnsTrue(string status) - { - Assert.True(Create(status).IsError); - } - - [Theory] - [InlineData("queued")] - [InlineData("running")] - [InlineData("success")] - public void IsError_NonErrorStatuses_ReturnsFalse(string status) - { - Assert.False(Create(status).IsError); - } - - // ─── Case insensitivity ─── - // One test proves all status comparisons are case-insensitive by checking - // every known status in UPPER and Title case against its lowercase behavior. - - [Theory] - [InlineData("success")] - [InlineData("fatal")] - [InlineData("failed")] - [InlineData("timeout")] - [InlineData("queued")] - [InlineData("running")] - public void StatusComparisons_AreCaseInsensitive(string lowercase) - { - var upper = Create(lowercase.ToUpperInvariant()); - var title = Create(char.ToUpperInvariant(lowercase[0]) + lowercase[1..]); - var lower = Create(lowercase); - - Assert.Equal(lower.AreResultsReady, upper.AreResultsReady); - Assert.Equal(lower.AreResultsReady, title.AreResultsReady); - Assert.Equal(lower.IsError, upper.IsError); - Assert.Equal(lower.IsError, title.IsError); - } - - // ─── GetResults ─── - - [Fact] - public void GetResults_WhenSuccess_ReturnsHandleResults() - { - var (results, error) = Create("success", "/api/v1/request/result/abc/1-0").GetResults(); - Assert.NotNull(results); - Assert.Null(error); - } - - [Theory] - [InlineData("fatal")] - [InlineData("failed")] - [InlineData("timeout")] - public void GetResults_WhenError_ReturnsException(string status) - { - var (results, error) = Create(status).GetResults(); - Assert.Null(results); - Assert.IsAssignableFrom(error); - } - - [Theory] - [InlineData("queued")] - [InlineData("running")] - public void GetResults_WhenInProgress_ThrowsInvalidOperation(string status) - { - Assert.Throws(() => Create(status).GetResults()); - } - - // ─── FetchStatus stores errors without throwing ─── - - [Theory] - [InlineData("fatal")] - [InlineData("timeout")] - public void FatalOrTimeout_WithErrors_StoresErrorsWithoutThrowing(string status) - { - var errors = new[] { new QueryError(23034, "Insufficient memory", false) }; - var qs = new QueryStatus(status, null, errors, null, - null, null, null, null, - StubService, StubDeserializer); - - // QueryStatus itself should never throw — errors surface via GetResults - Assert.True(qs.IsError); - var (results, error) = qs.GetResults(); - Assert.Null(results); - Assert.NotNull(error); - Assert.Contains("Insufficient memory", error.Message); - } - - // ─── Additional response fields ─── - - [Fact] - public void ResponseFields_AreExposed() - { - var partitions = new[] { new QueryPartition { Handle = "/api/v1/request/result/abc/1-0/0", ResultCount = 100 } }; - var createdAt = new DateTimeOffset(2026, 3, 16, 17, 15, 40, 850, TimeSpan.Zero); - - var qs = new QueryStatus("success", "/api/v1/request/result/abc/1-0", null, null, - resultCount: 100, partitions: partitions, resultSetOrdered: false, createdAt: createdAt, - StubService, StubDeserializer); - - Assert.Equal(100, qs.ResultCount); - Assert.Single(qs.Partitions!); - Assert.Equal("/api/v1/request/result/abc/1-0/0", qs.Partitions![0].Handle); - Assert.Equal(100, qs.Partitions![0].ResultCount); - Assert.False(qs.ResultSetOrdered); - Assert.Equal(createdAt, qs.CreatedAt); - } - - [Fact] - public void ResponseFields_AreNullWhenNotProvided() - { - var qs = Create("queued"); - Assert.Null(qs.ResultCount); - Assert.Null(qs.Partitions); - Assert.Null(qs.ResultSetOrdered); - Assert.Null(qs.CreatedAt); - } - - // ─── Stub service ─── - - private sealed class StubAnalyticsService : IAnalyticsService - { - public Uri Uri { get; } = new("http://localhost"); - - public Task SendAsync(string statement, QueryOptions options, CancellationToken cancellationToken = default) - => throw new NotImplementedException(); - - public Task StartQueryAsync(string statement, StartQueryOptions options, CancellationToken cancellationToken = default) - => throw new NotImplementedException(); - - public Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) - => throw new NotImplementedException(); - - public Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default) - => throw new NotImplementedException(); - - public Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) - => throw new NotImplementedException(); - - public Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) - => throw new NotImplementedException(); - } -} diff --git a/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs b/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs index 8ceaeac..82534ef 100644 --- a/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs +++ b/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs @@ -6,6 +6,8 @@ using Couchbase.AnalyticsClient.Logging; using Couchbase.AnalyticsClient.Options; using Couchbase.Core.Json; +using Couchbase.AnalyticsClient.Async; +using Couchbase.AnalyticsClient.Exceptions; using Microsoft.Extensions.Logging; using Moq; using Moq.Protected; @@ -158,4 +160,110 @@ public async Task SendAsync_WithStreaming_ReturnsStreamingAnalyticsResult() Assert.IsType(result); _httpClientFactoryMock.Verify(f => f.Create(), Times.Once); } + + [Fact] + public async Task FetchResultHandleAsync_When404_ThrowsQueryNotFoundException() + { + // Arrange + var httpClientMock = new Mock(); + httpClientMock.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.NotFound)); + + var httpClient = new HttpClient(httpClientMock.Object); + _httpClientFactoryMock.Setup(f => f.Create()).Returns(httpClient); + var service = new AnalyticsService( + _clusterOptions, + _httpClientFactoryMock.Object, + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); + + var handle = new QueryHandle("mock-handle", "mock-req", service); + + // Act & Assert + await Assert.ThrowsAsync(() => + service.FetchResultHandleAsync(handle, new FetchResultHandleOptions())); + } + + [Fact] + public async Task FetchResultsAsync_When404_ThrowsQueryNotFoundException() + { + // Arrange + var httpClientMock = new Mock(); + httpClientMock.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.NotFound)); + + var httpClient = new HttpClient(httpClientMock.Object); + _httpClientFactoryMock.Setup(f => f.Create()).Returns(httpClient); + var service = new AnalyticsService( + _clusterOptions, + _httpClientFactoryMock.Object, + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); + + // Act & Assert + await Assert.ThrowsAsync(() => + service.FetchResultsAsync("mock-req", "mock-path", new FetchResultsOptions())); + } + + [Fact] + public async Task DiscardResultsAsync_When404_CompletesSuccessfully() + { + // Arrange + var httpClientMock = new Mock(); + httpClientMock.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.NotFound)); + + var httpClient = new HttpClient(httpClientMock.Object); + _httpClientFactoryMock.Setup(f => f.Create()).Returns(httpClient); + var service = new AnalyticsService( + _clusterOptions, + _httpClientFactoryMock.Object, + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); + + // Act & Assert + var exception = await Record.ExceptionAsync(() => + service.DiscardResultsAsync("mock-req", "mock-path", new DiscardResultsOptions())); + + Assert.Null(exception); + } + + [Fact] + public async Task CancelQueryAsync_When404_CompletesSuccessfully() + { + // Arrange + var httpClientMock = new Mock(); + httpClientMock.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.NotFound)); + + var httpClient = new HttpClient(httpClientMock.Object); + _httpClientFactoryMock.Setup(f => f.Create()).Returns(httpClient); + var service = new AnalyticsService( + _clusterOptions, + _httpClientFactoryMock.Object, + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); + + // Act & Assert + var exception = await Record.ExceptionAsync(() => + service.CancelQueryAsync("mock-req", new CancelOptions())); + + Assert.Null(exception); + } } diff --git a/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs b/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs index ff267a5..861002a 100644 --- a/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs +++ b/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs @@ -47,6 +47,7 @@ private sealed class FakeAnalyticsService : IAnalyticsService public Uri Uri { get; } = new Uri("http://localhost"); public QueryOptions? LastOptions { get; private set; } + public StartQueryOptions? LastStartOptions { get; private set; } public Task SendAsync(string statement, QueryOptions options, CancellationToken cancellationToken = default) { @@ -55,18 +56,21 @@ public Task SendAsync(string statement, QueryOptions options, Canc } public Task StartQueryAsync(string statement, StartQueryOptions options, CancellationToken cancellationToken = default) - => throw new NotImplementedException(); + { + LastStartOptions = options; + return Task.FromResult(new QueryHandle("handle", "reqId", this)); + } - public Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + public Task FetchResultHandleAsync(QueryHandle handle, FetchResultHandleOptions options, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default) + public Task FetchResultsAsync(string requestId, string handlePath, FetchResultsOptions options, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + public Task DiscardResultsAsync(string requestId, string handlePath, DiscardResultsOptions options, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + public Task CancelQueryAsync(string requestId, CancelOptions options, CancellationToken cancellationToken = default) => throw new NotImplementedException(); } @@ -82,6 +86,17 @@ private static string FormatOptions(QueryOptions o) $"QueryContext={(o.QueryContext is null ? "" : o.QueryContext.ToString())}"; } + private static string FormatStartOptions(StartQueryOptions o) + { + string FormatDict(Dictionary d) => string.Join(", ", d.Select(kv => $"{kv.Key}={kv.Value}")); + string FormatList(List l) => string.Join(", ", l.Select(x => x?.ToString())); + + return $"QueryTimeout={o.QueryTimeout}; ClientContextId={o.ClientContextId}; " + + $"ScanConsistency={o.ScanConsistency}; ReadOnly={o.ReadOnly}; MaxRetries={o.MaxRetries}; " + + $"Named=[{FormatDict(o.NamedParameters)}]; Positional=[{FormatList(o.PositionalParameters)}]; Raw=[{FormatDict(o.Raw)}]; " + + $"QueryContext={(o.QueryContext is null ? "" : o.QueryContext.ToString())}"; + } + [Fact] public async Task ExecuteQuery_WithFunc_AppliesOptions_OnCluster() { @@ -173,4 +188,165 @@ public async Task ExecuteQuery_WithFunc_AppliesOptions_OnScope_AndPreservesQuery Assert.Equal("sc1", applied.QueryContext.Scope); Assert.Equal("default:`db1`.`sc1`", applied.QueryContext.ToString()); } + + [Fact] + public async Task StartQueryAsync_WithFunc_AppliesOptions_OnCluster() + { + var fakeService = new FakeAnalyticsService(); + + var cluster = Cluster.Create( + "http://localhost:18095", + Credential.Create("user", "pass"), + opts => opts + .AddClusterService(fakeService) + ); + + StartQueryOptions? before = null; + var func = (Func)(o => + { + // Capture the options BEFORE the func applies changes + before = o; + return o + .WithQueryTimeout(TimeSpan.FromSeconds(12)) + .WithClientContextId("ctx-xyz") + .WithScanConsistency(QueryScanConsistency.RequestPlus) + .WithReadOnly(true) + .WithMaxRetries(5) + .WithNamedParameters(new Dictionary { ["k1"] = 1 }) + .WithNamedParameter("k2", 2) + .WithPositionalParameters(["p1"]) + .WithPositionalParameter("p2") + .WithRaw("raw1", 123); + }); + + await cluster.StartQueryAsync("SELECT 1;", func); + + var applied = fakeService.LastStartOptions; + Assert.NotNull(applied); + + _outputHelper.WriteLine($"Before (Cluster): {FormatStartOptions(before!)}"); + _outputHelper.WriteLine($"After (Cluster): {FormatStartOptions(applied)}"); + + Assert.Equal(TimeSpan.FromSeconds(12), applied!.QueryTimeout); + Assert.Equal("ctx-xyz", applied.ClientContextId); + Assert.Equal(QueryScanConsistency.RequestPlus, applied.ScanConsistency); + Assert.True(applied.ReadOnly); + Assert.Equal((uint)5, applied.MaxRetries); + Assert.Equal(1, applied.NamedParameters["k1"]); + Assert.Equal(2, applied.NamedParameters["k2"]); + Assert.Equal(["p1", "p2"], applied.PositionalParameters); + Assert.Equal(123, applied.Raw["raw1"]); + } + + [Fact] + public async Task StartQueryAsync_WithFunc_AppliesOptions_OnScope_AndPreservesQueryContext() + { + var fakeService = new FakeAnalyticsService(); + + var cluster = Cluster.Create( + "http://localhost:18095", + Credential.Create("user", "pass"), + opts => opts.AddClusterService(fakeService) + ); + + var scope = cluster.Database("db1").Scope("sc1"); + + StartQueryOptions? before = null; + var func = (Func)(o => + { + // Capture the options before the func applies changes + before = o; + return o + .WithClientContextId("scope-ctx"); + }); + + await scope.StartQueryAsync("SELECT 1;", func); + + var applied = fakeService.LastStartOptions; + Assert.NotNull(applied); + + _outputHelper.WriteLine($"Before (Scope): {FormatStartOptions(before!)}"); + _outputHelper.WriteLine($"After (Scope): {FormatStartOptions(applied!)}"); + + Assert.Equal("scope-ctx", applied!.ClientContextId); + + // QueryContext should be set by Scope before invoking func + Assert.NotNull(applied.QueryContext); + Assert.Equal("db1", applied.QueryContext!.Database); + Assert.Equal("sc1", applied.QueryContext.Scope); + Assert.Equal("default:`db1`.`sc1`", applied.QueryContext.ToString()); + } + + [Fact] + public async Task StartQueryAsync_WithOptionsObject_AppliesOptions_OnCluster() + { + var fakeService = new FakeAnalyticsService(); + + var cluster = Cluster.Create( + "http://localhost:18095", + Credential.Create("user", "pass"), + opts => opts + .AddClusterService(fakeService) + ); + + var options = new StartQueryOptions() + .WithQueryTimeout(TimeSpan.FromSeconds(12)) + .WithClientContextId("ctx-xyz") + .WithScanConsistency(QueryScanConsistency.RequestPlus) + .WithReadOnly(true) + .WithMaxRetries(5) + .WithNamedParameters(new Dictionary { ["k1"] = 1 }) + .WithNamedParameter("k2", 2) + .WithPositionalParameters(["p1"]) + .WithPositionalParameter("p2") + .WithRaw("raw1", 123); + + await cluster.StartQueryAsync("SELECT 1;", options); + + var applied = fakeService.LastStartOptions; + Assert.NotNull(applied); + + Assert.Equal(TimeSpan.FromSeconds(12), applied.QueryTimeout); + Assert.Equal("ctx-xyz", applied.ClientContextId); + Assert.Equal(QueryScanConsistency.RequestPlus, applied.ScanConsistency); + Assert.True(applied.ReadOnly); + Assert.Equal((uint)5, applied.MaxRetries); + Assert.Equal(1, applied.NamedParameters["k1"]); + Assert.Equal(2, applied.NamedParameters["k2"]); + Assert.Equal(["p1", "p2"], applied.PositionalParameters); + Assert.Equal(123, applied.Raw["raw1"]); + } + + [Fact] + public async Task StartQueryAsync_WithOptionsObject_AppliesOptions_OnScope_AndPreservesQueryContext() + { + var fakeService = new FakeAnalyticsService(); + + var cluster = Cluster.Create( + "http://localhost:18095", + Credential.Create("user", "pass"), + opts => opts.AddClusterService(fakeService) + ); + + var scope = cluster.Database("db1").Scope("sc1"); + + var options = new StartQueryOptions() + .WithClientContextId("scope-ctx"); + + await scope.StartQueryAsync("SELECT 1;", options); + + var applied = fakeService.LastStartOptions; + Assert.NotNull(applied); + + Assert.Equal("scope-ctx", applied.ClientContextId); + + // QueryContext should be set by Scope without mutating the original options + Assert.NotNull(applied.QueryContext); + Assert.Equal("db1", applied.QueryContext!.Database); + Assert.Equal("sc1", applied.QueryContext.Scope); + Assert.Equal("default:`db1`.`sc1`", applied.QueryContext.ToString()); + + // The original options should not be mutated + Assert.Null(options.QueryContext); + } }