Skip to content

Commit 3e6e0ab

Browse files
authored
[Fix](batch-stream-load) handle types that do not have line delimiters (#619)
1 parent 65d8c00 commit 3e6e0ab

2 files changed

Lines changed: 52 additions & 1 deletion

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) {
380380
if (buffer.getBuffer().isEmpty()) {
381381
return false;
382382
}
383-
if (!mergeBuffer.getBuffer().isEmpty()) {
383+
if (!mergeBuffer.getBuffer().isEmpty() && mergeBuffer.getLineDelimiter() != null) {
384384
mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
385385
mergeBuffer.setBufferSizeBytes(
386386
mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length);

flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.doris.flink.sink.HttpTestUtil;
2727
import org.apache.doris.flink.sink.TestUtil;
2828
import org.apache.doris.flink.sink.writer.LabelGenerator;
29+
import org.apache.doris.flink.sink.writer.LoadConstants;
2930
import org.apache.http.client.methods.CloseableHttpResponse;
3031
import org.apache.http.impl.client.CloseableHttpClient;
3132
import org.apache.http.impl.client.HttpClientBuilder;
@@ -45,6 +46,7 @@
4546
import java.time.Duration;
4647
import java.util.ArrayList;
4748
import java.util.List;
49+
import java.util.Properties;
4850

4951
import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
5052
import static org.mockito.ArgumentMatchers.any;
@@ -226,4 +228,53 @@ public void mergeBufferTest() {
226228
flag = loader.mergeBuffer(bufferList, buffer);
227229
Assert.assertEquals(false, flag);
228230
}
231+
232+
@Test
233+
public void mergeBufferNullDelimiterTest() {
234+
DorisReadOptions readOptions = DorisReadOptions.builder().build();
235+
Properties streamProperties = new Properties();
236+
streamProperties.setProperty(
237+
LoadConstants.FORMAT_KEY, LoadConstants.ARROW); // this makes lineDelimiter null
238+
DorisExecutionOptions executionOptions =
239+
DorisExecutionOptions.builder().setStreamLoadProp(streamProperties).build();
240+
DorisOptions options =
241+
DorisOptions.builder()
242+
.setFenodes("127.0.0.1:8030")
243+
.setBenodes("127.0.0.1:9030")
244+
.setTableIdentifier("db.tbl")
245+
.build();
246+
247+
DorisBatchStreamLoad loader =
248+
new DorisBatchStreamLoad(
249+
options, readOptions, executionOptions, new LabelGenerator("xx", false), 0);
250+
251+
List<BatchRecordBuffer> bufferList = new ArrayList<>();
252+
BatchRecordBuffer recordBuffer = new BatchRecordBuffer("db", "tbl", null, 0);
253+
recordBuffer.insert("111".getBytes(StandardCharsets.UTF_8));
254+
recordBuffer.setLabelName("label2");
255+
BatchRecordBuffer buffer = new BatchRecordBuffer("db", "tbl", null, 0);
256+
buffer.insert("222".getBytes(StandardCharsets.UTF_8));
257+
buffer.setLabelName("label1");
258+
259+
boolean flag = loader.mergeBuffer(bufferList, buffer);
260+
Assert.assertEquals(false, flag);
261+
262+
bufferList.add(buffer);
263+
bufferList.add(recordBuffer);
264+
flag = loader.mergeBuffer(bufferList, buffer);
265+
Assert.assertEquals(true, flag);
266+
byte[] bytes = mergeByteArrays(buffer.getBuffer());
267+
Assert.assertArrayEquals(bytes, "222111".getBytes(StandardCharsets.UTF_8));
268+
269+
// multi table
270+
bufferList.clear();
271+
bufferList.add(buffer);
272+
BatchRecordBuffer recordBuffer2 =
273+
new BatchRecordBuffer("db", "tbl2", "\n".getBytes(StandardCharsets.UTF_8), 0);
274+
recordBuffer2.insert("333".getBytes(StandardCharsets.UTF_8));
275+
recordBuffer2.setLabelName("label3");
276+
bufferList.add(recordBuffer2);
277+
flag = loader.mergeBuffer(bufferList, buffer);
278+
Assert.assertEquals(false, flag);
279+
}
229280
}

0 commit comments

Comments
 (0)