diff --git a/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs b/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs index b04d295..06b71c1 100644 --- a/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs +++ b/fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs @@ -239,17 +239,14 @@ public override Task CloseQueryResult(CloseQueryRe var stopwatch = LightweightStopwatch.StartNew(); var initiated = Timestamp.FromDateTime(DateTime.UtcNow); - Exception? exception = null; - + // Idempotent close - if the handle was already removed (e.g., by cancel), just succeed. + // This matches the RFC expectation that close is a cleanup operation that should not fail. if (request.QueryHandle is not null) { - if (!_ongoingQueries.TryRemove(request.QueryHandle, out var performerQuery)) - { - exception = new KeyNotFoundException("Query handle not present in ongoing queries: " + request.QueryHandle); - } + _ongoingQueries.TryRemove(request.QueryHandle, out _); } - var response = new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception); + var response = new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated); return Task.FromResult(response); } diff --git a/fit/Couchbase.Analytics.Performer/Program.cs b/fit/Couchbase.Analytics.Performer/Program.cs index 9e626ec..528b268 100644 --- a/fit/Couchbase.Analytics.Performer/Program.cs +++ b/fit/Couchbase.Analytics.Performer/Program.cs @@ -15,6 +15,25 @@ public static async Task Main(string[] args) { var loggerFactory = LoggingUtils.ConfigureLogging(out var minimumLevel); + bool disableConsoleRead = false; + + foreach (var arg in args) + { + var parameter = arg.Split(new[] { '=' }, 2); + if (parameter.Length == 2) + { + switch (parameter[0]) + { + case "disableConsoleRead": + if (bool.TryParse(parameter[1], out var parsedDisableConsoleRead)) + { + disableConsoleRead = parsedDisableConsoleRead; + } + break; + } + } + } + var (host, port) = ("localhost", 8060); var server = new Server @@ -33,15 +52,31 @@ public static async Task Main(string[] args) server.Start(); Log.Information(".NET Analytics Performer started on {Host}:{Port} at LogLevel {Level}", host, port, minimumLevel); - Log.Information("Press any key to stop the server"); - Console.ReadKey(); + if (disableConsoleRead) + { + Log.Information("Running in headless mode, waiting for shutdown signal"); + var shutdownEvent = new ManualResetEventSlim(false); + Console.CancelKeyPress += (_, e) => + { + e.Cancel = true; + shutdownEvent.Set(); + }; + AppDomain.CurrentDomain.ProcessExit += (_, _) => shutdownEvent.Set(); + shutdownEvent.Wait(); + } + else + { + Log.Information("Press any key to stop the server"); + Console.ReadKey(); + } await server.ShutdownAsync().ConfigureAwait(false); LoggingUtils.ShutdownLogging(); - Log.Information("Press any key to exit"); - Console.ReadKey(); + if (!disableConsoleRead) + { + Log.Information("Press any key to exit"); + Console.ReadKey(); + } } - - } diff --git a/src/Couchbase.Analytics/Internal/Results/StreamingAnalyticsResult.cs b/src/Couchbase.Analytics/Internal/Results/StreamingAnalyticsResult.cs index a2db4dc..11b41cb 100644 --- a/src/Couchbase.Analytics/Internal/Results/StreamingAnalyticsResult.cs +++ b/src/Couchbase.Analytics/Internal/Results/StreamingAnalyticsResult.cs @@ -33,7 +33,8 @@ namespace Couchbase.AnalyticsClient.Internal.Results; /// This is the default response type. internal class StreamingAnalyticsResult : AnalyticsResultBase { - private bool _hasReadToResult; + private bool _isInitialized; + private bool _hasResultsArray; private int _enumerated; // 0 = not started, 1 = started (atomic via Interlocked) private IJsonStreamReader _jsonReader; private bool _disposed; @@ -63,7 +64,7 @@ public override IAsyncEnumerator GetAsyncEnumerator( private async IAsyncEnumerable EnumerateRows( [EnumeratorCancellation] CancellationToken cancellationToken = default) { - if (!_hasReadToResult) + if (!_isInitialized) { throw new InvalidOperationException( $"{nameof(StreamingAnalyticsResult)} has not been initialized, call InitializeAsync first"); @@ -80,12 +81,17 @@ private async IAsyncEnumerable EnumerateRows( "Query results can only be enumerated once. The result stream has already been consumed."); } - await foreach (var token in _jsonReader.ReadTokensAsync(cancellationToken).ConfigureAwait(false)) + // Only try to read tokens if the response contained a "results" array. + // DDL responses (CREATE DATABASE, etc.) don't have results and we should yield nothing. + if (_hasResultsArray) { - yield return new AnalyticsRow(token); - } + await foreach (var token in _jsonReader.ReadTokensAsync(cancellationToken).ConfigureAwait(false)) + { + yield return new AnalyticsRow(token); + } - await ReadResponseAttributes(cancellationToken).ConfigureAwait(false); + await ReadResponseAttributes(cancellationToken).ConfigureAwait(false); + } } private async Task ReadResponseAttributes(CancellationToken cancellationToken = default) @@ -95,9 +101,7 @@ private async Task ReadResponseAttributes(CancellationToken cancellationToken = throw new InvalidOperationException("_jsonReader is null"); } - MetaData = new QueryMetaData(); - - _hasReadToResult = false; + MetaData ??= new QueryMetaData(); while (true) { @@ -118,19 +122,24 @@ private async Task ReadResponseAttributes(CancellationToken cancellationToken = MetaData.Metrics = await _jsonReader.ReadObjectAsync(cancellationToken).ConfigureAwait(false); break; case "results": - _hasReadToResult = true; + _hasResultsArray = true; + _isInitialized = true; return; case "errors": var errors = await _jsonReader.ReadObjectAsync(cancellationToken).ConfigureAwait(false); Errors = errors ?? Array.Empty(); break; - case "warnings": - case "status": - //Ignore for now + default: + // Skip unknown attributes (signature, plans, status, warnings, etc.) + // by reading and discarding their values + await _jsonReader.ReadTokenAsync(cancellationToken).ConfigureAwait(false); break; } } + // If we've read through the entire response without finding a "results" array, + // mark as initialized anyway. This handles DDL responses that don't return results. + _isInitialized = true; } public override void Dispose() diff --git a/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj b/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj index 3bf3de7..66e8573 100644 --- a/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj +++ b/tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj @@ -31,10 +31,7 @@ PreserveNewest - - - - + PreserveNewest diff --git a/tests/Couchbase.Analytics.UnitTests/Internal/StreamingAnalyticsResultTests.cs b/tests/Couchbase.Analytics.UnitTests/Internal/StreamingAnalyticsResultTests.cs index f52c525..e1ee4f4 100644 --- a/tests/Couchbase.Analytics.UnitTests/Internal/StreamingAnalyticsResultTests.cs +++ b/tests/Couchbase.Analytics.UnitTests/Internal/StreamingAnalyticsResultTests.cs @@ -134,4 +134,57 @@ public async Task DisposeAsync_CalledTwice_DisposesOnlyOnce() // Assert ownedResource.Verify(r => r.Dispose(), Times.Once); } + + [Fact] + public async Task StreamingAnalyticsResult_DdlResponse_NoResultsArray_InitializesSuccessfully() + { + // Arrange - DDL responses (CREATE DATABASE, DROP SCOPE, etc.) don't have a "results" array + var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json"); + var stream = new MemoryStream(json); + + var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer()); + + // Act + await analyticsResult.InitializeAsync(CancellationToken.None); + + // Assert - should initialize without throwing + Assert.NotNull(analyticsResult.MetaData); + Assert.Equal("8c090478-269b-4051-b660-c2a5ae2c4aaa", analyticsResult.MetaData.RequestId); + } + + [Fact] + public async Task StreamingAnalyticsResult_DdlResponse_EnumerationReturnsEmpty() + { + // Arrange - DDL responses have no results to enumerate + var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json"); + var stream = new MemoryStream(json); + + var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer()); + await analyticsResult.InitializeAsync(CancellationToken.None); + + // Act + var rows = await analyticsResult.ToListAsync(CancellationToken.None); + + // Assert - should return empty list, not throw + Assert.NotNull(rows); + Assert.Empty(rows); + } + + [Fact] + public async Task StreamingAnalyticsResult_DdlResponse_MetricsAvailable() + { + // Arrange + var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json"); + var stream = new MemoryStream(json); + + var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer()); + await analyticsResult.InitializeAsync(CancellationToken.None); + + // Act - enumerate to completion (empty) + await analyticsResult.ToListAsync(CancellationToken.None); + + // Assert - metrics should still be available + Assert.NotNull(analyticsResult.MetaData.Metrics); + Assert.Equal(0, analyticsResult.MetaData.Metrics.ResultCount); + } } diff --git a/tests/Couchbase.Analytics.UnitTests/JsonDocuments/ddlResponse.json b/tests/Couchbase.Analytics.UnitTests/JsonDocuments/ddlResponse.json new file mode 100644 index 0000000..2fe6413 --- /dev/null +++ b/tests/Couchbase.Analytics.UnitTests/JsonDocuments/ddlResponse.json @@ -0,0 +1,19 @@ +{ + "requestID": "8c090478-269b-4051-b660-c2a5ae2c4aaa", + "signature": { + "*": "*" + }, + "plans": {}, + "status": "success", + "metrics": { + "elapsedTime": "268.643981ms", + "executionTime": "259.520732ms", + "compileTime": "0ns", + "queueWaitTime": "0ns", + "resultCount": 0, + "resultSize": 0, + "processedObjects": 0, + "bufferCacheHitRatio": "0.00%", + "bufferCachePageReadCount": 0 + } +}