|
28 | 28 | import org.apache.doris.flink.sink.writer.LabelGenerator; |
29 | 29 | import org.apache.doris.flink.sink.writer.LoadConstants; |
30 | 30 | import org.apache.http.client.methods.CloseableHttpResponse; |
| 31 | +import org.apache.http.client.methods.HttpUriRequest; |
31 | 32 | import org.apache.http.impl.client.CloseableHttpClient; |
32 | 33 | import org.apache.http.impl.client.HttpClientBuilder; |
33 | 34 | import org.junit.After; |
|
38 | 39 | import org.junit.Test; |
39 | 40 | import org.junit.rules.ExpectedException; |
40 | 41 | import org.junit.runners.MethodSorters; |
| 42 | +import org.mockito.ArgumentCaptor; |
41 | 43 | import org.mockito.MockedStatic; |
42 | 44 | import org.slf4j.Logger; |
43 | 45 | import org.slf4j.LoggerFactory; |
|
50 | 52 |
|
51 | 53 | import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays; |
52 | 54 | import static org.mockito.ArgumentMatchers.any; |
| 55 | +import static org.mockito.ArgumentMatchers.anyInt; |
53 | 56 | import static org.mockito.Mockito.mock; |
54 | 57 | import static org.mockito.Mockito.mockStatic; |
55 | 58 | import static org.mockito.Mockito.when; |
@@ -175,6 +178,78 @@ public void testLoadError() throws Exception { |
175 | 178 | loader.checkpointFlush(); |
176 | 179 | } |
177 | 180 |
|
| 181 | + @Test |
| 182 | + public void testGroupCommitRetryShouldNotSetLabel() throws Exception { |
| 183 | + LOG.info("testGroupCommitRetryShouldNotSetLabel start"); |
| 184 | + DorisReadOptions readOptions = DorisReadOptions.builder().build(); |
| 185 | + Properties streamProperties = new Properties(); |
| 186 | + streamProperties.setProperty(LoadConstants.GROUP_COMMIT, "sync_mode"); |
| 187 | + DorisExecutionOptions executionOptions = |
| 188 | + DorisExecutionOptions.builder() |
| 189 | + .setBufferFlushIntervalMs(1000) |
| 190 | + .setMaxRetries(1) |
| 191 | + .setStreamLoadProp(streamProperties) |
| 192 | + .build(); |
| 193 | + DorisOptions options = |
| 194 | + DorisOptions.builder() |
| 195 | + .setFenodes("127.0.0.1:1") |
| 196 | + .setBenodes("127.0.0.1:1") |
| 197 | + .setTableIdentifier("db.tbl") |
| 198 | + .build(); |
| 199 | + |
| 200 | + DorisBatchStreamLoad loader = |
| 201 | + new DorisBatchStreamLoad( |
| 202 | + options, |
| 203 | + readOptions, |
| 204 | + executionOptions, |
| 205 | + new LabelGenerator("label", false), |
| 206 | + 0); |
| 207 | + |
| 208 | + try { |
| 209 | + TestUtil.waitUntilCondition( |
| 210 | + () -> loader.isLoadThreadAlive(), |
| 211 | + Deadline.fromNow(Duration.ofSeconds(10)), |
| 212 | + 100L, |
| 213 | + "testGroupCommitRetryShouldNotSetLabel wait loader start failed."); |
| 214 | + Assert.assertTrue(loader.isLoadThreadAlive()); |
| 215 | + |
| 216 | + BackendUtil backendUtil = mock(BackendUtil.class); |
| 217 | + HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class); |
| 218 | + CloseableHttpClient httpClient = mock(CloseableHttpClient.class); |
| 219 | + CloseableHttpResponse failResponse = |
| 220 | + HttpTestUtil.getResponse("server error 404", false); |
| 221 | + CloseableHttpResponse successResponse = |
| 222 | + HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true); |
| 223 | + ArgumentCaptor<HttpUriRequest> requestCaptor = |
| 224 | + ArgumentCaptor.forClass(HttpUriRequest.class); |
| 225 | + |
| 226 | + loader.setBackendUtil(backendUtil); |
| 227 | + loader.setHttpClientBuilder(httpClientBuilder); |
| 228 | + when(backendUtil.getAvailableBackend(anyInt())).thenReturn("127.0.0.1:1"); |
| 229 | + when(httpClientBuilder.build()).thenReturn(httpClient); |
| 230 | + when(httpClient.execute(requestCaptor.capture())) |
| 231 | + .thenReturn(failResponse, successResponse); |
| 232 | + |
| 233 | + loader.writeRecord("db", "tbl", "1,data".getBytes(StandardCharsets.UTF_8)); |
| 234 | + loader.checkpointFlush(); |
| 235 | + |
| 236 | + List<HttpUriRequest> requests = requestCaptor.getAllValues(); |
| 237 | + Assert.assertEquals(2, requests.size()); |
| 238 | + Assert.assertNull(requests.get(0).getFirstHeader("label")); |
| 239 | + Assert.assertNull(requests.get(1).getFirstHeader("label")); |
| 240 | + Assert.assertNotNull(requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT)); |
| 241 | + Assert.assertNotNull(requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT)); |
| 242 | + Assert.assertEquals( |
| 243 | + "sync_mode", |
| 244 | + requests.get(0).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue()); |
| 245 | + Assert.assertEquals( |
| 246 | + "sync_mode", |
| 247 | + requests.get(1).getFirstHeader(LoadConstants.GROUP_COMMIT).getValue()); |
| 248 | + } finally { |
| 249 | + loader.close(); |
| 250 | + } |
| 251 | + } |
| 252 | + |
178 | 253 | @After |
179 | 254 | public void after() { |
180 | 255 | if (backendUtilMockedStatic != null) { |
|
0 commit comments