Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Couchbase.AnalyticsClient;
using Couchbase.AnalyticsClient.Async;
using Couchbase.AnalyticsClient.HTTP;
using Couchbase.AnalyticsClient.Options;
using Couchbase.AnalyticsClient.Results;
using Couchbase.Grpc.Protocol.Columnar;
Expand All @@ -8,7 +10,7 @@ namespace Couchbase.Analytics.Performer.Internal.Connections;
internal class ClusterConnection : IDisposable
{
private readonly ClusterNewInstanceRequest _clusterNewInstanceRequest;
private Cluster _cluster;
private readonly Cluster _cluster;
private volatile bool _disposed;

public ClusterConnection(
Expand All @@ -32,6 +34,21 @@ public Task<IQueryResult> ExecuteScopeQuery(string database, string scope, strin
ExecuteQueryAsync(statement, options);
}

public Task<QueryHandle> StartClusterQuery(string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default)
{
return _cluster.StartQueryAsync(statement, options, cancellationToken);
}

public Task<QueryHandle> StartScopeQuery(string database, string scope, string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default)
{
return _cluster.Database(database).Scope(scope).StartQueryAsync(statement, options, cancellationToken);
}

public void UpdateCredential(ICredential credential)
{
_cluster.UpdateCredential(credential);
}

public void Dispose()
{
if (!_disposed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ public static LogEventLevel ParseLogLevelOrDefault(string? value, LogEventLevel

return defaultLevel;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Couchbase.Analytics.Performer.Internal.Connections;
using Couchbase.Analytics.Performer.Internal.Modes;
using Couchbase.Analytics.Performer.Internal.Utils;
using Couchbase.AnalyticsClient.Async;
using Couchbase.AnalyticsClient.Options;
using Couchbase.AnalyticsClient.Results;
using Couchbase.Core.Utils;
Expand Down Expand Up @@ -146,7 +147,7 @@ public override async Task<QueryRowResponse> QueryRow(QueryRowRequest request, S

try
{
response = await performerQuery.GetNextRow().ConfigureAwait(false);
response = await performerQuery.GetNextRow(request.ContentAs).ConfigureAwait(false);
response.Metadata = new ResponseMetadata
{
ElapsedNanos = (long)stopwatch.Elapsed.TotalNanoseconds,
Expand Down Expand Up @@ -254,6 +255,206 @@ public override Task<EmptyResultOrFailureResponse> CloseQueryResult(CloseQueryRe
return Task.FromResult(response);
}

public override async Task<StartQueryResponse> StartQuery(StartQueryRequest request, ServerCallContext context)
{
Serilog.Log.Information("Executing startQuery");

var response = new StartQueryResponse();

try
{
var options = request.Options.ToStartQueryOptions();
var queryHandle = Guid.NewGuid().ToString();

QueryHandle sdkHandle;
switch (request.LevelCase)
{
case StartQueryRequest.LevelOneofCase.ClusterLevel:
var clusterConn = _clusterConnections[request.ClusterLevel.ClusterId];
sdkHandle = await clusterConn.StartClusterQuery(request.Statement, options).ConfigureAwait(false);
break;
case StartQueryRequest.LevelOneofCase.ScopeLevel:
var scopeConn = _clusterConnections[request.ScopeLevel.ClusterId];
sdkHandle = await scopeConn.StartScopeQuery(
request.ScopeLevel.DatabaseName,
request.ScopeLevel.ScopeName,
request.Statement,
options).ConfigureAwait(false);
break;
case StartQueryRequest.LevelOneofCase.None:
default:
throw new ArgumentException("No level specified for query");
}

var performerQuery = new PerformerQuery
{
AsyncHandle = sdkHandle,
CancellationTokenSource = new CancellationTokenSource(),
};
_ongoingQueries[queryHandle] = performerQuery;

response.QueryHandle = queryHandle;
}
catch (Exception ex)
{
response.Failure = ex.ToProtoError();
}

return response;
}

public override async Task<AsyncFetchStatusResponse> AsyncFetchStatus(AsyncFetchStatusRequest request,
ServerCallContext context)
{
Serilog.Log.Information("Executing asyncFetchStatus");

var response = new AsyncFetchStatusResponse();

try
{
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
|| performerQuery.AsyncHandle is null)
{
throw new KeyNotFoundException(
"Query handle not present in ongoing async queries: " + request.QueryHandle);
}

var status = await performerQuery.AsyncHandle.FetchStatusAsync().ConfigureAwait(false);
performerQuery.AsyncStatus = status;

response.QueryStatus = new AsyncFetchStatusResponse.Types.QueryStatusResult
{
ResultsReady = status.ResultsReady,
ToString_ = status.ToString() ?? string.Empty,
};
}
catch (Exception ex)
{
response.Failure = ex.ToProtoError();
}

return response;
}

public override Task<EmptyResultOrFailureResponse> AsyncQueryStatusResultHandle(
AsyncQueryStatusResultHandleRequest request, ServerCallContext context)
{
Serilog.Log.Information("Executing asyncQueryStatusResultHandle");
var stopwatch = LightweightStopwatch.StartNew();
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);

Exception? exception = null;
try
{
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
|| performerQuery.AsyncStatus is null)
{
throw new KeyNotFoundException(
"QueryStatus not present for query: " + request.QueryHandle);
}

performerQuery.AsyncResultHandle = performerQuery.AsyncStatus.ResultHandle();
}
catch (Exception ex)
{
exception = ex;
}

var response = new EmptyResultOrFailureResponse()
.GetResponseMetaData(stopwatch, initiated, exception);
return Task.FromResult(response);
}

public override async Task<EmptyResultOrFailureResponse> AsyncCancelHandle(AsyncCancelHandleRequest request,
ServerCallContext context)
{
Serilog.Log.Information("Executing asyncCancelHandle");
var stopwatch = LightweightStopwatch.StartNew();
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);

Exception? exception = null;
try
{
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
|| performerQuery.AsyncHandle is null)
{
throw new KeyNotFoundException(
"Query handle not present in ongoing async queries: " + request.QueryHandle);
}

await performerQuery.AsyncHandle.CancelAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
exception = ex;
}

return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
}

public override async Task<EmptyResultOrFailureResponse> AsyncFetchResults(AsyncFetchResultsRequest request,
ServerCallContext context)
{
Serilog.Log.Information("Executing asyncFetchResults");
var stopwatch = LightweightStopwatch.StartNew();
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);

Exception? exception = null;
try
{
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
|| performerQuery.AsyncResultHandle is null)
{
throw new KeyNotFoundException(
"Result handle not present for query: " + request.QueryHandle);
}

var fetchOptions = new FetchResultsOptions();
if (request.Options?.Deserializer is not null)
{
fetchOptions = fetchOptions.WithDeserializer(request.Options.Deserializer.ToCore());
}

var queryResult = await performerQuery.AsyncResultHandle
.FetchResultsAsync(fetchOptions).ConfigureAwait(false);

performerQuery.QueryTask = Task.FromResult(queryResult);
}
catch (Exception ex)
{
exception = ex;
}

return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
}

public override async Task<EmptyResultOrFailureResponse> AsyncDiscardResults(AsyncDiscardResultsRequest request,
ServerCallContext context)
{
Serilog.Log.Information("Executing asyncDiscardResults");
var stopwatch = LightweightStopwatch.StartNew();
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);

Exception? exception = null;
try
{
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
|| performerQuery.AsyncResultHandle is null)
{
throw new KeyNotFoundException(
"Result handle not present for query: " + request.QueryHandle);
}

await performerQuery.AsyncResultHandle.DiscardResultsAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
exception = ex;
}

return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
}

public override Task<EmptyResultOrFailureResponse> CloseAllQueryResults(CloseAllQueryResultsRequest request,
ServerCallContext context)
{
Expand All @@ -268,4 +469,4 @@ public override Task<EmptyResultOrFailureResponse> CloseAllQueryResults(CloseAll

return Task.FromResult(response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,30 @@
using Couchbase.Grpc.Protocol.Shared;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Serilog;
using Exception = System.Exception;

namespace Couchbase.Analytics.Performer.Internal.Services;

internal class AnalyticsPerformerService : ColumnarService.ColumnarServiceBase
{
public AnalyticsPerformerService(ConcurrentDictionary<string, ClusterConnection> clusters)
public AnalyticsPerformerService(
ConcurrentDictionary<string, ClusterConnection> clusters)
{
Clusters = clusters;
}
private ConcurrentDictionary<string, ClusterConnection> Clusters { get; }

// The Cluster owns its ILoggerFactory and disposes it on cluster Dispose, so we hand each
// cluster its own factory that bridges to the global Serilog logger.
private static ILoggerFactory CreateBridgedLoggerFactory() =>
LoggerFactory.Create(builder =>
{
builder.ClearProviders();
builder.AddSerilog();
});

public override Task<EmptyResultOrFailureResponse> ClusterNewInstance(ClusterNewInstanceRequest request,
ServerCallContext context)
{
Expand All @@ -46,7 +58,8 @@ public override Task<EmptyResultOrFailureResponse> ClusterNewInstance(ClusterNew
request.ConnectionString);

var cluster = Cluster.Create(request.ConnectionString,
request.ToSdkCredential(), request.ToSdkQueryOptions());
request.ToSdkCredential(),
request.ToSdkQueryOptions().WithLogging(CreateBridgedLoggerFactory()));

return new(request, cluster);
});
Expand Down Expand Up @@ -84,7 +97,14 @@ public override Task<FetchPerformerCapsResponse> FetchPerformerCaps(FetchPerform
var response = new FetchPerformerCapsResponse
{
AnalyticsProduct = AnalyticsProduct.Analytics,
Sdk = SDK.Net
Sdk = SDK.Net,
SupportsServerAsyncQueries = true,
CredentialSupport = new CredentialSupport
{
SupportsJwtCredential = true,
SupportsCertificateCredential = true,
SupportsSetCredential = true,
}
};

try
Expand Down Expand Up @@ -230,6 +250,34 @@ public override Task<EmptyResultOrFailureResponse> CloseAllClusters(CloseAllColu
return Task.FromResult(response);
}

public override Task<EmptyResultOrFailureResponse> SetCredential(SetCredentialRequest request,
ServerCallContext context)
{
Serilog.Log.Information("Calling SetCredential for {ClusterId}", request.ExecutionContext.ClusterId);
var stopWatch = LightweightStopwatch.StartNew();
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);

var response = new EmptyResultOrFailureResponse();
try
{
if (!Clusters.TryGetValue(request.ExecutionContext.ClusterId, out var connection))
{
throw new KeyNotFoundException(
"Cluster id not present in active clusters: " + request.ExecutionContext.ClusterId);
}

connection.UpdateCredential(request.Credential.ToSdkCredential());
response.GetResponseMetaData(stopWatch, initiated);
}
catch (Exception ex)
{
response.GetResponseMetaData(stopWatch, initiated, ex);
Serilog.Log.Error(ex, "SetCredential failed for {ClusterId}", request.ExecutionContext.ClusterId);
}

return Task.FromResult(response);
}

public override Task<EchoResponse> Echo(EchoRequest request, ServerCallContext context)
{
Serilog.Log.Information("Calling Echo - {TestName} | {Message}", request.TestName, request.Message);
Expand Down
Loading
Loading