diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteManager.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteManager.java new file mode 100644 index 000000000..96eaae988 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteManager.java @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.overwrite; + +import org.apache.flink.util.Preconditions; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider; +import org.apache.doris.flink.exception.DorisSystemException; +import org.apache.doris.flink.sink.writer.WriteMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Locale; + +/** DDL helper for INSERT OVERWRITE staging writes. */ +public class DorisOverwriteManager { + private static final Logger LOG = LoggerFactory.getLogger(DorisOverwriteManager.class); + private static final int MAX_STAGING_TABLE_LENGTH = 63; + private static final String STAGING_PREFIX = "__doris_flink_overwrite_"; + + private DorisOverwriteManager() {} + + public static DorisPreparedOverwrite prepareOverwrite( + DorisOptions targetOptions, DorisExecutionOptions executionOptions) { + validateOverwriteOptions(targetOptions, executionOptions); + DorisTableIdentifier targetTable = + DorisTableIdentifier.of(targetOptions.getTableIdentifier()); + DorisTableIdentifier stagingTable = + new DorisTableIdentifier( + targetTable.getDatabase(), + buildStagingTableName(targetTable, executionOptions.getLabelPrefix())); + DorisOptions stagingOptions = copyOptions(targetOptions, stagingTable.asString()); + + Long targetTableId; + Long stagingTableId; + SimpleJdbcConnectionProvider jdbcConnectionProvider = + new SimpleJdbcConnectionProvider(targetOptions); + try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { + targetTableId = requireTableId(connection, targetTable); + if (!tableExists(connection, stagingTable)) { + execute(connection, createTableLikeSql(stagingTable, targetTable)); + } else { + throw new DorisSystemException( + String.format( + "Doris overwrite staging table %s already exists. " + + "Use a unique sink.label-prefix for each INSERT OVERWRITE job " + + "or clean up the leftover staging table after verifying it is unused.", + stagingTable)); + } + stagingTableId = requireTableId(connection, stagingTable); + } catch (Exception e) { + throw new DorisSystemException( + String.format( + "Failed to prepare INSERT OVERWRITE staging table %s for target %s", + stagingTable, targetTable), + e); + } + + DorisOverwriteOptions overwriteOptions = + new DorisOverwriteOptions( + targetOptions, + targetTable, + stagingTable, + targetTableId, + stagingTableId, + executionOptions.getLabelPrefix()); + return new DorisPreparedOverwrite(stagingOptions, overwriteOptions); + } + + public static void finalizeOverwrite(DorisOverwriteOptions overwriteOptions) { + DorisOptions targetOptions = overwriteOptions.getTargetOptions(); + SimpleJdbcConnectionProvider jdbcConnectionProvider = + new SimpleJdbcConnectionProvider(targetOptions); + try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { + if (isAlreadyFinalized(connection, overwriteOptions)) { + LOG.info("Doris overwrite {} has already been finalized.", overwriteOptions); + return; + } + validateTargetUnchanged(connection, overwriteOptions); + execute(connection, replaceTableSql(overwriteOptions)); + } catch (Exception e) { + if (isFinalizedAfterFailure(jdbcConnectionProvider, overwriteOptions, e)) { + LOG.info( + "Doris overwrite {} was finalized before the failure returned.", + overwriteOptions); + return; + } + throw new DorisSystemException( + String.format("Failed to finalize Doris INSERT OVERWRITE %s", overwriteOptions), + e); + } + } + + public static DorisOptions copyOptions(DorisOptions options, String tableIdentifier) { + return DorisOptions.builder() + .setFenodes(options.getFenodes()) + .setBenodes(options.getBenodes()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setJdbcUrl(options.getJdbcUrl()) + .setAutoRedirect(options.isAutoRedirect()) + .setTableIdentifier(tableIdentifier) + .build(); + } + + public static String createTableLikeSql( + DorisTableIdentifier stagingTable, DorisTableIdentifier targetTable) { + return "CREATE TABLE " + stagingTable.toSql() + " LIKE " + targetTable.toSql(); + } + + public static String replaceTableSql(DorisOverwriteOptions overwriteOptions) { + return "ALTER TABLE " + + overwriteOptions.getTargetTable().toSql() + + " REPLACE WITH TABLE " + + DorisTableIdentifier.quote(overwriteOptions.getStagingTable().getTable()) + + " PROPERTIES('swap'='false')"; + } + + public static String buildStagingTableName(DorisTableIdentifier targetTable, String attemptId) { + String digest = + DigestUtils.sha256Hex(targetTable.asString() + "|" + attemptId).substring(0, 16); + String safeAttempt = attemptId.replaceAll("[^A-Za-z0-9_]", "_").toLowerCase(Locale.ROOT); + if (safeAttempt.isEmpty()) { + safeAttempt = "job"; + } + int maxAttemptLength = + MAX_STAGING_TABLE_LENGTH - STAGING_PREFIX.length() - digest.length() - 1; + if (safeAttempt.length() > maxAttemptLength) { + safeAttempt = safeAttempt.substring(0, maxAttemptLength); + } + return STAGING_PREFIX + safeAttempt + "_" + digest; + } + + static void validateOverwriteOptions( + DorisOptions targetOptions, DorisExecutionOptions executionOptions) { + Preconditions.checkArgument( + StringUtils.isNotBlank(targetOptions.getJdbcUrl()), + "jdbc-url is required for INSERT OVERWRITE staging mode."); + Preconditions.checkArgument( + WriteMode.STREAM_LOAD.equals(executionOptions.getWriteMode()), + "INSERT OVERWRITE staging mode only supports STREAM_LOAD write mode."); + Preconditions.checkArgument( + executionOptions.enabled2PC(), + "INSERT OVERWRITE staging mode requires sink.enable-2pc=true."); + Preconditions.checkArgument( + StringUtils.isNotBlank(executionOptions.getLabelPrefix()), + "sink.label-prefix is required for INSERT OVERWRITE staging mode."); + Preconditions.checkArgument( + !executionOptions.ignoreCommitError(), + "INSERT OVERWRITE staging mode does not support sink.ignore.commit-error=true."); + } + + private static void validateTargetUnchanged( + Connection connection, DorisOverwriteOptions overwriteOptions) throws Exception { + Long expectedTargetId = overwriteOptions.getTargetTableId(); + Long currentTargetId = requireTableId(connection, overwriteOptions.getTargetTable()); + if (!expectedTargetId.equals(currentTargetId)) { + throw new DorisSystemException( + String.format( + "Target table %s changed from id %s to id %s before overwrite finalization.", + overwriteOptions.getTargetTable(), expectedTargetId, currentTargetId)); + } + } + + private static boolean isFinalizedAfterFailure( + SimpleJdbcConnectionProvider jdbcConnectionProvider, + DorisOverwriteOptions overwriteOptions, + Exception originalException) { + try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { + return isAlreadyFinalized(connection, overwriteOptions); + } catch (Exception checkException) { + originalException.addSuppressed(checkException); + return false; + } + } + + private static boolean isAlreadyFinalized( + Connection connection, DorisOverwriteOptions overwriteOptions) throws Exception { + Long stagingTableId = overwriteOptions.getStagingTableId(); + Long currentTargetId = queryTableId(connection, overwriteOptions.getTargetTable()); + return stagingTableId.equals(currentTargetId) + && !tableExists(connection, overwriteOptions.getStagingTable()); + } + + private static boolean tableExists(Connection connection, DorisTableIdentifier table) + throws Exception { + String sql = "SHOW TABLES FROM " + DorisTableIdentifier.quote(table.getDatabase()); + try (Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql)) { + while (resultSet.next()) { + if (table.getTable().equals(resultSet.getString(1))) { + return true; + } + } + return false; + } + } + + private static Long queryTableId(Connection connection, DorisTableIdentifier table) + throws Exception { + String sql = + "SELECT TABLE_ID FROM information_schema.metadata_name_ids " + + "WHERE DATABASE_NAME = ? AND TABLE_NAME = ?"; + try (PreparedStatement statement = connection.prepareStatement(sql)) { + statement.setString(1, table.getDatabase()); + statement.setString(2, table.getTable()); + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + return resultSet.getLong(1); + } + return null; + } + } catch (Exception e) { + throw new DorisSystemException( + "Failed to query Doris table id for " + + table + + ". INSERT OVERWRITE staging mode requires " + + "information_schema.metadata_name_ids to be available.", + e); + } + } + + private static Long requireTableId(Connection connection, DorisTableIdentifier table) + throws Exception { + Long tableId = queryTableId(connection, table); + if (tableId == null) { + throw new DorisSystemException("Doris table " + table + " does not exist."); + } + return tableId; + } + + private static void execute(Connection connection, String sql) throws Exception { + try (Statement statement = connection.createStatement()) { + LOG.info("Executing Doris overwrite SQL: {}", sql); + statement.execute(sql); + } + } +} diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteOptions.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteOptions.java new file mode 100644 index 000000000..23302e0ee --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteOptions.java @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.overwrite; + +import org.apache.doris.flink.cfg.DorisOptions; + +import java.io.Serializable; +import java.util.Objects; + +/** Metadata required to finalize an INSERT OVERWRITE staging write. */ +public class DorisOverwriteOptions implements Serializable { + private static final long serialVersionUID = 1L; + + private final DorisOptions targetOptions; + private final DorisTableIdentifier targetTable; + private final DorisTableIdentifier stagingTable; + private final Long targetTableId; + private final Long stagingTableId; + private final String attemptId; + + public DorisOverwriteOptions( + DorisOptions targetOptions, + DorisTableIdentifier targetTable, + DorisTableIdentifier stagingTable, + Long targetTableId, + Long stagingTableId, + String attemptId) { + this.targetOptions = + DorisOverwriteManager.copyOptions(targetOptions, targetTable.asString()); + this.targetTable = targetTable; + this.stagingTable = stagingTable; + this.targetTableId = targetTableId; + this.stagingTableId = stagingTableId; + this.attemptId = attemptId; + } + + public DorisOptions getTargetOptions() { + return DorisOverwriteManager.copyOptions(targetOptions, targetTable.asString()); + } + + public DorisTableIdentifier getTargetTable() { + return targetTable; + } + + public DorisTableIdentifier getStagingTable() { + return stagingTable; + } + + public Long getTargetTableId() { + return targetTableId; + } + + public Long getStagingTableId() { + return stagingTableId; + } + + public String getAttemptId() { + return attemptId; + } + + @Override + public String toString() { + return "DorisOverwriteOptions{" + + "targetTable=" + + targetTable + + ", stagingTable=" + + stagingTable + + ", targetTableId=" + + targetTableId + + ", stagingTableId=" + + stagingTableId + + ", attemptId='" + + attemptId + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DorisOverwriteOptions that = (DorisOverwriteOptions) o; + return Objects.equals(targetOptions, that.targetOptions) + && Objects.equals(targetTable, that.targetTable) + && Objects.equals(stagingTable, that.stagingTable) + && Objects.equals(targetTableId, that.targetTableId) + && Objects.equals(stagingTableId, that.stagingTableId) + && Objects.equals(attemptId, that.attemptId); + } + + @Override + public int hashCode() { + return Objects.hash( + targetOptions, targetTable, stagingTable, targetTableId, stagingTableId, attemptId); + } +} diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisPreparedOverwrite.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisPreparedOverwrite.java new file mode 100644 index 000000000..0d7753ace --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisPreparedOverwrite.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.overwrite; + +import org.apache.doris.flink.cfg.DorisOptions; + +/** Prepared INSERT OVERWRITE staging context. */ +public class DorisPreparedOverwrite { + private final DorisOptions sinkOptions; + private final DorisOverwriteOptions overwriteOptions; + + public DorisPreparedOverwrite( + DorisOptions sinkOptions, DorisOverwriteOptions overwriteOptions) { + this.sinkOptions = sinkOptions; + this.overwriteOptions = overwriteOptions; + } + + public DorisOptions getSinkOptions() { + return sinkOptions; + } + + public DorisOverwriteOptions getOverwriteOptions() { + return overwriteOptions; + } +} diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisTableIdentifier.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisTableIdentifier.java new file mode 100644 index 000000000..99bb9a646 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/overwrite/DorisTableIdentifier.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.overwrite; + +import java.io.Serializable; +import java.util.Objects; + +/** Doris table identifier for overwrite DDL. */ +public class DorisTableIdentifier implements Serializable { + private static final long serialVersionUID = 1L; + + private final String database; + private final String table; + + public DorisTableIdentifier(String database, String table) { + if (database == null || database.trim().isEmpty()) { + throw new IllegalArgumentException("Doris database name must not be empty."); + } + if (table == null || table.trim().isEmpty()) { + throw new IllegalArgumentException("Doris table name must not be empty."); + } + this.database = database.trim(); + this.table = table.trim(); + } + + public static DorisTableIdentifier of(String tableIdentifier) { + if (tableIdentifier == null || tableIdentifier.trim().isEmpty()) { + throw new IllegalArgumentException( + "table.identifier is required for INSERT OVERWRITE."); + } + String[] parts = tableIdentifier.trim().split("\\.", -1); + if (parts.length != 2) { + throw new IllegalArgumentException( + "INSERT OVERWRITE only supports single-table identifier db.table."); + } + return new DorisTableIdentifier(parts[0], parts[1]); + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public String asString() { + return database + "." + table; + } + + public String toSql() { + return quote(database) + "." + quote(table); + } + + public static String quote(String identifier) { + return "`" + identifier.replace("`", "``") + "`"; + } + + @Override + public String toString() { + return asString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DorisTableIdentifier that = (DorisTableIdentifier) o; + return Objects.equals(database, that.database) && Objects.equals(table, that.table); + } + + @Override + public int hashCode() { + return Objects.hash(database, table); + } +} diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/overwrite/TestDorisOverwriteManager.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/overwrite/TestDorisOverwriteManager.java new file mode 100644 index 000000000..547eec287 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/overwrite/TestDorisOverwriteManager.java @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.overwrite; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.writer.WriteMode; +import org.junit.Assert; +import org.junit.Test; + +/** Tests for Doris overwrite staging helpers. */ +public class TestDorisOverwriteManager { + + @Test + public void testTableIdentifier() { + DorisTableIdentifier identifier = DorisTableIdentifier.of("db.tbl"); + Assert.assertEquals("db", identifier.getDatabase()); + Assert.assertEquals("tbl", identifier.getTable()); + Assert.assertEquals("db.tbl", identifier.asString()); + Assert.assertEquals("`db`.`tbl`", identifier.toSql()); + } + + @Test + public void testInvalidTableIdentifier() { + assertInvalidIdentifier(null); + assertInvalidIdentifier(""); + assertInvalidIdentifier("tbl"); + assertInvalidIdentifier("catalog.db.tbl"); + assertInvalidIdentifier("db."); + assertInvalidIdentifier(".tbl"); + } + + @Test + public void testStagingTableNameIsStableAndBounded() { + DorisTableIdentifier target = DorisTableIdentifier.of("db.tbl"); + String staging = DorisOverwriteManager.buildStagingTableName(target, "label-prefix"); + Assert.assertEquals( + staging, DorisOverwriteManager.buildStagingTableName(target, "label-prefix")); + Assert.assertTrue(staging.startsWith("__doris_flink_overwrite_label_prefix_")); + Assert.assertTrue(staging.length() <= 63); + } + + @Test + public void testSqlGeneration() { + DorisTableIdentifier target = DorisTableIdentifier.of("db.tbl"); + DorisTableIdentifier staging = DorisTableIdentifier.of("db.__staging"); + DorisOverwriteOptions options = + new DorisOverwriteOptions( + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setJdbcUrl("jdbc:mysql://127.0.0.1:9030") + .setTableIdentifier(target.asString()) + .build(), + target, + staging, + 10L, + 20L, + "label"); + Assert.assertEquals( + "CREATE TABLE `db`.`__staging` LIKE `db`.`tbl`", + DorisOverwriteManager.createTableLikeSql(staging, target)); + Assert.assertEquals( + "ALTER TABLE `db`.`tbl` REPLACE WITH TABLE `__staging` PROPERTIES('swap'='false')", + DorisOverwriteManager.replaceTableSql(options)); + } + + @Test + public void testCopyOptions() { + DorisOptions options = + DorisOptions.builder() + .setFenodes("fe:8030") + .setBenodes("be:8040") + .setUsername("root") + .setPassword("pwd") + .setJdbcUrl("jdbc:mysql://fe:9030") + .setAutoRedirect(false) + .setTableIdentifier("db.tbl") + .build(); + DorisOptions copied = DorisOverwriteManager.copyOptions(options, "db.staging"); + Assert.assertEquals("db.staging", copied.getTableIdentifier()); + Assert.assertEquals(options.getFenodes(), copied.getFenodes()); + Assert.assertEquals(options.getBenodes(), copied.getBenodes()); + Assert.assertEquals(options.getUsername(), copied.getUsername()); + Assert.assertEquals(options.getPassword(), copied.getPassword()); + Assert.assertEquals(options.getJdbcUrl(), copied.getJdbcUrl()); + Assert.assertEquals(options.isAutoRedirect(), copied.isAutoRedirect()); + } + + @Test + public void testValidateOverwriteOptions() { + DorisOverwriteManager.validateOverwriteOptions( + DorisOptions.builder() + .setFenodes("fe:8030") + .setJdbcUrl("jdbc:mysql://fe:9030") + .setTableIdentifier("db.tbl") + .build(), + DorisExecutionOptions.builder().setLabelPrefix("label").build()); + } + + @Test + public void testValidateOverwriteOptionsRejectsIgnoreCommitError() { + assertInvalidOverwriteOptions( + DorisOptions.builder() + .setFenodes("fe:8030") + .setJdbcUrl("jdbc:mysql://fe:9030") + .setTableIdentifier("db.tbl") + .build(), + DorisExecutionOptions.builder() + .setLabelPrefix("label") + .setIgnoreCommitError(true) + .build(), + "sink.ignore.commit-error"); + } + + @Test + public void testValidateOverwriteOptionsRejectsUnsafeModes() { + DorisOptions options = + DorisOptions.builder() + .setFenodes("fe:8030") + .setJdbcUrl("jdbc:mysql://fe:9030") + .setTableIdentifier("db.tbl") + .build(); + assertInvalidOverwriteOptions( + options, + DorisExecutionOptions.builder().disable2PC().setLabelPrefix("label").build(), + "sink.enable-2pc"); + assertInvalidOverwriteOptions( + options, + DorisExecutionOptions.builder() + .setLabelPrefix("label") + .setWriteMode(WriteMode.COPY) + .build(), + "STREAM_LOAD"); + assertInvalidOverwriteOptions( + options, DorisExecutionOptions.builder().build(), "sink.label-prefix"); + } + + private void assertInvalidIdentifier(String tableIdentifier) { + try { + DorisTableIdentifier.of(tableIdentifier); + Assert.fail("Expected invalid identifier: " + tableIdentifier); + } catch (IllegalArgumentException expected) { + } + } + + private void assertInvalidOverwriteOptions( + DorisOptions options, DorisExecutionOptions executionOptions, String messagePart) { + try { + DorisOverwriteManager.validateOverwriteOptions(options, executionOptions); + Assert.fail("Expected invalid overwrite options."); + } catch (IllegalArgumentException expected) { + Assert.assertTrue(expected.getMessage().contains(messagePart)); + } + } +} diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/sink/DorisSink.java index f95a00979..2a86b9ae4 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -19,10 +19,14 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.Preconditions; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -34,6 +38,8 @@ import org.apache.doris.flink.sink.copy.CopyCommittableSerializer; import org.apache.doris.flink.sink.copy.DorisCopyCommitter; import org.apache.doris.flink.sink.copy.DorisCopyWriterAdapter; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteFinalizerOperator; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteOptions; import org.apache.doris.flink.sink.writer.DorisAbstractWriter; import org.apache.doris.flink.sink.writer.DorisWriter; import org.apache.doris.flink.sink.writer.DorisWriterAdapter; @@ -56,22 +62,34 @@ @PublicEvolving public class DorisSink implements StatefulSink, - TwoPhaseCommittingSink { + TwoPhaseCommittingSink, + SupportsPostCommitTopology { private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class); private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; private final DorisExecutionOptions dorisExecutionOptions; private final DorisRecordSerializer serializer; + private final DorisOverwriteOptions overwriteOptions; public DorisSink( DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions, DorisRecordSerializer serializer) { + this(dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer, null); + } + + public DorisSink( + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions dorisExecutionOptions, + DorisRecordSerializer serializer, + DorisOverwriteOptions overwriteOptions) { this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; this.dorisExecutionOptions = dorisExecutionOptions; this.serializer = serializer; + this.overwriteOptions = overwriteOptions; checkKeyType(); } @@ -101,6 +119,21 @@ public Committer createCommitter() throws IOException { "Unsupported write mode " + dorisExecutionOptions.getWriteMode()); } + @Override + public void addPostCommitTopology( + DataStream> committables) { + if (overwriteOptions == null) { + return; + } + committables + .global() + .transform( + "Doris INSERT OVERWRITE finalizer", + Types.VOID, + new DorisOverwriteFinalizerOperator(overwriteOptions)) + .setParallelism(1); + } + @Override public DorisAbstractWriter restoreWriter( InitContext initContext, Collection recoveredState) @@ -161,6 +194,7 @@ public static class Builder { private DorisReadOptions dorisReadOptions; private DorisExecutionOptions dorisExecutionOptions; private DorisRecordSerializer serializer; + private DorisOverwriteOptions overwriteOptions; /** * Sets the DorisOptions for the DorisSink. @@ -207,6 +241,11 @@ public Builder setSerializer(DorisRecordSerializer serializer) { return this; } + public Builder setOverwriteOptions(DorisOverwriteOptions overwriteOptions) { + this.overwriteOptions = overwriteOptions; + return this; + } + /** * Build the {@link DorisSink}. * @@ -220,7 +259,11 @@ public DorisSink build() { dorisReadOptions = DorisReadOptions.builder().build(); } return new DorisSink<>( - dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer); + dorisOptions, + dorisReadOptions, + dorisExecutionOptions, + serializer, + overwriteOptions); } } } diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteFinalizerOperator.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteFinalizerOperator.java new file mode 100644 index 000000000..89acd17b6 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteFinalizerOperator.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.overwrite; + +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.apache.doris.flink.sink.DorisAbstractCommittable; + +/** Finalizes bounded INSERT OVERWRITE after all post-commit messages are consumed. */ +public class DorisOverwriteFinalizerOperator extends AbstractStreamOperator + implements OneInputStreamOperator, Void>, + BoundedOneInput { + private static final long serialVersionUID = 1L; + + private final DorisOverwriteOptions overwriteOptions; + + public DorisOverwriteFinalizerOperator(DorisOverwriteOptions overwriteOptions) { + this.overwriteOptions = overwriteOptions; + } + + @Override + public void processElement(StreamRecord> element) { + // The post-commit topology only needs the bounded end-of-input signal. + } + + @Override + public void endInput() { + DorisOverwriteManager.finalizeOverwrite(overwriteOptions); + } +} diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index f43d49bc7..97521b359 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -28,16 +28,16 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider; -import org.apache.doris.flink.exception.DorisSystemException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteManager; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteOptions; +import org.apache.doris.flink.sink.overwrite.DorisPreparedOverwrite; +import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.Statement; import java.util.Arrays; import java.util.Objects; import java.util.Properties; @@ -84,10 +84,35 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + DorisOptions sinkOptions = options; + DorisOverwriteOptions overwriteOptions = null; + Boolean targetUniqueKeyType = null; + if (overwrite) { + if (!context.isBounded()) { + throw new IllegalStateException("Streaming mode not support overwrite."); + } + Preconditions.checkArgument( + WriteMode.STREAM_LOAD.equals(executionOptions.getWriteMode()), + "INSERT OVERWRITE only supports STREAM_LOAD write mode."); + Preconditions.checkArgument( + executionOptions.enabled2PC(), + "INSERT OVERWRITE requires sink.enable-2pc=true."); + targetUniqueKeyType = RestService.isUniqueKeyType(options, readOptions, LOG); + Preconditions.checkArgument( + !targetUniqueKeyType || executionOptions.force2PC(), + "INSERT OVERWRITE on unique key table requires explicitly setting sink.enable-2pc=true."); + DorisPreparedOverwrite preparedOverwrite = + DorisOverwriteManager.prepareOverwrite(options, executionOptions); + sinkOptions = preparedOverwrite.getSinkOptions(); + overwriteOptions = preparedOverwrite.getOverwriteOptions(); + } + Properties loadProperties = executionOptions.getStreamLoadProp(); boolean deletable = executionOptions.getDeletable() - && RestService.isUniqueKeyType(options, readOptions, LOG); + && (targetUniqueKeyType == null + ? RestService.isUniqueKeyType(sinkOptions, readOptions, LOG) + : targetUniqueKeyType); if (!loadProperties.containsKey(COLUMNS_KEY)) { String[] fieldNames = tableSchema.getFieldNames(); Preconditions.checkState(fieldNames != null && fieldNames.length > 0); @@ -117,43 +142,15 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { DorisSink.Builder dorisSinkBuilder = DorisSink.builder(); dorisSinkBuilder - .setDorisOptions(options) + .setDorisOptions(sinkOptions) .setDorisReadOptions(readOptions) .setDorisExecutionOptions(executionOptions) + .setOverwriteOptions(overwriteOptions) .setSerializer(serializerBuilder.build()); DorisSink dorisSink = dorisSinkBuilder.build(); - - // for insert overwrite - if (overwrite) { - if (context.isBounded()) { - // execute jdbc query to truncate table - Preconditions.checkArgument( - options.getJdbcUrl() != null, "jdbc-url is required for Overwrite mode."); - // todo: should be written to a temporary table first, - // and then use GlobalCommitter to perform the rename. - truncateTable(); - } else { - throw new IllegalStateException("Streaming mode not support overwrite."); - } - } return SinkV2Provider.of(dorisSink, sinkParallelism); } - private void truncateTable() { - String truncateQuery = "TRUNCATE TABLE " + options.getTableIdentifier(); - SimpleJdbcConnectionProvider jdbcConnectionProvider = - new SimpleJdbcConnectionProvider(options); - try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection(); - Statement statement = connection.createStatement()) { - LOG.info("Executing truncate query: {}", truncateQuery); - statement.execute(truncateQuery); - } catch (Exception e) { - LOG.error("Failed to execute truncate query: {}", truncateQuery, e); - throw new DorisSystemException( - String.format("Failed to execute truncate query: %s", truncateQuery), e); - } - } - @Override public DynamicTableSink copy() { DorisDynamicTableSink sink = diff --git a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/DorisSink.java index fe1e920ff..b90cfa041 100644 --- a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.connector.sink2.Sink; @@ -26,6 +27,9 @@ import org.apache.flink.api.connector.sink2.SupportsWriterState; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.Preconditions; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -37,6 +41,8 @@ import org.apache.doris.flink.sink.copy.CopyCommittableSerializer; import org.apache.doris.flink.sink.copy.DorisCopyCommitter; import org.apache.doris.flink.sink.copy.DorisCopyWriterAdapter; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteFinalizerOperator; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteOptions; import org.apache.doris.flink.sink.writer.DorisAbstractWriter; import org.apache.doris.flink.sink.writer.DorisWriter; import org.apache.doris.flink.sink.writer.DorisWriterAdapter; @@ -60,22 +66,34 @@ public class DorisSink implements Sink, SupportsWriterState, - SupportsCommitter { + SupportsCommitter, + SupportsPostCommitTopology { private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class); private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; private final DorisExecutionOptions dorisExecutionOptions; private final DorisRecordSerializer serializer; + private final DorisOverwriteOptions overwriteOptions; public DorisSink( DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions, DorisRecordSerializer serializer) { + this(dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer, null); + } + + public DorisSink( + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions dorisExecutionOptions, + DorisRecordSerializer serializer, + DorisOverwriteOptions overwriteOptions) { this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; this.dorisExecutionOptions = dorisExecutionOptions; this.serializer = serializer; + this.overwriteOptions = overwriteOptions; checkKeyType(); } @@ -105,6 +123,21 @@ public Committer createCommitter(CommitterInitContext committerInitContext) thro "Unsupported write mode " + dorisExecutionOptions.getWriteMode()); } + @Override + public void addPostCommitTopology( + DataStream> committables) { + if (overwriteOptions == null) { + return; + } + committables + .global() + .transform( + "Doris INSERT OVERWRITE finalizer", + Types.VOID, + new DorisOverwriteFinalizerOperator(overwriteOptions)) + .setParallelism(1); + } + @Override public DorisAbstractWriter restoreWriter( WriterInitContext initContext, Collection recoveredState) @@ -165,6 +198,7 @@ public static class Builder { private DorisReadOptions dorisReadOptions; private DorisExecutionOptions dorisExecutionOptions; private DorisRecordSerializer serializer; + private DorisOverwriteOptions overwriteOptions; /** * Sets the DorisOptions for the DorisSink. @@ -211,6 +245,11 @@ public Builder setSerializer(DorisRecordSerializer serializer) { return this; } + public Builder setOverwriteOptions(DorisOverwriteOptions overwriteOptions) { + this.overwriteOptions = overwriteOptions; + return this; + } + /** * Build the {@link DorisSink}. * @@ -224,7 +263,11 @@ public DorisSink build() { dorisReadOptions = DorisReadOptions.builder().build(); } return new DorisSink<>( - dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer); + dorisOptions, + dorisReadOptions, + dorisExecutionOptions, + serializer, + overwriteOptions); } } } diff --git a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteFinalizerOperator.java b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteFinalizerOperator.java new file mode 100644 index 000000000..89acd17b6 --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/sink/overwrite/DorisOverwriteFinalizerOperator.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.overwrite; + +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.apache.doris.flink.sink.DorisAbstractCommittable; + +/** Finalizes bounded INSERT OVERWRITE after all post-commit messages are consumed. */ +public class DorisOverwriteFinalizerOperator extends AbstractStreamOperator + implements OneInputStreamOperator, Void>, + BoundedOneInput { + private static final long serialVersionUID = 1L; + + private final DorisOverwriteOptions overwriteOptions; + + public DorisOverwriteFinalizerOperator(DorisOverwriteOptions overwriteOptions) { + this.overwriteOptions = overwriteOptions; + } + + @Override + public void processElement(StreamRecord> element) { + // The post-commit topology only needs the bounded end-of-input signal. + } + + @Override + public void endInput() { + DorisOverwriteManager.finalizeOverwrite(overwriteOptions); + } +} diff --git a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index b3d75ded5..2de7929fd 100644 --- a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -28,16 +28,16 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider; -import org.apache.doris.flink.exception.DorisSystemException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteManager; +import org.apache.doris.flink.sink.overwrite.DorisOverwriteOptions; +import org.apache.doris.flink.sink.overwrite.DorisPreparedOverwrite; +import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.Statement; import java.util.Arrays; import java.util.Objects; import java.util.Properties; @@ -84,10 +84,35 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + DorisOptions sinkOptions = options; + DorisOverwriteOptions overwriteOptions = null; + Boolean targetUniqueKeyType = null; + if (overwrite) { + if (!context.isBounded()) { + throw new IllegalStateException("Streaming mode not support overwrite."); + } + Preconditions.checkArgument( + WriteMode.STREAM_LOAD.equals(executionOptions.getWriteMode()), + "INSERT OVERWRITE only supports STREAM_LOAD write mode."); + Preconditions.checkArgument( + executionOptions.enabled2PC(), + "INSERT OVERWRITE requires sink.enable-2pc=true."); + targetUniqueKeyType = RestService.isUniqueKeyType(options, readOptions, LOG); + Preconditions.checkArgument( + !targetUniqueKeyType || executionOptions.force2PC(), + "INSERT OVERWRITE on unique key table requires explicitly setting sink.enable-2pc=true."); + DorisPreparedOverwrite preparedOverwrite = + DorisOverwriteManager.prepareOverwrite(options, executionOptions); + sinkOptions = preparedOverwrite.getSinkOptions(); + overwriteOptions = preparedOverwrite.getOverwriteOptions(); + } + Properties loadProperties = executionOptions.getStreamLoadProp(); boolean deletable = executionOptions.getDeletable() - && RestService.isUniqueKeyType(options, readOptions, LOG); + && (targetUniqueKeyType == null + ? RestService.isUniqueKeyType(sinkOptions, readOptions, LOG) + : targetUniqueKeyType); if (!loadProperties.containsKey(COLUMNS_KEY)) { String[] fieldNames = tableSchema.getFieldNames(); Preconditions.checkState(fieldNames != null && fieldNames.length > 0); @@ -117,43 +142,15 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { DorisSink.Builder dorisSinkBuilder = DorisSink.builder(); dorisSinkBuilder - .setDorisOptions(options) + .setDorisOptions(sinkOptions) .setDorisReadOptions(readOptions) .setDorisExecutionOptions(executionOptions) + .setOverwriteOptions(overwriteOptions) .setSerializer(serializerBuilder.build()); DorisSink dorisSink = dorisSinkBuilder.build(); - - // for insert overwrite - if (overwrite) { - if (context.isBounded()) { - // execute jdbc query to truncate table - Preconditions.checkArgument( - options.getJdbcUrl() != null, "jdbc-url is required for Overwrite mode."); - // todo: should be written to a temporary table first, - // and then use GlobalCommitter to perform the rename. - truncateTable(); - } else { - throw new IllegalStateException("Streaming mode not support overwrite."); - } - } return SinkV2Provider.of(dorisSink, sinkParallelism); } - private void truncateTable() { - String truncateQuery = "TRUNCATE TABLE " + options.getTableIdentifier(); - SimpleJdbcConnectionProvider jdbcConnectionProvider = - new SimpleJdbcConnectionProvider(options); - try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection(); - Statement statement = connection.createStatement()) { - LOG.info("Executing truncate query: {}", truncateQuery); - statement.execute(truncateQuery); - } catch (Exception e) { - LOG.error("Failed to execute truncate query: {}", truncateQuery, e); - throw new DorisSystemException( - String.format("Failed to execute truncate query: %s", truncateQuery), e); - } - } - @Override public DynamicTableSink copy() { DorisDynamicTableSink sink =