Skip to content

Commit 0a2de13

Browse files
authored
[Fix][Connector-V2] Fix Doris sink retry backoff and scheduler leak (#10772)
1 parent 527e2e3 commit 0a2de13

3 files changed

Lines changed: 181 additions & 43 deletions

File tree

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java

Lines changed: 81 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,25 @@ public class DorisCommitter implements SinkCommitter<DorisCommitInfo> {
5353
private static final int HTTP_TEMPORARY_REDIRECT = 307;
5454
private final CloseableHttpClient httpClient;
5555
private final DorisSinkConfig dorisSinkConfig;
56-
int maxRetry;
56+
private final int maxRetry;
57+
private final RetrySleeper retrySleeper;
5758

5859
public DorisCommitter(DorisSinkConfig dorisSinkConfig) {
59-
this(dorisSinkConfig, new HttpUtil().getHttpClient());
60+
this(dorisSinkConfig, new HttpUtil().getHttpClient(), DorisCommitter::sleepBeforeRetry);
6061
}
6162

6263
public DorisCommitter(DorisSinkConfig dorisSinkConfig, CloseableHttpClient client) {
64+
this(dorisSinkConfig, client, DorisCommitter::sleepBeforeRetry);
65+
}
66+
67+
DorisCommitter(
68+
DorisSinkConfig dorisSinkConfig,
69+
CloseableHttpClient client,
70+
RetrySleeper retrySleeper) {
6371
this.dorisSinkConfig = dorisSinkConfig;
6472
this.httpClient = client;
6573
this.maxRetry = dorisSinkConfig.getMaxRetries();
74+
this.retrySleeper = retrySleeper;
6675
}
6776

6877
@Override
@@ -86,7 +95,7 @@ private void commitTransaction(DorisCommitInfo committable)
8695
IOException lastIOException = null;
8796
DorisConnectorException lastRedirectException = null;
8897
List<String> retryHosts = resolveFrontendRetryHosts(committable.getHostPort());
89-
for (int attempt = 0; attempt <= dorisSinkConfig.getMaxRetries(); attempt++) {
98+
for (int attempt = 0; attempt <= maxRetry; attempt++) {
9099
String hostPort = retryHosts.get(attempt % retryHosts.size());
91100
String requestUrl = String.format(COMMIT_PATTERN, hostPort, committable.getDb());
92101
HttpPutBuilder putBuilder = new HttpPutBuilder();
@@ -110,31 +119,36 @@ private void commitTransaction(DorisCommitInfo committable)
110119
} catch (DorisConnectorException e) {
111120
lastRedirectException = e;
112121
log.error("commit transaction redirect follow-up failed on {}: ", hostPort, e);
122+
sleepBeforeNextAttempt(attempt);
113123
continue;
114124
} catch (IOException e) {
115125
lastIOException = e;
116126
log.error("commit transaction failed on {}: ", hostPort, e);
127+
sleepBeforeNextAttempt(attempt);
117128
continue;
118129
}
119-
int statusCode = response.getStatusLine().getStatusCode();
120-
reasonPhrase = response.getStatusLine().getReasonPhrase();
121-
if (statusCode == HTTP_TEMPORARY_REDIRECT) {
122-
Header location = response.getFirstHeader("Location");
123-
throw new DorisConnectorException(
124-
DorisConnectorErrorCode.STREAM_LOAD_FAILED,
125-
DorisRedirectExceptionBuilder.build(
126-
requestUrl,
127-
location == null ? null : location.getValue(),
128-
dorisSinkConfig.isDirectToBe(),
129-
dorisSinkConfig.getEnable2PC(),
130-
"commit"));
131-
}
132-
if (statusCode != HTTP_OK) {
133-
log.warn("commit failed with {}, reason {}", hostPort, reasonPhrase);
134-
continue;
130+
try (CloseableHttpResponse closeableResponse = response) {
131+
int statusCode = closeableResponse.getStatusLine().getStatusCode();
132+
reasonPhrase = closeableResponse.getStatusLine().getReasonPhrase();
133+
if (statusCode == HTTP_TEMPORARY_REDIRECT) {
134+
Header location = closeableResponse.getFirstHeader("Location");
135+
throw new DorisConnectorException(
136+
DorisConnectorErrorCode.STREAM_LOAD_FAILED,
137+
DorisRedirectExceptionBuilder.build(
138+
requestUrl,
139+
location == null ? null : location.getValue(),
140+
dorisSinkConfig.isDirectToBe(),
141+
dorisSinkConfig.getEnable2PC(),
142+
"commit"));
143+
}
144+
if (statusCode != HTTP_OK) {
145+
log.warn("commit failed with {}, reason {}", hostPort, reasonPhrase);
146+
sleepBeforeNextAttempt(attempt);
147+
continue;
148+
}
149+
handleCommitSuccess(committable, hostPort, closeableResponse);
150+
return;
135151
}
136-
handleCommitSuccess(committable, hostPort, response);
137-
return;
138152
}
139153

140154
if (lastRedirectException != null) {
@@ -175,31 +189,38 @@ private void abortTransaction(DorisCommitInfo committable)
175189
} catch (DorisConnectorException e) {
176190
lastRedirectException = e;
177191
log.error("abort transaction redirect follow-up failed on {}: ", hostPort, e);
192+
sleepBeforeNextAttempt(attempt);
178193
continue;
179194
} catch (IOException e) {
180195
lastIOException = e;
181196
log.error("abort transaction failed on {}: ", hostPort, e);
197+
sleepBeforeNextAttempt(attempt);
182198
continue;
183199
}
184-
int statusCode = response.getStatusLine().getStatusCode();
185-
responseStatus = response.getStatusLine().toString();
186-
if (statusCode == HTTP_TEMPORARY_REDIRECT) {
187-
Header location = response.getFirstHeader("Location");
188-
throw new DorisConnectorException(
189-
DorisConnectorErrorCode.STREAM_LOAD_FAILED,
190-
DorisRedirectExceptionBuilder.build(
191-
requestUrl,
192-
location == null ? null : location.getValue(),
193-
dorisSinkConfig.isDirectToBe(),
194-
dorisSinkConfig.getEnable2PC(),
195-
"abort"));
196-
}
197-
if (statusCode != HTTP_OK || response.getEntity() == null) {
198-
log.warn("abort transaction response: {}", response.getStatusLine().toString());
199-
continue;
200+
try (CloseableHttpResponse closeableResponse = response) {
201+
int statusCode = closeableResponse.getStatusLine().getStatusCode();
202+
responseStatus = closeableResponse.getStatusLine().toString();
203+
if (statusCode == HTTP_TEMPORARY_REDIRECT) {
204+
Header location = closeableResponse.getFirstHeader("Location");
205+
throw new DorisConnectorException(
206+
DorisConnectorErrorCode.STREAM_LOAD_FAILED,
207+
DorisRedirectExceptionBuilder.build(
208+
requestUrl,
209+
location == null ? null : location.getValue(),
210+
dorisSinkConfig.isDirectToBe(),
211+
dorisSinkConfig.getEnable2PC(),
212+
"abort"));
213+
}
214+
if (statusCode != HTTP_OK || closeableResponse.getEntity() == null) {
215+
log.warn(
216+
"abort transaction response: {}",
217+
closeableResponse.getStatusLine().toString());
218+
sleepBeforeNextAttempt(attempt);
219+
continue;
220+
}
221+
handleAbortSuccess(committable, closeableResponse);
222+
return;
200223
}
201-
handleAbortSuccess(committable, response);
202-
return;
203224
}
204225
if (lastRedirectException != null) {
205226
throw lastRedirectException;
@@ -240,6 +261,22 @@ private void handleCommitSuccess(
240261
}
241262
}
242263

264+
private void sleepBeforeNextAttempt(int attempt) throws IOException {
265+
if (attempt < maxRetry) {
266+
retrySleeper.sleep(attempt + 1);
267+
}
268+
}
269+
270+
private static void sleepBeforeRetry(int retry) throws IOException {
271+
try {
272+
int shift = Math.min(Math.max(retry - 1, 0), 4);
273+
Thread.sleep(1000L * (1L << shift));
274+
} catch (InterruptedException ie) {
275+
Thread.currentThread().interrupt();
276+
throw new IOException("Interrupted during Doris retry backoff", ie);
277+
}
278+
}
279+
243280
private void handleAbortSuccess(DorisCommitInfo committable, CloseableHttpResponse response)
244281
throws IOException {
245282
ObjectMapper mapper = new ObjectMapper();
@@ -269,4 +306,9 @@ private List<String> resolveFrontendRetryHosts(String preferredHostPort) {
269306
.forEach(hosts::add);
270307
return hosts;
271308
}
309+
310+
@FunctionalInterface
311+
interface RetrySleeper {
312+
void sleep(int retry) throws IOException;
313+
}
272314
}

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ private void checkDone() {
275275
loadException =
276276
new DorisConnectorException(
277277
DorisConnectorErrorCode.STREAM_LOAD_FAILED, errorMsg);
278+
// Stop the scheduler to prevent repeated error logging when downstream is unavailable.
279+
// Once loadException is set, write() will throw on the next call via
280+
// checkLoadException().
281+
scheduledExecutorService.shutdownNow();
278282
}
279283
}
280284

seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitterTest.java

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@
4444
import java.io.OutputStream;
4545
import java.net.InetSocketAddress;
4646
import java.net.ServerSocket;
47+
import java.util.ArrayList;
48+
import java.util.Arrays;
4749
import java.util.Collections;
4850
import java.util.HashMap;
51+
import java.util.List;
4952
import java.util.Map;
5053
import java.util.concurrent.Executors;
5154

@@ -177,13 +180,95 @@ void testAbortRetriesNextFrontendOnIOException() throws IOException {
177180
requestCaptor.getAllValues().get(1).getURI().toString());
178181
}
179182

183+
@Test
184+
void testCommitBackoffSkipsFinalSleep() throws IOException {
185+
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
186+
when(httpClient.execute(any(HttpUriRequest.class), any(HttpContext.class)))
187+
.thenThrow(new IOException("attempt-1"))
188+
.thenThrow(new IOException("attempt-2"))
189+
.thenThrow(new IOException("attempt-3"));
190+
191+
List<Integer> sleepRetries = new ArrayList<>();
192+
DorisCommitter committer =
193+
new DorisCommitter(createSinkConfig(true, true, 2), httpClient, sleepRetries::add);
194+
195+
Assertions.assertThrows(
196+
IOException.class,
197+
() ->
198+
committer.commit(
199+
Collections.singletonList(
200+
new DorisCommitInfo("fe1:8030", "test_db", 21L))));
201+
Assertions.assertEquals(Arrays.asList(1, 2), sleepRetries);
202+
}
203+
204+
@Test
205+
void testAbortBackoffSkipsFinalSleep() throws IOException {
206+
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
207+
when(httpClient.execute(any(HttpUriRequest.class), any(HttpContext.class)))
208+
.thenThrow(new IOException("attempt-1"))
209+
.thenThrow(new IOException("attempt-2"))
210+
.thenThrow(new IOException("attempt-3"));
211+
212+
List<Integer> sleepRetries = new ArrayList<>();
213+
DorisCommitter committer =
214+
new DorisCommitter(createSinkConfig(true, true, 2), httpClient, sleepRetries::add);
215+
216+
Assertions.assertThrows(
217+
IOException.class,
218+
() ->
219+
committer.abort(
220+
Collections.singletonList(
221+
new DorisCommitInfo("fe1:8030", "test_db", 22L))));
222+
Assertions.assertEquals(Arrays.asList(1, 2), sleepRetries);
223+
}
224+
225+
@Test
226+
void testCommitClosesFailedResponseBeforeRetry() throws IOException {
227+
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
228+
CloseableHttpResponse failedResponse =
229+
responseWithStatus(500, "Internal Server Error", null);
230+
CloseableHttpResponse successResponse = successResponse();
231+
when(httpClient.execute(any(HttpUriRequest.class), any(HttpContext.class)))
232+
.thenReturn(failedResponse)
233+
.thenReturn(successResponse);
234+
235+
DorisCommitter committer = new DorisCommitter(createSinkConfig(true, true), httpClient);
236+
committer.commit(
237+
Collections.singletonList(new DorisCommitInfo("fe1:8030", "test_db", 23L)));
238+
239+
verify(failedResponse, times(1)).close();
240+
verify(successResponse, times(1)).close();
241+
}
242+
243+
@Test
244+
void testAbortClosesFailedResponseBeforeRetry() throws IOException {
245+
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
246+
CloseableHttpResponse failedResponse =
247+
responseWithStatus(500, "Internal Server Error", null);
248+
CloseableHttpResponse successResponse = successResponse();
249+
when(httpClient.execute(any(HttpUriRequest.class), any(HttpContext.class)))
250+
.thenReturn(failedResponse)
251+
.thenReturn(successResponse);
252+
253+
DorisCommitter committer = new DorisCommitter(createSinkConfig(true, true), httpClient);
254+
committer.abort(Collections.singletonList(new DorisCommitInfo("fe1:8030", "test_db", 24L)));
255+
256+
verify(failedResponse, times(1)).close();
257+
verify(successResponse, times(1)).close();
258+
}
259+
180260
private DorisSinkConfig createSinkConfig(boolean directToBe, boolean enable2PC) {
261+
return createSinkConfig(directToBe, enable2PC, 3);
262+
}
263+
264+
private DorisSinkConfig createSinkConfig(
265+
boolean directToBe, boolean enable2PC, int maxRetries) {
181266
Map<String, Object> options = new HashMap<>();
182267
options.put("fenodes", "fe1:8030");
183268
options.put("benodes", "be1:8040");
184269
options.put("direct_to_be", directToBe);
185270
options.put("sink.enable-2pc", enable2PC);
186-
options.put("sink.max-retries", 3);
271+
options.put("sink.max-retries", maxRetries);
187272
options.put("username", "root");
188273
options.put("password", "");
189274
options.put("database", "test_db");
@@ -233,11 +318,18 @@ private Map<String, String> createStreamLoadProperties() {
233318
}
234319

235320
private CloseableHttpResponse successResponse() throws IOException {
321+
return responseWithStatus(
322+
200, "OK", new StringEntity("{\"status\":\"Success\",\"msg\":\"\"}"));
323+
}
324+
325+
private CloseableHttpResponse responseWithStatus(
326+
int statusCode, String reasonPhrase, StringEntity entity) throws IOException {
236327
CloseableHttpResponse response = mock(CloseableHttpResponse.class);
237328
when(response.getStatusLine())
238-
.thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "OK"));
239-
when(response.getEntity())
240-
.thenReturn(new StringEntity("{\"status\":\"Success\",\"msg\":\"\"}"));
329+
.thenReturn(
330+
new BasicStatusLine(
331+
new ProtocolVersion("HTTP", 1, 1), statusCode, reasonPhrase));
332+
when(response.getEntity()).thenReturn(entity);
241333
return response;
242334
}
243335

0 commit comments

Comments
 (0)