diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/LoadState.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/LoadState.java new file mode 100644 index 000000000..0741b77c5 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/LoadState.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.rest; + +import org.apache.doris.flink.exception.DorisException; + +/** Stream load transaction state returned by Doris get_load_state API. */ +public enum LoadState { + UNKNOWN, + PREPARE, + PRECOMMITTED, + COMMITTED, + VISIBLE, + ABORTED; + + public static LoadState fromString(String value) throws DorisException { + try { + return LoadState.valueOf(value.trim().toUpperCase()); + } catch (Exception e) { + throw new DorisException("Unknown stream load transaction state: " + value, e); + } + } + + public boolean isPending() { + return this == PREPARE || this == PRECOMMITTED; + } + + public boolean isCommitted() { + return this == COMMITTED || this == VISIBLE; + } +} diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java index f9aad4ffd..320e8d501 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -61,8 +61,10 @@ import java.io.InputStream; import java.io.PrintWriter; import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -96,6 +98,7 @@ public class RestService implements Serializable { private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema"; private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema"; private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan"; + private static final String LOAD_STATE_API = "http://%s/api/%s/get_load_state?label=%s"; private static final String STATEMENT_EXEC_API = "http://%s/api/query/default_cluster/information_schema"; @@ -428,6 +431,51 @@ public static Schema getSchema( return parseSchema(response, logger); } + public static LoadState getLoadState( + DorisOptions options, + DorisReadOptions readOptions, + String db, + String label, + Logger logger) + throws DorisException { + try { + String loadStateUri = + String.format( + LOAD_STATE_API, + randomEndpoint(options.getFenodes(), logger), + db, + URLEncoder.encode(label, StandardCharsets.UTF_8.name())); + HttpGet httpGet = new HttpGet(loadStateUri); + String response = send(options, readOptions, httpGet, logger); + logger.debug("Get stream load state response is '{}'.", response); + return parseLoadState(response, logger); + } catch (UnsupportedEncodingException e) { + throw new DorisException("Encode stream load label failed, label: " + label, e); + } + } + + @VisibleForTesting + static LoadState parseLoadState(String response, Logger logger) throws DorisException { + if (StringUtils.isBlank(response)) { + throw new DorisException("Doris FE's load state response is empty."); + } + String state = response.trim(); + try { + JsonNode node = objectMapper.readTree(state); + if (node.isTextual()) { + state = node.asText(); + } else if (node.has("data") && node.path("data").isTextual()) { + state = node.path("data").asText(); + } else { + throw new DorisException( + "Cannot parse Doris FE's load state response: " + response); + } + } catch (JsonProcessingException e) { + logger.trace("Parse load state response as raw state: {}", response); + } + return LoadState.fromString(state); + } + @VisibleForTesting public static Schema getSchema( DorisOptions dorisOptions, String db, String table, Logger logger) { diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java index c348e7384..b58eb7a30 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java @@ -25,12 +25,15 @@ public class ResponseUtil { Pattern.compile("Label \\[(.*)\\] has already been used, relate to txn \\[(\\d+)\\]"); public static final Pattern COMMITTED_PATTERN = Pattern.compile( - "transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed."); + "transaction \\[(\\d+)\\] is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, (not pre-committed|could not abort)."); public static final Pattern ABORTTED_PATTERN = Pattern.compile( "transaction \\[(\\d+)\\] is already|transaction \\[(\\d+)\\] not found"); + public static final Pattern ALREADY_ABORTED_PATTERN = + Pattern.compile("transaction \\[(\\d+)\\] is already \\b(ABORTED|aborted)\\b"); + public static boolean isCommitted(String msg) { return COMMITTED_PATTERN.matcher(msg).find(); } @@ -39,6 +42,10 @@ public static boolean isAborted(String msg) { return ABORTTED_PATTERN.matcher(msg).find(); } + public static boolean isAlreadyAborted(String msg) { + return ALREADY_ABORTED_PATTERN.matcher(msg).find(); + } + static final Pattern COPY_COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*"); diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 18a979598..85791012f 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -26,10 +26,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.exception.LabelAlreadyExistsException; import org.apache.doris.flink.exception.StreamLoadException; +import org.apache.doris.flink.rest.LoadState; +import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; @@ -57,7 +60,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; -import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; @@ -77,6 +79,8 @@ public class DorisStreamLoad implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; + private final DorisOptions dorisOptions; + private final DorisReadOptions dorisReadOptions; private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"; public static final String JOB_EXIST_FINISHED = "FINISHED"; @@ -100,6 +104,7 @@ public class DorisStreamLoad implements Serializable { private volatile String currentLabel; private boolean enableGroupCommit; private boolean enableGzCompress; + private transient LoadStateProvider loadStateProvider; public DorisStreamLoad( String hostPort, @@ -107,12 +112,30 @@ public DorisStreamLoad( DorisExecutionOptions executionOptions, LabelGenerator labelGenerator, CloseableHttpClient httpClient) { + this( + hostPort, + dorisOptions, + DorisReadOptions.defaults(), + executionOptions, + labelGenerator, + httpClient); + } + + public DorisStreamLoad( + String hostPort, + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions, + LabelGenerator labelGenerator, + CloseableHttpClient httpClient) { this.hostPort = hostPort; String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); this.db = tableInfo[0]; this.table = tableInfo[1]; this.user = dorisOptions.getUsername(); this.passwd = dorisOptions.getPassword(); + this.dorisOptions = dorisOptions; + this.dorisReadOptions = dorisReadOptions; this.labelGenerator = labelGenerator; this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table); this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); @@ -191,58 +214,42 @@ public Future getPendingLoadFuture() { * @throws Exception */ public void abortPreCommit(String labelPrefix, long chkID) throws Exception { + abortPreCommit(labelPrefix, chkID, labelGenerator); + } + + public void abortPreCommit(String labelPrefix, long chkID, LabelGenerator abortLabelGenerator) + throws Exception { long startChkID = chkID; LOG.info( "abort for labelPrefix {}, concat labelPrefix {}, start chkId {}.", labelPrefix, - labelGenerator.getConcatLabelPrefix(), + abortLabelGenerator.getConcatLabelPrefix(), chkID); while (true) { try { - // TODO: According to label abort txn. - // Currently, it can only be aborted based on txnid, so we must - // first request a streamload based on the label to get the txnid. - String label = labelGenerator.generateTableLabel(startChkID); - LOG.info("start a check label {} to load.", label); - HttpPutBuilder builder = new HttpPutBuilder(); - builder.setUrl(loadUrlStr) - .baseAuth(user, passwd) - .addCommonHeader() - .enable2PC() - .setLabel(label) - .setEmptyEntity() - .addProperties(streamLoadProp); - RespContent respContent = - handlePreCommitResponse(httpClient.execute(builder.build())); - Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit())); - if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) { - // label already exist and job finished - if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) { - throw new DorisException( - "Load status is " - + LABEL_ALREADY_EXIST - + " and load job finished, " - + "change you label prefix or restore from latest savepoint!"); - } - // job not finished, abort. - Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage()); - if (matcher.find()) { - Preconditions.checkState(label.equals(matcher.group(1))); - long txnId = Long.parseLong(matcher.group(2)); - LOG.info("abort {} for exist label {}", txnId, label); - abortTransaction(txnId); - } else { - LOG.error("response: {}", respContent.toString()); - throw new DorisException( - "Load Status is " - + LABEL_ALREADY_EXIST - + ", but no txnID associated with it!"); - } - } else { - LOG.info("abort {} for check label {}.", respContent.getTxnId(), label); - abortTransaction(respContent.getTxnId()); + String label = abortLabelGenerator.generateTableLabel(startChkID); + LoadState loadState = getLoadState(label); + LOG.info("load state for label {} is {}.", label, loadState); + if (LoadState.UNKNOWN.equals(loadState)) { break; } + if (LoadState.ABORTED.equals(loadState)) { + startChkID++; + continue; + } + if (loadState.isCommitted()) { + throw new DorisException( + "Load label " + + label + + " is already " + + loadState + + ", change your label prefix or restore from latest savepoint!"); + } + if (!loadState.isPending()) { + throw new DorisException( + "Unsupported load state " + loadState + " for label " + label); + } + abortPendingTransactionByLabel(label); startChkID++; } catch (Exception e) { LOG.warn("failed to abort labelPrefix {}", labelPrefix, e); @@ -252,6 +259,43 @@ public void abortPreCommit(String labelPrefix, long chkID) throws Exception { LOG.info("abort for labelPrefix {} finished", labelPrefix); } + private void abortPendingTransactionByLabel(String label) throws Exception { + try { + LOG.info("abort precommitted transaction by label {}.", label); + abortTransactionByLabel(label); + } catch (Exception e) { + LoadState loadState = getLoadState(label); + if (LoadState.ABORTED.equals(loadState)) { + LOG.info( + "transaction for label {} has been aborted after abort failure: {}", + label, + e.getMessage()); + return; + } + if (loadState.isCommitted()) { + throw new DorisException( + "Failed to abort transaction by label " + + label + + " because it is already " + + loadState, + e); + } + throw new DorisException( + "Failed to prove transaction abort success by label " + + label + + ", current load state is " + + loadState, + e); + } + } + + private LoadState getLoadState(String label) throws Exception { + if (loadStateProvider != null) { + return loadStateProvider.getLoadState(db, label); + } + return RestService.getLoadState(dorisOptions, dorisReadOptions, db, label, LOG); + } + /** * write record into stream. * @@ -517,6 +561,13 @@ public void abortTransactionByLabel(String label) throws Exception { "try abort committed transaction by label, " + "do you recover from old savepoint?"); } + if (msg != null && ResponseUtil.isAlreadyAborted(msg)) { + LOG.info( + "transaction with label {} may have already been successfully aborted, skipping, abort response is {}", + label, + msg); + return; + } LOG.error("Fail to abort transaction by label. label: {}, error: {}", label, msg); throw new DorisException("Fail to abort transaction by label, " + loadResult); @@ -547,6 +598,11 @@ public String getCurrentLabel() { return currentLabel; } + @VisibleForTesting + void setLoadStateProvider(LoadStateProvider loadStateProvider) { + this.loadStateProvider = loadStateProvider; + } + public void close() throws IOException { if (null != httpClient) { try { @@ -559,4 +615,9 @@ public void close() throws IOException { executorService.shutdownNow(); } } + + @VisibleForTesting + interface LoadStateProvider { + LoadState getLoadState(String db, String label) throws Exception; + } } diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 4435b360a..6700b999e 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -153,7 +153,14 @@ public void abortLingeringTransactions(Collection recoveredSta } String key = state.getDatabase() + "." + state.getTable(); DorisStreamLoad streamLoader = getStreamLoader(key); - streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); + LabelGenerator recoveredLabelGenerator = + new LabelGenerator( + state.getLabelPrefix(), + executionOptions.enabled2PC(), + key, + state.getSubtaskId()); + streamLoader.abortPreCommit( + state.getLabelPrefix(), curCheckpointId, recoveredLabelGenerator); alreadyAborts.add(state.getLabelPrefix()); } @@ -305,6 +312,7 @@ public DorisStreamLoad getStreamLoader(String tableKey) { new DorisStreamLoad( backendUtil.getAvailableBackend(subtaskId), dorisOptions, + dorisReadOptions, executionOptions, labelGenerator, new HttpUtil(dorisReadOptions, executionOptions.isHttpUtf8Charset()) diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index 84c14b793..2a5ab85fb 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -19,6 +19,9 @@ import org.apache.flink.util.Preconditions; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.UUID; import java.util.regex.Pattern; @@ -26,10 +29,13 @@ public class LabelGenerator { // doris default label regex private static final String LABEL_REGEX = "^[-_A-Za-z0-9:]{1,128}$"; + private static final int MAX_LABEL_LENGTH = 128; + private static final int DIGEST_LENGTH = 32; private static final Pattern LABEL_PATTERN = Pattern.compile(LABEL_REGEX); private String labelPrefix; private boolean enable2PC; private String tableIdentifier; + private String originalTableIdentifier; private int subtaskId; public LabelGenerator(String labelPrefix, boolean enable2PC) { @@ -41,6 +47,7 @@ public LabelGenerator( String labelPrefix, boolean enable2PC, String tableIdentifier, int subtaskId) { this(labelPrefix, enable2PC); // The label of stream load can not contain `.` + this.originalTableIdentifier = tableIdentifier; this.tableIdentifier = tableIdentifier.replace(".", "_"); this.subtaskId = subtaskId; } @@ -65,10 +72,7 @@ public String generateTableLabel(long chkId) { } if (enable2PC) { - // In 2pc, replace uuid with the table name. This will cause some txns to fail to be - // aborted when aborting. - // Later, the label needs to be stored in the state and aborted through label - return String.format("%s_%s_%s_%s", labelPrefix, UUID.randomUUID(), subtaskId, chkId); + return generateStableFallbackLabel(chkId); } else { return String.format("%s_%s_%s_%s", labelPrefix, subtaskId, chkId, UUID.randomUUID()); } @@ -92,6 +96,74 @@ public String getConcatLabelPrefix() { return concatPrefix; } + private String generateStableFallbackLabel(long chkId) { + String digest = stableDigest(chkId); + String suffix = String.format("_%s_%s_%s", digest, subtaskId, chkId); + String safePrefix = sanitizeLabelPart(labelPrefix); + int maxPrefixLength = MAX_LABEL_LENGTH - suffix.length(); + Preconditions.checkState(maxPrefixLength > 0); + if (safePrefix.length() > maxPrefixLength) { + safePrefix = safePrefix.substring(0, maxPrefixLength); + } + String label = safePrefix + suffix; + Preconditions.checkState(LABEL_PATTERN.matcher(label).matches()); + return label; + } + + private String stableDigest(long chkId) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + updateDigest(digest, "flink-doris-stream-load-2pc"); + updateDigest(digest, labelPrefix); + updateDigest( + digest, + originalTableIdentifier == null ? tableIdentifier : originalTableIdentifier); + updateDigest(digest, String.valueOf(subtaskId)); + updateDigest(digest, String.valueOf(chkId)); + return toHex(digest.digest()).substring(0, DIGEST_LENGTH); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 is not available", e); + } + } + + private void updateDigest(MessageDigest digest, String value) { + if (value != null) { + digest.update(value.getBytes(StandardCharsets.UTF_8)); + } + digest.update((byte) 0); + } + + private String toHex(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + char[] hexArray = "0123456789abcdef".toCharArray(); + for (int i = 0; i < bytes.length; i++) { + int v = bytes[i] & 0xFF; + hexChars[i * 2] = hexArray[v >>> 4]; + hexChars[i * 2 + 1] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + + private String sanitizeLabelPart(String value) { + if (value == null || value.isEmpty()) { + return "label"; + } + StringBuilder builder = new StringBuilder(value.length()); + for (int i = 0; i < value.length(); i++) { + char ch = value.charAt(i); + if (ch == '-' || ch == '_' || ch == ':' || Character.isLetterOrDigit(ch)) { + if (ch < 128) { + builder.append(ch); + } else { + builder.append('_'); + } + } else { + builder.append('_'); + } + } + return builder.length() == 0 ? "label" : builder.toString(); + } + public int getSubtaskId() { return subtaskId; } diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java index 6fe176cc2..683c29d91 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java @@ -128,6 +128,21 @@ public void testParseIdentifierIllegal() throws Exception { RestService.parseIdentifier(invalidIdentifier3, logger); } + @Test + public void testParseLoadState() throws Exception { + Assert.assertEquals( + LoadState.PRECOMMITTED, RestService.parseLoadState("PRECOMMITTED", logger)); + Assert.assertEquals(LoadState.ABORTED, RestService.parseLoadState("\"ABORTED\"", logger)); + Assert.assertEquals( + LoadState.VISIBLE, + RestService.parseLoadState( + "{\"code\":0,\"msg\":\"OK\",\"data\":\"VISIBLE\"}", logger)); + + thrown.expect(DorisException.class); + thrown.expectMessage(startsWith("Unknown stream load transaction state")); + RestService.parseLoadState("NOT_A_STATE", logger); + } + @Test public void testChoiceFe() throws Exception { String validFes = "1,2,3"; diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java index f8d36a731..aca0a8145 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java @@ -33,6 +33,8 @@ public void testIsCommitted() { "errCode = 2, detailMessage = transaction [2] is already VISIBLE, not pre-committed."; String visibleMsgWhenAbort = "errCode = 2, detailMessage = transaction [2] is already COMMITTED, not pre-committed."; + String visibleCouldNotAbort = + "errCode = 2, detailMessage = transaction [2] is already VISIBLE, could not abort."; String commitMsg = "errCode = 2, detailMessage = transaction [2] is already COMMIT, not pre-committed."; String abortedMsg = @@ -41,6 +43,7 @@ public void testIsCommitted() { Assert.assertTrue(ResponseUtil.isCommitted(visibleMsg)); Assert.assertTrue(ResponseUtil.isCommitted(committedMsgWhenAbort)); Assert.assertTrue(ResponseUtil.isCommitted(visibleMsgWhenAbort)); + Assert.assertTrue(ResponseUtil.isCommitted(visibleCouldNotAbort)); Assert.assertFalse(ResponseUtil.isCommitted(commitMsg)); Assert.assertFalse(ResponseUtil.isCommitted(abortedMsg)); } @@ -64,5 +67,8 @@ public void testIsAborted() { Assert.assertTrue(ResponseUtil.isAborted(alreadCommit)); Assert.assertTrue(ResponseUtil.isAborted(alreadVISIBLE)); Assert.assertFalse(ResponseUtil.isAborted(errormsg)); + Assert.assertTrue(ResponseUtil.isAlreadyAborted(alreadyAbort)); + Assert.assertFalse(ResponseUtil.isAlreadyAborted(alreadCommit)); + Assert.assertFalse(ResponseUtil.isAlreadyAborted(alreadVISIBLE)); } } diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index cda8674ad..c93607bf6 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -21,27 +21,33 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.rest.LoadState; import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.OptionUtils; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.core.StringStartsWith.startsWith; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** test for DorisStreamLoad. */ @@ -64,22 +70,31 @@ public void setUp() throws Exception { @Test public void testAbortPreCommit() throws Exception { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); - CloseableHttpResponse existLabelResponse = - HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE, true); - CloseableHttpResponse preCommitResponse = - HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true); - when(httpClient.execute(any())).thenReturn(existLabelResponse, preCommitResponse); + CloseableHttpResponse abortSuccessResponse = + HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE_BY_LABEL, true); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(abortSuccessResponse); DorisStreamLoad dorisStreamLoad = - spy( - new DorisStreamLoad( - "", - dorisOptions, - executionOptions, - new LabelGenerator("test001", true, "db.table", 0), - httpClient)); - - doNothing().when(dorisStreamLoad).abortTransaction(anyLong()); + new DorisStreamLoad( + "", + dorisOptions, + executionOptions, + new LabelGenerator("test001", true, "db.table", 0), + httpClient); + Map states = new HashMap<>(); + states.put("test001_db_table_0_1", LoadState.PRECOMMITTED); + states.put("test001_db_table_0_2", LoadState.UNKNOWN); + dorisStreamLoad.setLoadStateProvider( + (db, label) -> states.getOrDefault(label, LoadState.UNKNOWN)); dorisStreamLoad.abortPreCommit("test001", 1); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(HttpUriRequest.class); + verify(httpClient).execute(requestCaptor.capture()); + HttpUriRequest abortRequest = requestCaptor.getValue(); + Assert.assertTrue(abortRequest.getURI().toString().endsWith("/api/db/_stream_load_2pc")); + Assert.assertEquals( + "test001_db_table_0_1", abortRequest.getFirstHeader("label").getValue()); + Assert.assertEquals("abort", abortRequest.getFirstHeader("txn_operation").getValue()); } @Test @@ -118,13 +133,9 @@ public void testAbortTransactionError() throws Exception { dorisStreamLoad.abortTransaction(anyLong()); } - @Test(expected = DorisException.class) - public void testAbortTransactionLabelExistNoTxn() throws Exception { + @Test + public void testAbortPreCommitStopOnUnknown() throws Exception { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); - CloseableHttpResponse abortFailedResponse = - HttpTestUtil.getResponse( - HttpTestUtil.LABEL_EXIST_PRECOMMITTED_NO_TXN_TABLE_RESPONSE, true); - when(httpClient.execute(any())).thenReturn(abortFailedResponse); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad( "", @@ -132,7 +143,9 @@ public void testAbortTransactionLabelExistNoTxn() throws Exception { executionOptions, new LabelGenerator("test001", true, "db.tbl", 1), httpClient); - dorisStreamLoad.abortPreCommit("123", anyLong()); + dorisStreamLoad.setLoadStateProvider((db, label) -> LoadState.UNKNOWN); + dorisStreamLoad.abortPreCommit("test001", 1); + verify(httpClient, never()).execute(any(HttpUriRequest.class)); } @Test @@ -169,11 +182,8 @@ public void testAbortTransactionFailed() throws Exception { } @Test(expected = DorisException.class) - public void testAbortTransactionFailedCauseFinished() throws Exception { + public void testAbortPreCommitFailedCauseVisible() throws Exception { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); - CloseableHttpResponse abortFailedResponse = - HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_FINISHED_TABLE_RESPONSE, true); - when(httpClient.execute(any())).thenReturn(abortFailedResponse); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad( "", @@ -181,7 +191,35 @@ public void testAbortTransactionFailedCauseFinished() throws Exception { executionOptions, new LabelGenerator("test001", true, "db.tbl", 1), httpClient); - dorisStreamLoad.abortPreCommit("123", anyLong()); + dorisStreamLoad.setLoadStateProvider((db, label) -> LoadState.VISIBLE); + dorisStreamLoad.abortPreCommit("test001", 1); + } + + @Test + public void testAbortPreCommitSuccessWhenFollowUpStateAborted() throws Exception { + CloseableHttpClient httpClient = mock(CloseableHttpClient.class); + CloseableHttpResponse abortFailedResponse = + HttpTestUtil.getResponse(HttpTestUtil.ABORT_FAILED_RESPONSE_BY_LABEL, true); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(abortFailedResponse); + DorisStreamLoad dorisStreamLoad = + new DorisStreamLoad( + "", + dorisOptions, + executionOptions, + new LabelGenerator("test001", true, "db.table", 0), + httpClient); + AtomicInteger labelOneQueryCount = new AtomicInteger(); + dorisStreamLoad.setLoadStateProvider( + (db, label) -> { + if ("test001_db_table_0_1".equals(label)) { + return labelOneQueryCount.getAndIncrement() == 0 + ? LoadState.PRECOMMITTED + : LoadState.ABORTED; + } + return LoadState.UNKNOWN; + }); + dorisStreamLoad.abortPreCommit("test001", 1); + verify(httpClient).execute(any(HttpUriRequest.class)); } @Test(expected = Exception.class) diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index 8edb7a589..d8d05f3e7 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -36,6 +36,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import java.io.IOException; @@ -49,6 +50,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** test for DorisWriter. */ @@ -227,6 +229,76 @@ public void testSnapshot() throws Exception { Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix()); } + @Test + public void testAbortLingeringTransactionsUsesRecoveredWriterIdentity() throws Exception { + SinkWriterMetricGroup sinkWriterMetricGroup = mock(SinkWriterMetricGroup.class); + dorisOptions.setTableIdentifier(""); + DorisWriter dorisWriter = + new DorisWriter( + 1, + 3, + sinkWriterMetricGroup, + Collections.emptyList(), + new SimpleStringSerializer(), + dorisOptions, + readOptions, + executionOptions); + Map dorisStreamLoadMap = new ConcurrentHashMap<>(); + DorisStreamLoad streamLoad = mock(DorisStreamLoad.class); + dorisStreamLoadMap.put("db.table", streamLoad); + dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap); + + DorisWriterState recoveredState = new DorisWriterState("old_prefix", "db", "table", 7); + dorisWriter.abortLingeringTransactions(Collections.singletonList(recoveredState)); + + ArgumentCaptor labelGeneratorCaptor = + ArgumentCaptor.forClass(LabelGenerator.class); + verify(streamLoad) + .abortPreCommit( + org.mockito.ArgumentMatchers.eq("old_prefix"), + org.mockito.ArgumentMatchers.eq(2L), + labelGeneratorCaptor.capture()); + Assert.assertEquals( + "old_prefix_db_table_7_2", labelGeneratorCaptor.getValue().generateTableLabel(2)); + } + + @Test + public void testAbortLingeringTransactionsUsesRecoveredIdentityForFallbackLabel() + throws Exception { + SinkWriterMetricGroup sinkWriterMetricGroup = mock(SinkWriterMetricGroup.class); + dorisOptions.setTableIdentifier(""); + DorisWriter dorisWriter = + new DorisWriter( + 1, + 3, + sinkWriterMetricGroup, + Collections.emptyList(), + new SimpleStringSerializer(), + dorisOptions, + readOptions, + executionOptions); + Map dorisStreamLoadMap = new ConcurrentHashMap<>(); + DorisStreamLoad streamLoad = mock(DorisStreamLoad.class); + String table = + "数据表tabletabletabletabletabletabletabletabletabletabletabletabletabletabletable"; + dorisStreamLoadMap.put("数据库." + table, streamLoad); + dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap); + + DorisWriterState recoveredState = new DorisWriterState("old_prefix", "数据库", table, 7); + dorisWriter.abortLingeringTransactions(Collections.singletonList(recoveredState)); + + ArgumentCaptor labelGeneratorCaptor = + ArgumentCaptor.forClass(LabelGenerator.class); + verify(streamLoad) + .abortPreCommit( + org.mockito.ArgumentMatchers.eq("old_prefix"), + org.mockito.ArgumentMatchers.eq(2L), + labelGeneratorCaptor.capture()); + String label = labelGeneratorCaptor.getValue().generateTableLabel(2); + Assert.assertTrue(label.matches("old_prefix_[0-9a-f]{32}_7_2")); + Assert.assertEquals(label, labelGeneratorCaptor.getValue().generateTableLabel(2)); + } + public DorisWriteMetrics getMockWriteMetrics(SinkWriterMetricGroup sinkWriterMetricGroup) { DorisWriteMetrics dorisWriteMetrics = new DorisWriteMetrics(sinkWriterMetricGroup, dorisOptions.getTableIdentifier()); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java index c06b1966f..f79706cbb 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/TestLabelGenerator.java @@ -24,6 +24,7 @@ public class TestLabelGenerator { private static String UUID_REGEX_WITHOUT_LINE = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + private static final String LABEL_REGEX = "[-_A-Za-z0-9:]{1,128}"; @Test public void generateTableLabelTest() { @@ -49,7 +50,21 @@ public void generateTableLabelTest() { // mock table name chinese and 2pc labelGenerator = new LabelGenerator("test001", true, "数据库.数据表", 0); label = labelGenerator.generateTableLabel(1); - Assert.assertTrue(label.matches("test001_" + UUID_REGEX_WITHOUT_LINE + "_0_1")); + Assert.assertTrue(label.matches("test001_[0-9a-f]{32}_0_1")); + Assert.assertEquals(label, labelGenerator.generateTableLabel(1)); + Assert.assertTrue(label.matches(LABEL_REGEX)); + + // mock label length more than 128 and 2pc + labelGenerator = + new LabelGenerator( + "test001", + true, + "db.tabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletabletable", + 0); + label = labelGenerator.generateTableLabel(1); + Assert.assertTrue(label.matches("test001_[0-9a-f]{32}_0_1")); + Assert.assertEquals(label, labelGenerator.generateTableLabel(1)); + Assert.assertTrue(label.matches(LABEL_REGEX)); } @Test