Skip to content

Commit bc03cf8

Browse files
committed
Merge branch 'master' into support-flink2.x
2 parents 6e67156 + e10a47c commit bc03cf8

2 files changed

Lines changed: 78 additions & 1 deletion

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,9 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
547547
// get available backend retry
548548
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
549549
putBuilder.setUrl(loadUrl);
550-
putBuilder.setLabel(label + "_" + retry);
550+
if (!enableGroupCommit && label != null) {
551+
putBuilder.setLabel(label + "_" + retry);
552+
}
551553

552554
try {
553555
Thread.sleep(retry * 1000);

flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.doris.flink.sink.writer.LabelGenerator;
2929
import org.apache.doris.flink.sink.writer.LoadConstants;
3030
import org.apache.http.client.methods.CloseableHttpResponse;
31+
import org.apache.http.client.methods.HttpUriRequest;
3132
import org.apache.http.impl.client.CloseableHttpClient;
3233
import org.apache.http.impl.client.HttpClientBuilder;
3334
import org.junit.After;
@@ -38,6 +39,7 @@
3839
import org.junit.Test;
3940
import org.junit.rules.ExpectedException;
4041
import org.junit.runners.MethodSorters;
42+
import org.mockito.ArgumentCaptor;
4143
import org.mockito.MockedStatic;
4244
import org.slf4j.Logger;
4345
import org.slf4j.LoggerFactory;
@@ -50,6 +52,7 @@
5052

5153
import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
5254
import static org.mockito.ArgumentMatchers.any;
55+
import static org.mockito.ArgumentMatchers.anyInt;
5356
import static org.mockito.Mockito.mock;
5457
import static org.mockito.Mockito.mockStatic;
5558
import static org.mockito.Mockito.when;
@@ -175,6 +178,78 @@ public void testLoadError() throws Exception {
175178
loader.checkpointFlush();
176179
}
177180

181+
@Test
182+
public void testGroupCommitRetryShouldNotSetLabel() throws Exception {
183+
LOG.info("testGroupCommitRetryShouldNotSetLabel start");
184+
DorisReadOptions readOptions = DorisReadOptions.builder().build();
185+
Properties streamProperties = new Properties();
186+
streamProperties.setProperty(LoadConstants.GROUP_COMMIT, "sync_mode");
187+
DorisExecutionOptions executionOptions =
188+
DorisExecutionOptions.builder()
189+
.setBufferFlushIntervalMs(1000)
190+
.setMaxRetries(1)
191+
.setStreamLoadProp(streamProperties)
192+
.build();
193+
DorisOptions options =
194+
DorisOptions.builder()
195+
.setFenodes("127.0.0.1:1")
196+
.setBenodes("127.0.0.1:1")
197+
.setTableIdentifier("db.tbl")
198+
.build();
199+
200+
DorisBatchStreamLoad loader =
201+
new DorisBatchStreamLoad(
202+
options,
203+
readOptions,
204+
executionOptions,
205+
new LabelGenerator("label", false),
206+
0);
207+
208+
try {
209+
TestUtil.waitUntilCondition(
210+
() -> loader.isLoadThreadAlive(),
211+
Deadline.fromNow(Duration.ofSeconds(10)),
212+
100L,
213+
"testGroupCommitRetryShouldNotSetLabel wait loader start failed.");
214+
Assert.assertTrue(loader.isLoadThreadAlive());
215+
216+
BackendUtil backendUtil = mock(BackendUtil.class);
217+
HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
218+
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
219+
CloseableHttpResponse failResponse =
220+
HttpTestUtil.getResponse("server error 404", false);
221+
CloseableHttpResponse successResponse =
222+
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true);
223+
ArgumentCaptor<HttpUriRequest> requestCaptor =
224+
ArgumentCaptor.forClass(HttpUriRequest.class);
225+
226+
loader.setBackendUtil(backendUtil);
227+
loader.setHttpClientBuilder(httpClientBuilder);
228+
when(backendUtil.getAvailableBackend(anyInt())).thenReturn("127.0.0.1:1");
229+
when(httpClientBuilder.build()).thenReturn(httpClient);
230+
when(httpClient.execute(requestCaptor.capture()))
231+
.thenReturn(failResponse, successResponse);
232+
233+
loader.writeRecord("db", "tbl", "1,data".getBytes(StandardCharsets.UTF_8));
234+
loader.checkpointFlush();
235+
236+
List<HttpUriRequest> requests = requestCaptor.getAllValues();
237+
Assert.assertEquals(2, requests.size());
238+
Assert.assertNull(requests.get(0).getFirstHeader("label"));
239+
Assert.assertNull(requests.get(1).getFirstHeader("label"));
240+
Assert.assertNotNull(requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT));
241+
Assert.assertNotNull(requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT));
242+
Assert.assertEquals(
243+
"sync_mode",
244+
requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue());
245+
Assert.assertEquals(
246+
"sync_mode",
247+
requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue());
248+
} finally {
249+
loader.close();
250+
}
251+
}
252+
178253
@After
179254
public void after() {
180255
if (backendUtilMockedStatic != null) {

0 commit comments

Comments
 (0)