Skip to content

Commit 007a994

Browse files
authored
[Improve] Improve doris stream load client side error message (apache#6688)
1 parent 584c898 commit 007a994

File tree

6 files changed

+83
-52
lines changed

6 files changed

+83
-52
lines changed

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java

+5-28
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,13 @@ public class DorisSinkWriter
5858
new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));
5959
private long lastCheckpointId;
6060
private DorisStreamLoad dorisStreamLoad;
61-
volatile boolean loading;
6261
private final DorisConfig dorisConfig;
6362
private final String labelPrefix;
6463
private final LabelGenerator labelGenerator;
6564
private final int intervalTime;
6665
private final DorisSerializer serializer;
6766
private final CatalogTable catalogTable;
6867
private final ScheduledExecutorService scheduledExecutorService;
69-
private Thread executorThread;
7068
private volatile Exception loadException = null;
7169

7270
public DorisSinkWriter(
@@ -94,7 +92,6 @@ public DorisSinkWriter(
9492
1, new ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
9593
this.serializer = createSerializer(dorisConfig, catalogTable.getSeaTunnelRowType());
9694
this.intervalTime = dorisConfig.getCheckInterval();
97-
this.loading = false;
9895
this.initializeLoad();
9996
}
10097

@@ -123,7 +120,7 @@ private void initializeLoad() {
123120

124121
@Override
125122
public void write(SeaTunnelRow element) throws IOException {
126-
checkLoadExceptionAndResetThread();
123+
checkLoadException();
127124
byte[] serialize =
128125
serializer.serialize(
129126
dorisConfig.isNeedsUnsupportedTypeCasting()
@@ -154,7 +151,6 @@ public Optional<DorisCommitInfo> prepareCommit() throws IOException {
154151

155152
private RespContent flush() throws IOException {
156153
// disable exception checker before stop load.
157-
loading = false;
158154
checkState(dorisStreamLoad != null);
159155
RespContent respContent = dorisStreamLoad.stopLoad();
160156
if (respContent != null && !DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
@@ -177,7 +173,6 @@ public List<DorisSinkState> snapshotState(long checkpointId) {
177173

178174
private void startLoad(String label) {
179175
this.dorisStreamLoad.startLoad(label);
180-
this.loading = true;
181176
}
182177

183178
@Override
@@ -194,37 +189,19 @@ public void abortPrepare() {
194189
private void checkDone() {
195190
// the load future is done and checked in prepareCommit().
196191
// this will check error while loading.
192+
String errorMsg;
197193
log.debug("start timer checker, interval {} ms", intervalTime);
198-
if (dorisStreamLoad.getPendingLoadFuture() != null
199-
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
200-
if (!loading) {
201-
log.debug("not loading, skip timer checker");
202-
return;
203-
}
204-
String errorMsg;
205-
try {
206-
RespContent content =
207-
dorisStreamLoad.handlePreCommitResponse(
208-
dorisStreamLoad.getPendingLoadFuture().get());
209-
errorMsg = content.getMessage();
210-
} catch (Exception e) {
211-
errorMsg = e.getMessage();
212-
}
213-
194+
if ((errorMsg = dorisStreamLoad.getLoadFailedMsg()) != null) {
195+
log.error("stream load finished unexpectedly: {}", errorMsg);
214196
loadException =
215197
new DorisConnectorException(
216198
DorisConnectorErrorCode.STREAM_LOAD_FAILED, errorMsg);
217-
log.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
218-
// set the executor thread interrupted in case blocking in write data.
219-
executorThread.interrupt();
220199
}
221200
}
222201

223-
private void checkLoadExceptionAndResetThread() {
202+
private void checkLoadException() {
224203
if (loadException != null) {
225204
throw new RuntimeException("error while loading data.", loadException);
226-
} else {
227-
executorThread = Thread.currentThread();
228205
}
229206
}
230207

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public class DorisStreamLoad implements Serializable {
7878
private final CloseableHttpClient httpClient;
7979
private final ExecutorService executorService;
8080
private volatile boolean loadBatchFirstRecord;
81+
private volatile boolean loading = false;
8182
private String label;
8283
private long recordCount = 0;
8384

@@ -199,7 +200,25 @@ public long getRecordCount() {
199200
return recordCount;
200201
}
201202

202-
public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception {
203+
public String getLoadFailedMsg() {
204+
if (!loading) {
205+
return null;
206+
}
207+
if (this.getPendingLoadFuture() != null && this.getPendingLoadFuture().isDone()) {
208+
String errorMessage;
209+
try {
210+
errorMessage = handlePreCommitResponse(pendingLoadFuture.get()).getMessage();
211+
} catch (Exception e) {
212+
errorMessage = e.getMessage();
213+
}
214+
recordStream.setErrorMessageByStreamLoad(errorMessage);
215+
return errorMessage;
216+
} else {
217+
return null;
218+
}
219+
}
220+
221+
private RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception {
203222
final int statusCode = response.getStatusLine().getStatusCode();
204223
if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity() != null) {
205224
String loadResult = EntityUtils.toString(response.getEntity());
@@ -211,6 +230,7 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw
211230
}
212231

213232
public RespContent stopLoad() throws IOException {
233+
loading = false;
214234
if (pendingLoadFuture != null) {
215235
log.info("stream load stopped.");
216236
recordStream.endInput();
@@ -230,6 +250,7 @@ public void startLoad(String label) {
230250
loadBatchFirstRecord = true;
231251
recordCount = 0;
232252
this.label = label;
253+
this.loading = true;
233254
}
234255

235256
private void startStreamLoad() {

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java

+35-21
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.seatunnel.connectors.doris.sink.writer;
1919

20+
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
21+
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
22+
23+
import lombok.Setter;
2024
import lombok.extern.slf4j.Slf4j;
2125

2226
import java.io.IOException;
@@ -25,18 +29,21 @@
2529
import java.util.concurrent.ArrayBlockingQueue;
2630
import java.util.concurrent.BlockingQueue;
2731
import java.util.concurrent.LinkedBlockingDeque;
32+
import java.util.concurrent.TimeUnit;
2833

2934
import static com.google.common.base.Preconditions.checkState;
3035

3136
/** Channel of record stream and HTTP data stream. */
3237
@Slf4j
3338
public class RecordBuffer {
34-
BlockingQueue<ByteBuffer> writeQueue;
35-
BlockingQueue<ByteBuffer> readQueue;
36-
int bufferCapacity;
37-
int queueSize;
38-
ByteBuffer currentWriteBuffer;
39-
ByteBuffer currentReadBuffer;
39+
private final BlockingQueue<ByteBuffer> writeQueue;
40+
private final BlockingQueue<ByteBuffer> readQueue;
41+
private final int bufferCapacity;
42+
private final int queueSize;
43+
private ByteBuffer currentWriteBuffer;
44+
private ByteBuffer currentReadBuffer;
45+
// used to check stream load error by stream load thread
46+
@Setter private volatile String errorMessageByStreamLoad;
4047

4148
public RecordBuffer(int capacity, int queueSize) {
4249
log.info("init RecordBuffer capacity {}, count {}", capacity, queueSize);
@@ -76,7 +83,11 @@ public void stopBufferData() throws IOException {
7683
currentWriteBuffer = null;
7784
}
7885
if (!isEmpty) {
79-
ByteBuffer byteBuffer = writeQueue.take();
86+
ByteBuffer byteBuffer = null;
87+
while (byteBuffer == null) {
88+
checkErrorMessageByStreamLoad();
89+
byteBuffer = writeQueue.poll(100, TimeUnit.MILLISECONDS);
90+
}
8091
((Buffer) byteBuffer).flip();
8192
checkState(byteBuffer.limit() == 0);
8293
readQueue.put(byteBuffer);
@@ -89,8 +100,9 @@ public void stopBufferData() throws IOException {
89100
public void write(byte[] buf) throws InterruptedException {
90101
int wPos = 0;
91102
do {
92-
if (currentWriteBuffer == null) {
93-
currentWriteBuffer = writeQueue.take();
103+
while (currentWriteBuffer == null) {
104+
checkErrorMessageByStreamLoad();
105+
currentWriteBuffer = writeQueue.poll(100, TimeUnit.MILLISECONDS);
94106
}
95107
int available = currentWriteBuffer.remaining();
96108
int nWrite = Math.min(available, buf.length - wPos);
@@ -105,14 +117,15 @@ public void write(byte[] buf) throws InterruptedException {
105117
}
106118

107119
public int read(byte[] buf) throws InterruptedException {
108-
if (currentReadBuffer == null) {
109-
currentReadBuffer = readQueue.take();
120+
while (currentReadBuffer == null) {
121+
checkErrorMessageByStreamLoad();
122+
currentReadBuffer = readQueue.poll(100, TimeUnit.MILLISECONDS);
110123
}
111124
// add empty buffer as end flag
112125
if (currentReadBuffer.limit() == 0) {
113126
recycleBuffer(currentReadBuffer);
114127
currentReadBuffer = null;
115-
checkState(readQueue.size() == 0);
128+
checkState(readQueue.isEmpty());
116129
return -1;
117130
}
118131
int available = currentReadBuffer.remaining();
@@ -125,16 +138,17 @@ public int read(byte[] buf) throws InterruptedException {
125138
return nRead;
126139
}
127140

128-
private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
129-
((Buffer) buffer).clear();
130-
writeQueue.put(buffer);
131-
}
132-
133-
public int getWriteQueueSize() {
134-
return writeQueue.size();
141+
private void checkErrorMessageByStreamLoad() {
142+
if (errorMessageByStreamLoad != null) {
143+
throw new DorisConnectorException(
144+
DorisConnectorErrorCode.STREAM_LOAD_FAILED, errorMessageByStreamLoad);
145+
}
135146
}
136147

137-
public int getReadQueueSize() {
138-
return readQueue.size();
148+
private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
149+
((Buffer) buffer).clear();
150+
while (!writeQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) {
151+
checkErrorMessageByStreamLoad();
152+
}
139153
}
140154
}

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java

+4
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ public void write(byte[] buff) throws IOException {
5757
throw new RuntimeException(e);
5858
}
5959
}
60+
61+
public void setErrorMessageByStreamLoad(String errorMessageByStreamLoad) {
62+
recordBuffer.setErrorMessageByStreamLoad(errorMessageByStreamLoad);
63+
}
6064
}

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java

+11
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ public void testDoris(TestContainer container) throws InterruptedException, Exec
7979
Thread.sleep(10 * 1000);
8080
super.container.stop();
8181
Assertions.assertNotEquals(0, future.get().getExitCode());
82+
Assertions.assertTrue(
83+
future.get()
84+
.getStderr()
85+
.contains(
86+
"Caused by: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]"));
87+
Assertions.assertTrue(
88+
future.get()
89+
.getStderr()
90+
.contains(
91+
"at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.checkErrorMessageByStreamLoad"));
92+
log.info("doris error log: \n" + future.get().getStderr());
8293
super.container.start();
8394
// wait for the container to restart
8495
given().ignoreExceptions()

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf

+6-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
env{
1919
parallelism = 1
2020
job.mode = "BATCH"
21+
job.retry.times = 0
2122
}
2223

2324
source{
2425
FakeSource {
25-
row.num = 100
26+
row.num = 1000
2627
split.num = 10
27-
split.read-interval = 10000
2828
string.length = 1
2929
schema = {
3030
fields {
@@ -58,6 +58,10 @@ sink{
5858
password = ""
5959
table.identifier = "e2e_sink.doris_e2e_table"
6060
sink.enable-2pc = "true"
61+
// stuck in get RecordBuffer
62+
sink.buffer-size = 2
63+
sink.buffer-count = 2
64+
6165
sink.label-prefix = "test_json"
6266
doris.config = {
6367
format="json"

0 commit comments

Comments
 (0)