Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.rest;

import org.apache.doris.flink.exception.DorisException;

/** Stream load transaction state returned by Doris get_load_state API. */
public enum LoadState {
UNKNOWN,
PREPARE,
PRECOMMITTED,
COMMITTED,
VISIBLE,
ABORTED;

public static LoadState fromString(String value) throws DorisException {
try {
return LoadState.valueOf(value.trim().toUpperCase());
} catch (Exception e) {
throw new DorisException("Unknown stream load transaction state: " + value, e);
}
}

public boolean isPending() {
return this == PREPARE || this == PRECOMMITTED;
}

public boolean isCommitted() {
return this == COMMITTED || this == VISIBLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class RestService implements Serializable {
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
private static final String LOAD_STATE_API = "http://%s/api/%s/get_load_state?label=%s";
private static final String STATEMENT_EXEC_API =
"http://%s/api/query/default_cluster/information_schema";

Expand Down Expand Up @@ -428,6 +431,51 @@ public static Schema getSchema(
return parseSchema(response, logger);
}

public static LoadState getLoadState(
DorisOptions options,
DorisReadOptions readOptions,
String db,
String label,
Logger logger)
throws DorisException {
try {
String loadStateUri =
String.format(
LOAD_STATE_API,
randomEndpoint(options.getFenodes(), logger),
db,
URLEncoder.encode(label, StandardCharsets.UTF_8.name()));
HttpGet httpGet = new HttpGet(loadStateUri);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Get stream load state response is '{}'.", response);
return parseLoadState(response, logger);
} catch (UnsupportedEncodingException e) {
throw new DorisException("Encode stream load label failed, label: " + label, e);
}
}

@VisibleForTesting
static LoadState parseLoadState(String response, Logger logger) throws DorisException {
if (StringUtils.isBlank(response)) {
throw new DorisException("Doris FE's load state response is empty.");
}
String state = response.trim();
try {
JsonNode node = objectMapper.readTree(state);
if (node.isTextual()) {
state = node.asText();
} else if (node.has("data") && node.path("data").isTextual()) {
state = node.path("data").asText();
} else {
throw new DorisException(
"Cannot parse Doris FE's load state response: " + response);
}
} catch (JsonProcessingException e) {
logger.trace("Parse load state response as raw state: {}", response);
}
return LoadState.fromString(state);
}

@VisibleForTesting
public static Schema getSchema(
DorisOptions dorisOptions, String db, String table, Logger logger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ public class ResponseUtil {
Pattern.compile("Label \\[(.*)\\] has already been used, relate to txn \\[(\\d+)\\]");
public static final Pattern COMMITTED_PATTERN =
Pattern.compile(
"transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
"transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, (not pre-committed|could not abort).");

public static final Pattern ABORTTED_PATTERN =
Pattern.compile(
"transaction \\[(\\d+)\\] is already|transaction \\[(\\d+)\\] not found");

public static final Pattern ALREADY_ABORTED_PATTERN =
Pattern.compile("transaction \\[(\\d+)\\] is already \\b(ABORTED|aborted)\\b");

public static boolean isCommitted(String msg) {
return COMMITTED_PATTERN.matcher(msg).find();
}
Expand All @@ -39,6 +42,10 @@ public static boolean isAborted(String msg) {
return ABORTTED_PATTERN.matcher(msg).find();
}

public static boolean isAlreadyAborted(String msg) {
return ALREADY_ABORTED_PATTERN.matcher(msg).find();
}

static final Pattern COPY_COMMITTED_PATTERN =
Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.LabelAlreadyExistsException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.LoadState;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
Expand Down Expand Up @@ -57,7 +60,6 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;

import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
Expand All @@ -77,6 +79,8 @@ public class DorisStreamLoad implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
public static final String JOB_EXIST_FINISHED = "FINISHED";
Expand All @@ -100,19 +104,38 @@ public class DorisStreamLoad implements Serializable {
private volatile String currentLabel;
private boolean enableGroupCommit;
private boolean enableGzCompress;
private transient LoadStateProvider loadStateProvider;

public DorisStreamLoad(
String hostPort,
DorisOptions dorisOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator,
CloseableHttpClient httpClient) {
this(
hostPort,
dorisOptions,
DorisReadOptions.defaults(),
executionOptions,
labelGenerator,
httpClient);
}

public DorisStreamLoad(
String hostPort,
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator,
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
this.table = tableInfo[1];
this.user = dorisOptions.getUsername();
this.passwd = dorisOptions.getPassword();
this.dorisOptions = dorisOptions;
this.dorisReadOptions = dorisReadOptions;
this.labelGenerator = labelGenerator;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
Expand Down Expand Up @@ -191,58 +214,42 @@ public Future<RespContent> getPendingLoadFuture() {
* @throws Exception
*/
public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
abortPreCommit(labelPrefix, chkID, labelGenerator);
}

public void abortPreCommit(String labelPrefix, long chkID, LabelGenerator abortLabelGenerator)
throws Exception {
long startChkID = chkID;
LOG.info(
"abort for labelPrefix {}, concat labelPrefix {}, start chkId {}.",
labelPrefix,
labelGenerator.getConcatLabelPrefix(),
abortLabelGenerator.getConcatLabelPrefix(),
chkID);
while (true) {
try {
// TODO: According to label abort txn.
// Currently, it can only be aborted based on txnid, so we must
// first request a streamload based on the label to get the txnid.
String label = labelGenerator.generateTableLabel(startChkID);
LOG.info("start a check label {} to load.", label);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.enable2PC()
.setLabel(label)
.setEmptyEntity()
.addProperties(streamLoadProp);
RespContent respContent =
handlePreCommitResponse(httpClient.execute(builder.build()));
Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit()));
if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
// label already exist and job finished
if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
throw new DorisException(
"Load status is "
+ LABEL_ALREADY_EXIST
+ " and load job finished, "
+ "change you label prefix or restore from latest savepoint!");
}
// job not finished, abort.
Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
if (matcher.find()) {
Preconditions.checkState(label.equals(matcher.group(1)));
long txnId = Long.parseLong(matcher.group(2));
LOG.info("abort {} for exist label {}", txnId, label);
abortTransaction(txnId);
} else {
LOG.error("response: {}", respContent.toString());
throw new DorisException(
"Load Status is "
+ LABEL_ALREADY_EXIST
+ ", but no txnID associated with it!");
}
} else {
LOG.info("abort {} for check label {}.", respContent.getTxnId(), label);
abortTransaction(respContent.getTxnId());
String label = abortLabelGenerator.generateTableLabel(startChkID);
LoadState loadState = getLoadState(label);
LOG.info("load state for label {} is {}.", label, loadState);
if (LoadState.UNKNOWN.equals(loadState)) {
break;
}
if (LoadState.ABORTED.equals(loadState)) {
startChkID++;
continue;
}
if (loadState.isCommitted()) {
throw new DorisException(
"Load label "
+ label
+ " is already "
+ loadState
+ ", change your label prefix or restore from latest savepoint!");
}
if (!loadState.isPending()) {
throw new DorisException(
"Unsupported load state " + loadState + " for label " + label);
}
abortPendingTransactionByLabel(label);
startChkID++;
} catch (Exception e) {
LOG.warn("failed to abort labelPrefix {}", labelPrefix, e);
Expand All @@ -252,6 +259,43 @@ public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
LOG.info("abort for labelPrefix {} finished", labelPrefix);
}

private void abortPendingTransactionByLabel(String label) throws Exception {
try {
LOG.info("abort precommitted transaction by label {}.", label);
abortTransactionByLabel(label);
} catch (Exception e) {
LoadState loadState = getLoadState(label);
if (LoadState.ABORTED.equals(loadState)) {
LOG.info(
"transaction for label {} has been aborted after abort failure: {}",
label,
e.getMessage());
return;
}
if (loadState.isCommitted()) {
throw new DorisException(
"Failed to abort transaction by label "
+ label
+ " because it is already "
+ loadState,
e);
}
throw new DorisException(
"Failed to prove transaction abort success by label "
+ label
+ ", current load state is "
+ loadState,
e);
}
}

private LoadState getLoadState(String label) throws Exception {
if (loadStateProvider != null) {
return loadStateProvider.getLoadState(db, label);
}
return RestService.getLoadState(dorisOptions, dorisReadOptions, db, label, LOG);
}

/**
* write record into stream.
*
Expand Down Expand Up @@ -517,6 +561,13 @@ public void abortTransactionByLabel(String label) throws Exception {
"try abort committed transaction by label, "
+ "do you recover from old savepoint?");
}
if (msg != null && ResponseUtil.isAlreadyAborted(msg)) {
LOG.info(
"transaction with label {} may have already been successfully aborted, skipping, abort response is {}",
label,
msg);
return;
}

LOG.error("Fail to abort transaction by label. label: {}, error: {}", label, msg);
throw new DorisException("Fail to abort transaction by label, " + loadResult);
Expand Down Expand Up @@ -547,6 +598,11 @@ public String getCurrentLabel() {
return currentLabel;
}

@VisibleForTesting
void setLoadStateProvider(LoadStateProvider loadStateProvider) {
this.loadStateProvider = loadStateProvider;
}

public void close() throws IOException {
if (null != httpClient) {
try {
Expand All @@ -559,4 +615,9 @@ public void close() throws IOException {
executorService.shutdownNow();
}
}

@VisibleForTesting
interface LoadStateProvider {
LoadState getLoadState(String db, String label) throws Exception;
}
}
Loading
Loading