Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@ namespace Couchbase.Analytics.Performer.Internal.Connections;
internal class ClusterConnection : IDisposable
{
private readonly ClusterNewInstanceRequest _clusterNewInstanceRequest;
private Cluster _cluster;
internal Cluster Cluster;
private volatile bool _disposed;

public ClusterConnection(
ClusterNewInstanceRequest clusterNewInstanceRequest, Cluster cluster)
{
_clusterNewInstanceRequest = clusterNewInstanceRequest;
_cluster = cluster;
Cluster = cluster;
}

public Task<IQueryResult> ExecuteClusterQuery(string statement, QueryOptions? options = null, CancellationToken? cancellationToken = null)
{
return _cluster.
return Cluster.
ExecuteQueryAsync(statement, options);
}

public Task<IQueryResult> ExecuteScopeQuery(string database, string scope, string statement, QueryOptions? options = null, CancellationToken? cancellationToken = null)
{
return _cluster.
return Cluster.
Database(database).
Scope(scope).
ExecuteQueryAsync(statement, options);
Expand All @@ -37,7 +37,7 @@ public void Dispose()
if (!_disposed)
{
_disposed = true;
_cluster?.Dispose();
Cluster?.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ public static LogEventLevel ParseLogLevelOrDefault(string? value, LogEventLevel

return defaultLevel;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ public AnalyticsPerformerService(ConcurrentDictionary<string, ClusterConnection>
}
private ConcurrentDictionary<string, ClusterConnection> Clusters { get; }

public override Task<EmptyResultOrFailureResponse> SetCredential(SetCredentialRequest request, ServerCallContext context)
{
var response = new EmptyResultOrFailureResponse();
var stopWatch = LightweightStopwatch.StartNew();
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);

try
{
var newCredential = request.Credential.ToCore();

if (Clusters.TryGetValue(request.ExecutionContext.ClusterId, out var clusterConnection))
{
clusterConnection.Cluster.UpdateCredential(newCredential);
}
response = response.GetResponseMetaData(stopWatch, initiated);
}
catch (Exception ex)
{
response.GetResponseMetaData(stopWatch, initiated, ex);
}

return Task.FromResult(response);
}

public override Task<EmptyResultOrFailureResponse> ClusterNewInstance(ClusterNewInstanceRequest request,
ServerCallContext context)
{
Expand All @@ -32,7 +56,6 @@ public override Task<EmptyResultOrFailureResponse> ClusterNewInstance(ClusterNew
var response = new EmptyResultOrFailureResponse();
try
{
//how to make this work in C#? or does this work?
foreach (var tunable in request.Tunables)
{
Environment.SetEnvironmentVariable(tunable.Key, tunable.Value);
Expand All @@ -46,21 +69,11 @@ public override Task<EmptyResultOrFailureResponse> ClusterNewInstance(ClusterNew
request.ConnectionString);

var cluster = Cluster.Create(request.ConnectionString,
request.ToSdkCredential(), request.ToSdkQueryOptions());
request.Credential.ToCore(), request.ToSdkQueryOptions());

return new(request, cluster);
});

/* context.GetHttpContext().Response.OnCompleted(() =>
{
foreach (var tunable in request.Tunables)
{
Environment.SetEnvironmentVariable(tunable.Key, null);
}

return Task.CompletedTask;
});*/

Serilog.Log.Information(
"Created or using new cluster instance in {Seconds}: {ConnectionString}",
stopWatch.Elapsed, request.ConnectionString);
Expand Down Expand Up @@ -89,6 +102,13 @@ public override Task<FetchPerformerCapsResponse> FetchPerformerCaps(FetchPerform

try
{
response.CredentialSupport = new CredentialSupport
{
SupportsJwtCredential = true,
SupportsCertificateCredential = true,
SupportsSetCredential = true
};

response.ClusterNewInstance.Add((int)Mode.PushBasedStreaming, new PerApiElementClusterNewInstance { SupportsDispatchTimeout = true });
response.ClusterNewInstance.Add((int)Mode.Buffered, new PerApiElementClusterNewInstance { SupportsDispatchTimeout = true });

Expand Down Expand Up @@ -158,13 +178,9 @@ public override Task<FetchPerformerCapsResponse> FetchPerformerCaps(FetchPerform
{
statusCode = StatusCode.Unimplemented;
}
response.SdkConnectionError.Add((int)statusCode, new SdkConnectionError());// ¯\_(ツ)_/¯
response.SdkConnectionError.Add((int)statusCode, new SdkConnectionError());
}

//java has observer.onNext and observer.onCompleted here
//C# has context.GetHttpContext().Response.OnCompleted and context.GetHttpContext().Response.OnStarting
//Do we have to provide a func here for each or just use the defaults?

return Task.FromResult(response);
}

Expand Down Expand Up @@ -235,4 +251,4 @@ public override Task<EchoResponse> Echo(EchoRequest request, ServerCallContext c
Serilog.Log.Information("Calling Echo - {TestName} | {Message}", request.TestName, request.Message);
return Task.FromResult(new EchoResponse());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
using System.Security.Authentication;
using Couchbase.AnalyticsClient.HTTP;
using Couchbase.AnalyticsClient.Options;
using Couchbase.Grpc.Protocol.Columnar;

namespace Couchbase.Analytics.Performer.Internal.Utils;

public static class ClusterNewInstanceRequestExtensions
{
public static Credential ToSdkCredential(
this ClusterNewInstanceRequest request)
{
return new Credential(request.Credential.UsernameAndPassword.Username,
request.Credential.UsernameAndPassword.Password);
}

public static ClusterOptions ToSdkQueryOptions(this ClusterNewInstanceRequest request)
{
var protoOptions = request.Options;
Expand Down Expand Up @@ -72,7 +64,8 @@ private static SecurityOptions ToCore(

if (protoSecurity.TrustOnlyPlatform)
{
// what is this?
// Trust only certificates installed on the machine's trust store,
// nothing to do here
}
else if (protoSecurity.HasTrustOnlyCapella)
{
Expand Down Expand Up @@ -105,4 +98,4 @@ private static SecurityOptions ToCore(
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Security.Cryptography.X509Certificates;
using Couchbase.AnalyticsClient.HTTP;
using Couchbase.Grpc.Protocol.Columnar;

namespace Couchbase.Analytics.Performer.Internal.Utils;

internal static class CertificateUtils
{
internal static X509Certificate2 CreateCertificateFromStrings(string certPem, string keyPem)
{
var certPath = Path.GetTempFileName();
var keyPath = Path.GetTempFileName();
var pfxPath = Path.GetTempFileName();
try
{
File.WriteAllText(certPath, certPem);
File.WriteAllText(keyPath, keyPem);
using var pemCert = X509Certificate2.CreateFromPemFile(certPath, keyPath);
var pfxBytes = pemCert.Export(X509ContentType.Pfx);
File.WriteAllBytes(pfxPath, pfxBytes);
return X509CertificateLoader.LoadPkcs12FromFile(pfxPath, password: null);
}
finally
{
File.Delete(certPath);
File.Delete(keyPath);
File.Delete(pfxPath);
}
}

internal static ICredential ToCore(this ClusterNewInstanceRequest.Types.Credential protoCredential)
{
return protoCredential.TypeCase switch
{
ClusterNewInstanceRequest.Types.Credential.TypeOneofCase.UsernameAndPassword =>
Credential.Create(
protoCredential.UsernameAndPassword.Username,
protoCredential.UsernameAndPassword.Password),
ClusterNewInstanceRequest.Types.Credential.TypeOneofCase.JwtAuth =>
JwtCredential.Create(protoCredential.JwtAuth.Jwt),
ClusterNewInstanceRequest.Types.Credential.TypeOneofCase.CertificateAuth =>
CertificateCredential.Create(
CreateCertificateFromStrings(
protoCredential.CertificateAuth.Cert,
protoCredential.CertificateAuth.Key)),
_ => throw new ArgumentOutOfRangeException()
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ message PerApiElementGeneric {
// Currently empty.
}

message CredentialSupport {
bool supports_jwt_credential = 1;
bool supports_certificate_credential = 2;
bool supports_set_credential = 3;
}

message PerApiElementClusterNewInstance {
// The SDK implements a dispatch timeout, which is optional. The SDK may or may not implement this feature.
bool supports_dispatch_timeout = 1;
Expand Down Expand Up @@ -261,4 +267,6 @@ message FetchPerformerCapsResponse {
map<int32, SdkConnectionError> sdk_connection_error = 7;

AnalyticsProduct analytics_product = 8;
}

CredentialSupport credential_support = 9;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,25 @@ message ClusterNewInstanceRequest {
string password = 2;
}

// Authenticates using a JSON Web Token (JWT).
// The SDK sets an "Authorization: Bearer <jwt>" header on every HTTP request.
message JwtAuth {
string jwt = 1;
}

// Authenticates using a client certificate (mTLS).
// The SDK authenticates the client during the TLS handshake.
message CertificateAuth {
// PEM-encoded X509 client certificate
string cert = 1;
// PEM-encoded private key
string key = 2;
}

oneof type {
UsernameAndPassword username_and_password = 1;
JwtAuth jwt_auth = 2;
CertificateAuth certificate_auth = 3;
}
}

Expand Down Expand Up @@ -81,3 +98,11 @@ message ClusterCloseRequest {
message CloseAllColumnarClustersRequest {
ExecutionContext execution_context = 1;
}

// Requests the performer to update the credential on an existing cluster connection.
// Per the EA RFC, `cluster.credential(newCredential)` replaces the credential used for subsequent requests.
// The new credential must be the same type as the old one (e.g. JWT -> JWT, cert -> cert).
message SetCredentialRequest {
ExecutionContextClusterLevel execution_context = 1;
ClusterNewInstanceRequest.Credential credential = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ service ColumnarService {
rpc ClusterNewInstance (ClusterNewInstanceRequest) returns (EmptyResultOrFailureResponse);
rpc ClusterClose (ClusterCloseRequest) returns (EmptyResultOrFailureResponse);
rpc CloseAllClusters (CloseAllColumnarClustersRequest) returns (EmptyResultOrFailureResponse);

// Updates the credential on a previously-created cluster connection.
// Maps to the EA RFC's `cluster.credential(newCredential)`.
rpc SetCredential (SetCredentialRequest) returns (EmptyResultOrFailureResponse);
}

// To better permit DRY, instance.executeQuery() and scope.executeQuery() are here rather than in their respective services.
Expand Down
24 changes: 19 additions & 5 deletions src/Couchbase.Analytics/Certificates/CertificateValidation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static RemoteCertificateValidationCallback CreateRemoteCertificateValidat
return false; // Error loading custom certificates
}

return ValidateAgainstCustomTrust(certificate, chain, trustedCertificates, logger);
return ValidateAgainstCustomTrust(certificate, chain, trustedCertificates, logger, securityOptions.RevocationMode);
}
};
}
Expand Down Expand Up @@ -213,8 +213,9 @@ private static bool ValidateWithPlatformAndCapellaTrust(
}
else if (securityOptions.TrustMode == CertificateTrustMode.PemString)
{
trustedCertificates.Add(X509CertificateLoader.LoadCertificate(
System.Text.Encoding.ASCII.GetBytes(securityOptions.CertificateValue!)));
var certs = new X509Certificate2Collection();
certs.ImportFromPem(securityOptions.CertificateValue!);
trustedCertificates.AddRange(certs);
}
else if (securityOptions.TrustMode == CertificateTrustMode.CertificatesOnly)
{
Expand Down Expand Up @@ -249,7 +250,8 @@ private static bool ValidateAgainstCustomTrust(
X509Certificate? certificate,
X509Chain? chain,
X509Certificate2Collection trustedCertificates,
ILogger logger)
ILogger logger,
X509RevocationMode revocationMode = X509RevocationMode.NoCheck)
{
if (certificate == null)
{
Expand Down Expand Up @@ -277,6 +279,18 @@ private static bool ValidateAgainstCustomTrust(
// Allow unknown certificate authority since we're using a custom trust
validationChain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;

validationChain.ChainPolicy.RevocationMode = revocationMode;

// Copy intermediate certificates from the TLS chain into ExtraStore so the
// custom chain builder can locate them even if they are not in CustomTrustStore.
if (chain != null)
{
foreach (var element in chain.ChainElements)
{
validationChain.ChainPolicy.ExtraStore.Add(element.Certificate);
}
}

// Build and validate the certificate chain
var chainValid = validationChain.Build(serverCert);

Expand Down Expand Up @@ -421,4 +435,4 @@ private static void LogChainValidationErrors(X509Chain chain, X509Certificate2 s
private static partial void LogChainElementError(ILogger logger, string subject, X509ChainStatusFlags status, string statusInformation);

#endregion
}
}
25 changes: 24 additions & 1 deletion src/Couchbase.Analytics/Options/SecurityOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ public record SecurityOptions
/// </summary>
internal SslProtocols SslProtocols { get; init; } = SslProtocols.Tls13 | SslProtocols.Tls12;

/// <summary>
/// The certificate revocation checking mode used during chain validation.
/// </summary>
/// <remarks>
/// The default is <see cref="X509RevocationMode.NoCheck"/> because custom and private CAs
/// typically do not provide CRL distribution points or OCSP responders. Set to
/// <see cref="X509RevocationMode.Online"/> or <see cref="X509RevocationMode.Offline"/>
/// if your CA infrastructure supports revocation checking.
/// </remarks>
internal X509RevocationMode RevocationMode { get; init; } = X509RevocationMode.NoCheck;

internal string? PathToPemFileValue => PemFilePath;
internal string? CertificateValue => PemString;
internal X509Certificate2Collection? CertificatesValue => Certificates;
Expand Down Expand Up @@ -146,4 +157,16 @@ public SecurityOptions WithSslProtocols(SslProtocols protocols)
{
return this with { SslProtocols = protocols };
}
}

/// <summary>
/// Sets the certificate revocation mode used when validating server certificate chains.
/// </summary>
/// <remarks>
/// The default is <see cref="X509RevocationMode.NoCheck"/>. Use <see cref="X509RevocationMode.Online"/>
/// or <see cref="X509RevocationMode.Offline"/> if your CA infrastructure provides CRL/OCSP endpoints.
/// </remarks>
public SecurityOptions WithRevocationMode(X509RevocationMode revocationMode)
{
return this with { RevocationMode = revocationMode };
}
}
Loading