From a7335aa40f577f41632d4fc7fc008434c28c6741 Mon Sep 17 00:00:00 2001 From: Emilien Bevierre Date: Fri, 1 May 2026 18:20:42 +0200 Subject: [PATCH] fit: implement Async query and add Certificate Revocation option Signed-off-by: Emilien Bevierre --- .../Internal/Connections/ClusterConnection.cs | 19 +- .../Internal/Logging/LoggingUtils.cs | 2 +- .../AnalyticsPerformerCrossService.cs | 205 +++++++++++++++++- .../Services/AnalyticsPerformerService.cs | 54 ++++- .../ClusterNewInstanceRequestExtensions.cs | 42 +++- .../Internal/Utils/ExceptionExtensions.cs | 11 + .../Internal/Utils/OptionsExtensions.cs | 41 +++- .../Internal/Utils/PerformerQuery.cs | 31 ++- .../gRPC/columnar/columnar.caps.proto | 11 + .../columnar.cluster_management.proto | 24 ++ .../gRPC/columnar/columnar.errors.proto | 6 + .../gRPC/columnar/columnar.query.proto | 110 ++++++++++ .../gRPC/columnar/columnar.services.proto | 12 + .../Certificates/CertificateValidation.cs | 12 +- .../Options/SecurityOptions.cs | 20 ++ .../Couchbase.Analytics.UnitTests.csproj | 4 + 16 files changed, 583 insertions(+), 21 deletions(-) diff --git a/fit/Couchbase.Analytics.Performer/Internal/Connections/ClusterConnection.cs b/fit/Couchbase.Analytics.Performer/Internal/Connections/ClusterConnection.cs index 5202d4f..c5afebe 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Connections/ClusterConnection.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Connections/ClusterConnection.cs @@ -1,4 +1,6 @@ using Couchbase.AnalyticsClient; +using Couchbase.AnalyticsClient.Async; +using Couchbase.AnalyticsClient.HTTP; using Couchbase.AnalyticsClient.Options; using Couchbase.AnalyticsClient.Results; using Couchbase.Grpc.Protocol.Columnar; @@ -8,7 +10,7 @@ namespace Couchbase.Analytics.Performer.Internal.Connections; internal class ClusterConnection : IDisposable { private readonly ClusterNewInstanceRequest _clusterNewInstanceRequest; - private Cluster _cluster; + private readonly Cluster _cluster; private volatile bool _disposed; public ClusterConnection( @@ -32,6 +34,21 @@ public Task ExecuteScopeQuery(string database, string scope, strin ExecuteQueryAsync(statement, options); } + public Task StartClusterQuery(string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default) + { + return _cluster.StartQueryAsync(statement, options, cancellationToken); + } + + public Task StartScopeQuery(string database, string scope, string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default) + { + return _cluster.Database(database).Scope(scope).StartQueryAsync(statement, options, cancellationToken); + } + + public void UpdateCredential(ICredential credential) + { + _cluster.UpdateCredential(credential); + } + public void Dispose() { if (!_disposed) diff --git a/fit/Couchbase.Analytics.Performer/Internal/Logging/LoggingUtils.cs b/fit/Couchbase.Analytics.Performer/Internal/Logging/LoggingUtils.cs index ec679eb..472e8d3 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Logging/LoggingUtils.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Logging/LoggingUtils.cs @@ -66,4 +66,4 @@ public static LogEventLevel ParseLogLevelOrDefault(string? value, LogEventLevel return defaultLevel; } -} +} \ No newline at end of file diff --git a/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs b/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs index b04d295..2e47339 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs @@ -3,6 +3,7 @@ using Couchbase.Analytics.Performer.Internal.Connections; using Couchbase.Analytics.Performer.Internal.Modes; using Couchbase.Analytics.Performer.Internal.Utils; +using Couchbase.AnalyticsClient.Async; using Couchbase.AnalyticsClient.Options; using Couchbase.AnalyticsClient.Results; using Couchbase.Core.Utils; @@ -146,7 +147,7 @@ public override async Task QueryRow(QueryRowRequest request, S try { - response = await performerQuery.GetNextRow().ConfigureAwait(false); + response = await performerQuery.GetNextRow(request.ContentAs).ConfigureAwait(false); response.Metadata = new ResponseMetadata { ElapsedNanos = (long)stopwatch.Elapsed.TotalNanoseconds, @@ -254,6 +255,206 @@ public override Task CloseQueryResult(CloseQueryRe return Task.FromResult(response); } + public override async Task StartQuery(StartQueryRequest request, ServerCallContext context) + { + Serilog.Log.Information("Executing startQuery"); + + var response = new StartQueryResponse(); + + try + { + var options = request.Options.ToStartQueryOptions(); + var queryHandle = Guid.NewGuid().ToString(); + + QueryHandle sdkHandle; + switch (request.LevelCase) + { + case StartQueryRequest.LevelOneofCase.ClusterLevel: + var clusterConn = _clusterConnections[request.ClusterLevel.ClusterId]; + sdkHandle = await clusterConn.StartClusterQuery(request.Statement, options).ConfigureAwait(false); + break; + case StartQueryRequest.LevelOneofCase.ScopeLevel: + var scopeConn = _clusterConnections[request.ScopeLevel.ClusterId]; + sdkHandle = await scopeConn.StartScopeQuery( + request.ScopeLevel.DatabaseName, + request.ScopeLevel.ScopeName, + request.Statement, + options).ConfigureAwait(false); + break; + case StartQueryRequest.LevelOneofCase.None: + default: + throw new ArgumentException("No level specified for query"); + } + + var performerQuery = new PerformerQuery + { + AsyncHandle = sdkHandle, + CancellationTokenSource = new CancellationTokenSource(), + }; + _ongoingQueries[queryHandle] = performerQuery; + + response.QueryHandle = queryHandle; + } + catch (Exception ex) + { + response.Failure = ex.ToProtoError(); + } + + return response; + } + + public override async Task AsyncFetchStatus(AsyncFetchStatusRequest request, + ServerCallContext context) + { + Serilog.Log.Information("Executing asyncFetchStatus"); + + var response = new AsyncFetchStatusResponse(); + + try + { + if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery) + || performerQuery.AsyncHandle is null) + { + throw new KeyNotFoundException( + "Query handle not present in ongoing async queries: " + request.QueryHandle); + } + + var status = await performerQuery.AsyncHandle.FetchStatusAsync().ConfigureAwait(false); + performerQuery.AsyncStatus = status; + + response.QueryStatus = new AsyncFetchStatusResponse.Types.QueryStatusResult + { + ResultsReady = status.ResultsReady, + ToString_ = status.ToString() ?? string.Empty, + }; + } + catch (Exception ex) + { + response.Failure = ex.ToProtoError(); + } + + return response; + } + + public override Task AsyncQueryStatusResultHandle( + AsyncQueryStatusResultHandleRequest request, ServerCallContext context) + { + Serilog.Log.Information("Executing asyncQueryStatusResultHandle"); + var stopwatch = LightweightStopwatch.StartNew(); + var initiated = Timestamp.FromDateTime(DateTime.UtcNow); + + Exception? exception = null; + try + { + if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery) + || performerQuery.AsyncStatus is null) + { + throw new KeyNotFoundException( + "QueryStatus not present for query: " + request.QueryHandle); + } + + performerQuery.AsyncResultHandle = performerQuery.AsyncStatus.ResultHandle(); + } + catch (Exception ex) + { + exception = ex; + } + + var response = new EmptyResultOrFailureResponse() + .GetResponseMetaData(stopwatch, initiated, exception); + return Task.FromResult(response); + } + + public override async Task AsyncCancelHandle(AsyncCancelHandleRequest request, + ServerCallContext context) + { + Serilog.Log.Information("Executing asyncCancelHandle"); + var stopwatch = LightweightStopwatch.StartNew(); + var initiated = Timestamp.FromDateTime(DateTime.UtcNow); + + Exception? exception = null; + try + { + if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery) + || performerQuery.AsyncHandle is null) + { + throw new KeyNotFoundException( + "Query handle not present in ongoing async queries: " + request.QueryHandle); + } + + await performerQuery.AsyncHandle.CancelAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + exception = ex; + } + + return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception); + } + + public override async Task AsyncFetchResults(AsyncFetchResultsRequest request, + ServerCallContext context) + { + Serilog.Log.Information("Executing asyncFetchResults"); + var stopwatch = LightweightStopwatch.StartNew(); + var initiated = Timestamp.FromDateTime(DateTime.UtcNow); + + Exception? exception = null; + try + { + if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery) + || performerQuery.AsyncResultHandle is null) + { + throw new KeyNotFoundException( + "Result handle not present for query: " + request.QueryHandle); + } + + var fetchOptions = new FetchResultsOptions(); + if (request.Options?.Deserializer is not null) + { + fetchOptions = fetchOptions.WithDeserializer(request.Options.Deserializer.ToCore()); + } + + var queryResult = await performerQuery.AsyncResultHandle + .FetchResultsAsync(fetchOptions).ConfigureAwait(false); + + performerQuery.QueryTask = Task.FromResult(queryResult); + } + catch (Exception ex) + { + exception = ex; + } + + return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception); + } + + public override async Task AsyncDiscardResults(AsyncDiscardResultsRequest request, + ServerCallContext context) + { + Serilog.Log.Information("Executing asyncDiscardResults"); + var stopwatch = LightweightStopwatch.StartNew(); + var initiated = Timestamp.FromDateTime(DateTime.UtcNow); + + Exception? exception = null; + try + { + if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery) + || performerQuery.AsyncResultHandle is null) + { + throw new KeyNotFoundException( + "Result handle not present for query: " + request.QueryHandle); + } + + await performerQuery.AsyncResultHandle.DiscardResultsAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + exception = ex; + } + + return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception); + } + public override Task CloseAllQueryResults(CloseAllQueryResultsRequest request, ServerCallContext context) { @@ -268,4 +469,4 @@ public override Task CloseAllQueryResults(CloseAll return Task.FromResult(response); } -} +} \ No newline at end of file diff --git a/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerService.cs b/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerService.cs index 2798956..a416df5 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerService.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerService.cs @@ -9,18 +9,30 @@ using Couchbase.Grpc.Protocol.Shared; using Google.Protobuf.WellKnownTypes; using Grpc.Core; +using Microsoft.Extensions.Logging; +using Serilog; using Exception = System.Exception; namespace Couchbase.Analytics.Performer.Internal.Services; internal class AnalyticsPerformerService : ColumnarService.ColumnarServiceBase { - public AnalyticsPerformerService(ConcurrentDictionary clusters) + public AnalyticsPerformerService( + ConcurrentDictionary clusters) { Clusters = clusters; } private ConcurrentDictionary Clusters { get; } + // The Cluster owns its ILoggerFactory and disposes it on cluster Dispose, so we hand each + // cluster its own factory that bridges to the global Serilog logger. + private static ILoggerFactory CreateBridgedLoggerFactory() => + LoggerFactory.Create(builder => + { + builder.ClearProviders(); + builder.AddSerilog(); + }); + public override Task ClusterNewInstance(ClusterNewInstanceRequest request, ServerCallContext context) { @@ -46,7 +58,8 @@ public override Task ClusterNewInstance(ClusterNew request.ConnectionString); var cluster = Cluster.Create(request.ConnectionString, - request.ToSdkCredential(), request.ToSdkQueryOptions()); + request.ToSdkCredential(), + request.ToSdkQueryOptions().WithLogging(CreateBridgedLoggerFactory())); return new(request, cluster); }); @@ -84,7 +97,14 @@ public override Task FetchPerformerCaps(FetchPerform var response = new FetchPerformerCapsResponse { AnalyticsProduct = AnalyticsProduct.Analytics, - Sdk = SDK.Net + Sdk = SDK.Net, + SupportsServerAsyncQueries = true, + CredentialSupport = new CredentialSupport + { + SupportsJwtCredential = true, + SupportsCertificateCredential = true, + SupportsSetCredential = true, + } }; try @@ -230,6 +250,34 @@ public override Task CloseAllClusters(CloseAllColu return Task.FromResult(response); } + public override Task SetCredential(SetCredentialRequest request, + ServerCallContext context) + { + Serilog.Log.Information("Calling SetCredential for {ClusterId}", request.ExecutionContext.ClusterId); + var stopWatch = LightweightStopwatch.StartNew(); + var initiated = Timestamp.FromDateTime(DateTime.UtcNow); + + var response = new EmptyResultOrFailureResponse(); + try + { + if (!Clusters.TryGetValue(request.ExecutionContext.ClusterId, out var connection)) + { + throw new KeyNotFoundException( + "Cluster id not present in active clusters: " + request.ExecutionContext.ClusterId); + } + + connection.UpdateCredential(request.Credential.ToSdkCredential()); + response.GetResponseMetaData(stopWatch, initiated); + } + catch (Exception ex) + { + response.GetResponseMetaData(stopWatch, initiated, ex); + Serilog.Log.Error(ex, "SetCredential failed for {ClusterId}", request.ExecutionContext.ClusterId); + } + + return Task.FromResult(response); + } + public override Task Echo(EchoRequest request, ServerCallContext context) { Serilog.Log.Information("Calling Echo - {TestName} | {Message}", request.TestName, request.Message); diff --git a/fit/Couchbase.Analytics.Performer/Internal/Utils/ClusterNewInstanceRequestExtensions.cs b/fit/Couchbase.Analytics.Performer/Internal/Utils/ClusterNewInstanceRequestExtensions.cs index b1461ea..c6773a0 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Utils/ClusterNewInstanceRequestExtensions.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Utils/ClusterNewInstanceRequestExtensions.cs @@ -1,4 +1,5 @@ using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; using Couchbase.AnalyticsClient.HTTP; using Couchbase.AnalyticsClient.Options; using Couchbase.Grpc.Protocol.Columnar; @@ -7,11 +8,31 @@ namespace Couchbase.Analytics.Performer.Internal.Utils; public static class ClusterNewInstanceRequestExtensions { - public static Credential ToSdkCredential( + public static ICredential ToSdkCredential( this ClusterNewInstanceRequest request) { - return new Credential(request.Credential.UsernameAndPassword.Username, - request.Credential.UsernameAndPassword.Password); + return request.Credential.ToSdkCredential(); + } + + public static ICredential ToSdkCredential( + this ClusterNewInstanceRequest.Types.Credential credential) + { + switch (credential.TypeCase) + { + case ClusterNewInstanceRequest.Types.Credential.TypeOneofCase.UsernameAndPassword: + return new Credential(credential.UsernameAndPassword.Username, + credential.UsernameAndPassword.Password); + case ClusterNewInstanceRequest.Types.Credential.TypeOneofCase.JwtAuth: + return new JwtCredential(credential.JwtAuth.Jwt); + case ClusterNewInstanceRequest.Types.Credential.TypeOneofCase.CertificateAuth: + var x509 = X509Certificate2.CreateFromPem( + credential.CertificateAuth.Cert, + credential.CertificateAuth.Key); + return new CertificateCredential(x509); + case ClusterNewInstanceRequest.Types.Credential.TypeOneofCase.None: + default: + throw new ArgumentException("No credential type specified"); + } } public static ClusterOptions ToSdkQueryOptions(this ClusterNewInstanceRequest request) @@ -61,13 +82,16 @@ private static TimeoutOptions ToCore(this ClusterNewInstanceRequest.Types.Option { timeoutOptions = timeoutOptions.WithQueryTimeout(protoTimeout.QueryTimeout.ToTimeSpan()); } + // handle_timeout is not supported by the .NET SDK; intentionally ignored. return timeoutOptions; } private static SecurityOptions ToCore( this ClusterNewInstanceRequest.Types.Options.Types.SecurityOptions? protoSecurity) { - var securityOptions = new SecurityOptions(); + // FIT clusters (e.g. dinocluster) issue server certs without reachable OCSP/CRL endpoints, + // so revocation checking is always disabled in the performer. + var securityOptions = new SecurityOptions().WithEnableCertificateRevocationCheck(false); if (protoSecurity is null) return securityOptions; if (protoSecurity.TrustOnlyPlatform) @@ -80,7 +104,13 @@ private static SecurityOptions ToCore( } else if (protoSecurity.HasTrustOnlyPemString) { - securityOptions = securityOptions.WithTrustOnlyPemString(protoSecurity.TrustOnlyPemString); + // The driver may send a bundle of multiple PEM-encoded certificates concatenated + // together (e.g. server cert + intermediate). + var collection = new X509Certificate2Collection(); + collection.ImportFromPem(protoSecurity.TrustOnlyPemString); + securityOptions = collection.Count > 1 + ? securityOptions.WithTrustOnlyCertificates(collection) + : securityOptions.WithTrustOnlyPemString(protoSecurity.TrustOnlyPemString); } if (protoSecurity.HasDisableServerCertificateVerification) { @@ -105,4 +135,4 @@ private static SecurityOptions ToCore( } -} +} \ No newline at end of file diff --git a/fit/Couchbase.Analytics.Performer/Internal/Utils/ExceptionExtensions.cs b/fit/Couchbase.Analytics.Performer/Internal/Utils/ExceptionExtensions.cs index c6945c6..de9d421 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Utils/ExceptionExtensions.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Utils/ExceptionExtensions.cs @@ -2,6 +2,7 @@ using Couchbase.Grpc.Protocol.Columnar; using InvalidCredentialException = Couchbase.AnalyticsClient.Exceptions.InvalidCredentialException; using QueryException = Couchbase.AnalyticsClient.Exceptions.QueryException; +using QueryNotFoundException = Couchbase.AnalyticsClient.Exceptions.QueryNotFoundException; namespace Couchbase.Analytics.Performer.Internal.Utils; @@ -42,6 +43,15 @@ public static Error ToProtoError(this Exception exception) }; } + if (exception is QueryNotFoundException) + { + columnarError.SubException = new SubColumnarError + { + QueryNotFoundException = + new Grpc.Protocol.Columnar.QueryNotFoundException() + }; + } + if (exception is AnalyticsTimeoutException timeoutEx) { columnarError.SubException = new SubColumnarError @@ -87,6 +97,7 @@ private static bool IsColumnarError(this Exception exception) return false; case InvalidCredentialException invalidCredentialException: case QueryException queryException: + case QueryNotFoundException queryNotFoundException: case AnalyticsTimeoutException timeoutException: case AnalyticsException analyticsException: return true; diff --git a/fit/Couchbase.Analytics.Performer/Internal/Utils/OptionsExtensions.cs b/fit/Couchbase.Analytics.Performer/Internal/Utils/OptionsExtensions.cs index f066b03..213c8ff 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Utils/OptionsExtensions.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Utils/OptionsExtensions.cs @@ -10,6 +10,45 @@ namespace Couchbase.Analytics.Performer.Internal.Utils; internal static class OptionsExtensions { + public static StartQueryOptions ToStartQueryOptions(this StartQueryRequest.Types.Options? protoOptions) + { + var options = new StartQueryOptions(); + if (protoOptions is null) return options; + + if (protoOptions.HasReadonly) options = options.WithReadOnly(protoOptions.Readonly); + if (protoOptions.HasScanConsistency) options = options.WithScanConsistency(protoOptions.ScanConsistency.ToCore()); + if (protoOptions.Timeout is not null) options = options.WithQueryTimeout(protoOptions.Timeout.ToTimeSpan()); + if (protoOptions.HasMaxRetries) options = options.WithMaxRetries((uint)protoOptions.MaxRetries); + + if (protoOptions.ParametersPositional is not null) + { + options = options.WithPositionalParameters(protoOptions.ParametersPositional.Values.ToCore()); + } + + if (protoOptions.ParametersNamed is not null) + { + options = options.WithNamedParameters(protoOptions.ParametersNamed.Fields.ToCore()); + } + + if (protoOptions.Raw is not null) + { + options = options.WithRawParameters(protoOptions.Raw.Fields.ToCore()); + } + + return options; + } + + private static QueryScanConsistency ToCore( + this StartQueryRequest.Types.Options.Types.ScanConsistency protoScanConsistency) + { + return protoScanConsistency switch + { + StartQueryRequest.Types.Options.Types.ScanConsistency.NotBounded => QueryScanConsistency.NotBounded, + StartQueryRequest.Types.Options.Types.ScanConsistency.RequestPlus => QueryScanConsistency.RequestPlus, + _ => throw new ArgumentOutOfRangeException(nameof(protoScanConsistency), "Could not parse ScanConsistency") + }; + } + public static QueryOptions ToQueryOptions(this ExecuteQueryRequest.Types.Options? protoOptions) { var queryOptions = new QueryOptions(); @@ -40,7 +79,7 @@ public static QueryOptions ToQueryOptions(this ExecuteQueryRequest.Types.Options return queryOptions; } - private static IDeserializer ToCore(this Deserializer protoDeserializer) + internal static IDeserializer ToCore(this Deserializer protoDeserializer) { return protoDeserializer.TypeCase switch { diff --git a/fit/Couchbase.Analytics.Performer/Internal/Utils/PerformerQuery.cs b/fit/Couchbase.Analytics.Performer/Internal/Utils/PerformerQuery.cs index 0ed3547..787cc2e 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Utils/PerformerQuery.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Utils/PerformerQuery.cs @@ -1,3 +1,4 @@ +using Couchbase.AnalyticsClient.Async; using Couchbase.AnalyticsClient.Results; using Couchbase.Grpc.Protocol.Columnar; @@ -12,22 +13,39 @@ public PerformerQuery(Task queryTask, ContentAs contentAs, Cancell CancellationTokenSource = cancellationTokenSource; } - private Task QueryTask { get; } + public PerformerQuery() + { + } + + public Task? QueryTask { get; set; } private IQueryResult? QueryResult { get; set; } - public CancellationTokenSource? CancellationTokenSource { get; } - public ContentAs ContentAs { get; set; } + public CancellationTokenSource? CancellationTokenSource { get; set; } + public ContentAs? ContentAs { get; set; } + + // Server-async fields + public QueryHandle? AsyncHandle { get; set; } + public QueryStatus? AsyncStatus { get; set; } + public QueryResultHandle? AsyncResultHandle { get; set; } private IAsyncEnumerator? _cachedEnumerator; public async Task GetQueryResult() { + if (QueryTask is null) + { + throw new InvalidOperationException("No query task associated with this query handle."); + } return QueryResult ??= await QueryTask.ConfigureAwait(false); } - public async Task GetNextRow() + public async Task GetNextRow(ContentAs? rowContentAs = null) { if (QueryResult is null) { + if (QueryTask is null) + { + throw new InvalidOperationException("No query task associated with this query handle."); + } // Do not wrap in try/catch since we want to bubble the Exception to the caller // so it can convert it appropriately (e.g. "Invalid Credentials" and such) QueryResult = await QueryTask.ConfigureAwait(false); @@ -46,6 +64,9 @@ public async Task GetNextRow() } } + var effectiveContentAs = rowContentAs ?? ContentAs + ?? throw new InvalidOperationException("No ContentAs available for row deserialization."); + QueryRowResponse response; if (await _cachedEnumerator!.MoveNextAsync().ConfigureAwait(false)) @@ -56,7 +77,7 @@ public async Task GetNextRow() { Row = new QueryRowResponse.Types.Row() { - RowContent = _cachedEnumerator.Current.ContentAsToAnalyticsRow(ContentAs) + RowContent = _cachedEnumerator.Current.ContentAsToAnalyticsRow(effectiveContentAs) } } }; diff --git a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.caps.proto b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.caps.proto index 7fd48df..579b78a 100644 --- a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.caps.proto +++ b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.caps.proto @@ -10,6 +10,12 @@ message PerApiElementGeneric { // Currently empty. } +message CredentialSupport { + bool supports_jwt_credential = 1; + bool supports_certificate_credential = 2; + bool supports_set_credential = 3; +} + message PerApiElementClusterNewInstance { // The SDK implements a dispatch timeout, which is optional. The SDK may or may not implement this feature. bool supports_dispatch_timeout = 1; @@ -261,4 +267,9 @@ message FetchPerformerCapsResponse { map sdk_connection_error = 7; AnalyticsProduct analytics_product = 8; + + // WWhether the SDK supports async queries, that is startQuery. + bool supports_server_async_queries = 9; + + CredentialSupport credential_support = 10; } diff --git a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.cluster_management.proto b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.cluster_management.proto index 2407261..fa3f51c 100644 --- a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.cluster_management.proto +++ b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.cluster_management.proto @@ -27,6 +27,7 @@ message ClusterNewInstanceRequest { google.protobuf.Duration connect_timeout = 1; google.protobuf.Duration dispatch_timeout = 2; google.protobuf.Duration query_timeout = 3; + google.protobuf.Duration handle_timeout = 4; } optional Deserializer deserializer = 1; @@ -41,8 +42,25 @@ message ClusterNewInstanceRequest { string password = 2; } + // Authenticates using a JSON Web Token (JWT). + // The SDK sets an "Authorization: Bearer " header on every HTTP request. + message JwtAuth { + string jwt = 1; + } + + // Authenticates using a client certificate (mTLS). + // The SDK authenticates the client during the TLS handshake. + message CertificateAuth { + // PEM-encoded X509 client certificate + string cert = 1; + // PEM-encoded private key + string key = 2; + } + oneof type { UsernameAndPassword username_and_password = 1; + JwtAuth jwt_auth = 2; + CertificateAuth certificate_auth = 3; } } @@ -81,3 +99,9 @@ message ClusterCloseRequest { message CloseAllColumnarClustersRequest { ExecutionContext execution_context = 1; } + +// Requests the performer to update the credential on an existing cluster connection. +message SetCredentialRequest { + ExecutionContextClusterLevel execution_context = 1; + ClusterNewInstanceRequest.Credential credential = 2; +} diff --git a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.errors.proto b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.errors.proto index a6d9bc0..3d32aae 100644 --- a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.errors.proto +++ b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.errors.proto @@ -55,12 +55,18 @@ message InvalidCredentialException { // No additional info here yet } +// Used to represent the detailed QueryNotFoundException sub-ColumnarError type +message QueryNotFoundException { + // No additional info here yet +} + // Represents the ColumnarError's sub types message SubColumnarError { oneof sub_error { QueryException query_exception = 1; TimeoutException timeout_exception = 2; InvalidCredentialException invalid_credential_exception = 3; + QueryNotFoundException query_not_found_exception = 4; } } diff --git a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.query.proto b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.query.proto index c925c13..676af88 100644 --- a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.query.proto +++ b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.query.proto @@ -257,6 +257,10 @@ message QueryResultMetadataResponse { // Executes nothing on the SDK, and is purely for cleanup purposes. Tells the performer a query_handle is no longer needed. // The performer should not cancel anything as a result of this call, or block until row iteration or metadata is complete, or // similar. It may only do memory cleanup, remove the query_handle from internal collections, and do similar "tidyup". +// +// This should clean up ALL state associated with the query_handle, regardless of whether it originated from +// executeQuery() or startQuery(). For startQuery() handles, this includes any stored QueryHandle, QueryStatus, +// QueryResultHandle, and QueryResult. message CloseQueryResultRequest { string query_handle = 1; } @@ -264,3 +268,109 @@ message CloseQueryResultRequest { // Same as CloseQueryResultRequest except for all query_handles. message CloseAllQueryResultsRequest { } + +// StartQueryRequest +// ================== +// Tells the performer it should **synchronously** call [instance|scope].startQuery(). +// +// Unlike ExecuteQueryRequest, this is a synchronous call that should return only once startQuery() has returned, +// and it should return a UUID representing the query_handle from that call. +// All subsequent calls from the driver that reference that query_handle. +message StartQueryRequest { + message Options { + enum ScanConsistency { + SCAN_CONSISTENCY_NOT_BOUNDED = 0; + SCAN_CONSISTENCY_REQUEST_PLUS = 1; + } + + optional google.protobuf.ListValue parameters_positional = 2; + optional google.protobuf.Struct parameters_named = 3; + optional bool readonly = 4; + optional ScanConsistency scan_consistency = 5; + optional google.protobuf.Struct raw = 6; + google.protobuf.Duration timeout = 7; + optional int32 max_retries = 8; + } + + oneof level { + ExecutionContextClusterLevel cluster_level = 1; + ExecutionContextScopeLevel scope_level = 2; + } + string statement = 3; + optional Options options = 4; +} + +message StartQueryResponse { + oneof result { + string query_handle = 1; + Error failure = 2; + } +} + +// AsyncFetchStatusRequest +// ================== +// The driver is requesting the SDK call the FetchStatus request on QueryHandle. +// The performer should block until this (or failure) has been returned from that call. +// The performer must hold onto the QueryStatus object keyed by the query_handle, regardless of ResultsReady(). +message AsyncFetchStatusRequest { + string query_handle = 1; +} + +message AsyncFetchStatusResponse { + message QueryStatusResult { + // The result of calling ResultsReady() on the QueryStatus. + bool results_ready = 1; + // The string representation of the QueryStatus (e.g. via toString()). + string to_string = 2; + } + + oneof result { + QueryStatusResult query_status = 1; + Error failure = 2; + } +} + +// AsyncQueryStatusResultHandleRequest +// ================== +// Calls ResultHandle() on the QueryStatus returned from the latest AsyncFetchStatus call. +// The driver may call this when ResultsReady() was false, to test error handling. +// If the driver calls this before an AsyncFetchStatus call, this can be treated as a driver bug. +// +// On success, the performer stores the SDK's QueryResultHandle internally, keyed by query_handle. +message AsyncQueryStatusResultHandleRequest { + string query_handle = 1; +} + +// AsyncCancelHandleRequest +// ================== +// Calls the cancel function on the QueryHandle returned from startQuery(). +message AsyncCancelHandleRequest { + string query_handle = 1; +} + +// AsyncFetchResultsRequest +// ================== +// The driver is requesting the performer call FetchResults() on the internally-stored QueryResultHandle. +// The performer should block until this (or failure) has been returned from that call. +// +// On success, the performer should store the resulting QueryResult in the same way it would store one from +// executeQuery(). I.e. keyed by the query_handle and available for subsequent QueryRow, QueryMetadata, +// QueryCancel, and CloseQueryResult calls, exactly as if it came from the executeQuery() path. + +// Some modes of some SDKs will not have a QueryResult, in which case this should be surfaced in the performer caps, +// and the driver should not be sending it. +message AsyncFetchResultsRequest { + message Options { + optional Deserializer deserializer = 1; + } + + string query_handle = 1; + optional Options options = 2; +} + +// AsyncDiscardResultsRequest +// ================== +// Calls the discard function on the QueryResultHandle. +message AsyncDiscardResultsRequest { + string query_handle = 1; +} diff --git a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.services.proto b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.services.proto index 9e0da63..fc4ba59 100644 --- a/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.services.proto +++ b/fit/Couchbase.Analytics.Performer/gRPC/columnar/columnar.services.proto @@ -24,11 +24,16 @@ service ColumnarService { rpc ClusterNewInstance (ClusterNewInstanceRequest) returns (EmptyResultOrFailureResponse); rpc ClusterClose (ClusterCloseRequest) returns (EmptyResultOrFailureResponse); rpc CloseAllClusters (CloseAllColumnarClustersRequest) returns (EmptyResultOrFailureResponse); + + // Updates the credential on a previously-created cluster connection. + // Maps to the EA RFC's `cluster.credential(newCredential)`. + rpc SetCredential (SetCredentialRequest) returns (EmptyResultOrFailureResponse); } // To better permit DRY, instance.executeQuery() and scope.executeQuery() are here rather than in their respective services. service ColumnarCrossService { rpc ExecuteQuery (ExecuteQueryRequest) returns (ExecuteQueryResponse); + rpc StartQuery (StartQueryRequest) returns (StartQueryResponse); // Get the QueryResult from the executeQuery() rpc QueryResult (QueryResultRequest) returns (EmptyResultOrFailureResponse); @@ -36,6 +41,13 @@ service ColumnarCrossService { rpc QueryCancel (QueryCancelRequest) returns (EmptyResultOrFailureResponse); rpc QueryMetadata (QueryMetadataRequest) returns (QueryResultMetadataResponse); + // RPCs pertaining to server async query execution via StartQuery() + rpc AsyncFetchStatus (AsyncFetchStatusRequest) returns (AsyncFetchStatusResponse); + rpc AsyncCancelHandle (AsyncCancelHandleRequest) returns (EmptyResultOrFailureResponse); + rpc AsyncQueryStatusResultHandle (AsyncQueryStatusResultHandleRequest) returns (EmptyResultOrFailureResponse); + rpc AsyncFetchResults (AsyncFetchResultsRequest) returns (EmptyResultOrFailureResponse); + rpc AsyncDiscardResults (AsyncDiscardResultsRequest) returns (EmptyResultOrFailureResponse); + rpc CloseQueryResult (CloseQueryResultRequest) returns (EmptyResultOrFailureResponse); rpc CloseAllQueryResults (CloseAllQueryResultsRequest) returns (EmptyResultOrFailureResponse); } diff --git a/src/Couchbase.Analytics/Certificates/CertificateValidation.cs b/src/Couchbase.Analytics/Certificates/CertificateValidation.cs index 00c6c6a..3562654 100644 --- a/src/Couchbase.Analytics/Certificates/CertificateValidation.cs +++ b/src/Couchbase.Analytics/Certificates/CertificateValidation.cs @@ -127,7 +127,8 @@ public static RemoteCertificateValidationCallback CreateRemoteCertificateValidat return false; // Error loading custom certificates } - return ValidateAgainstCustomTrust(certificate, chain, trustedCertificates, logger); + return ValidateAgainstCustomTrust(certificate, chain, trustedCertificates, + securityOptions.EnableCertificateRevocationCheck, logger); } }; } @@ -179,7 +180,8 @@ private static bool ValidateWithPlatformAndCapellaTrust( // Second check: Validate against Capella CA var capellaTrust = new X509Certificate2Collection { CapellaCaCert }; - var capellaValid = ValidateAgainstCustomTrust(certificate, chain, capellaTrust, logger); + var capellaValid = ValidateAgainstCustomTrust( + certificate, chain, capellaTrust, enableRevocationCheck: true, logger); if (capellaValid) { @@ -249,6 +251,7 @@ private static bool ValidateAgainstCustomTrust( X509Certificate? certificate, X509Chain? chain, X509Certificate2Collection trustedCertificates, + bool enableRevocationCheck, ILogger logger) { if (certificate == null) @@ -277,6 +280,11 @@ private static bool ValidateAgainstCustomTrust( // Allow unknown certificate authority since we're using a custom trust validationChain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority; + if (!enableRevocationCheck) + { + validationChain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck; + } + // Build and validate the certificate chain var chainValid = validationChain.Build(serverCert); diff --git a/src/Couchbase.Analytics/Options/SecurityOptions.cs b/src/Couchbase.Analytics/Options/SecurityOptions.cs index 4a10aeb..3f0d1be 100644 --- a/src/Couchbase.Analytics/Options/SecurityOptions.cs +++ b/src/Couchbase.Analytics/Options/SecurityOptions.cs @@ -58,6 +58,17 @@ public record SecurityOptions /// internal SslProtocols SslProtocols { get; init; } = SslProtocols.Tls13 | SslProtocols.Tls12; + /// + /// If true, the SDK performs revocation checking (OCSP/CRL) on the server certificate chain + /// during custom-trust validation. Defaults to true. + /// + /// + /// Disable this when connecting to deployments whose certificates do not publish + /// reachable OCSP/CRL endpoints (e.g. internal test clusters). When disabled, the chain + /// is still validated against the configured trust anchors; only revocation is skipped. + /// + internal bool EnableCertificateRevocationCheck { get; init; } = true; + internal string? PathToPemFileValue => PemFilePath; internal string? CertificateValue => PemString; internal X509Certificate2Collection? CertificatesValue => Certificates; @@ -146,4 +157,13 @@ public SecurityOptions WithSslProtocols(SslProtocols protocols) { return this with { SslProtocols = protocols }; } + + /// + /// Enables or disables OCSP/CRL revocation checking during custom-trust certificate + /// validation. Defaults to true. + /// + public SecurityOptions WithEnableCertificateRevocationCheck(bool enable) + { + return this with { EnableCertificateRevocationCheck = enable }; + } } diff --git a/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj b/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj index 3bf3de7..a69537a 100644 --- a/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj +++ b/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj @@ -39,5 +39,9 @@ + + + + \ No newline at end of file