Skip to content

Commit 64de75b

Browse files
committed
chore: apply suggestions
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
1 parent 2c4ef3f commit 64de75b

3 files changed

Lines changed: 56 additions & 41 deletions

File tree

src/GreptimeDB.Ingester/Client/BulkWriter.cs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public sealed partial class BulkWriter : IBulkWriter
2828
private Task? _recvTask;
2929
private uint _serverAffectedRows;
3030
private volatile Exception? _recvError;
31+
private readonly CancellationTokenSource _cts = new();
3132
private int _completed;
3233
private int _disposed;
3334

@@ -54,6 +55,11 @@ public async ValueTask WriteAsync(Table.Table table, CancellationToken cancellat
5455

5556
if (_recvError != null)
5657
{
58+
if (_recvError is GreptimeException)
59+
{
60+
throw _recvError;
61+
}
62+
5763
throw new GreptimeException($"Stream already failed: {_recvError.Message}", _recvError);
5864
}
5965

@@ -102,11 +108,16 @@ public async ValueTask<uint> CompleteAsync(CancellationToken cancellationToken =
102108

103109
if (_recvTask != null)
104110
{
105-
await _recvTask.ConfigureAwait(false);
111+
await _recvTask.WaitAsync(cancellationToken).ConfigureAwait(false);
106112
}
107113

108114
if (_recvError != null)
109115
{
116+
if (_recvError is GreptimeException)
117+
{
118+
throw _recvError;
119+
}
120+
110121
throw new GreptimeException($"Bulk write failed: {_recvError.Message}", _recvError);
111122
}
112123

@@ -132,6 +143,12 @@ public async ValueTask DisposeAsync()
132143
return;
133144
}
134145

146+
#if NET8_0_OR_GREATER
147+
await _cts.CancelAsync().ConfigureAwait(false);
148+
#else
149+
_cts.Cancel();
150+
#endif
151+
135152
_putCall?.Dispose();
136153
_recordBatchBuilder.Dispose();
137154

@@ -141,12 +158,14 @@ public async ValueTask DisposeAsync()
141158
{
142159
await _recvTask.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
143160
}
144-
catch
161+
catch (Exception ex)
145162
{
146-
// Ignore exceptions during disposal
163+
LogBulkWriteError(_logger, ex.Message);
147164
}
148165
}
149166

167+
_cts.Dispose();
168+
150169
LogBulkWriterDisposed(_logger);
151170
}
152171

@@ -169,16 +188,28 @@ private async Task InitializeStreamAsync(
169188
_putCall = await _flightClient.StartPut(descriptor, schema, headers, deadline: null, cancellationToken)
170189
.ConfigureAwait(false);
171190

172-
_recvTask = DrainResponsesAsync(_putCall.ResponseStream);
191+
_recvTask = RunRecvLoopAsync(_putCall.ResponseStream);
173192

174193
LogStreamInitialized(_logger, tableName);
175194
}
176195

177-
internal async Task DrainResponsesAsync(IAsyncStreamReader<FlightPutResult> responseStream)
196+
private async Task RunRecvLoopAsync(IAsyncStreamReader<FlightPutResult> responseStream)
197+
{
198+
var (affectedRows, error) = await DrainResponsesAsync(responseStream, _cts.Token).ConfigureAwait(false);
199+
_serverAffectedRows = affectedRows;
200+
_recvError = error;
201+
}
202+
203+
internal static async Task<(uint AffectedRows, Exception? Error)> DrainResponsesAsync(
204+
IAsyncStreamReader<FlightPutResult> responseStream,
205+
CancellationToken cancellationToken = default)
178206
{
207+
uint affectedRows = 0;
208+
Exception? error = null;
209+
179210
try
180211
{
181-
while (await responseStream.MoveNext(CancellationToken.None).ConfigureAwait(false))
212+
while (await responseStream.MoveNext(cancellationToken).ConfigureAwait(false))
182213
{
183214
var result = responseStream.Current;
184215
if (result.ApplicationMetadata == null || result.ApplicationMetadata.IsEmpty)
@@ -191,24 +222,30 @@ internal async Task DrainResponsesAsync(IAsyncStreamReader<FlightPutResult> resp
191222
var resp = JsonSerializer.Deserialize<DoPutResponse>(result.ApplicationMetadata.Span);
192223
if (resp != null)
193224
{
194-
_serverAffectedRows += resp.AffectedRows;
225+
affectedRows += resp.AffectedRows;
195226
}
196227
}
197228
catch (JsonException ex)
198229
{
199-
_recvError ??= new GreptimeException(
230+
error ??= new GreptimeException(
200231
$"Failed to deserialize PutResult metadata: {ex.Message}", ex);
201232
}
202233
}
203234
}
235+
catch (OperationCanceledException ex)
236+
{
237+
error ??= ex;
238+
}
204239
catch (RpcException ex)
205240
{
206-
_recvError ??= ex;
241+
error ??= ex;
207242
}
208-
catch (ObjectDisposedException)
243+
catch (ObjectDisposedException ex)
209244
{
210-
// Stream disposed during shutdown, expected
245+
error ??= ex;
211246
}
247+
248+
return (affectedRows, error);
212249
}
213250

214251
private void ThrowIfDisposed()

src/GreptimeDB.Ingester/Client/StreamIngestWriter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ public async ValueTask DisposeAsync()
150150
{
151151
await _sendTask.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
152152
}
153-
catch
153+
catch (Exception ex)
154154
{
155-
// Ignore exceptions during disposal
155+
LogStreamError(_logger, ex.Message);
156156
}
157157

158158
_cts.Dispose();
Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using Apache.Arrow.Flight;
22
using FluentAssertions;
3-
using Google.Protobuf;
43
using GreptimeDB.Ingester.Client;
4+
using GreptimeDB.Ingester.Exceptions;
55
using Grpc.Core;
66
using Moq;
77
using Xunit;
@@ -21,12 +21,11 @@ public async Task DrainResponsesAsync_AccumulatesAffectedRows()
2121
};
2222

2323
var mockStream = CreateMockStream(responses);
24-
var writer = CreateWriter();
2524

26-
await writer.DrainResponsesAsync(mockStream.Object);
25+
var (affectedRows, error) = await BulkWriter.DrainResponsesAsync(mockStream.Object);
2726

28-
var rows = GetServerAffectedRows(writer);
29-
rows.Should().Be(40);
27+
affectedRows.Should().Be(40);
28+
error.Should().BeNull();
3029
}
3130

3231
[Fact]
@@ -36,19 +35,12 @@ public async Task DrainResponsesAsync_CapturesRpcException()
3635
mockStream.Setup(s => s.MoveNext(It.IsAny<CancellationToken>()))
3736
.ThrowsAsync(new RpcException(new Status(StatusCode.Internal, "server error")));
3837

39-
var writer = CreateWriter();
38+
var (affectedRows, error) = await BulkWriter.DrainResponsesAsync(mockStream.Object);
4039

41-
await writer.DrainResponsesAsync(mockStream.Object);
42-
43-
var error = GetRecvError(writer);
40+
affectedRows.Should().Be(0);
4441
error.Should().BeOfType<RpcException>();
4542
}
4643

47-
private static BulkWriter CreateWriter()
48-
{
49-
return new BulkWriter(null!, "test-db", null);
50-
}
51-
5244
private static Mock<IAsyncStreamReader<FlightPutResult>> CreateMockStream(FlightPutResult[] responses)
5345
{
5446
var mock = new Mock<IAsyncStreamReader<FlightPutResult>>();
@@ -66,18 +58,4 @@ private static Mock<IAsyncStreamReader<FlightPutResult>> CreateMockStream(Flight
6658

6759
return mock;
6860
}
69-
70-
private static uint GetServerAffectedRows(BulkWriter writer)
71-
{
72-
var field = typeof(BulkWriter).GetField("_serverAffectedRows",
73-
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
74-
return (uint)field!.GetValue(writer)!;
75-
}
76-
77-
private static Exception? GetRecvError(BulkWriter writer)
78-
{
79-
var field = typeof(BulkWriter).GetField("_recvError",
80-
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
81-
return (Exception?)field!.GetValue(writer);
82-
}
8361
}

0 commit comments

Comments
 (0)