Skip to content

Commit 177b71b

Browse files
committed
NCO-63: Fix DDL query handling and add headless mode support
SDK: - Handle responses without "results" array (DDL statements like CREATE/DROP DATABASE) - Skip unknown JSON attributes (signature, plans, etc.) to avoid stream corruption - Add unit tests for DDL response handling Performer: - Make CloseQueryResult idempotent to handle close-after-cancel - Add disableConsoleRead argument for headless Docker operation
1 parent d359fe5 commit 177b71b

6 files changed

Lines changed: 137 additions & 30 deletions

File tree

fit/Couchbase.Analytics.Performer/Internal/Services/AnalyticsPerformerCrossService.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,17 +239,14 @@ public override Task<EmptyResultOrFailureResponse> CloseQueryResult(CloseQueryRe
239239
var stopwatch = LightweightStopwatch.StartNew();
240240
var initiated = Timestamp.FromDateTime(DateTime.UtcNow);
241241

242-
Exception? exception = null;
243-
242+
// Idempotent close - if the handle was already removed (e.g., by cancel), just succeed.
243+
// This matches the RFC expectation that close is a cleanup operation that should not fail.
244244
if (request.QueryHandle is not null)
245245
{
246-
if (!_ongoingQueries.TryRemove(request.QueryHandle, out var performerQuery))
247-
{
248-
exception = new KeyNotFoundException("Query handle not present in ongoing queries: " + request.QueryHandle);
249-
}
246+
_ongoingQueries.TryRemove(request.QueryHandle, out _);
250247
}
251248

252-
var response = new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated, exception);
249+
var response = new EmptyResultOrFailureResponse().GetResponseMetaData(stopwatch, initiated);
253250

254251
return Task.FromResult(response);
255252
}

fit/Couchbase.Analytics.Performer/Program.cs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,22 @@ public static async Task Main(string[] args)
1515
{
1616
var loggerFactory = LoggingUtils.ConfigureLogging(out var minimumLevel);
1717

18+
bool disableConsoleRead = false;
19+
20+
foreach (var arg in args)
21+
{
22+
var parameter = arg.Split('=');
23+
if (parameter.Length == 2)
24+
{
25+
switch (parameter[0])
26+
{
27+
case "disableConsoleRead":
28+
disableConsoleRead = bool.Parse(parameter[1]);
29+
break;
30+
}
31+
}
32+
}
33+
1834
var (host, port) = ("localhost", 8060);
1935

2036
var server = new Server
@@ -33,15 +49,31 @@ public static async Task Main(string[] args)
3349
server.Start();
3450
Log.Information(".NET Analytics Performer started on {Host}:{Port} at LogLevel {Level}", host, port, minimumLevel);
3551

36-
Log.Information("Press any key to stop the server");
37-
Console.ReadKey();
52+
if (disableConsoleRead)
53+
{
54+
Log.Information("Running in headless mode, waiting for shutdown signal");
55+
var shutdownEvent = new ManualResetEventSlim(false);
56+
Console.CancelKeyPress += (_, e) =>
57+
{
58+
e.Cancel = true;
59+
shutdownEvent.Set();
60+
};
61+
AppDomain.CurrentDomain.ProcessExit += (_, _) => shutdownEvent.Set();
62+
shutdownEvent.Wait();
63+
}
64+
else
65+
{
66+
Log.Information("Press any key to stop the server");
67+
Console.ReadKey();
68+
}
3869

3970
await server.ShutdownAsync().ConfigureAwait(false);
4071
LoggingUtils.ShutdownLogging();
4172

42-
Log.Information("Press any key to exit");
43-
Console.ReadKey();
73+
if (!disableConsoleRead)
74+
{
75+
Log.Information("Press any key to exit");
76+
Console.ReadKey();
77+
}
4478
}
45-
46-
4779
}

src/Couchbase.Analytics/Internal/Results/StreamingAnalyticsResult.cs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ namespace Couchbase.AnalyticsClient.Internal.Results;
3333
/// <remarks>This is the default response type.</remarks>
3434
internal class StreamingAnalyticsResult : AnalyticsResultBase
3535
{
36-
private bool _hasReadToResult;
36+
private bool _isInitialized;
37+
private bool _hasResultsArray;
3738
private int _enumerated; // 0 = not started, 1 = started (atomic via Interlocked)
3839
private IJsonStreamReader _jsonReader;
3940
private bool _disposed;
@@ -63,7 +64,7 @@ public override IAsyncEnumerator<AnalyticsRow> GetAsyncEnumerator(
6364
private async IAsyncEnumerable<AnalyticsRow> EnumerateRows(
6465
[EnumeratorCancellation] CancellationToken cancellationToken = default)
6566
{
66-
if (!_hasReadToResult)
67+
if (!_isInitialized)
6768
{
6869
throw new InvalidOperationException(
6970
$"{nameof(StreamingAnalyticsResult)} has not been initialized, call InitializeAsync first");
@@ -80,12 +81,17 @@ private async IAsyncEnumerable<AnalyticsRow> EnumerateRows(
8081
"Query results can only be enumerated once. The result stream has already been consumed.");
8182
}
8283

83-
await foreach (var token in _jsonReader.ReadTokensAsync(cancellationToken).ConfigureAwait(false))
84+
// Only try to read tokens if the response contained a "results" array.
85+
// DDL responses (CREATE DATABASE, etc.) don't have results and we should yield nothing.
86+
if (_hasResultsArray)
8487
{
85-
yield return new AnalyticsRow(token);
86-
}
88+
await foreach (var token in _jsonReader.ReadTokensAsync(cancellationToken).ConfigureAwait(false))
89+
{
90+
yield return new AnalyticsRow(token);
91+
}
8792

88-
await ReadResponseAttributes(cancellationToken).ConfigureAwait(false);
93+
await ReadResponseAttributes(cancellationToken).ConfigureAwait(false);
94+
}
8995
}
9096

9197
private async Task ReadResponseAttributes(CancellationToken cancellationToken = default)
@@ -95,9 +101,7 @@ private async Task ReadResponseAttributes(CancellationToken cancellationToken =
95101
throw new InvalidOperationException("_jsonReader is null");
96102
}
97103

98-
MetaData = new QueryMetaData();
99-
100-
_hasReadToResult = false;
104+
MetaData ??= new QueryMetaData();
101105

102106
while (true)
103107
{
@@ -118,19 +122,24 @@ private async Task ReadResponseAttributes(CancellationToken cancellationToken =
118122
MetaData.Metrics = await _jsonReader.ReadObjectAsync<QueryMetrics>(cancellationToken).ConfigureAwait(false);
119123
break;
120124
case "results":
121-
_hasReadToResult = true;
125+
_hasResultsArray = true;
126+
_isInitialized = true;
122127
return;
123128
case "errors":
124129
var errors = await _jsonReader.ReadObjectAsync<QueryError[]>(cancellationToken).ConfigureAwait(false);
125130
Errors = errors ?? Array.Empty<QueryError>();
126131
break;
127-
case "warnings":
128-
case "status":
129-
//Ignore for now
132+
default:
133+
// Skip unknown attributes (signature, plans, status, warnings, etc.)
134+
// by reading and discarding their values
135+
await _jsonReader.ReadTokenAsync(cancellationToken).ConfigureAwait(false);
130136
break;
131137
}
132138
}
133139

140+
// If we've read through the entire response without finding a "results" array,
141+
// mark as initialized anyway. This handles DDL responses that don't return results.
142+
_isInitialized = true;
134143
}
135144

136145
public override void Dispose()

tests/Couchbase.Analytics.UnitTests/Couchbase.Analytics.UnitTests.csproj

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@
3131
<None Update="JsonDocuments\error-23000-response.json">
3232
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
3333
</None>
34-
</ItemGroup>
35-
36-
<ItemGroup>
37-
<None Update="JsonDocuments\analyticsResponse.json">
34+
<None Update="JsonDocuments\ddlResponse.json">
3835
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
3936
</None>
4037
</ItemGroup>

tests/Couchbase.Analytics.UnitTests/Internal/StreamingAnalyticsResultTests.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,57 @@ public async Task DisposeAsync_CalledTwice_DisposesOnlyOnce()
134134
// Assert
135135
ownedResource.Verify(r => r.Dispose(), Times.Once);
136136
}
137+
138+
[Fact]
139+
public async Task StreamingAnalyticsResult_DdlResponse_NoResultsArray_InitializesSuccessfully()
140+
{
141+
// Arrange - DDL responses (CREATE DATABASE, DROP SCOPE, etc.) don't have a "results" array
142+
var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json");
143+
var stream = new MemoryStream(json);
144+
145+
var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer());
146+
147+
// Act
148+
await analyticsResult.InitializeAsync(CancellationToken.None);
149+
150+
// Assert - should initialize without throwing
151+
Assert.NotNull(analyticsResult.MetaData);
152+
Assert.Equal("8c090478-269b-4051-b660-c2a5ae2c4aaa", analyticsResult.MetaData.RequestId);
153+
}
154+
155+
[Fact]
156+
public async Task StreamingAnalyticsResult_DdlResponse_EnumerationReturnsEmpty()
157+
{
158+
// Arrange - DDL responses have no results to enumerate
159+
var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json");
160+
var stream = new MemoryStream(json);
161+
162+
var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer());
163+
await analyticsResult.InitializeAsync(CancellationToken.None);
164+
165+
// Act
166+
var rows = await analyticsResult.ToListAsync(CancellationToken.None);
167+
168+
// Assert - should return empty list, not throw
169+
Assert.NotNull(rows);
170+
Assert.Empty(rows);
171+
}
172+
173+
[Fact]
174+
public async Task StreamingAnalyticsResult_DdlResponse_MetricsAvailable()
175+
{
176+
// Arrange
177+
var json = File.ReadAllBytes("JsonDocuments/ddlResponse.json");
178+
var stream = new MemoryStream(json);
179+
180+
var analyticsResult = new StreamingAnalyticsResult(stream, new StjJsonDeserializer());
181+
await analyticsResult.InitializeAsync(CancellationToken.None);
182+
183+
// Act - enumerate to completion (empty)
184+
await analyticsResult.ToListAsync(CancellationToken.None);
185+
186+
// Assert - metrics should still be available
187+
Assert.NotNull(analyticsResult.MetaData.Metrics);
188+
Assert.Equal(0, analyticsResult.MetaData.Metrics.ResultCount);
189+
}
137190
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"requestID": "8c090478-269b-4051-b660-c2a5ae2c4aaa",
3+
"signature": {
4+
"*": "*"
5+
},
6+
"plans": {},
7+
"status": "success",
8+
"metrics": {
9+
"elapsedTime": "268.643981ms",
10+
"executionTime": "259.520732ms",
11+
"compileTime": "0ns",
12+
"queueWaitTime": "0ns",
13+
"resultCount": 0,
14+
"resultSize": 0,
15+
"processedObjects": 0,
16+
"bufferCacheHitRatio": "0.00%",
17+
"bufferCachePageReadCount": 0
18+
}
19+
}

0 commit comments

Comments
 (0)