Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,14 @@ public override Task<EmptyResultOrFailureResponse> CloseQueryResult(CloseQueryRe
var stopwatch = LightweightStopwatch.StartNew();
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);

Exception? exception = null;

// Idempotent close - if the handle was already removed (e.g., by cancel), just succeed.
// This matches the RFC expectation that close is a cleanup operation that should not fail.
if (request.QueryHandle is not null)
{
if (!_ongoingQueries.TryRemove(request.QueryHandle, out var performerQuery))
{
exception = new KeyNotFoundException("Query handle not present in ongoing queries: " + request.QueryHandle);
}
_ongoingQueries.TryRemove(request.QueryHandle, out _);
}

var response = new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
var response = new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated);

return Task.FromResult(response);
}
Expand Down
47 changes: 41 additions & 6 deletions fit/Couchbase.Analytics.Performer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@ public static async Task Main(string[] args)
{
var loggerFactory = LoggingUtils.ConfigureLogging(out var minimumLevel);

bool disableConsoleRead = false;

foreach (var arg in args)
{
var parameter = arg.Split(new[] { '=' }, 2);
if (parameter.Length == 2)
{
switch (parameter[0])
{
case "disableConsoleRead":
if (bool.TryParse(parameter[1], out var parsedDisableConsoleRead))
{
disableConsoleRead = parsedDisableConsoleRead;
}
break;
}
}
}

var (host, port) = ("localhost", 8060);

var server = new Server
Expand All @@ -33,15 +52,31 @@ public static async Task Main(string[] args)
server.Start();
Log.Information(".NET Analytics Performer started on {Host}:{Port} at LogLevel {Level}", host, port, minimumLevel);

Log.Information("Press any key to stop the server");
Console.ReadKey();
if (disableConsoleRead)
{
Log.Information("Running in headless mode, waiting for shutdown signal");
var shutdownEvent = new ManualResetEventSlim(false);
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
shutdownEvent.Set();
};
AppDomain.CurrentDomain.ProcessExit += (_, _) => shutdownEvent.Set();
shutdownEvent.Wait();
}
else
{
Log.Information("Press any key to stop the server");
Console.ReadKey();
}

await server.ShutdownAsync().ConfigureAwait(false);
LoggingUtils.ShutdownLogging();

Log.Information("Press any key to exit");
Console.ReadKey();
if (!disableConsoleRead)
{
Log.Information("Press any key to exit");
Console.ReadKey();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ namespace Couchbase.AnalyticsClient.Internal.Results;
/// <remarks>This is the default response type.</remarks>
internal class StreamingAnalyticsResult : AnalyticsResultBase
{
private bool _hasReadToResult;
private bool _isInitialized;
private bool _hasResultsArray;
private int _enumerated; // 0 = not started, 1 = started (atomic via Interlocked)
private IJsonStreamReader _jsonReader;
private bool _disposed;
Expand Down Expand Up @@ -63,7 +64,7 @@ public override IAsyncEnumerator<AnalyticsRow> GetAsyncEnumerator(
private async IAsyncEnumerable<AnalyticsRow> EnumerateRows(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (!_hasReadToResult)
if (!_isInitialized)
{
throw new InvalidOperationException(
$"{nameof(StreamingAnalyticsResult)} has not been initialized, call InitializeAsync first");
Expand All @@ -80,12 +81,17 @@ private async IAsyncEnumerable<AnalyticsRow> EnumerateRows(
"Query results can only be enumerated once. The result stream has already been consumed.");
}

await foreach (var token in _jsonReader.ReadTokensAsync(cancellationToken).ConfigureAwait(false))
// Only try to read tokens if the response contained a "results" array.
// DDL responses (CREATE DATABASE, etc.) don't have results and we should yield nothing.
if (_hasResultsArray)
{
yield return new AnalyticsRow(token);
}
await foreach (var token in _jsonReader.ReadTokensAsync(cancellationToken).ConfigureAwait(false))
{
yield return new AnalyticsRow(token);
}

await ReadResponseAttributes(cancellationToken).ConfigureAwait(false);
await ReadResponseAttributes(cancellationToken).ConfigureAwait(false);
}
}

private async Task ReadResponseAttributes(CancellationToken cancellationToken = default)
Expand All @@ -95,9 +101,7 @@ private async Task ReadResponseAttributes(CancellationToken cancellationToken =
throw new InvalidOperationException("_jsonReader is null");
}

MetaData = new QueryMetaData();

_hasReadToResult = false;
MetaData ??= new QueryMetaData();

while (true)
{
Expand All @@ -118,19 +122,24 @@ private async Task ReadResponseAttributes(CancellationToken cancellationToken =
MetaData.Metrics = await _jsonReader.ReadObjectAsync<QueryMetrics>(cancellationToken).ConfigureAwait(false);
break;
case "results":
_hasReadToResult = true;
_hasResultsArray = true;
_isInitialized = true;
return;
case "errors":
var errors = await _jsonReader.ReadObjectAsync<QueryError[]>(cancellationToken).ConfigureAwait(false);
Errors = errors ?? Array.Empty<QueryError>();
break;
case "warnings":
case "status":
//Ignore for now
default:
// Skip unknown attributes (signature, plans, status, warnings, etc.)
// by reading and discarding their values
await _jsonReader.ReadTokenAsync(cancellationToken).ConfigureAwait(false);
break;
}
}

// If we've read through the entire response without finding a "results" array,
// mark as initialized anyway. This handles DDL responses that don't return results.
_isInitialized = true;
}

public override void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
<None Update="JsonDocuments\error-23000-response.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

<ItemGroup>
<None Update="JsonDocuments\analyticsResponse.json">
<None Update="JsonDocuments\ddlResponse.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,57 @@ public async Task DisposeAsync_CalledTwice_DisposesOnlyOnce()
// Assert
ownedResource.Verify(r => r.Dispose(), Times.Once);
}

[Fact]
public async Task StreamingAnalyticsResult_DdlResponse_NoResultsArray_InitializesSuccessfully()
{
// Arrange - DDL responses (CREATE DATABASE, DROP SCOPE, etc.) don't have a "results" array
var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json");
var stream = new MemoryStream(json);

var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer());

// Act
await analyticsResult.InitializeAsync(CancellationToken.None);

// Assert - should initialize without throwing
Assert.NotNull(analyticsResult.MetaData);
Assert.Equal("8c090478-269b-4051-b660-c2a5ae2c4aaa", analyticsResult.MetaData.RequestId);
}

[Fact]
public async Task StreamingAnalyticsResult_DdlResponse_EnumerationReturnsEmpty()
{
// Arrange - DDL responses have no results to enumerate
var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json");
var stream = new MemoryStream(json);

var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer());
await analyticsResult.InitializeAsync(CancellationToken.None);

// Act
var rows = await analyticsResult.ToListAsync(CancellationToken.None);

// Assert - should return empty list, not throw
Assert.NotNull(rows);
Assert.Empty(rows);
}

[Fact]
public async Task StreamingAnalyticsResult_DdlResponse_MetricsAvailable()
{
// Arrange
var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json");
var stream = new MemoryStream(json);

var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer());
await analyticsResult.InitializeAsync(CancellationToken.None);

// Act - enumerate to completion (empty)
await analyticsResult.ToListAsync(CancellationToken.None);

// Assert - metrics should still be available
Assert.NotNull(analyticsResult.MetaData.Metrics);
Assert.Equal(0, analyticsResult.MetaData.Metrics.ResultCount);
}
}
19 changes: 19 additions & 0 deletions tests/Couchbase.Analytics.UnitTests/JsonDocuments/ddlResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"requestID": "8c090478-269b-4051-b660-c2a5ae2c4aaa",
"signature": {
"*": "*"
},
"plans": {},
"status": "success",
"metrics": {
"elapsedTime": "268.643981ms",
"executionTime": "259.520732ms",
"compileTime": "0ns",
"queueWaitTime": "0ns",
"resultCount": 0,
"resultSize": 0,
"processedObjects": 0,
"bufferCacheHitRatio": "0.00%",
"bufferCachePageReadCount": 0
}
}
Loading