Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/Couchbase.Analytics/Async/QueryHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
using System.Text.Json;
using Couchbase.AnalyticsClient.Internal;
using Couchbase.AnalyticsClient.Options;
using Couchbase.AnalyticsClient.Results;
using Couchbase.AnalyticsClient.Query;

namespace Couchbase.AnalyticsClient.Async;

Expand All @@ -34,6 +34,10 @@ public class QueryHandle
{
private readonly IAnalyticsService _analyticsService;

internal string? Status { get; }

internal AsyncQueryMetrics? Metrics { get; }

/// <summary>
/// The query handle string used to poll for the result handle.
/// </summary>
Expand All @@ -44,11 +48,22 @@ public class QueryHandle
/// </summary>
public string RequestId { get; }

internal QueryHandle(string handle, string requestId, IAnalyticsService analyticsService)
internal QueryHandle(string handle, string requestId, JsonElement root, IAnalyticsService analyticsService)
{
Handle = handle ?? throw new ArgumentNullException(nameof(handle));
RequestId = requestId ?? throw new ArgumentNullException(nameof(requestId));
_analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService));

if (root.ValueKind is JsonValueKind.Undefined or JsonValueKind.Null)
{
throw new ArgumentException("The JSON response element must not be empty or null.", nameof(root));
}

Status = root.TryGetProperty("status", out var statusProp) ? statusProp.GetString() : null;
if (root.TryGetProperty("metrics", out var metricsElement))
{
Metrics = JsonSerializer.Deserialize<AsyncQueryMetrics>(metricsElement.GetRawText());
}
}

/// <summary>
Expand Down Expand Up @@ -93,4 +108,12 @@ public Task CancelAsync(Func<CancelOptions, CancelOptions> options, Cancellation
cancelOptions = options.Invoke(cancelOptions);
return CancelAsync(cancelOptions, cancellationToken);
}

/// <inheritdoc />
public override string ToString()
{
var elapsed = Metrics?.ElapsedTime?.TotalMilliseconds;
var metricsStr = elapsed.HasValue ? $"{elapsed}ms elapsed" : "none";
return $"QueryHandle [RequestId={RequestId}, Handle={Handle}, Status={Status ?? "unknown"}, Metrics={{{metricsStr}}}]";
}
}
36 changes: 35 additions & 1 deletion src/Couchbase.Analytics/Async/QueryResultHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
* ************************************************************/
#endregion

using System.Text.Json;
using Couchbase.AnalyticsClient.Internal;
using Couchbase.AnalyticsClient.Options;
using Couchbase.AnalyticsClient.Query;
using Couchbase.AnalyticsClient.Results;

namespace Couchbase.AnalyticsClient.Async;
Expand All @@ -33,16 +35,38 @@ public class QueryResultHandle
private readonly string _handlePath;
private readonly IAnalyticsService _analyticsService;

internal string? Status { get; }

internal AsyncQueryMetrics? Metrics { get; }

internal int? ResultCount { get; }

/// <summary>
/// The request ID assigned by the server when the query was submitted.
/// </summary>
public string RequestId { get; }

internal QueryResultHandle(string handlePath, string requestId, IAnalyticsService analyticsService)
internal QueryResultHandle(string handlePath, string requestId, JsonElement root, IAnalyticsService analyticsService)
{
_handlePath = handlePath ?? throw new ArgumentNullException(nameof(handlePath));
RequestId = requestId ?? throw new ArgumentNullException(nameof(requestId));
_analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService));

if (root.ValueKind is JsonValueKind.Undefined or JsonValueKind.Null)
{
throw new ArgumentException("The JSON response element must not be empty or null.", nameof(root));
}

Status = root.TryGetProperty("status", out var statusProp) ? statusProp.GetString() : null;
if (root.TryGetProperty("metrics", out var metricsElement))
{
Metrics = JsonSerializer.Deserialize<AsyncQueryMetrics>(metricsElement.GetRawText());
}

if (root.TryGetProperty("resultCount", out var resultCountProp) && resultCountProp.TryGetInt32(out var resultCount))
{
ResultCount = resultCount;
}
}

/// <summary>
Expand Down Expand Up @@ -87,4 +111,14 @@ public Task DiscardResultsAsync(Func<DiscardResultsOptions, DiscardResultsOption
discardOptions = options.Invoke(discardOptions);
return DiscardResultsAsync(discardOptions, cancellationToken);
}

/// <inheritdoc />
public override string ToString()
{
var elapsed = Metrics?.ElapsedTime?.TotalMilliseconds;
var metricsStr = elapsed.HasValue ? $"{elapsed}ms elapsed" : "none";
var countStr = ResultCount.HasValue ? $", ResultCount={ResultCount}" : "";

return $"QueryResultHandle [RequestId={RequestId}, Status={Status ?? "unknown"}{countStr}, Metrics={{{metricsStr}}}]";
}
}
19 changes: 18 additions & 1 deletion src/Couchbase.Analytics/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

namespace Couchbase.AnalyticsClient;

public class Cluster : IDisposable
public partial class Cluster : IDisposable
{
private volatile ICredential _credential;
private readonly ClusterOptions _clusterOptions;
Expand All @@ -52,6 +52,8 @@ private Cluster(ICredential credential, ClusterOptions clusterOptions)

_logger = _serviceProvider.GetRequiredService<ILogger<Cluster>>();
_analyticsService = new LazyService<IAnalyticsService>(_serviceProvider);

LogClusterCreated(_logger, clusterOptions.ConnectionString);
}

/// <summary>
Expand Down Expand Up @@ -164,6 +166,7 @@ public void UpdateCredential(ICredential newCredential)
throw new InvalidOperationException(
$"Cannot change credential type from {current.GetType().Name} to {newCredential.GetType().Name}.");
_credential = newCredential;
LogCredentialUpdated(_logger, current.GetType().Name);
}

public Task<IQueryResult> ExecuteQueryAsync(string statement, Func<QueryOptions, QueryOptions> options, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -222,5 +225,19 @@ public void Dispose()
{
disposableProvider.Dispose();
}
LogClusterDisposed(_logger);
}

#region Logging

[LoggerMessage(1, LogLevel.Information, "Analytics Cluster initialized for connection: {ConnectionString}")]
private static partial void LogClusterCreated(ILogger logger, string connectionString);

[LoggerMessage(2, LogLevel.Information, "Analytics Cluster credentials dynamically updated (Type: {CredentialType})")]
private static partial void LogCredentialUpdated(ILogger logger, string credentialType);

[LoggerMessage(3, LogLevel.Information, "Analytics Cluster disposed. Releasing managed resources.")]
private static partial void LogClusterDisposed(ILogger logger);

#endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ public QueryNotFoundException(string message) : base(message) { }

public QueryNotFoundException(string message, Exception innerException) : base(message, innerException) { }

internal QueryNotFoundException(string message, Exception innerException, Couchbase.AnalyticsClient.Internal.Retry.ErrorContext errorContext) : base(message, innerException, errorContext) { }
internal QueryNotFoundException(string message, Exception innerException, Internal.Retry.ErrorContext errorContext) : base(message, innerException, errorContext) { }
}
6 changes: 1 addition & 5 deletions src/Couchbase.Analytics/HTTP/CertificateCredential.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,9 @@ public static CertificateCredential FromPkcs12(string path, string? password = n
public static CertificateCredential FromPem(string certPath, string keyPath) =>
new(X509Certificate2.CreateFromPemFile(certPath, keyPath));

/// <summary>
/// Excludes sensitive certificate details from the record's ToString output.
/// Only Subject and Thumbprint are included.
/// </summary>
private bool PrintMembers(StringBuilder builder)
{
builder.Append($"{nameof(Certificate.Subject)} = {Certificate.Subject}, {nameof(Certificate.Thumbprint)} = {Certificate.Thumbprint}");
builder.Append($"{nameof(Certificate)} = [Subject = {Certificate.Subject}, Thumbprint = {Certificate.Thumbprint}]");
return true;
}
}
2 changes: 1 addition & 1 deletion src/Couchbase.Analytics/HTTP/Credential.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static Credential Create(string username, string password)
/// Excludes <see cref="AuthorizationHeader"/> from the record's ToString output
/// to prevent leaking encoded credentials into logs.
/// </summary>
protected virtual bool PrintMembers(System.Text.StringBuilder builder)
protected virtual bool PrintMembers(StringBuilder builder)
{
builder.Append($"{nameof(Username)} = {Username}");
return true;
Expand Down
7 changes: 2 additions & 5 deletions src/Couchbase.Analytics/HTTP/JwtCredential.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#endregion

using System.Net.Http.Headers;
using System.Text;

namespace Couchbase.AnalyticsClient.HTTP;

Expand All @@ -43,11 +44,7 @@ public static JwtCredential Create(string token)
return new(token);
}

/// <summary>
/// Excludes the full token and <see cref="AuthorizationHeader"/> from the record's
/// ToString output to prevent leaking credentials into logs.
/// </summary>
private bool PrintMembers(System.Text.StringBuilder builder)
private bool PrintMembers(StringBuilder builder)
{
builder.Append($"{nameof(Token)} = <{Token.Length} chars>");
return true;
Expand Down
44 changes: 22 additions & 22 deletions src/Couchbase.Analytics/Internal/AnalyticsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private async Task<IQueryResult> ExecuteWithRetryAsync(string statement, QueryOp
}
try
{
LogQueryAttemptStarting(_logger, attempt + 1, options.ClientContextId, _redactor.UserData(statement), stopwatch.Elapsed.TotalMilliseconds);
LogQueryAttemptStarting(_logger, attempt + 1, options.ClientContextId, options.QueryContext?.ToString() ?? "<cluster>", _redactor.UserData(statement), stopwatch.Elapsed.TotalMilliseconds);

var result = await ExecuteQueryAsync(content, httpClient, options.AsStreaming, deserializer, errorContext, cancellationToken).ConfigureAwait(false);

Expand All @@ -172,7 +172,7 @@ private async Task<IQueryResult> ExecuteWithRetryAsync(string statement, QueryOp
}
catch (HttpRequestException httpRequestException)
{
LogQueryAttemptFailed(_logger, httpRequestException, attempt + 1, options.ClientContextId, _redactor.UserData(statement),
LogQueryAttemptFailed(_logger, httpRequestException, attempt + 1, options.ClientContextId, options.QueryContext?.ToString() ?? "<cluster>", _redactor.UserData(statement),
httpRequestException.Message, stopwatch.Elapsed.TotalMilliseconds);

// "No successful connection(s)" is retryable
Expand Down Expand Up @@ -238,7 +238,7 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio

try
{
LogAsyncStartQueryAttempt(_logger, attempt + 1, _redactor.SystemData(Uri), options.ClientContextId, _redactor.UserData(statement), stopwatch.Elapsed.TotalMilliseconds);
LogAsyncStartQueryAttempt(_logger, attempt + 1, _redactor.SystemData(Uri), options.ClientContextId, options.QueryContext?.ToString() ?? "<cluster>", _redactor.UserData(statement), stopwatch.Elapsed.TotalMilliseconds);

var request = new HttpRequestMessage(HttpMethod.Post, Uri) { Content = content };
var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken)
Expand Down Expand Up @@ -289,11 +289,11 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio
}

LogAsyncStartQuerySucceeded(_logger, options.ClientContextId, _redactor.SystemData(handlePath), _redactor.SystemData(requestId), (int)response.StatusCode);
return new QueryHandle(handlePath, requestId, this);
return new QueryHandle(handlePath, requestId, root, this);
}
catch (HttpRequestException httpRequestException)
{
LogAsyncStartQueryFailed(_logger, httpRequestException, attempt + 1, options.ClientContextId, _redactor.UserData(statement), httpRequestException.Message);
LogAsyncStartQueryFailed(_logger, httpRequestException, attempt + 1, options.ClientContextId, options.QueryContext?.ToString() ?? "<cluster>", _redactor.UserData(statement), httpRequestException.Message);

if (httpRequestException.InnerException is AggregateException aggregateEx)
{
Expand Down Expand Up @@ -333,7 +333,7 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio
var statusUri = Uri.TryCreate(handle.Handle, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps)
? absUri
: new Uri(_baseUri, handle.Handle);

var request = new HttpRequestMessage(HttpMethod.Get, statusUri);

LogFetchResultHandleRequest(_logger, _redactor.SystemData(statusUri), _redactor.SystemData(handle.Handle));
Expand Down Expand Up @@ -390,19 +390,19 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio

if (!string.Equals(status, "success", StringComparison.OrdinalIgnoreCase))
{
if (string.Equals(status, "stopped", StringComparison.OrdinalIgnoreCase) ||
if (string.Equals(status, "stopped", StringComparison.OrdinalIgnoreCase) ||
string.Equals(status, "aborted", StringComparison.OrdinalIgnoreCase) ||
string.Equals(status, "closed", StringComparison.OrdinalIgnoreCase))
{
throw new QueryNotFoundException($"Query has been discarded or canceled (status: {status}).");
}

if (string.Equals(status, "timeout", StringComparison.OrdinalIgnoreCase))
{
throw new AnalyticsTimeoutException("The query evaluation timed out on the server.");
}
if (string.Equals(status, "fatal", StringComparison.OrdinalIgnoreCase) ||

if (string.Equals(status, "fatal", StringComparison.OrdinalIgnoreCase) ||
string.Equals(status, "failed", StringComparison.OrdinalIgnoreCase) ||
string.Equals(status, "errors", StringComparison.OrdinalIgnoreCase))
{
Expand All @@ -414,7 +414,7 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio
}
throw new AnalyticsException($"Query execution failed on the server (status: {status}).");
}

throw new AnalyticsException($"Query status fetch failed with unrecognized status: {status}");
}

Expand All @@ -424,7 +424,7 @@ public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptio
throw new InvalidOperationException("Query status indicates success but no result handle was provided by the server.");
}

return new QueryResultHandle(resultHandle, handle.RequestId, this);
return new QueryResultHandle(resultHandle, handle.RequestId, root, this);
}
Comment thread
jeffrymorris marked this conversation as resolved.
catch (TaskCanceledException taskCanceledEx)
{
Expand All @@ -441,7 +441,7 @@ public async Task<IQueryResult> FetchResultsAsync(string requestId, string handl
var resultUri = Uri.TryCreate(handlePath, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps)
? absUri
: new Uri(_baseUri, handlePath);

var request = new HttpRequestMessage(HttpMethod.Get, resultUri);

LogFetchResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handlePath));
Expand Down Expand Up @@ -489,7 +489,7 @@ public async Task DiscardResultsAsync(string requestId, string handlePath, Disca
var resultUri = Uri.TryCreate(handlePath, UriKind.Absolute, out var absUri) && (absUri.Scheme == Uri.UriSchemeHttp || absUri.Scheme == Uri.UriSchemeHttps)
? absUri
: new Uri(_baseUri, handlePath);

var request = new HttpRequestMessage(HttpMethod.Delete, resultUri);

LogDiscardResultsRequest(_logger, _redactor.SystemData(resultUri), _redactor.SystemData(handlePath));
Expand Down Expand Up @@ -579,23 +579,23 @@ private static Exception ThrowTooManyRetries(ErrorContext errorContext)

#region Logging

[LoggerMessage(1, LogLevel.Debug, "Analytics query attempt {Attempt} starting for {ClientContextId}: {Statement} (elapsed: {Elapsed}ms)")]
private static partial void LogQueryAttemptStarting(ILogger logger, int attempt, string? clientContextId, Redacted<string> statement, double elapsed);
[LoggerMessage(1, LogLevel.Debug, "Analytics query attempt {Attempt} starting for {ClientContextId} on context [{QueryContext}]: {Statement} (elapsed: {Elapsed}ms)")]
private static partial void LogQueryAttemptStarting(ILogger logger, int attempt, string? clientContextId, string queryContext, Redacted<string> statement, double elapsed);

[LoggerMessage(2, LogLevel.Debug, "Received retriable server errors for ClientContextId {ClientContextId}, retrying...")]
private static partial void LogRetriableServerErrors(ILogger logger, string? clientContextId);

[LoggerMessage(3, LogLevel.Debug, "HttpRequestException is not retriable, failing immediately")]
private static partial void LogNonRetriableHttpException(ILogger logger);

[LoggerMessage(4, LogLevel.Debug, "Analytics query attempt {Attempt} for ClientContextId {ClientContextId}: {Statement} failed: {Error} (elapsed: {Elapsed}ms)")]
private static partial void LogQueryAttemptFailed(ILogger logger, Exception ex, int attempt, string? clientContextId, Redacted<string> statement, string error, double elapsed);
[LoggerMessage(4, LogLevel.Debug, "Analytics query attempt {Attempt} for ClientContextId {ClientContextId} on context [{QueryContext}]: {Statement} failed: {Error} (elapsed: {Elapsed}ms)")]
private static partial void LogQueryAttemptFailed(ILogger logger, Exception ex, int attempt, string? clientContextId, string queryContext, Redacted<string> statement, string error, double elapsed);

[LoggerMessage(5, LogLevel.Debug, "Async StartQuery attempt {Attempt} sending POST to {Uri} for {ClientContextId}: {Statement} (elapsed: {Elapsed}ms)")]
private static partial void LogAsyncStartQueryAttempt(ILogger logger, int attempt, Redacted<Uri> uri, string? clientContextId, Redacted<string> statement, double elapsed);
[LoggerMessage(5, LogLevel.Debug, "Async StartQuery attempt {Attempt} sending POST to {Uri} for {ClientContextId} on context [{QueryContext}]: {Statement} (elapsed: {Elapsed}ms)")]
private static partial void LogAsyncStartQueryAttempt(ILogger logger, int attempt, Redacted<Uri> uri, string? clientContextId, string queryContext, Redacted<string> statement, double elapsed);

[LoggerMessage(6, LogLevel.Debug, "Async StartQuery attempt {Attempt} for {ClientContextId}: {Statement} failed: {Error}")]
private static partial void LogAsyncStartQueryFailed(ILogger logger, Exception ex, int attempt, string? clientContextId, Redacted<string> statement, string error);
[LoggerMessage(6, LogLevel.Debug, "Async StartQuery attempt {Attempt} for {ClientContextId} on context [{QueryContext}]: {Statement} failed: {Error}")]
private static partial void LogAsyncStartQueryFailed(ILogger logger, Exception ex, int attempt, string? clientContextId, string queryContext, Redacted<string> statement, string error);

[LoggerMessage(7, LogLevel.Debug, "DiscardResults returned 404 for handle {Handle} — already discarded or canceled.")]
private static partial void LogDiscardResults404(ILogger logger, Redacted<string> handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* ************************************************************/
#endregion

using System;
using System.Collections.ObjectModel;

namespace Couchbase.AnalyticsClient.Internal.DI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* ************************************************************/
#endregion

using System;
using System.Diagnostics.CodeAnalysis;
using Couchbase.AnalyticsClient.Utils;

Expand Down
Loading
Loading