Skip to content

Commit a49a19f

Browse files
committed
[fix] abort stream load 2pc by label
1 parent 0044826 commit a49a19f

11 files changed

Lines changed: 467 additions & 79 deletions

File tree

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.flink.rest;
19+
20+
import org.apache.doris.flink.exception.DorisException;
21+
22+
/** Stream load transaction state returned by Doris get_load_state API. */
23+
public enum LoadState {
24+
UNKNOWN,
25+
PREPARE,
26+
PRECOMMITTED,
27+
COMMITTED,
28+
VISIBLE,
29+
ABORTED;
30+
31+
public static LoadState fromString(String value) throws DorisException {
32+
try {
33+
return LoadState.valueOf(value.trim().toUpperCase());
34+
} catch (Exception e) {
35+
throw new DorisException("Unknown stream load transaction state: " + value, e);
36+
}
37+
}
38+
39+
public boolean isPending() {
40+
return this == PREPARE || this == PRECOMMITTED;
41+
}
42+
43+
public boolean isCommitted() {
44+
return this == COMMITTED || this == VISIBLE;
45+
}
46+
}

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@
6161
import java.io.InputStream;
6262
import java.io.PrintWriter;
6363
import java.io.Serializable;
64+
import java.io.UnsupportedEncodingException;
6465
import java.net.HttpURLConnection;
6566
import java.net.URL;
67+
import java.net.URLEncoder;
6668
import java.nio.charset.StandardCharsets;
6769
import java.util.ArrayList;
6870
import java.util.Arrays;
@@ -96,6 +98,7 @@ public class RestService implements Serializable {
9698
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
9799
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
98100
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
101+
private static final String LOAD_STATE_API = "http://%s/api/%s/get_load_state?label=%s";
99102
private static final String STATEMENT_EXEC_API =
100103
"http://%s/api/query/default_cluster/information_schema";
101104

@@ -428,6 +431,51 @@ public static Schema getSchema(
428431
return parseSchema(response, logger);
429432
}
430433

434+
public static LoadState getLoadState(
435+
DorisOptions options,
436+
DorisReadOptions readOptions,
437+
String db,
438+
String label,
439+
Logger logger)
440+
throws DorisException {
441+
try {
442+
String loadStateUri =
443+
String.format(
444+
LOAD_STATE_API,
445+
randomEndpoint(options.getFenodes(), logger),
446+
db,
447+
URLEncoder.encode(label, StandardCharsets.UTF_8.name()));
448+
HttpGet httpGet = new HttpGet(loadStateUri);
449+
String response = send(options, readOptions, httpGet, logger);
450+
logger.debug("Get stream load state response is '{}'.", response);
451+
return parseLoadState(response, logger);
452+
} catch (UnsupportedEncodingException e) {
453+
throw new DorisException("Encode stream load label failed, label: " + label, e);
454+
}
455+
}
456+
457+
@VisibleForTesting
458+
static LoadState parseLoadState(String response, Logger logger) throws DorisException {
459+
if (StringUtils.isBlank(response)) {
460+
throw new DorisException("Doris FE's load state response is empty.");
461+
}
462+
String state = response.trim();
463+
try {
464+
JsonNode node = objectMapper.readTree(state);
465+
if (node.isTextual()) {
466+
state = node.asText();
467+
} else if (node.has("data") && node.path("data").isTextual()) {
468+
state = node.path("data").asText();
469+
} else {
470+
throw new DorisException(
471+
"Cannot parse Doris FE's load state response: " + response);
472+
}
473+
} catch (JsonProcessingException e) {
474+
logger.trace("Parse load state response as raw state: {}", response);
475+
}
476+
return LoadState.fromString(state);
477+
}
478+
431479
@VisibleForTesting
432480
public static Schema getSchema(
433481
DorisOptions dorisOptions, String db, String table, Logger logger) {

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@ public class ResponseUtil {
2525
Pattern.compile("Label \\[(.*)\\] has already been used, relate to txn \\[(\\d+)\\]");
2626
public static final Pattern COMMITTED_PATTERN =
2727
Pattern.compile(
28-
"transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
28+
"transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, (not pre-committed|could not abort).");
2929

3030
public static final Pattern ABORTTED_PATTERN =
3131
Pattern.compile(
3232
"transaction \\[(\\d+)\\] is already|transaction \\[(\\d+)\\] not found");
3333

34+
public static final Pattern ALREADY_ABORTED_PATTERN =
35+
Pattern.compile("transaction \\[(\\d+)\\] is already \\b(ABORTED|aborted)\\b");
36+
3437
public static boolean isCommitted(String msg) {
3538
return COMMITTED_PATTERN.matcher(msg).find();
3639
}
@@ -39,6 +42,10 @@ public static boolean isAborted(String msg) {
3942
return ABORTTED_PATTERN.matcher(msg).find();
4043
}
4144

45+
public static boolean isAlreadyAborted(String msg) {
46+
return ALREADY_ABORTED_PATTERN.matcher(msg).find();
47+
}
48+
4249
static final Pattern COPY_COMMITTED_PATTERN =
4350
Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");
4451

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java

Lines changed: 105 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@
2626
import com.fasterxml.jackson.databind.ObjectMapper;
2727
import org.apache.doris.flink.cfg.DorisExecutionOptions;
2828
import org.apache.doris.flink.cfg.DorisOptions;
29+
import org.apache.doris.flink.cfg.DorisReadOptions;
2930
import org.apache.doris.flink.exception.DorisException;
3031
import org.apache.doris.flink.exception.DorisRuntimeException;
3132
import org.apache.doris.flink.exception.LabelAlreadyExistsException;
3233
import org.apache.doris.flink.exception.StreamLoadException;
34+
import org.apache.doris.flink.rest.LoadState;
35+
import org.apache.doris.flink.rest.RestService;
3336
import org.apache.doris.flink.rest.models.RespContent;
3437
import org.apache.doris.flink.sink.EscapeHandler;
3538
import org.apache.doris.flink.sink.HttpPutBuilder;
@@ -57,7 +60,6 @@
5760
import java.util.concurrent.TimeUnit;
5861
import java.util.regex.Matcher;
5962

60-
import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
6163
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
6264
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
6365
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
@@ -77,6 +79,8 @@ public class DorisStreamLoad implements Serializable {
7779
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
7880
private final LabelGenerator labelGenerator;
7981
private final byte[] lineDelimiter;
82+
private final DorisOptions dorisOptions;
83+
private final DorisReadOptions dorisReadOptions;
8084
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
8185
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
8286
public static final String JOB_EXIST_FINISHED = "FINISHED";
@@ -100,19 +104,38 @@ public class DorisStreamLoad implements Serializable {
100104
private volatile String currentLabel;
101105
private boolean enableGroupCommit;
102106
private boolean enableGzCompress;
107+
private transient LoadStateProvider loadStateProvider;
103108

104109
public DorisStreamLoad(
105110
String hostPort,
106111
DorisOptions dorisOptions,
107112
DorisExecutionOptions executionOptions,
108113
LabelGenerator labelGenerator,
109114
CloseableHttpClient httpClient) {
115+
this(
116+
hostPort,
117+
dorisOptions,
118+
DorisReadOptions.defaults(),
119+
executionOptions,
120+
labelGenerator,
121+
httpClient);
122+
}
123+
124+
public DorisStreamLoad(
125+
String hostPort,
126+
DorisOptions dorisOptions,
127+
DorisReadOptions dorisReadOptions,
128+
DorisExecutionOptions executionOptions,
129+
LabelGenerator labelGenerator,
130+
CloseableHttpClient httpClient) {
110131
this.hostPort = hostPort;
111132
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
112133
this.db = tableInfo[0];
113134
this.table = tableInfo[1];
114135
this.user = dorisOptions.getUsername();
115136
this.passwd = dorisOptions.getPassword();
137+
this.dorisOptions = dorisOptions;
138+
this.dorisReadOptions = dorisReadOptions;
116139
this.labelGenerator = labelGenerator;
117140
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
118141
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
@@ -191,58 +214,42 @@ public Future<RespContent> getPendingLoadFuture() {
191214
* @throws Exception
192215
*/
193216
public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
217+
abortPreCommit(labelPrefix, chkID, labelGenerator);
218+
}
219+
220+
public void abortPreCommit(String labelPrefix, long chkID, LabelGenerator abortLabelGenerator)
221+
throws Exception {
194222
long startChkID = chkID;
195223
LOG.info(
196224
"abort for labelPrefix {}, concat labelPrefix {}, start chkId {}.",
197225
labelPrefix,
198-
labelGenerator.getConcatLabelPrefix(),
226+
abortLabelGenerator.getConcatLabelPrefix(),
199227
chkID);
200228
while (true) {
201229
try {
202-
// TODO: According to label abort txn.
203-
// Currently, it can only be aborted based on txnid, so we must
204-
// first request a streamload based on the label to get the txnid.
205-
String label = labelGenerator.generateTableLabel(startChkID);
206-
LOG.info("start a check label {} to load.", label);
207-
HttpPutBuilder builder = new HttpPutBuilder();
208-
builder.setUrl(loadUrlStr)
209-
.baseAuth(user, passwd)
210-
.addCommonHeader()
211-
.enable2PC()
212-
.setLabel(label)
213-
.setEmptyEntity()
214-
.addProperties(streamLoadProp);
215-
RespContent respContent =
216-
handlePreCommitResponse(httpClient.execute(builder.build()));
217-
Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit()));
218-
if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
219-
// label already exist and job finished
220-
if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
221-
throw new DorisException(
222-
"Load status is "
223-
+ LABEL_ALREADY_EXIST
224-
+ " and load job finished, "
225-
+ "change you label prefix or restore from latest savepoint!");
226-
}
227-
// job not finished, abort.
228-
Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
229-
if (matcher.find()) {
230-
Preconditions.checkState(label.equals(matcher.group(1)));
231-
long txnId = Long.parseLong(matcher.group(2));
232-
LOG.info("abort {} for exist label {}", txnId, label);
233-
abortTransaction(txnId);
234-
} else {
235-
LOG.error("response: {}", respContent.toString());
236-
throw new DorisException(
237-
"Load Status is "
238-
+ LABEL_ALREADY_EXIST
239-
+ ", but no txnID associated with it!");
240-
}
241-
} else {
242-
LOG.info("abort {} for check label {}.", respContent.getTxnId(), label);
243-
abortTransaction(respContent.getTxnId());
230+
String label = abortLabelGenerator.generateTableLabel(startChkID);
231+
LoadState loadState = getLoadState(label);
232+
LOG.info("load state for label {} is {}.", label, loadState);
233+
if (LoadState.UNKNOWN.equals(loadState)) {
244234
break;
245235
}
236+
if (LoadState.ABORTED.equals(loadState)) {
237+
startChkID++;
238+
continue;
239+
}
240+
if (loadState.isCommitted()) {
241+
throw new DorisException(
242+
"Load label "
243+
+ label
244+
+ " is already "
245+
+ loadState
246+
+ ", change your label prefix or restore from latest savepoint!");
247+
}
248+
if (!loadState.isPending()) {
249+
throw new DorisException(
250+
"Unsupported load state " + loadState + " for label " + label);
251+
}
252+
abortPendingTransactionByLabel(label);
246253
startChkID++;
247254
} catch (Exception e) {
248255
LOG.warn("failed to abort labelPrefix {}", labelPrefix, e);
@@ -252,6 +259,43 @@ public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
252259
LOG.info("abort for labelPrefix {} finished", labelPrefix);
253260
}
254261

262+
private void abortPendingTransactionByLabel(String label) throws Exception {
263+
try {
264+
LOG.info("abort precommitted transaction by label {}.", label);
265+
abortTransactionByLabel(label);
266+
} catch (Exception e) {
267+
LoadState loadState = getLoadState(label);
268+
if (LoadState.ABORTED.equals(loadState)) {
269+
LOG.info(
270+
"transaction for label {} has been aborted after abort failure: {}",
271+
label,
272+
e.getMessage());
273+
return;
274+
}
275+
if (loadState.isCommitted()) {
276+
throw new DorisException(
277+
"Failed to abort transaction by label "
278+
+ label
279+
+ " because it is already "
280+
+ loadState,
281+
e);
282+
}
283+
throw new DorisException(
284+
"Failed to prove transaction abort success by label "
285+
+ label
286+
+ ", current load state is "
287+
+ loadState,
288+
e);
289+
}
290+
}
291+
292+
private LoadState getLoadState(String label) throws Exception {
293+
if (loadStateProvider != null) {
294+
return loadStateProvider.getLoadState(db, label);
295+
}
296+
return RestService.getLoadState(dorisOptions, dorisReadOptions, db, label, LOG);
297+
}
298+
255299
/**
256300
* write record into stream.
257301
*
@@ -517,6 +561,13 @@ public void abortTransactionByLabel(String label) throws Exception {
517561
"try abort committed transaction by label, "
518562
+ "do you recover from old savepoint?");
519563
}
564+
if (msg != null && ResponseUtil.isAlreadyAborted(msg)) {
565+
LOG.info(
566+
"transaction with label {} may have already been successfully aborted, skipping, abort response is {}",
567+
label,
568+
msg);
569+
return;
570+
}
520571

521572
LOG.error("Fail to abort transaction by label. label: {}, error: {}", label, msg);
522573
throw new DorisException("Fail to abort transaction by label, " + loadResult);
@@ -547,6 +598,11 @@ public String getCurrentLabel() {
547598
return currentLabel;
548599
}
549600

601+
@VisibleForTesting
602+
void setLoadStateProvider(LoadStateProvider loadStateProvider) {
603+
this.loadStateProvider = loadStateProvider;
604+
}
605+
550606
public void close() throws IOException {
551607
if (null != httpClient) {
552608
try {
@@ -559,4 +615,9 @@ public void close() throws IOException {
559615
executorService.shutdownNow();
560616
}
561617
}
618+
619+
@VisibleForTesting
620+
interface LoadStateProvider {
621+
LoadState getLoadState(String db, String label) throws Exception;
622+
}
562623
}

0 commit comments

Comments
 (0)