diff --git a/src/Couchbase.Analytics/Async/QueryHandle.cs b/src/Couchbase.Analytics/Async/QueryHandle.cs new file mode 100644 index 0000000..50bae13 --- /dev/null +++ b/src/Couchbase.Analytics/Async/QueryHandle.cs @@ -0,0 +1,118 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using System.Text.Json; +using Couchbase.AnalyticsClient.Internal; + +namespace Couchbase.AnalyticsClient.Async; + +/// +/// Represents a handle to a server-side asynchronous query. +/// Obtained from or . +/// +public class QueryHandle +{ + private readonly IAnalyticsService _analyticsService; + private readonly TimeSpan? _requestTimeout; + + /// + /// The query handle string used to poll status and fetch results. + /// This is the path segment after /api/v1/request/status/. + /// + public string Handle { get; } + + /// + /// The request ID assigned by the server when the query was submitted. + /// + public string RequestId { get; } + + internal QueryHandle(string handle, string requestId, IAnalyticsService analyticsService, TimeSpan? requestTimeout = null) + { + Handle = handle ?? throw new ArgumentNullException(nameof(handle)); + RequestId = requestId ?? throw new ArgumentNullException(nameof(requestId)); + _analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService)); + _requestTimeout = requestTimeout; + } + + /// + /// Fetches the current status of the asynchronous query from the server. + /// + /// A cancellation token. + /// A representing the current state of the query. + public async Task FetchStatusAsync(CancellationToken cancellationToken = default) + { + return await _analyticsService.FetchStatusAsync(Handle, _requestTimeout, cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Discards the query results on the server. After this call, the results can no longer be fetched. + /// + /// A cancellation token. + public async Task DiscardResultsAsync(CancellationToken cancellationToken = default) + { + await _analyticsService.DiscardResultsAsync(Handle, _requestTimeout, cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Cancels the query on the server. If the query has already completed, this is a no-op. + /// + /// A cancellation token. + public async Task CancelAsync(CancellationToken cancellationToken = default) + { + await _analyticsService.CancelQueryAsync(RequestId, _requestTimeout, cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Serializes this to a JSON string so it can be persisted and + /// later reconstructed via . + /// This method does not perform any network operations. + /// + /// A JSON string containing the handle and request ID. + public string Serialize() + { + var data = new SerializedQueryHandle(Handle, RequestId); + return JsonSerializer.Serialize(data); + } + + /// + /// Deserializes a from a JSON string previously produced by . + /// This method does not perform any network operations. + /// + internal static QueryHandle Deserialize(string serializedHandle, IAnalyticsService analyticsService, TimeSpan? requestTimeout = null) + { + ArgumentNullException.ThrowIfNull(serializedHandle); + + var data = JsonSerializer.Deserialize(serializedHandle) + ?? throw new ArgumentException("Invalid serialized handle format.", nameof(serializedHandle)); + + if (string.IsNullOrWhiteSpace(data.Handle) || string.IsNullOrWhiteSpace(data.RequestId)) + { + throw new ArgumentException("Serialized handle is missing required fields.", nameof(serializedHandle)); + } + + return new QueryHandle(data.Handle, data.RequestId, analyticsService, requestTimeout); + } + + private record SerializedQueryHandle(string Handle, string RequestId); +} diff --git a/src/Couchbase.Analytics/Async/QueryHandleResults.cs b/src/Couchbase.Analytics/Async/QueryHandleResults.cs new file mode 100644 index 0000000..e985ad5 --- /dev/null +++ b/src/Couchbase.Analytics/Async/QueryHandleResults.cs @@ -0,0 +1,58 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using Couchbase.AnalyticsClient.Internal; +using Couchbase.AnalyticsClient.Results; +using Couchbase.Core.Json; + +namespace Couchbase.AnalyticsClient.Async; + +/// +/// Provides access to the results of a completed asynchronous query. +/// Obtained from when the query status is "success". +/// +public class QueryHandleResults +{ + private readonly string _handle; + private readonly IAnalyticsService _analyticsService; + private readonly IDeserializer _deserializer; + private readonly TimeSpan? _requestTimeout; + + internal QueryHandleResults(string handle, IAnalyticsService analyticsService, IDeserializer deserializer, TimeSpan? requestTimeout = null) + { + _handle = handle ?? throw new ArgumentNullException(nameof(handle)); + _analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService)); + _deserializer = deserializer ?? throw new ArgumentNullException(nameof(deserializer)); + _requestTimeout = requestTimeout; + } + + /// + /// Streams all query result rows from the server. + /// The returned behaves the same as the synchronous query API result. + /// + /// A cancellation token. + /// An that can be used to enumerate the result rows. + public async Task StreamAllAsync(CancellationToken cancellationToken = default) + { + return await _analyticsService.FetchResultsAsync(_handle, _requestTimeout, _deserializer, cancellationToken) + .ConfigureAwait(false); + } +} diff --git a/src/Couchbase.Analytics/Async/QueryPartition.cs b/src/Couchbase.Analytics/Async/QueryPartition.cs new file mode 100644 index 0000000..b5d6291 --- /dev/null +++ b/src/Couchbase.Analytics/Async/QueryPartition.cs @@ -0,0 +1,22 @@ +using System.Text.Json.Serialization; + +namespace Couchbase.AnalyticsClient.Async; + +/// +/// Represents a partition of a completed asynchronous query's result set, +/// as returned in the FetchStatus response. +/// +public class QueryPartition +{ + /// + /// The handle path for fetching this partition's results. + /// + [JsonPropertyName("handle")] + public string? Handle { get; set; } + + /// + /// The number of result rows in this partition. + /// + [JsonPropertyName("resultCount")] + public long ResultCount { get; set; } +} diff --git a/src/Couchbase.Analytics/Async/QueryStatus.cs b/src/Couchbase.Analytics/Async/QueryStatus.cs new file mode 100644 index 0000000..9ec75f4 --- /dev/null +++ b/src/Couchbase.Analytics/Async/QueryStatus.cs @@ -0,0 +1,181 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using Couchbase.AnalyticsClient.Exceptions; +using Couchbase.AnalyticsClient.Internal; +using Couchbase.AnalyticsClient.Query; +using Couchbase.Core.Json; + +namespace Couchbase.AnalyticsClient.Async; + +/// +/// Represents the status of an asynchronous query, obtained by polling via +/// . +/// +public class QueryStatus +{ + private readonly IAnalyticsService _analyticsService; + private readonly IDeserializer _deserializer; + private readonly TimeSpan? _requestTimeout; + + /// + /// The raw status string from the server (e.g., "queued", "running", "success", "fatal", "timeout"). + /// + public string Status { get; } + + /// + /// The handle path for fetching results, available when is true. + /// This is the full path from the status response (e.g., "/api/v1/request/result/{handle}"). + /// + internal string? ResultHandle { get; } + + /// + /// Errors returned by the server when the query status is "fatal" or "timeout". + /// + internal IReadOnlyList? Errors { get; } + + /// + /// Metrics from the status response. + /// + public QueryMetrics? Metrics { get; } + + /// + /// The total number of result rows, available when the query has completed successfully. + /// + public long? ResultCount { get; } + + /// + /// Partition information from the status response, if available. + /// Each element contains a handle path and result count for that partition. + /// + public IReadOnlyList? Partitions { get; } + + /// + /// Whether the result set is ordered. Available when the query has completed successfully. + /// + public bool? ResultSetOrdered { get; } + + /// + /// The timestamp when the query was created on the server. + /// + public DateTimeOffset? CreatedAt { get; } + + internal QueryStatus( + string status, + string? resultHandle, + IReadOnlyList? errors, + QueryMetrics? metrics, + long? resultCount, + IReadOnlyList? partitions, + bool? resultSetOrdered, + DateTimeOffset? createdAt, + IAnalyticsService analyticsService, + IDeserializer deserializer, + TimeSpan? requestTimeout = null) + { + Status = status ?? throw new ArgumentNullException(nameof(status)); + ResultHandle = resultHandle; + Errors = errors; + Metrics = metrics; + ResultCount = resultCount; + Partitions = partitions; + ResultSetOrdered = resultSetOrdered; + CreatedAt = createdAt; + _analyticsService = analyticsService; + _deserializer = deserializer; + _requestTimeout = requestTimeout; + } + + /// + /// Returns true if the query has completed successfully and results are ready to be streamed. + /// + public bool AreResultsReady => string.Equals(Status, "success", StringComparison.OrdinalIgnoreCase); + + /// + /// Returns true if the query ended with a terminal error status. + /// Per spec: "fatal" or "timeout". + /// Also includes "failed" which is not in the spec but has been observed + /// from the server (e.g., for cancelled queries). + /// + public bool IsError => string.Equals(Status, "fatal", StringComparison.OrdinalIgnoreCase) + || string.Equals(Status, "failed", StringComparison.OrdinalIgnoreCase) + || string.Equals(Status, "timeout", StringComparison.OrdinalIgnoreCase); + + /// + /// If results are ready, returns a that can be used to stream the query rows. + /// If the query ended with an error, returns the error. + /// This method does not perform network calls. + /// + /// + /// A tuple of (?, ?). + /// When results are ready, the first element is populated and the second is null. + /// When the query errored, the first element is null and the second contains the error. + /// + /// + /// Thrown if the query is still in progress (not "success", "fatal", or "timeout"). + /// + public (QueryHandleResults? Results, AnalyticsException? Error) GetResults() + { + if (AreResultsReady) + { + if (string.IsNullOrWhiteSpace(ResultHandle)) + { + throw new InvalidOperationException( + "Query status indicates success but no result handle was provided by the server."); + } + + // Strip the leading path prefix to get just the handle portion + var handle = ResultHandle.StartsWith("/api/v1/request/result/") + ? ResultHandle["/api/v1/request/result/".Length..] + : ResultHandle; + + var handleResults = new QueryHandleResults(handle, _analyticsService, _deserializer, _requestTimeout); + return (handleResults, null); + } + + if (IsError) + { + var error = BuildErrorFromStatus(); + return (null, error); + } + + throw new InvalidOperationException( + $"Cannot get results while query is in status '{Status}'. Poll again using FetchStatusAsync()."); + } + + private AnalyticsException BuildErrorFromStatus() + { + if (Errors is { Count: > 0 }) + { + var firstError = Errors[0]; + return firstError.Code switch + { + 20000 => new InvalidCredentialException(firstError.Message), + 21002 => new AnalyticsTimeoutException($"{firstError.Message}. Error code: {firstError.Code}"), + _ => new QueryException(firstError.Message) { Code = firstError.Code, ServerMessage = firstError.Message } + }; + } + + return string.Equals(Status, "timeout", StringComparison.OrdinalIgnoreCase) + ? new AnalyticsTimeoutException($"Query timed out on the server (status: {Status}).") + : new AnalyticsException($"Query failed with status: {Status}"); + } +} diff --git a/src/Couchbase.Analytics/Cluster.cs b/src/Couchbase.Analytics/Cluster.cs index 7b7446c..6291bbc 100644 --- a/src/Couchbase.Analytics/Cluster.cs +++ b/src/Couchbase.Analytics/Cluster.cs @@ -20,6 +20,7 @@ #endregion using System.Collections.Concurrent; +using Couchbase.AnalyticsClient.Async; using Couchbase.AnalyticsClient.HTTP; using Couchbase.AnalyticsClient.Internal; using Couchbase.AnalyticsClient.Internal.DI; @@ -138,6 +139,48 @@ public async Task ExecuteQueryAsync(string statement, QueryOptions return await service.SendAsync(statement, options ?? new QueryOptions(), cancellationToken).ConfigureAwait(false); } + /// + /// Starts an asynchronous server-side query. The query is submitted to the server and a + /// is returned that can be used to poll status, fetch results, + /// cancel, or discard the query. + /// + /// The analytics query statement to execute. + /// Options for the async query, including timeouts and result TTL. + /// A cancellation token. + /// A for tracking and interacting with the query. + public async Task StartQueryAsync(string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default) + { + var service = _analyticsService.GetValueOrThrow(); + return await service.StartQueryAsync(statement, options ?? new StartQueryOptions(), cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Starts an asynchronous server-side query with a fluent options builder. + /// + /// The analytics query statement to execute. + /// A function to configure the . + /// A cancellation token. + /// A for tracking and interacting with the query. + public Task StartQueryAsync(string statement, Func options, CancellationToken cancellationToken = default) + { + var startQueryOptions = new StartQueryOptions(); + startQueryOptions = options.Invoke(startQueryOptions); + return StartQueryAsync(statement, startQueryOptions, cancellationToken); + } + + /// + /// Reconstructs a from a previously serialized handle string. + /// This method does not perform any network operations. + /// + /// A JSON string previously produced by . + /// A that can be used to interact with the query. + public QueryHandle QueryHandleFromSerialized(string serializedHandle) + { + var service = _analyticsService.GetValueOrThrow(); + return QueryHandle.Deserialize(serializedHandle, service); + } + public Database Database(string databaseName) { return _databases.GetOrAdd(databaseName, database => new Database(this, database)); diff --git a/src/Couchbase.Analytics/Internal/AnalyticsService.cs b/src/Couchbase.Analytics/Internal/AnalyticsService.cs index 5aeaab2..036fe02 100644 --- a/src/Couchbase.Analytics/Internal/AnalyticsService.cs +++ b/src/Couchbase.Analytics/Internal/AnalyticsService.cs @@ -19,13 +19,18 @@ * ************************************************************/ #endregion +using System.Net; using System.Text; +using System.Text.Json; +using Couchbase.AnalyticsClient.Async; using Couchbase.AnalyticsClient.Exceptions; using Couchbase.AnalyticsClient.HTTP; using Couchbase.AnalyticsClient.Internal.HTTP; using Couchbase.AnalyticsClient.Internal.Results; using Couchbase.AnalyticsClient.Internal.Retry; +using Couchbase.AnalyticsClient.Logging; using Couchbase.AnalyticsClient.Options; +using Couchbase.AnalyticsClient.Query; using Couchbase.AnalyticsClient.Results; using Couchbase.Core.Json; using Couchbase.Core.Utils; @@ -37,18 +42,26 @@ internal sealed partial class AnalyticsService : HttpServiceBase, IAnalyticsServ { private readonly ClusterOptions _clusterOptions; private readonly ILogger _logger; + private readonly TypedRedactor _redactor; + private readonly Uri _baseUri; public AnalyticsService(ClusterOptions clusterOptions, ICouchbaseHttpClientFactory httpClientFactory, - ILogger logger) : base(httpClientFactory) + ILogger logger, TypedRedactor redactor) : base(httpClientFactory) { _clusterOptions = clusterOptions; _logger = logger; + _redactor = redactor; HttpClientFactory = httpClientFactory; Uri = clusterOptions.ConnectionStringValue!.GetAnalyticsServiceUri(); + _baseUri = clusterOptions.ConnectionStringValue!.GetBaseServiceUri(); } public Uri Uri { get; } + // ──────────────────────────────────────────────────────────── + // Synchronous query API (existing) + // ──────────────────────────────────────────────────────────── + public async Task SendAsync(string statement, QueryOptions options, CancellationToken cancellationToken = default) { return await ExecuteWithRetryAsync(statement, options, cancellationToken).ConfigureAwait(false); @@ -133,7 +146,7 @@ private async Task ExecuteWithRetryAsync(string statement, QueryOp } try { - LogQueryAttemptStarting(_logger, attempt + 1, options.ClientContextId, stopwatch.Elapsed.TotalMilliseconds); + LogQueryAttemptStarting(_logger, attempt + 1, options.ClientContextId, _redactor.UserData(statement), stopwatch.Elapsed.TotalMilliseconds); var result = await ExecuteQueryAsync(content, httpClient, options.AsStreaming, deserializer, errorContext, cancellationToken).ConfigureAwait(false); @@ -159,7 +172,7 @@ private async Task ExecuteWithRetryAsync(string statement, QueryOp } catch (HttpRequestException httpRequestException) { - LogQueryAttemptFailed(_logger, httpRequestException, attempt + 1, options.ClientContextId, + LogQueryAttemptFailed(_logger, httpRequestException, attempt + 1, options.ClientContextId, _redactor.UserData(statement), httpRequestException.Message, stopwatch.Elapsed.TotalMilliseconds); // "No successful connection(s)" is retryable @@ -193,6 +206,335 @@ private async Task ExecuteWithRetryAsync(string statement, QueryOp throw lastException ?? ThrowTooManyRetries(errorContext); } + // ──────────────────────────────────────────────────────────── + // Async server request API (new) + // ──────────────────────────────────────────────────────────── + + public async Task StartQueryAsync(string statement, StartQueryOptions options, CancellationToken cancellationToken = default) + { + var stopwatch = LightweightStopwatch.StartNew(); + var queryTimeout = options.QueryTimeout ?? _clusterOptions.TimeoutOptions.QueryTimeout; + var requestTimeout = options.RequestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var deserializer = options.Deserializer ?? _clusterOptions.Deserializer; + + var errorContext = new ErrorContext(options.ClientContextId, stopwatch, queryTimeout); + Exception? lastException = null; + + var body = options.GetFormValuesAsJson(statement); + using var content = new StringContent(body, Encoding.UTF8, MediaType.Json); + var httpClient = CreateHttpClient(requestTimeout); + + var maxRetries = options.MaxRetries ?? _clusterOptions.MaxRetries; + var attempt = -1; + + while (attempt < maxRetries) + { + attempt++; + errorContext.RetryAttempts = attempt; + + if (stopwatch.Elapsed > queryTimeout) + { + ThrowGlobalTimeout(lastException, stopwatch.Elapsed, errorContext); + } + + try + { + LogAsyncStartQueryAttempt(_logger, attempt + 1, _redactor.SystemData(Uri), options.ClientContextId, _redactor.UserData(statement), stopwatch.Elapsed.TotalMilliseconds); + + var request = new HttpRequestMessage(HttpMethod.Post, Uri) { Content = content }; + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken) + .ConfigureAwait(false); + + errorContext.StatusCode = response.StatusCode; + + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); + var json = JsonDocument.Parse(responseBody); + var root = json.RootElement; + + if (!response.IsSuccessStatusCode) + { + // Try to parse errors from the response + if (root.TryGetProperty("errors", out var errorsElement)) + { + var errors = JsonSerializer.Deserialize(errorsElement.GetRawText()) + ?? Array.Empty(); + + if (AnalyticsErrorMapper.AreErrorsRetriable(errors)) + { + lastException = AnalyticsErrorMapper.MapServiceErrors(errors, errorContext); + await RetryUtils.BackoffAsync(attempt, cancellationToken).ConfigureAwait(false); + continue; + } + + throw AnalyticsErrorMapper.MapServiceErrors(errors, errorContext); + } + + // 503 is retriable + if (response.StatusCode == HttpStatusCode.ServiceUnavailable) + { + lastException = new AnalyticsException("Service temporarily unavailable.", errorContext); + await RetryUtils.BackoffAsync(attempt, cancellationToken).ConfigureAwait(false); + continue; + } + + throw new AnalyticsException($"HTTP {(int)response.StatusCode} {response.StatusCode}", errorContext); + } + + // Parse the successful response + var requestId = root.TryGetProperty("requestID", out var reqIdProp) ? reqIdProp.GetString() : null; + var handlePath = root.TryGetProperty("handle", out var handleProp) ? handleProp.GetString() : null; + + if (string.IsNullOrWhiteSpace(requestId) || string.IsNullOrWhiteSpace(handlePath)) + { + throw new AnalyticsException("Server response is missing required 'requestID' or 'handle' fields.", errorContext); + } + + // Strip the leading path prefix from the handle + var handle = handlePath.StartsWith("/api/v1/request/status/") + ? handlePath["/api/v1/request/status/".Length..] + : handlePath; + + LogAsyncStartQuerySucceeded(_logger, options.ClientContextId, _redactor.SystemData(handle), _redactor.SystemData(requestId), (int)response.StatusCode); + return new QueryHandle(handle, requestId, this, requestTimeout); + } + catch (HttpRequestException httpRequestException) + { + LogAsyncStartQueryFailed(_logger, httpRequestException, attempt + 1, options.ClientContextId, _redactor.UserData(statement), httpRequestException.Message); + + if (httpRequestException.InnerException is AggregateException aggregateEx) + { + lastException = new AnalyticsException("No connections could be established to any of the endpoints.", aggregateEx, errorContext); + } + + if (!AnalyticsErrorMapper.IsRetriableHttpException(httpRequestException)) + { + throw; + } + + await RetryUtils.BackoffAsync(attempt, cancellationToken).ConfigureAwait(false); + } + catch (TaskCanceledException taskCanceledEx) + { + throw new AnalyticsTimeoutException("The async StartQuery request was canceled.", taskCanceledEx, errorContext); + } + catch (OperationCanceledException) + { + if (stopwatch.Elapsed > queryTimeout) + { + ThrowGlobalTimeout(lastException, stopwatch.Elapsed, errorContext); + } + + await RetryUtils.BackoffAsync(attempt, cancellationToken).ConfigureAwait(false); + } + } + + throw lastException ?? ThrowTooManyRetries(errorContext); + } + + public async Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + { + var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var httpClient = CreateHttpClient(timeout); + var deserializer = _clusterOptions.Deserializer; + + var statusUri = new Uri(_baseUri, $"api/v1/request/status/{handle}"); + var request = new HttpRequestMessage(HttpMethod.Get, statusUri); + + LogFetchStatusRequest(_logger, _redactor.SystemData(statusUri), _redactor.SystemData(handle)); + + try + { + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken) + .ConfigureAwait(false); + + if (response.StatusCode == HttpStatusCode.NotFound) + { + throw new AnalyticsException("Query has been discarded or canceled (404 Not Found)."); + } + + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); + var json = JsonDocument.Parse(responseBody); + var root = json.RootElement; + + var status = root.TryGetProperty("status", out var statusProp) ? statusProp.GetString() : null; + + if (string.IsNullOrWhiteSpace(status)) + { + throw new AnalyticsException("Server response is missing required 'status' field."); + } + + if (!response.IsSuccessStatusCode + && response.StatusCode != HttpStatusCode.NotFound) + { + LogFetchStatusUnexpectedHttp(_logger, _redactor.SystemData(handle), (int)response.StatusCode); + } + else + { + LogFetchStatusResponse(_logger, _redactor.SystemData(handle), status, (int)response.StatusCode); + } + + // Parse optional fields + var resultHandle = root.TryGetProperty("handle", out var handleProp) ? handleProp.GetString() : null; + + IReadOnlyList? errors = null; + if (root.TryGetProperty("errors", out var errorsElement)) + { + errors = JsonSerializer.Deserialize(errorsElement.GetRawText()) + ?? Array.Empty(); + } + + QueryMetrics? metrics = null; + if (root.TryGetProperty("metrics", out var metricsElement)) + { + metrics = JsonSerializer.Deserialize(metricsElement.GetRawText()); + } + + // Parse additional status response fields per spec + long? resultCount = root.TryGetProperty("resultCount", out var rcProp) && rcProp.TryGetInt64(out var rc) ? rc : null; + bool? resultSetOrdered = root.TryGetProperty("resultSetOrdered", out var rsoProp) ? rsoProp.GetBoolean() : null; + DateTimeOffset? createdAt = root.TryGetProperty("createdAt", out var caProp) && caProp.TryGetDateTimeOffset(out var ca) ? ca : null; + + IReadOnlyList? partitions = null; + if (root.TryGetProperty("partitions", out var partitionsElement)) + { + partitions = JsonSerializer.Deserialize(partitionsElement.GetRawText()) + ?? Array.Empty(); + } + + return new QueryStatus(status, resultHandle, errors, metrics, resultCount, partitions, resultSetOrdered, createdAt, this, deserializer, requestTimeout); + } + catch (TaskCanceledException taskCanceledEx) + { + throw new AnalyticsTimeoutException("The FetchStatus request was canceled.", taskCanceledEx); + } + } + + public async Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default) + { + var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var httpClient = CreateHttpClient(timeout); + + var resultUri = new Uri(_baseUri, $"api/v1/request/result/{handle}"); + var request = new HttpRequestMessage(HttpMethod.Get, resultUri); + + LogFetchResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handle)); + + try + { + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken) + .ConfigureAwait(false); + + LogFetchResultsResponse(_logger, _redactor.SystemData(handle), (int)response.StatusCode); + + if (response.StatusCode == HttpStatusCode.NotFound) + { + throw new AnalyticsException("Query results have been discarded or canceled (404 Not Found)."); + } + + var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); + + // Reuse the existing StreamingAnalyticsResult - the response format is identical to sync queries + var result = new StreamingAnalyticsResult(stream, deserializer, httpClient); + result.StatusCode = response.StatusCode; + + await result.InitializeAsync(cancellationToken).ConfigureAwait(false); + + if (!response.IsSuccessStatusCode) + { + var errorContext = new ErrorContext(string.Empty, LightweightStopwatch.StartNew(), timeout); + errorContext.StatusCode = response.StatusCode; + throw AnalyticsErrorMapper.MapHttpErrorCode(result, errorContext); + } + + return result; + } + catch (TaskCanceledException taskCanceledEx) + { + throw new AnalyticsTimeoutException("The FetchResults request was canceled.", taskCanceledEx); + } + } + + public async Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + { + var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var httpClient = CreateHttpClient(timeout); + + var resultUri = new Uri(_baseUri, $"api/v1/request/result/{handle}"); + var request = new HttpRequestMessage(HttpMethod.Delete, resultUri); + + LogDiscardResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handle)); + + try + { + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken) + .ConfigureAwait(false); + + if (response.StatusCode == HttpStatusCode.NotFound) + { + // Per spec: 404 means already discarded or canceled — not an error + LogDiscardResults404(_logger, _redactor.SystemData(handle)); + return; + } + + LogDiscardResultsResponse(_logger, _redactor.SystemData(handle), (int)response.StatusCode); + + if (response.StatusCode != HttpStatusCode.Accepted) + { + throw new AnalyticsException($"Unexpected response from DiscardResults: HTTP {(int)response.StatusCode} {response.StatusCode}"); + } + } + catch (TaskCanceledException taskCanceledEx) + { + throw new AnalyticsTimeoutException("The DiscardResults request was canceled.", taskCanceledEx); + } + } + + public async Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + { + var timeout = requestTimeout ?? _clusterOptions.TimeoutOptions.DispatchTimeout; + var httpClient = CreateHttpClient(timeout); + + var cancelUri = new Uri(_baseUri, "api/v1/active_requests"); + var request = new HttpRequestMessage(HttpMethod.Delete, cancelUri) + { + // Per spec: content-type must be application/x-www-form-urlencoded + Content = new FormUrlEncodedContent(new[] + { + new KeyValuePair("request_id", requestId) + }) + }; + + LogCancelQueryRequest(_logger, _redactor.SystemData(cancelUri), _redactor.SystemData(requestId)); + + try + { + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken) + .ConfigureAwait(false); + + if (response.StatusCode == HttpStatusCode.NotFound) + { + // Per spec: 404 means already discarded or canceled — not an error + LogCancelQuery404(_logger, _redactor.SystemData(requestId)); + return; + } + + LogCancelQueryResponse(_logger, _redactor.SystemData(requestId), (int)response.StatusCode); + + if (!response.IsSuccessStatusCode) + { + throw new AnalyticsException($"Unexpected response from CancelQuery: HTTP {(int)response.StatusCode} {response.StatusCode}"); + } + } + catch (TaskCanceledException taskCanceledEx) + { + throw new AnalyticsTimeoutException("The CancelQuery request was canceled.", taskCanceledEx); + } + } + + // ──────────────────────────────────────────────────────────── + // Shared helpers + // ──────────────────────────────────────────────────────────── + private static void ThrowGlobalTimeout(Exception? lastException, TimeSpan elapsed, ErrorContext? errorContext = null) { var timeoutException = new AnalyticsTimeoutException( @@ -207,8 +549,8 @@ private static Exception ThrowTooManyRetries(ErrorContext errorContext) #region Logging - [LoggerMessage(1, LogLevel.Debug, "Analytics query attempt {Attempt} starting for {ClientContextId} (elapsed: {Elapsed}ms)")] - private static partial void LogQueryAttemptStarting(ILogger logger, int attempt, string? clientContextId, double elapsed); + [LoggerMessage(1, LogLevel.Debug, "Analytics query attempt {Attempt} starting for {ClientContextId}: {Statement} (elapsed: {Elapsed}ms)")] + private static partial void LogQueryAttemptStarting(ILogger logger, int attempt, string? clientContextId, Redacted statement, double elapsed); [LoggerMessage(2, LogLevel.Debug, "Received retriable server errors for ClientContextId {ClientContextId}, retrying...")] private static partial void LogRetriableServerErrors(ILogger logger, string? clientContextId); @@ -216,8 +558,52 @@ private static Exception ThrowTooManyRetries(ErrorContext errorContext) [LoggerMessage(3, LogLevel.Debug, "HttpRequestException is not retriable, failing immediately")] private static partial void LogNonRetriableHttpException(ILogger logger); - [LoggerMessage(4, LogLevel.Debug, "Analytics query attempt {Attempt} for ClientContextId {ClientContextId} failed: {Error} (elapsed: {Elapsed}ms)")] - private static partial void LogQueryAttemptFailed(ILogger logger, Exception ex, int attempt, string? clientContextId, string error, double elapsed); + [LoggerMessage(4, LogLevel.Debug, "Analytics query attempt {Attempt} for ClientContextId {ClientContextId}: {Statement} failed: {Error} (elapsed: {Elapsed}ms)")] + private static partial void LogQueryAttemptFailed(ILogger logger, Exception ex, int attempt, string? clientContextId, Redacted statement, string error, double elapsed); + + [LoggerMessage(5, LogLevel.Debug, "Async StartQuery attempt {Attempt} sending POST to {Uri} for {ClientContextId}: {Statement} (elapsed: {Elapsed}ms)")] + private static partial void LogAsyncStartQueryAttempt(ILogger logger, int attempt, Redacted uri, string? clientContextId, Redacted statement, double elapsed); + + [LoggerMessage(6, LogLevel.Debug, "Async StartQuery attempt {Attempt} for {ClientContextId}: {Statement} failed: {Error}")] + private static partial void LogAsyncStartQueryFailed(ILogger logger, Exception ex, int attempt, string? clientContextId, Redacted statement, string error); + + [LoggerMessage(7, LogLevel.Debug, "DiscardResults returned 404 for handle {Handle} — already discarded or canceled.")] + private static partial void LogDiscardResults404(ILogger logger, Redacted handle); + + [LoggerMessage(8, LogLevel.Debug, "CancelQuery returned 404 for requestId {RequestId} — already discarded or canceled.")] + private static partial void LogCancelQuery404(ILogger logger, Redacted requestId); + + // ── Async API: request-sent / response-received pairs ── + + [LoggerMessage(9, LogLevel.Debug, "Async StartQuery succeeded for {ClientContextId}. Handle={Handle}, RequestId={RequestId} (HTTP {StatusCode})")] + private static partial void LogAsyncStartQuerySucceeded(ILogger logger, string? clientContextId, Redacted handle, Redacted requestId, int statusCode); + + [LoggerMessage(10, LogLevel.Debug, "FetchStatus sending GET to {Uri} for handle {Handle}")] + private static partial void LogFetchStatusRequest(ILogger logger, Redacted uri, Redacted handle); + + [LoggerMessage(11, LogLevel.Debug, "FetchStatus for handle {Handle} returned status={Status} (HTTP {StatusCode})")] + private static partial void LogFetchStatusResponse(ILogger logger, Redacted handle, string status, int statusCode); + + [LoggerMessage(12, LogLevel.Warning, "FetchStatus for handle {Handle} returned unexpected HTTP {StatusCode}")] + private static partial void LogFetchStatusUnexpectedHttp(ILogger logger, Redacted handle, int statusCode); + + [LoggerMessage(13, LogLevel.Debug, "FetchResults sending GET to {Uri} for handle {Handle}")] + private static partial void LogFetchResultsRequest(ILogger logger, Redacted uri, Redacted handle); + + [LoggerMessage(14, LogLevel.Debug, "FetchResults for handle {Handle} received HTTP {StatusCode}")] + private static partial void LogFetchResultsResponse(ILogger logger, Redacted handle, int statusCode); + + [LoggerMessage(15, LogLevel.Debug, "DiscardResults sending DELETE to {Uri} for handle {Handle}")] + private static partial void LogDiscardResultsRequest(ILogger logger, Redacted uri, Redacted handle); + + [LoggerMessage(16, LogLevel.Debug, "DiscardResults for handle {Handle} completed with HTTP {StatusCode}")] + private static partial void LogDiscardResultsResponse(ILogger logger, Redacted handle, int statusCode); + + [LoggerMessage(17, LogLevel.Debug, "CancelQuery sending DELETE to {Uri} for requestId {RequestId}")] + private static partial void LogCancelQueryRequest(ILogger logger, Redacted uri, Redacted requestId); + + [LoggerMessage(18, LogLevel.Debug, "CancelQuery for requestId {RequestId} completed with HTTP {StatusCode}")] + private static partial void LogCancelQueryResponse(ILogger logger, Redacted requestId, int statusCode); #endregion } diff --git a/src/Couchbase.Analytics/Internal/ConnectionString.cs b/src/Couchbase.Analytics/Internal/ConnectionString.cs index 4da3996..3b0cb2a 100644 --- a/src/Couchbase.Analytics/Internal/ConnectionString.cs +++ b/src/Couchbase.Analytics/Internal/ConnectionString.cs @@ -151,6 +151,20 @@ internal Uri GetAnalyticsServiceUri() }.Uri; } + /// + /// Returns the base service URI (scheme + host + port) without any path. + /// Used for building dynamic paths for the async query API endpoints. + /// + internal Uri GetBaseServiceUri() + { + return new UriBuilder + { + Scheme = Scheme.ToString(), + Host = Hosts.First().Host, + Port = Hosts.First().Port ?? (Scheme == Scheme.Https ? HttpsPort : HttpPort), + }.Uri; + } + public bool TryGetParameter(string key, out object parameter) { if (Parameters.TryGetValue(key, out var value)) diff --git a/src/Couchbase.Analytics/Internal/IAnalyticsService.cs b/src/Couchbase.Analytics/Internal/IAnalyticsService.cs index 09ff995..376411f 100644 --- a/src/Couchbase.Analytics/Internal/IAnalyticsService.cs +++ b/src/Couchbase.Analytics/Internal/IAnalyticsService.cs @@ -19,8 +19,10 @@ * ************************************************************/ #endregion +using Couchbase.AnalyticsClient.Async; using Couchbase.AnalyticsClient.Options; using Couchbase.AnalyticsClient.Results; +using Couchbase.Core.Json; namespace Couchbase.AnalyticsClient.Internal; @@ -29,4 +31,36 @@ internal interface IAnalyticsService Uri Uri { get; } Task SendAsync(string statement, QueryOptions options, CancellationToken cancellationToken = default); + + // Async server request API methods + + /// + /// Starts an asynchronous query on the server. + /// Sends POST to /api/v1/request with mode=async. + /// + Task StartQueryAsync(string statement, StartQueryOptions options, CancellationToken cancellationToken = default); + + /// + /// Fetches the status of an async query from the server. + /// Sends GET to /api/v1/request/status/{handle}. + /// + Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default); + + /// + /// Fetches the results of a completed async query from the server. + /// Sends GET to /api/v1/request/result/{handle}. + /// + Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default); + + /// + /// Discards the results of an async query on the server. + /// Sends DELETE to /api/v1/request/result/{handle}. + /// + Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default); + + /// + /// Cancels an active async query on the server. + /// Sends DELETE to /api/v1/active_requests with the request_id. + /// + Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default); } diff --git a/src/Couchbase.Analytics/Logging/IRedactor.cs b/src/Couchbase.Analytics/Logging/IRedactor.cs new file mode 100644 index 0000000..319afd6 --- /dev/null +++ b/src/Couchbase.Analytics/Logging/IRedactor.cs @@ -0,0 +1,48 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2026 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using System.Diagnostics.CodeAnalysis; + +namespace Couchbase.AnalyticsClient.Logging; + +/// +/// An interface used for redacting specific log information. +/// +public interface IRedactor +{ + /// + /// Redact user data like query statements, document keys, usernames. + /// + [return: NotNullIfNotNull(nameof(message))] + object? UserData(object? message); + + /// + /// Redact metadata like bucket names, dataset names, index names. + /// + [return: NotNullIfNotNull(nameof(message))] + object? MetaData(object? message); + + /// + /// Redact system data like hostnames, endpoints, URIs. + /// + [return: NotNullIfNotNull(nameof(message))] + object? SystemData(object? message); +} diff --git a/src/Couchbase.Analytics/Logging/Redacted.cs b/src/Couchbase.Analytics/Logging/Redacted.cs new file mode 100644 index 0000000..101419a --- /dev/null +++ b/src/Couchbase.Analytics/Logging/Redacted.cs @@ -0,0 +1,87 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2026 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using System.Runtime.InteropServices; + +namespace Couchbase.AnalyticsClient.Logging; + +/// +/// Wraps a value in an optional pair of redaction tags when written to a string. +/// String formatting is delayed to the call to . +/// This avoids the string formatting cost for disabled log levels. +/// +/// +/// +/// Since this type is a structure, it avoids heap allocations so long as we're using strongly typed +/// logging mechanisms to avoid boxing (i.e. [LoggerMessage] source generators). +/// +/// +/// Because this type implements it also avoids string allocations when +/// used in string interpolation expressions. +/// +/// +[StructLayout(LayoutKind.Auto)] +internal readonly struct Redacted : ISpanFormattable +{ + private readonly T _value; + private readonly string? _redactionType; + + /// + /// Creates a no-op redaction — the value is not marked for redaction. + /// + public Redacted(T value) : this(value, null) + { + } + + /// + /// Creates a redaction of the given type. + /// + /// Value to wrap. + /// The type of redaction (e.g. "ud", "md", "sd"), or null to not redact. + public Redacted(T value, string? redactionType) + { + _value = value; + _redactionType = redactionType; + } + + public override string ToString() => ToString(null, null); + + public string ToString(string? format, IFormatProvider? formatProvider) + { + if (_redactionType is null) + { + return _value is IFormattable formattable + ? formattable.ToString(null, formatProvider) + : _value?.ToString() ?? ""; + } + + Span buffer = stackalloc char[128]; + return string.Create(formatProvider, buffer, $"<{_redactionType}>{_value}"); + } + + /// + public bool TryFormat(Span destination, out int charsWritten, ReadOnlySpan format, IFormatProvider? provider) + { + return _redactionType is not null + ? destination.TryWrite(provider, $"<{_redactionType}>{_value}", out charsWritten) + : destination.TryWrite(provider, $"{_value}", out charsWritten); + } +} diff --git a/src/Couchbase.Analytics/Logging/RedactionLevel.cs b/src/Couchbase.Analytics/Logging/RedactionLevel.cs new file mode 100644 index 0000000..9ac7037 --- /dev/null +++ b/src/Couchbase.Analytics/Logging/RedactionLevel.cs @@ -0,0 +1,43 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2026 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +namespace Couchbase.AnalyticsClient.Logging; + +/// +/// Specifies the level of log redaction. +/// +public enum RedactionLevel +{ + /// + /// No redaction is performed; this is the default. + /// + None, + + /// + /// Only user data is redacted; system and metadata are not. + /// + Partial, + + /// + /// User, system, and metadata are all redacted. + /// + Full +} diff --git a/src/Couchbase.Analytics/Logging/TypedRedactor.cs b/src/Couchbase.Analytics/Logging/TypedRedactor.cs new file mode 100644 index 0000000..a787174 --- /dev/null +++ b/src/Couchbase.Analytics/Logging/TypedRedactor.cs @@ -0,0 +1,87 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2026 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using System.Runtime.CompilerServices; + +namespace Couchbase.AnalyticsClient.Logging; + +/// +/// Provides strongly-typed log redaction. Returns structs +/// to avoid boxing when used with [LoggerMessage] source-generated logging. +/// +/// +/// This type doesn't have an interface and is injected by class so that methods may be inlined. +/// For the public API, use . +/// +internal sealed class TypedRedactor +{ + private const string User = "ud"; + private const string Meta = "md"; + private const string System = "sd"; + + public TypedRedactor(RedactionLevel redactionLevel) + { + RedactionLevel = redactionLevel; + } + + public RedactionLevel RedactionLevel { get; } + + /// + /// Redact user data like query statements, document keys, usernames. + /// + public Redacted UserData(T message) => RedactMessage(message, User); + + /// + /// Redact metadata like bucket names, dataset names, index names. + /// + public Redacted MetaData(T message) => RedactMessage(message, Meta); + + /// + /// Redact system data like hostnames, endpoints, URIs. + /// + public Redacted SystemData(T message) => RedactMessage(message, System); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private Redacted RedactMessage(T message, string redactionType) + { + switch (RedactionLevel) + { + case RedactionLevel.None: + return new Redacted(message); + + case RedactionLevel.Full: + break; + + case RedactionLevel.Partial: + if (!ReferenceEquals(redactionType, User)) + { + return new Redacted(message); + } + break; + + default: + throw new ArgumentOutOfRangeException(nameof(RedactionLevel), + $"Unexpected redaction level: {RedactionLevel}"); + } + + return new Redacted(message, redactionType); + } +} diff --git a/src/Couchbase.Analytics/Options/ClusterOptions.cs b/src/Couchbase.Analytics/Options/ClusterOptions.cs index 859ecee..229c8c1 100644 --- a/src/Couchbase.Analytics/Options/ClusterOptions.cs +++ b/src/Couchbase.Analytics/Options/ClusterOptions.cs @@ -26,6 +26,7 @@ using Couchbase.AnalyticsClient.Internal; using Couchbase.AnalyticsClient.Internal.DI; using Couchbase.AnalyticsClient.Internal.Utils; +using Couchbase.AnalyticsClient.Logging; using Couchbase.Core.Json; using Couchbase.Core.Utils; using Microsoft.Extensions.Logging; @@ -46,6 +47,11 @@ public record ClusterOptions internal ConnectionString? ConnectionStringValue { get; private set; } + /// + /// The level of log redaction to apply. Default is . + /// + public RedactionLevel RedactionLevel { get; private set; } = RedactionLevel.None; + private ILoggerFactory? Logging { get; set; } public ClusterOptions WithSecurityOptions(SecurityOptions securityOptions) @@ -99,12 +105,23 @@ public ClusterOptions WithDeserializer(IDeserializer deserializer) return this with { Deserializer = deserializer }; } + /// + /// Set the to use for log redaction. + /// + /// The redaction level. + /// A copy of this object for method chaining. + public ClusterOptions WithRedactionLevel(RedactionLevel redactionLevel) + { + return this with { RedactionLevel = redactionLevel }; + } + private readonly IDictionary _services = DefaultServices.GetDefaultServices(); internal ICouchbaseServiceProvider BuildServiceProvider(ICredential? credential = null) { this.AddClusterService(this); this.AddClusterService(Logging ??= new NullLoggerFactory()); + this.AddClusterService(new TypedRedactor(RedactionLevel)); if (credential is not null) this.AddClusterService(credential); return new CouchbaseServiceProvider(_services); } diff --git a/src/Couchbase.Analytics/Options/StartQueryOptions.cs b/src/Couchbase.Analytics/Options/StartQueryOptions.cs new file mode 100644 index 0000000..e4250ad --- /dev/null +++ b/src/Couchbase.Analytics/Options/StartQueryOptions.cs @@ -0,0 +1,227 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2025 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using System.Text.Json; +using Couchbase.AnalyticsClient.Query; +using Couchbase.Core.Json; +using Couchbase.Core.Utils; + +namespace Couchbase.AnalyticsClient.Options; + +/// +/// Options for starting an asynchronous server-side query via . +/// +/// +/// Similar to with these differences: +/// +/// is renamed to to disambiguate from . +/// controls the per-HTTP-request timeout for SDK calls (status polls, result fetch, etc.). +/// overrides the server's default TTL for the result set. +/// +/// +public record StartQueryOptions +{ + /// + /// The server-side query timeout. If unset, the default 's QueryTimeout will be used. + /// This is sent to the server to control how long the query is allowed to run. + /// + public TimeSpan? QueryTimeout { get; init; } + + /// + /// The per-HTTP-request timeout for SDK operations related to this async query + /// (e.g., starting the query, polling status, fetching results, cancelling). + /// If unset, defaults to . + /// + public TimeSpan? RequestTimeout { get; init; } + + /// + /// Optional result TTL that overrides the cluster's default (1 hour) for the query's result set. + /// Once the TTL expires, the server discards the result set. + /// The value should be a duration string (e.g., "30m", "2h"). + /// + public string? ResultTTL { get; init; } + + /// + /// The ClientContextId to be used for the query request. Used to identify the query in logs and profiles. + /// If none is provided, a new GUID will be generated. + /// + public string ClientContextId { get; init; } = Guid.NewGuid().ToString(); + + /// + /// Named parameters for the query request. + /// + public Dictionary NamedParameters { get; init; } = new(); + + /// + /// Positional parameters for the query request. + /// + public List PositionalParameters { get; init; } = new(); + + /// + /// The scan consistency for the query request. + /// + public QueryScanConsistency? ScanConsistency { get; init; } + + /// + /// Used to deserialize query rows. + /// + public IDeserializer? Deserializer { get; init; } + + /// + /// Whether the query is read-only. + /// + public bool? ReadOnly { get; init; } + + /// + /// Maximum number of times to retry a request (when the error is retryable). + /// Overrides when provided. + /// + [InterfaceStability(StabilityLevel.Volatile)] + public uint? MaxRetries { get; init; } + + /// + /// Raw parameters passed directly to the analytics service for advanced options. + /// + public Dictionary Raw { get; init; } = new(); + + /// + /// The query context (database and scope) applied to the query. Internal use. + /// + internal QueryContext? QueryContext { get; init; } + + internal string GetFormValuesAsJson(string statement) + { + return JsonSerializer.Serialize(GetFormValues(statement)); + } + + internal IDictionary GetFormValues(string statement) + { + statement = CleanStatement(statement); + var formValues = new Dictionary + { + { "statement", statement }, + { "client_context_id", ClientContextId }, + { "mode", "async" } + }; + + if (QueryTimeout.HasValue) + { + var formTimeout = QueryTimeout.Value.Add(TimeSpan.FromSeconds(5)); + formValues["timeout"] = $"{(int)formTimeout.TotalMilliseconds}ms"; + } + + if (ScanConsistency.HasValue) + { + formValues["scan_consistency"] = + ScanConsistency == QueryScanConsistency.NotBounded ? "not_bounded" : "request_plus"; + } + + if (ReadOnly.HasValue) + { + formValues["readonly"] = ReadOnly; + } + + if (QueryContext is not null) + { + formValues["query_context"] = QueryContext.ToString(); + } + + if (!string.IsNullOrWhiteSpace(ResultTTL)) + { + formValues["result_ttl"] = ResultTTL; + } + + foreach (var parameter in NamedParameters) + { + formValues.Add(parameter.Key.StartsWith('$') ? parameter.Key : $"${parameter.Key}", parameter.Value); + } + + if (PositionalParameters.Count != 0) + { + formValues.Add("args", PositionalParameters.ToArray()); + } + + foreach (var rawParameter in Raw) + { + formValues.Add(rawParameter.Key, rawParameter.Value); + } + + return formValues; + } + + private static string CleanStatement(string statement) + { + if (string.IsNullOrWhiteSpace(statement)) + { + throw new ArgumentException("statement cannot be null or empty"); + } + + return statement.Trim(); + } + + // Fluent builder methods + + public StartQueryOptions WithQueryTimeout(TimeSpan? queryTimeout) => this with { QueryTimeout = queryTimeout }; + + public StartQueryOptions WithRequestTimeout(TimeSpan? requestTimeout) => this with { RequestTimeout = requestTimeout }; + + public StartQueryOptions WithResultTTL(string? resultTTL) => this with { ResultTTL = resultTTL }; + + public StartQueryOptions WithClientContextId(string clientContextId) => this with { ClientContextId = clientContextId }; + + public StartQueryOptions WithScanConsistency(QueryScanConsistency? scanConsistency) => this with { ScanConsistency = scanConsistency }; + + public StartQueryOptions WithReadOnly(bool? readOnly) => this with { ReadOnly = readOnly }; + + [InterfaceStability(StabilityLevel.Volatile)] + public StartQueryOptions WithMaxRetries(uint? maxRetries) => this with { MaxRetries = maxRetries }; + + public StartQueryOptions WithNamedParameters(Dictionary namedParameters) => this with { NamedParameters = namedParameters }; + + public StartQueryOptions WithNamedParameter(string name, object value) + { + var copy = new Dictionary(NamedParameters); + copy[name] = value; + return this with { NamedParameters = copy }; + } + + public StartQueryOptions WithPositionalParameters(IEnumerable positionalParameters) => + this with { PositionalParameters = new List(positionalParameters) }; + + public StartQueryOptions WithPositionalParameter(object parameter) + { + var copy = new List(PositionalParameters) { parameter }; + return this with { PositionalParameters = copy }; + } + + public StartQueryOptions WithRawParameters(Dictionary rawParameters) => this with { Raw = rawParameters }; + + public StartQueryOptions WithRaw(string name, object value) + { + var copy = new Dictionary(Raw); + copy[name] = value; + return this with { Raw = copy }; + } + + public StartQueryOptions WithDeserializer(IDeserializer deserializer) => this with { Deserializer = deserializer }; + + internal StartQueryOptions WithQueryContext(QueryContext queryContext) => this with { QueryContext = queryContext }; +} diff --git a/src/Couchbase.Core/Internal/JsonElementExtensions.cs b/src/Couchbase.Core/Internal/JsonElementExtensions.cs index ac50ff2..f1f45e9 100644 --- a/src/Couchbase.Core/Internal/JsonElementExtensions.cs +++ b/src/Couchbase.Core/Internal/JsonElementExtensions.cs @@ -150,7 +150,7 @@ public static bool TryGetValue(this JsonElement element, out T? value) if (typeof(T) == typeof(char) || typeof(T) == typeof(char?)) { - string str = element.GetString()!; + var str = element.GetString()!; if (str.Length == 1) { value = (T)(object)str[0]; diff --git a/src/Couchbase.Core/Json/JsonStreamReader.cs b/src/Couchbase.Core/Json/JsonStreamReader.cs index d785f97..9f46c68 100644 --- a/src/Couchbase.Core/Json/JsonStreamReader.cs +++ b/src/Couchbase.Core/Json/JsonStreamReader.cs @@ -561,7 +561,7 @@ private async Task ReadFromStreamAsync(CancellationToken cancellationToken) _buffer.Buffer.AsMemory(writeIndex), cancellationToken).ConfigureAwait(false); #else - int readBytes = await _stream.ReadAsync( + var readBytes = await _stream.ReadAsync( _buffer.Buffer, writeIndex, _buffer.Buffer.Length - writeIndex, @@ -637,8 +637,8 @@ public void EnsureBufferSpace() { // We've used more than half of the buffer, grow it to make more room and shift to the beginning - byte[] oldBuffer = Buffer; - byte[] newBuffer = ArrayPool.Shared.Rent(oldBuffer.Length * 2); + var oldBuffer = Buffer; + var newBuffer = ArrayPool.Shared.Rent(oldBuffer.Length * 2); System.Buffer.BlockCopy(oldBuffer, Offset, newBuffer, 0, UsedBytes); Buffer = newBuffer; diff --git a/tests/Couchbase.Analytics.UnitTests/Async/QueryStatusTests.cs b/tests/Couchbase.Analytics.UnitTests/Async/QueryStatusTests.cs new file mode 100644 index 0000000..ed0aced --- /dev/null +++ b/tests/Couchbase.Analytics.UnitTests/Async/QueryStatusTests.cs @@ -0,0 +1,209 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2026 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using System.Text.Json; +using Couchbase.AnalyticsClient.Async; +using Couchbase.AnalyticsClient.Exceptions; +using Couchbase.AnalyticsClient.Internal; +using Couchbase.AnalyticsClient.Options; +using Couchbase.AnalyticsClient.Query; +using Couchbase.AnalyticsClient.Results; +using Couchbase.Core.Json; +using Xunit; + +namespace Couchbase.AnalyticsClient.UnitTests.Async; + +public class QueryStatusTests +{ + private static readonly IAnalyticsService StubService = new StubAnalyticsService(); + private static readonly IDeserializer StubDeserializer = new StjJsonDeserializer(new JsonSerializerOptions()); + + private static QueryStatus Create(string status, string? resultHandle = null) => + new(status, resultHandle, errors: null, metrics: null, + resultCount: null, partitions: null, resultSetOrdered: null, createdAt: null, + StubService, StubDeserializer); + + // ─── AreResultsReady ─── + + [Fact] + public void AreResultsReady_Success_ReturnsTrue() + { + Assert.True(Create("success", "/api/v1/request/result/abc/1-0").AreResultsReady); + } + + [Theory] + [InlineData("queued")] + [InlineData("running")] + [InlineData("fatal")] + [InlineData("failed")] + [InlineData("timeout")] + public void AreResultsReady_NonSuccess_ReturnsFalse(string status) + { + Assert.False(Create(status).AreResultsReady); + } + + // ─── IsError ─── + + [Theory] + [InlineData("fatal")] + [InlineData("timeout")] + [InlineData("failed")] // undocumented but observed from server (e.g., cancelled queries) + public void IsError_TerminalErrorStatuses_ReturnsTrue(string status) + { + Assert.True(Create(status).IsError); + } + + [Theory] + [InlineData("queued")] + [InlineData("running")] + [InlineData("success")] + public void IsError_NonErrorStatuses_ReturnsFalse(string status) + { + Assert.False(Create(status).IsError); + } + + // ─── Case insensitivity ─── + // One test proves all status comparisons are case-insensitive by checking + // every known status in UPPER and Title case against its lowercase behavior. + + [Theory] + [InlineData("success")] + [InlineData("fatal")] + [InlineData("failed")] + [InlineData("timeout")] + [InlineData("queued")] + [InlineData("running")] + public void StatusComparisons_AreCaseInsensitive(string lowercase) + { + var upper = Create(lowercase.ToUpperInvariant()); + var title = Create(char.ToUpperInvariant(lowercase[0]) + lowercase[1..]); + var lower = Create(lowercase); + + Assert.Equal(lower.AreResultsReady, upper.AreResultsReady); + Assert.Equal(lower.AreResultsReady, title.AreResultsReady); + Assert.Equal(lower.IsError, upper.IsError); + Assert.Equal(lower.IsError, title.IsError); + } + + // ─── GetResults ─── + + [Fact] + public void GetResults_WhenSuccess_ReturnsHandleResults() + { + var (results, error) = Create("success", "/api/v1/request/result/abc/1-0").GetResults(); + Assert.NotNull(results); + Assert.Null(error); + } + + [Theory] + [InlineData("fatal")] + [InlineData("failed")] + [InlineData("timeout")] + public void GetResults_WhenError_ReturnsException(string status) + { + var (results, error) = Create(status).GetResults(); + Assert.Null(results); + Assert.IsAssignableFrom(error); + } + + [Theory] + [InlineData("queued")] + [InlineData("running")] + public void GetResults_WhenInProgress_ThrowsInvalidOperation(string status) + { + Assert.Throws(() => Create(status).GetResults()); + } + + // ─── FetchStatus stores errors without throwing ─── + + [Theory] + [InlineData("fatal")] + [InlineData("timeout")] + public void FatalOrTimeout_WithErrors_StoresErrorsWithoutThrowing(string status) + { + var errors = new[] { new QueryError(23034, "Insufficient memory", false) }; + var qs = new QueryStatus(status, null, errors, null, + null, null, null, null, + StubService, StubDeserializer); + + // QueryStatus itself should never throw — errors surface via GetResults + Assert.True(qs.IsError); + var (results, error) = qs.GetResults(); + Assert.Null(results); + Assert.NotNull(error); + Assert.Contains("Insufficient memory", error.Message); + } + + // ─── Additional response fields ─── + + [Fact] + public void ResponseFields_AreExposed() + { + var partitions = new[] { new QueryPartition { Handle = "/api/v1/request/result/abc/1-0/0", ResultCount = 100 } }; + var createdAt = new DateTimeOffset(2026, 3, 16, 17, 15, 40, 850, TimeSpan.Zero); + + var qs = new QueryStatus("success", "/api/v1/request/result/abc/1-0", null, null, + resultCount: 100, partitions: partitions, resultSetOrdered: false, createdAt: createdAt, + StubService, StubDeserializer); + + Assert.Equal(100, qs.ResultCount); + Assert.Single(qs.Partitions!); + Assert.Equal("/api/v1/request/result/abc/1-0/0", qs.Partitions![0].Handle); + Assert.Equal(100, qs.Partitions![0].ResultCount); + Assert.False(qs.ResultSetOrdered); + Assert.Equal(createdAt, qs.CreatedAt); + } + + [Fact] + public void ResponseFields_AreNullWhenNotProvided() + { + var qs = Create("queued"); + Assert.Null(qs.ResultCount); + Assert.Null(qs.Partitions); + Assert.Null(qs.ResultSetOrdered); + Assert.Null(qs.CreatedAt); + } + + // ─── Stub service ─── + + private sealed class StubAnalyticsService : IAnalyticsService + { + public Uri Uri { get; } = new("http://localhost"); + + public Task SendAsync(string statement, QueryOptions options, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task StartQueryAsync(string statement, StartQueryOptions options, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + } +} diff --git a/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs b/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs index d1d3827..8ceaeac 100644 --- a/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs +++ b/tests/Couchbase.Analytics.UnitTests/Internal/AnalyticsServiceTests.cs @@ -3,6 +3,7 @@ using Couchbase.AnalyticsClient.Internal; using Couchbase.AnalyticsClient.Internal.HTTP; using Couchbase.AnalyticsClient.Internal.Results; +using Couchbase.AnalyticsClient.Logging; using Couchbase.AnalyticsClient.Options; using Couchbase.Core.Json; using Microsoft.Extensions.Logging; @@ -42,7 +43,8 @@ public void Constructor_InitializesCorrectly() var service = new AnalyticsService( _clusterOptions, _httpClientFactoryMock.Object, - _loggerMock.Object); + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); const string ExecuteQueryPath = "/api/v1/request"; var expected = new UriBuilder(_endPoint); @@ -74,7 +76,8 @@ public async Task SendAsync_ValidQuery_ReturnsBlockingAnalyticsResult() var service = new AnalyticsService( _clusterOptions, _httpClientFactoryMock.Object, - _loggerMock.Object); + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); var queryOptions = new QueryOptions { AsStreaming = false }; @@ -110,7 +113,8 @@ public async Task SendAsync_WithPriority_AddsPriorityHeader() var service = new AnalyticsService( _clusterOptions, _httpClientFactoryMock.Object, - _loggerMock.Object); + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); var queryOptions = new QueryOptions { AsStreaming = false }; @@ -142,7 +146,8 @@ public async Task SendAsync_WithStreaming_ReturnsStreamingAnalyticsResult() var service = new AnalyticsService( _clusterOptions, _httpClientFactoryMock.Object, - _loggerMock.Object); + _loggerMock.Object, + new TypedRedactor(RedactionLevel.None)); var queryOptions = new QueryOptions { AsStreaming = true }; diff --git a/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs b/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs index 8dff93d..ff267a5 100644 --- a/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs +++ b/tests/Couchbase.Analytics.UnitTests/Internal/ExecuteQueryTests.cs @@ -1,9 +1,11 @@ +using Couchbase.AnalyticsClient.Async; using Couchbase.AnalyticsClient.DI; using Couchbase.AnalyticsClient.HTTP; using Couchbase.AnalyticsClient.Internal; using Couchbase.AnalyticsClient.Options; using Couchbase.AnalyticsClient.Query; using Couchbase.AnalyticsClient.Results; +using Couchbase.Core.Json; using Xunit; using Xunit.Abstractions; @@ -51,6 +53,21 @@ public Task SendAsync(string statement, QueryOptions options, Canc LastOptions = options; return Task.FromResult(new FakeQueryResult()); } + + public Task StartQueryAsync(string statement, StartQueryOptions options, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task FetchStatusAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task FetchResultsAsync(string handle, TimeSpan? requestTimeout, IDeserializer deserializer, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task DiscardResultsAsync(string handle, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public Task CancelQueryAsync(string requestId, TimeSpan? requestTimeout, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); } // Helper method to print out QueryOptions for debugging diff --git a/tests/Couchbase.Analytics.UnitTests/Internal/RetryUtilsTests.cs b/tests/Couchbase.Analytics.UnitTests/Internal/RetryUtilsTests.cs index 45fb055..39c218a 100644 --- a/tests/Couchbase.Analytics.UnitTests/Internal/RetryUtilsTests.cs +++ b/tests/Couchbase.Analytics.UnitTests/Internal/RetryUtilsTests.cs @@ -28,7 +28,7 @@ public RetryUtilsTests(ITestOutputHelper outputHelper) public void CalculateBackoffDelay_ReturnsExpectedRange(int attemptNumber, double expectedMinMs, double expectedMaxMs) { var delays = new List(); - for (int i = 0; i < 100; i++) + for (var i = 0; i < 100; i++) { var delay = RetryUtils.CalculateBackoffDelay(attemptNumber); delays.Add(delay.TotalMilliseconds); @@ -65,7 +65,7 @@ public void CalculateBackoffDelay_ExponentialGrowth_FollowsExpectedPattern() _outputHelper.WriteLine("Attempt | Base Delay | Expected Range (±25% jitter) | Actual Sample"); _outputHelper.WriteLine("--------|------------|-------------------------------|---------------"); - for (int attempt = 0; attempt <= 10; attempt++) + for (var attempt = 0; attempt <= 10; attempt++) { // Calculate the theoretical base delay (before jitter) const uint baseDelayMs = 100; @@ -106,7 +106,7 @@ public void CalculateBackoffDelay_MaxDelayIsCapped() public void CalculateBackoffDelay_MinDelayIsNonNegative() { // Test that delays are never negative (even with jitter) - for (int attempt = 0; attempt <= 5; attempt++) + for (var attempt = 0; attempt <= 5; attempt++) { var delay = RetryUtils.CalculateBackoffDelay(attempt); diff --git a/tests/Couchbase.Analytics.UnitTests/Logging/RedactionTests.cs b/tests/Couchbase.Analytics.UnitTests/Logging/RedactionTests.cs new file mode 100644 index 0000000..96e7edb --- /dev/null +++ b/tests/Couchbase.Analytics.UnitTests/Logging/RedactionTests.cs @@ -0,0 +1,191 @@ +#region License +/* ************************************************************ + * + * @author Couchbase + * @copyright 2026 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion + +using Couchbase.AnalyticsClient.Logging; +using Xunit; + +namespace Couchbase.AnalyticsClient.UnitTests.Logging; + +public class RedactionTests +{ + // ─── RedactionLevel.None ─── + + [Fact] + public void None_UserData_NotRedacted() + { + var redactor = new TypedRedactor(RedactionLevel.None); + Assert.Equal("user-value", redactor.UserData("user-value").ToString()); + } + + [Fact] + public void None_MetaData_NotRedacted() + { + var redactor = new TypedRedactor(RedactionLevel.None); + Assert.Equal("meta-value", redactor.MetaData("meta-value").ToString()); + } + + [Fact] + public void None_SystemData_NotRedacted() + { + var redactor = new TypedRedactor(RedactionLevel.None); + Assert.Equal("system-value", redactor.SystemData("system-value").ToString()); + } + + // ─── RedactionLevel.Partial — only UserData gets tagged ─── + + [Fact] + public void Partial_UserData_Redacted() + { + var redactor = new TypedRedactor(RedactionLevel.Partial); + Assert.Equal("user-value", redactor.UserData("user-value").ToString()); + } + + [Fact] + public void Partial_MetaData_NotRedacted() + { + var redactor = new TypedRedactor(RedactionLevel.Partial); + Assert.Equal("meta-value", redactor.MetaData("meta-value").ToString()); + } + + [Fact] + public void Partial_SystemData_NotRedacted() + { + var redactor = new TypedRedactor(RedactionLevel.Partial); + Assert.Equal("system-value", redactor.SystemData("system-value").ToString()); + } + + // ─── RedactionLevel.Full — everything gets tagged ─── + + [Fact] + public void Full_UserData_Redacted() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + Assert.Equal("user-value", redactor.UserData("user-value").ToString()); + } + + [Fact] + public void Full_MetaData_Redacted() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + Assert.Equal("meta-value", redactor.MetaData("meta-value").ToString()); + } + + [Fact] + public void Full_SystemData_Redacted() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + Assert.Equal("system-value", redactor.SystemData("system-value").ToString()); + } + + // ─── Typed values (Uri, int) — verifies generic works ─── + + [Fact] + public void Full_SystemData_Uri_Redacted() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + var uri = new Uri("http://localhost:8095/api/v1/request"); + Assert.Equal("http://localhost:8095/api/v1/request", redactor.SystemData(uri).ToString()); + } + + [Fact] + public void None_SystemData_Uri_NotRedacted() + { + var redactor = new TypedRedactor(RedactionLevel.None); + var uri = new Uri("http://localhost:8095/api/v1/request"); + Assert.Equal("http://localhost:8095/api/v1/request", redactor.SystemData(uri).ToString()); + } + + [Fact] + public void Full_UserData_Int_Redacted() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + Assert.Equal("42", redactor.UserData(42).ToString()); + } + + // ─── Null handling ─── + + [Fact] + public void None_NullValue_ReturnsEmptyString() + { + var redactor = new TypedRedactor(RedactionLevel.None); + Assert.Equal("", redactor.UserData((string?)null).ToString()); + } + + [Fact] + public void Full_NullValue_ReturnsRedactedEmpty() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + Assert.Equal("", redactor.UserData((string?)null).ToString()); + } + + // ─── ISpanFormattable (string interpolation path) ─── + + [Fact] + public void Full_SpanFormattable_WorksInInterpolation() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + var result = $"host={redactor.SystemData("192.168.1.1")} port=8095"; + Assert.Equal("host=192.168.1.1 port=8095", result); + } + + [Fact] + public void None_SpanFormattable_WorksInInterpolation() + { + var redactor = new TypedRedactor(RedactionLevel.None); + var result = $"host={redactor.SystemData("192.168.1.1")} port=8095"; + Assert.Equal("host=192.168.1.1 port=8095", result); + } + + // ─── TryFormat directly ─── + + [Fact] + public void Full_TryFormat_WritesToSpan() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + var redacted = redactor.SystemData("test"); + + Span buffer = stackalloc char[64]; + Assert.True(redacted.TryFormat(buffer, out var written, default, null)); + Assert.Equal("test", buffer[..written].ToString()); + } + + [Fact] + public void TryFormat_BufferTooSmall_ReturnsFalse() + { + var redactor = new TypedRedactor(RedactionLevel.Full); + var redacted = redactor.SystemData("a-value-that-needs-tags"); + + Span buffer = stackalloc char[2]; // way too small + Assert.False(redacted.TryFormat(buffer, out _, default, null)); + } + + // ─── RedactionLevel property ─── + + [Theory] + [InlineData(RedactionLevel.None)] + [InlineData(RedactionLevel.Partial)] + [InlineData(RedactionLevel.Full)] + public void RedactionLevel_IsExposed(RedactionLevel level) + { + var redactor = new TypedRedactor(level); + Assert.Equal(level, redactor.RedactionLevel); + } +}