Skip to content

Commit b1cf255

Browse files
fix(csharp): prevent Power BI hang when warehouse stops mid-query (PECO-2971)
Root cause: When the CloudFetch fetcher encounters an error (e.g. warehouse stopped), the downloader's download loop exits but never called CompleteAdding() on the result queue. This caused _resultQueue.Take() to block forever, deadlocking PBI's Mashup container thread. Changes: - Validate Thrift response status in ThriftResultFetcher, throw HiveServer2Exception on non-SUCCESS status codes - Always call CompleteAdding() on result queue when download loop exits in CloudFetchDownloader (both error and happy paths) - Add AdbcException throw in CloudFetchDownloadManager when downloader returns null but fetcher has error (PBI cannot handle AggregateException) - Enhanced tracing for error propagation and completion paths - Added tests for status validation and error propagation Co-authored-by: Isaac
1 parent 7c2e308 commit b1cf255

File tree

7 files changed

+361
-11
lines changed

7 files changed

+361
-11
lines changed

csharp/src/Reader/CloudFetch/CloudFetchDownloadManager.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323

2424
using System;
2525
using System.Collections.Concurrent;
26+
using System.Diagnostics;
2627
using System.Threading;
2728
using System.Threading.Tasks;
29+
using Apache.Arrow.Adbc;
30+
using Apache.Arrow.Adbc.Tracing;
2831

2932
namespace AdbcDrivers.Databricks.Reader.CloudFetch
3033
{
@@ -84,14 +87,36 @@ public CloudFetchDownloadManager(
8487
throw new InvalidOperationException("Download manager has not been started.");
8588
}
8689

90+
IDownloadResult? result;
8791
try
8892
{
89-
return await _downloader.GetNextDownloadedFileAsync(cancellationToken).ConfigureAwait(false);
93+
result = await _downloader.GetNextDownloadedFileAsync(cancellationToken).ConfigureAwait(false);
94+
Activity.Current?.AddEvent("cloudfetch.download_manager_result", [
95+
new("result_is_null", result == null),
96+
new("fetcher_has_error", _resultFetcher.HasError),
97+
new("fetcher_error_message", _resultFetcher.Error?.Message ?? "(none)"),
98+
new("fetcher_is_completed", _resultFetcher.IsCompleted),
99+
new("downloader_is_completed", _downloader.IsCompleted)
100+
]);
90101
}
91102
catch (Exception ex) when (_resultFetcher.HasError)
92103
{
93104
throw new AggregateException("Errors in download pipeline", new[] { ex, _resultFetcher.Error! });
94105
}
106+
107+
// If the downloader returned null (end of results) but the fetcher
108+
// has a stored error, the fetcher failed mid-stream and the error
109+
// was not propagated through the download queue. Surface it now
110+
// instead of silently returning partial data.
111+
if (result == null && _resultFetcher.HasError)
112+
{
113+
throw new AdbcException(
114+
$"Query execution failed: {_resultFetcher.Error!.Message}",
115+
AdbcStatusCode.IOError,
116+
_resultFetcher.Error!);
117+
}
118+
119+
return result;
95120
}
96121

97122
/// <inheritdoc />

csharp/src/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,24 @@ await _activityTracer.TraceActivityAsync(async activity =>
437437
new("total_time_sec", overallStopwatch.ElapsedMilliseconds / 1000.0)
438438
]);
439439

440-
// If there's an error, add the error to the result queue
440+
// Always mark the result queue as complete when the download
441+
// loop exits. Without this, a subsequent Take() call would
442+
// block forever on an empty, non-completed queue if the caller
443+
// retries after an exception (e.g. a fetcher error that the
444+
// downloader doesn't know about).
441445
if (HasError)
442446
{
443447
CompleteWithError(activity);
444448
}
449+
else
450+
{
451+
_isCompleted = true;
452+
try { _resultQueue.CompleteAdding(); }
453+
catch (InvalidOperationException ex)
454+
{
455+
activity?.AddException(ex, [new("error.context", "cloudfetch.result_queue_already_completed")]);
456+
}
457+
}
445458
}
446459
});
447460
}
@@ -671,18 +684,15 @@ private void SetError(Exception ex, Activity? activity = null)
671684

672685
private void CompleteWithError(Activity? activity = null)
673686
{
687+
// Mark the download as completed with error
688+
_isCompleted = true;
689+
674690
try
675691
{
676692
// Mark the result queue as completed to prevent further additions
677693
_resultQueue.CompleteAdding();
678-
679-
// Mark the download as completed with error
680-
_isCompleted = true;
681-
}
682-
catch (Exception ex)
683-
{
684-
activity?.AddException(ex, [new("error.context", "cloudfetch.complete_with_error_failed")]);
685694
}
695+
catch (InvalidOperationException) { /* already completed */ }
686696
}
687697

688698
// Helper method to sanitize URLs for logging (to avoid exposing sensitive information)

csharp/src/Reader/CloudFetch/CloudFetchReader.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,11 @@ public CloudFetchReader(
153153
this.currentDownloadResult = await this.downloadManager.GetNextDownloadedFileAsync(cancellationToken);
154154
if (this.currentDownloadResult == null)
155155
{
156-
Activity.Current?.AddEvent("cloudfetch.reader_no_more_files");
156+
Activity.Current?.AddEvent("cloudfetch.reader_no_more_files", [
157+
new("rows_read_so_far", _rowsRead),
158+
new("total_expected_rows", _totalExpectedRows),
159+
new("current_chunk_rows_read", _currentChunkRowsRead)
160+
]);
157161
this.downloadManager.Dispose();
158162
this.downloadManager = null;
159163
// No more files

csharp/src/Reader/CloudFetch/CloudFetchResultFetcher.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,19 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken)
204204
{
205205
Activity.Current?.AddEvent("cloudfetch.fetcher_unhandled_error", [
206206
new("error_message", ex.Message),
207-
new("error_type", ex.GetType().Name)
207+
new("error_type", ex.GetType().Name),
208+
new("error_stack", ex.StackTrace ?? "(null)")
208209
]);
209210
_error = ex;
210211
_hasMoreResults = false;
211212
}
212213
finally
213214
{
215+
Activity.Current?.AddEvent("cloudfetch.fetcher_completing", [
216+
new("has_error", _error != null),
217+
new("error_message", _error?.Message ?? "(none)"),
218+
new("error_type", _error?.GetType().Name ?? "(none)")
219+
]);
214220
// Always add the end of results guard to signal completion to the downloader.
215221
// Use Add with cancellation token to exit promptly when cancelled.
216222
try

csharp/src/Reader/CloudFetch/ThriftResultFetcher.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,36 @@ private async Task FetchNextResultBatchAsync(long? offset, CancellationToken can
174174
throw;
175175
}
176176

177+
// Validate the response status before processing results.
178+
// Without this check, a non-SUCCESS response (e.g., warehouse stopped mid-query)
179+
// with no result links would be silently treated as end-of-results,
180+
// causing partial data to be returned to callers like Power BI.
181+
Activity.Current?.AddEvent("cloudfetch.fetch_response_status", [
182+
new("status_code", response.Status.StatusCode.ToString()),
183+
new("has_result_links", response.Results?.__isset.resultLinks == true),
184+
new("result_links_count", response.Results?.ResultLinks?.Count ?? 0),
185+
new("has_more_rows", response.HasMoreRows),
186+
new("error_message", response.Status.ErrorMessage ?? "(null)"),
187+
new("start_offset", startOffset)
188+
]);
189+
if (response.Status.StatusCode != TStatusCode.SUCCESS_STATUS &&
190+
response.Status.StatusCode != TStatusCode.SUCCESS_WITH_INFO_STATUS)
191+
{
192+
Activity.Current?.AddEvent("cloudfetch.fetch_error_status_detected", [
193+
new("status_code", response.Status.StatusCode.ToString()),
194+
new("error_message", response.Status.ErrorMessage ?? "(null)"),
195+
new("error_code", response.Status.ErrorCode),
196+
new("sql_state", response.Status.SqlState ?? "(null)")
197+
]);
198+
_hasMoreResults = false;
199+
var errorMessage = !string.IsNullOrWhiteSpace(response.Status.ErrorMessage)
200+
? response.Status.ErrorMessage
201+
: $"Thrift server error: {response.Status.StatusCode} (ErrorCode={response.Status.ErrorCode}, SqlState={response.Status.SqlState ?? "null"})";
202+
throw new DatabricksException(errorMessage)
203+
.SetNativeError(response.Status.ErrorCode)
204+
.SetSqlState(response.Status.SqlState);
205+
}
206+
177207
// Check if we have URL-based results
178208
if (response.Results.__isset.resultLinks &&
179209
response.Results.ResultLinks != null &&

csharp/test/E2E/CloudFetch/CloudFetchResultFetcherTest.cs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
using System.Linq;
2828
using System.Threading;
2929
using System.Threading.Tasks;
30+
using AdbcDrivers.Databricks;
3031
using AdbcDrivers.Databricks.Reader.CloudFetch;
3132
using AdbcDrivers.HiveServer2.Hive2;
3233
using Apache.Hive.Service.Rpc.Thrift;
@@ -335,6 +336,93 @@ public async Task FetchResultsAsync_WithServerError_SetsErrorState()
335336
await _resultFetcher.StopAsync();
336337
}
337338

339+
[Fact]
340+
public async Task FetchResultsAsync_WithErrorStatusCode_SetsErrorState()
341+
{
342+
// Arrange - simulate a warehouse stopping mid-query by returning
343+
// a response with ERROR_STATUS and no result links
344+
var errorResults = new TRowSet { __isset = { resultLinks = false } };
345+
var errorResponse = new TFetchResultsResp
346+
{
347+
Status = new TStatus
348+
{
349+
StatusCode = TStatusCode.ERROR_STATUS,
350+
ErrorMessage = "Query failed: warehouse stopped",
351+
ErrorCode = 500,
352+
SqlState = "HY000"
353+
},
354+
HasMoreRows = false,
355+
Results = errorResults,
356+
__isset = { results = true, hasMoreRows = true }
357+
};
358+
359+
_mockClient.Reset();
360+
_mockClient.Setup(c => c.FetchResults(It.IsAny<TFetchResultsReq>(), It.IsAny<CancellationToken>()))
361+
.ReturnsAsync(errorResponse);
362+
363+
// Act
364+
await _resultFetcher.StartAsync(CancellationToken.None);
365+
366+
// Wait for the fetcher to process the error
367+
await Task.Delay(200);
368+
369+
// Assert - the fetcher should report an error, not silently succeed
370+
Assert.False(_resultFetcher.HasMoreResults);
371+
Assert.True(_resultFetcher.IsCompleted);
372+
Assert.True(_resultFetcher.HasError, "Fetcher should have error state when server returns ERROR_STATUS");
373+
Assert.NotNull(_resultFetcher.Error);
374+
Assert.IsType<DatabricksException>(_resultFetcher.Error);
375+
Assert.Contains("warehouse stopped", _resultFetcher.Error.Message);
376+
377+
// Cleanup
378+
await _resultFetcher.StopAsync();
379+
}
380+
381+
[Fact]
382+
public async Task FetchResultsAsync_WithErrorAfterFirstBatch_SetsErrorState()
383+
{
384+
// Arrange - first batch succeeds, second batch returns error (warehouse stopped)
385+
var firstBatchLinks = new List<TSparkArrowResultLink>
386+
{
387+
CreateTestResultLink(0, 100, "http://test.com/file1", 3600),
388+
CreateTestResultLink(100, 100, "http://test.com/file2", 3600)
389+
};
390+
391+
var errorResults = new TRowSet { __isset = { resultLinks = false } };
392+
var errorResponse = new TFetchResultsResp
393+
{
394+
Status = new TStatus
395+
{
396+
StatusCode = TStatusCode.ERROR_STATUS,
397+
ErrorMessage = "Query failed: warehouse stopped",
398+
ErrorCode = 500,
399+
SqlState = "HY000"
400+
},
401+
HasMoreRows = false,
402+
Results = errorResults,
403+
__isset = { results = true, hasMoreRows = true }
404+
};
405+
406+
_mockClient.SetupSequence(c => c.FetchResults(It.IsAny<TFetchResultsReq>(), It.IsAny<CancellationToken>()))
407+
.ReturnsAsync(CreateFetchResultsResponse(firstBatchLinks, true)) // First batch: success, more rows
408+
.ReturnsAsync(errorResponse); // Second batch: error (warehouse stopped)
409+
410+
// Act
411+
await _resultFetcher.StartAsync(CancellationToken.None);
412+
413+
// Wait for the fetcher to process
414+
await Task.Delay(300);
415+
416+
// Assert - should have error state even though first batch succeeded
417+
Assert.True(_resultFetcher.IsCompleted);
418+
Assert.True(_resultFetcher.HasError, "Fetcher should have error state when second batch returns ERROR_STATUS");
419+
Assert.NotNull(_resultFetcher.Error);
420+
Assert.IsType<DatabricksException>(_resultFetcher.Error);
421+
422+
// Cleanup
423+
await _resultFetcher.StopAsync();
424+
}
425+
338426
[Fact]
339427
public async Task StopAsync_CancelsFetching()
340428
{

0 commit comments

Comments
 (0)