Skip to content

Commit 7cf0d9d

Browse files
committed
fit: implement Async query and add Certificate Revocation option
Signed-off-by: Emilien Bevierre <emilien.bevierre@couchbase.com>
1 parent ba913b5 commit 7cf0d9d

16 files changed

Lines changed: 583 additions & 21 deletions

fit/Couchbase.Analytics.Performer/Internal/Connections/ClusterConnection.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using Couchbase.AnalyticsClient;
2+
using Couchbase.AnalyticsClient.Async;
3+
using Couchbase.AnalyticsClient.HTTP;
24
using Couchbase.AnalyticsClient.Options;
35
using Couchbase.AnalyticsClient.Results;
46
using Couchbase.Grpc.Protocol.Columnar;
@@ -8,7 +10,7 @@ namespace Couchbase.Analytics.Performer.Internal.Connections;
810
internal class ClusterConnection : IDisposable
911
{
1012
private readonly ClusterNewInstanceRequest _clusterNewInstanceRequest;
11-
private Cluster _cluster;
13+
private readonly Cluster _cluster;
1214
private volatile bool _disposed;
1315

1416
public ClusterConnection(
@@ -32,6 +34,21 @@ public Task<IQueryResult> ExecuteScopeQuery(string database, string scope, strin
3234
ExecuteQueryAsync(statement, options);
3335
}
3436

37+
public Task<QueryHandle> StartClusterQuery(string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default)
38+
{
39+
return _cluster.StartQueryAsync(statement, options, cancellationToken);
40+
}
41+
42+
public Task<QueryHandle> StartScopeQuery(string database, string scope, string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default)
43+
{
44+
return _cluster.Database(database).Scope(scope).StartQueryAsync(statement, options, cancellationToken);
45+
}
46+
47+
public void UpdateCredential(ICredential credential)
48+
{
49+
_cluster.UpdateCredential(credential);
50+
}
51+
3552
public void Dispose()
3653
{
3754
if (!_disposed)

fit/Couchbase.Analytics.Performer/Internal/Logging/LoggingUtils.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,4 @@ public static LogEventLevel ParseLogLevelOrDefault(string? value, LogEventLevel
6666

6767
return defaultLevel;
6868
}
69-
}
69+
}

fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs

Lines changed: 203 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Couchbase.Analytics.Performer.Internal.Connections;
44
using Couchbase.Analytics.Performer.Internal.Modes;
55
using Couchbase.Analytics.Performer.Internal.Utils;
6+
using Couchbase.AnalyticsClient.Async;
67
using Couchbase.AnalyticsClient.Options;
78
using Couchbase.AnalyticsClient.Results;
89
using Couchbase.Core.Utils;
@@ -146,7 +147,7 @@ public override async Task<QueryRowResponse> QueryRow(QueryRowRequest request, S
146147

147148
try
148149
{
149-
response = await performerQuery.GetNextRow().ConfigureAwait(false);
150+
response = await performerQuery.GetNextRow(request.ContentAs).ConfigureAwait(false);
150151
response.Metadata = new ResponseMetadata
151152
{
152153
ElapsedNanos = (long)stopwatch.Elapsed.TotalNanoseconds,
@@ -251,6 +252,206 @@ public override Task<EmptyResultOrFailureResponse> CloseQueryResult(CloseQueryRe
251252
return Task.FromResult(response);
252253
}
253254

255+
public override async Task<StartQueryResponse> StartQuery(StartQueryRequest request, ServerCallContext context)
256+
{
257+
Serilog.Log.Information("Executing startQuery");
258+
259+
var response = new StartQueryResponse();
260+
261+
try
262+
{
263+
var options = request.Options.ToStartQueryOptions();
264+
var queryHandle = Guid.NewGuid().ToString();
265+
266+
QueryHandle sdkHandle;
267+
switch (request.LevelCase)
268+
{
269+
case StartQueryRequest.LevelOneofCase.ClusterLevel:
270+
var clusterConn = _clusterConnections[request.ClusterLevel.ClusterId];
271+
sdkHandle = await clusterConn.StartClusterQuery(request.Statement, options).ConfigureAwait(false);
272+
break;
273+
case StartQueryRequest.LevelOneofCase.ScopeLevel:
274+
var scopeConn = _clusterConnections[request.ScopeLevel.ClusterId];
275+
sdkHandle = await scopeConn.StartScopeQuery(
276+
request.ScopeLevel.DatabaseName,
277+
request.ScopeLevel.ScopeName,
278+
request.Statement,
279+
options).ConfigureAwait(false);
280+
break;
281+
case StartQueryRequest.LevelOneofCase.None:
282+
default:
283+
throw new ArgumentException("No level specified for query");
284+
}
285+
286+
var performerQuery = new PerformerQuery
287+
{
288+
AsyncHandle = sdkHandle,
289+
CancellationTokenSource = new CancellationTokenSource(),
290+
};
291+
_ongoingQueries[queryHandle] = performerQuery;
292+
293+
response.QueryHandle = queryHandle;
294+
}
295+
catch (Exception ex)
296+
{
297+
response.Failure = ex.ToProtoError();
298+
}
299+
300+
return response;
301+
}
302+
303+
public override async Task<AsyncFetchStatusResponse> AsyncFetchStatus(AsyncFetchStatusRequest request,
304+
ServerCallContext context)
305+
{
306+
Serilog.Log.Information("Executing asyncFetchStatus");
307+
308+
var response = new AsyncFetchStatusResponse();
309+
310+
try
311+
{
312+
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
313+
|| performerQuery.AsyncHandle is null)
314+
{
315+
throw new KeyNotFoundException(
316+
"Query handle not present in ongoing async queries: " + request.QueryHandle);
317+
}
318+
319+
var status = await performerQuery.AsyncHandle.FetchStatusAsync().ConfigureAwait(false);
320+
performerQuery.AsyncStatus = status;
321+
322+
response.QueryStatus = new AsyncFetchStatusResponse.Types.QueryStatusResult
323+
{
324+
ResultsReady = status.ResultsReady,
325+
ToString_ = status.ToString() ?? string.Empty,
326+
};
327+
}
328+
catch (Exception ex)
329+
{
330+
response.Failure = ex.ToProtoError();
331+
}
332+
333+
return response;
334+
}
335+
336+
public override Task<EmptyResultOrFailureResponse> AsyncQueryStatusResultHandle(
337+
AsyncQueryStatusResultHandleRequest request, ServerCallContext context)
338+
{
339+
Serilog.Log.Information("Executing asyncQueryStatusResultHandle");
340+
var stopwatch = LightweightStopwatch.StartNew();
341+
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);
342+
343+
Exception? exception = null;
344+
try
345+
{
346+
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
347+
|| performerQuery.AsyncStatus is null)
348+
{
349+
throw new KeyNotFoundException(
350+
"QueryStatus not present for query: " + request.QueryHandle);
351+
}
352+
353+
performerQuery.AsyncResultHandle = performerQuery.AsyncStatus.ResultHandle();
354+
}
355+
catch (Exception ex)
356+
{
357+
exception = ex;
358+
}
359+
360+
var response = new EmptyResultOrFailureResponse()
361+
.GetResponseMetaData(stopwatch, initiated, exception);
362+
return Task.FromResult(response);
363+
}
364+
365+
public override async Task<EmptyResultOrFailureResponse> AsyncCancelHandle(AsyncCancelHandleRequest request,
366+
ServerCallContext context)
367+
{
368+
Serilog.Log.Information("Executing asyncCancelHandle");
369+
var stopwatch = LightweightStopwatch.StartNew();
370+
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);
371+
372+
Exception? exception = null;
373+
try
374+
{
375+
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
376+
|| performerQuery.AsyncHandle is null)
377+
{
378+
throw new KeyNotFoundException(
379+
"Query handle not present in ongoing async queries: " + request.QueryHandle);
380+
}
381+
382+
await performerQuery.AsyncHandle.CancelAsync().ConfigureAwait(false);
383+
}
384+
catch (Exception ex)
385+
{
386+
exception = ex;
387+
}
388+
389+
return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
390+
}
391+
392+
public override async Task<EmptyResultOrFailureResponse> AsyncFetchResults(AsyncFetchResultsRequest request,
393+
ServerCallContext context)
394+
{
395+
Serilog.Log.Information("Executing asyncFetchResults");
396+
var stopwatch = LightweightStopwatch.StartNew();
397+
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);
398+
399+
Exception? exception = null;
400+
try
401+
{
402+
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
403+
|| performerQuery.AsyncResultHandle is null)
404+
{
405+
throw new KeyNotFoundException(
406+
"Result handle not present for query: " + request.QueryHandle);
407+
}
408+
409+
var fetchOptions = new FetchResultsOptions();
410+
if (request.Options?.Deserializer is not null)
411+
{
412+
fetchOptions = fetchOptions.WithDeserializer(request.Options.Deserializer.ToCore());
413+
}
414+
415+
var queryResult = await performerQuery.AsyncResultHandle
416+
.FetchResultsAsync(fetchOptions).ConfigureAwait(false);
417+
418+
performerQuery.QueryTask = Task.FromResult(queryResult);
419+
}
420+
catch (Exception ex)
421+
{
422+
exception = ex;
423+
}
424+
425+
return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
426+
}
427+
428+
public override async Task<EmptyResultOrFailureResponse> AsyncDiscardResults(AsyncDiscardResultsRequest request,
429+
ServerCallContext context)
430+
{
431+
Serilog.Log.Information("Executing asyncDiscardResults");
432+
var stopwatch = LightweightStopwatch.StartNew();
433+
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);
434+
435+
Exception? exception = null;
436+
try
437+
{
438+
if (!_ongoingQueries.TryGetValue(request.QueryHandle, out var performerQuery)
439+
|| performerQuery.AsyncResultHandle is null)
440+
{
441+
throw new KeyNotFoundException(
442+
"Result handle not present for query: " + request.QueryHandle);
443+
}
444+
445+
await performerQuery.AsyncResultHandle.DiscardResultsAsync().ConfigureAwait(false);
446+
}
447+
catch (Exception ex)
448+
{
449+
exception = ex;
450+
}
451+
452+
return new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
453+
}
454+
254455
public override Task<EmptyResultOrFailureResponse> CloseAllQueryResults(CloseAllQueryResultsRequest request,
255456
ServerCallContext context)
256457
{
@@ -265,4 +466,4 @@ public override Task<EmptyResultOrFailureResponse> CloseAllQueryResults(CloseAll
265466

266467
return Task.FromResult(response);
267468
}
268-
}
469+
}

fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerService.cs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,30 @@
99
using Couchbase.Grpc.Protocol.Shared;
1010
using Google.Protobuf.WellKnownTypes;
1111
using Grpc.Core;
12+
using Microsoft.Extensions.Logging;
13+
using Serilog;
1214
using Exception = System.Exception;
1315

1416
namespace Couchbase.Analytics.Performer.Internal.Services;
1517

1618
internal class AnalyticsPerformerService : ColumnarService.ColumnarServiceBase
1719
{
18-
public AnalyticsPerformerService(ConcurrentDictionary<string, ClusterConnection> clusters)
20+
public AnalyticsPerformerService(
21+
ConcurrentDictionary<string, ClusterConnection> clusters)
1922
{
2023
Clusters = clusters;
2124
}
2225
private ConcurrentDictionary<string, ClusterConnection> Clusters { get; }
2326

27+
// The Cluster owns its ILoggerFactory and disposes it on cluster Dispose, so we hand each
28+
// cluster its own factory that bridges to the global Serilog logger.
29+
private static ILoggerFactory CreateBridgedLoggerFactory() =>
30+
LoggerFactory.Create(builder =>
31+
{
32+
builder.ClearProviders();
33+
builder.AddSerilog();
34+
});
35+
2436
public override Task<EmptyResultOrFailureResponse> ClusterNewInstance(ClusterNewInstanceRequest request,
2537
ServerCallContext context)
2638
{
@@ -46,7 +58,8 @@ public override Task<EmptyResultOrFailureResponse> ClusterNewInstance(ClusterNew
4658
request.ConnectionString);
4759

4860
var cluster = Cluster.Create(request.ConnectionString,
49-
request.ToSdkCredential(), request.ToSdkQueryOptions());
61+
request.ToSdkCredential(),
62+
request.ToSdkQueryOptions().WithLogging(CreateBridgedLoggerFactory()));
5063

5164
return new(request, cluster);
5265
});
@@ -84,7 +97,14 @@ public override Task<FetchPerformerCapsResponse> FetchPerformerCaps(FetchPerform
8497
var response = new FetchPerformerCapsResponse
8598
{
8699
AnalyticsProduct = AnalyticsProduct.Analytics,
87-
Sdk = SDK.Net
100+
Sdk = SDK.Net,
101+
SupportsServerAsyncQueries = true,
102+
CredentialSupport = new CredentialSupport
103+
{
104+
SupportsJwtCredential = true,
105+
SupportsCertificateCredential = true,
106+
SupportsSetCredential = true,
107+
}
88108
};
89109

90110
try
@@ -230,6 +250,34 @@ public override Task<EmptyResultOrFailureResponse> CloseAllClusters(CloseAllColu
230250
return Task.FromResult(response);
231251
}
232252

253+
public override Task<EmptyResultOrFailureResponse> SetCredential(SetCredentialRequest request,
254+
ServerCallContext context)
255+
{
256+
Serilog.Log.Information("Calling SetCredential for {ClusterId}", request.ExecutionContext.ClusterId);
257+
var stopWatch = LightweightStopwatch.StartNew();
258+
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);
259+
260+
var response = new EmptyResultOrFailureResponse();
261+
try
262+
{
263+
if (!Clusters.TryGetValue(request.ExecutionContext.ClusterId, out var connection))
264+
{
265+
throw new KeyNotFoundException(
266+
"Cluster id not present in active clusters: " + request.ExecutionContext.ClusterId);
267+
}
268+
269+
connection.UpdateCredential(request.Credential.ToSdkCredential());
270+
response.GetResponseMetaData(stopWatch, initiated);
271+
}
272+
catch (Exception ex)
273+
{
274+
response.GetResponseMetaData(stopWatch, initiated, ex);
275+
Serilog.Log.Error(ex, "SetCredential failed for {ClusterId}", request.ExecutionContext.ClusterId);
276+
}
277+
278+
return Task.FromResult(response);
279+
}
280+
233281
public override Task<EchoResponse> Echo(EchoRequest request, ServerCallContext context)
234282
{
235283
Serilog.Log.Information("Calling Echo - {TestName} | {Message}", request.TestName, request.Message);

0 commit comments

Comments
 (0)