Skip to content

Commit 4ff76d9

Browse files
committed
Support staged Doris insert overwrite
1 parent 0044826 commit 4ff76d9

11 files changed

Lines changed: 933 additions & 74 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.flink.sink.overwrite;
19+
20+
import org.apache.flink.util.Preconditions;
21+
22+
import org.apache.commons.codec.digest.DigestUtils;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.doris.flink.cfg.DorisExecutionOptions;
25+
import org.apache.doris.flink.cfg.DorisOptions;
26+
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
27+
import org.apache.doris.flink.exception.DorisSystemException;
28+
import org.apache.doris.flink.sink.writer.WriteMode;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.sql.Connection;
33+
import java.sql.PreparedStatement;
34+
import java.sql.ResultSet;
35+
import java.sql.Statement;
36+
import java.util.Locale;
37+
38+
/** DDL helper for INSERT OVERWRITE staging writes. */
39+
public class DorisOverwriteManager {
40+
private static final Logger LOG = LoggerFactory.getLogger(DorisOverwriteManager.class);
41+
private static final int MAX_STAGING_TABLE_LENGTH = 63;
42+
private static final String STAGING_PREFIX = "__doris_flink_overwrite_";
43+
44+
private DorisOverwriteManager() {}
45+
46+
public static DorisPreparedOverwrite prepareOverwrite(
47+
DorisOptions targetOptions, DorisExecutionOptions executionOptions) {
48+
validateOverwriteOptions(targetOptions, executionOptions);
49+
DorisTableIdentifier targetTable =
50+
DorisTableIdentifier.of(targetOptions.getTableIdentifier());
51+
DorisTableIdentifier stagingTable =
52+
new DorisTableIdentifier(
53+
targetTable.getDatabase(),
54+
buildStagingTableName(targetTable, executionOptions.getLabelPrefix()));
55+
DorisOptions stagingOptions = copyOptions(targetOptions, stagingTable.asString());
56+
57+
Long targetTableId;
58+
Long stagingTableId;
59+
SimpleJdbcConnectionProvider jdbcConnectionProvider =
60+
new SimpleJdbcConnectionProvider(targetOptions);
61+
try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) {
62+
targetTableId = requireTableId(connection, targetTable);
63+
if (!tableExists(connection, stagingTable)) {
64+
execute(connection, createTableLikeSql(stagingTable, targetTable));
65+
} else {
66+
throw new DorisSystemException(
67+
String.format(
68+
"Doris overwrite staging table %s already exists. "
69+
+ "Use a unique sink.label-prefix for each INSERT OVERWRITE job "
70+
+ "or clean up the leftover staging table after verifying it is unused.",
71+
stagingTable));
72+
}
73+
stagingTableId = requireTableId(connection, stagingTable);
74+
} catch (Exception e) {
75+
throw new DorisSystemException(
76+
String.format(
77+
"Failed to prepare INSERT OVERWRITE staging table %s for target %s",
78+
stagingTable, targetTable),
79+
e);
80+
}
81+
82+
DorisOverwriteOptions overwriteOptions =
83+
new DorisOverwriteOptions(
84+
targetOptions,
85+
targetTable,
86+
stagingTable,
87+
targetTableId,
88+
stagingTableId,
89+
executionOptions.getLabelPrefix());
90+
return new DorisPreparedOverwrite(stagingOptions, overwriteOptions);
91+
}
92+
93+
public static void finalizeOverwrite(DorisOverwriteOptions overwriteOptions) {
94+
DorisOptions targetOptions = overwriteOptions.getTargetOptions();
95+
SimpleJdbcConnectionProvider jdbcConnectionProvider =
96+
new SimpleJdbcConnectionProvider(targetOptions);
97+
try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) {
98+
if (isAlreadyFinalized(connection, overwriteOptions)) {
99+
LOG.info("Doris overwrite {} has already been finalized.", overwriteOptions);
100+
return;
101+
}
102+
validateTargetUnchanged(connection, overwriteOptions);
103+
execute(connection, replaceTableSql(overwriteOptions));
104+
} catch (Exception e) {
105+
if (isFinalizedAfterFailure(jdbcConnectionProvider, overwriteOptions, e)) {
106+
LOG.info(
107+
"Doris overwrite {} was finalized before the failure returned.",
108+
overwriteOptions);
109+
return;
110+
}
111+
throw new DorisSystemException(
112+
String.format("Failed to finalize Doris INSERT OVERWRITE %s", overwriteOptions),
113+
e);
114+
}
115+
}
116+
117+
public static DorisOptions copyOptions(DorisOptions options, String tableIdentifier) {
118+
return DorisOptions.builder()
119+
.setFenodes(options.getFenodes())
120+
.setBenodes(options.getBenodes())
121+
.setUsername(options.getUsername())
122+
.setPassword(options.getPassword())
123+
.setJdbcUrl(options.getJdbcUrl())
124+
.setAutoRedirect(options.isAutoRedirect())
125+
.setTableIdentifier(tableIdentifier)
126+
.build();
127+
}
128+
129+
public static String createTableLikeSql(
130+
DorisTableIdentifier stagingTable, DorisTableIdentifier targetTable) {
131+
return "CREATE TABLE " + stagingTable.toSql() + " LIKE " + targetTable.toSql();
132+
}
133+
134+
public static String replaceTableSql(DorisOverwriteOptions overwriteOptions) {
135+
return "ALTER TABLE "
136+
+ overwriteOptions.getTargetTable().toSql()
137+
+ " REPLACE WITH TABLE "
138+
+ DorisTableIdentifier.quote(overwriteOptions.getStagingTable().getTable())
139+
+ " PROPERTIES('swap'='false')";
140+
}
141+
142+
public static String buildStagingTableName(DorisTableIdentifier targetTable, String attemptId) {
143+
String digest =
144+
DigestUtils.sha256Hex(targetTable.asString() + "|" + attemptId).substring(0, 16);
145+
String safeAttempt = attemptId.replaceAll("[^A-Za-z0-9_]", "_").toLowerCase(Locale.ROOT);
146+
if (safeAttempt.isEmpty()) {
147+
safeAttempt = "job";
148+
}
149+
int maxAttemptLength =
150+
MAX_STAGING_TABLE_LENGTH - STAGING_PREFIX.length() - digest.length() - 1;
151+
if (safeAttempt.length() > maxAttemptLength) {
152+
safeAttempt = safeAttempt.substring(0, maxAttemptLength);
153+
}
154+
return STAGING_PREFIX + safeAttempt + "_" + digest;
155+
}
156+
157+
static void validateOverwriteOptions(
158+
DorisOptions targetOptions, DorisExecutionOptions executionOptions) {
159+
Preconditions.checkArgument(
160+
StringUtils.isNotBlank(targetOptions.getJdbcUrl()),
161+
"jdbc-url is required for INSERT OVERWRITE staging mode.");
162+
Preconditions.checkArgument(
163+
WriteMode.STREAM_LOAD.equals(executionOptions.getWriteMode()),
164+
"INSERT OVERWRITE staging mode only supports STREAM_LOAD write mode.");
165+
Preconditions.checkArgument(
166+
executionOptions.enabled2PC(),
167+
"INSERT OVERWRITE staging mode requires sink.enable-2pc=true.");
168+
Preconditions.checkArgument(
169+
StringUtils.isNotBlank(executionOptions.getLabelPrefix()),
170+
"sink.label-prefix is required for INSERT OVERWRITE staging mode.");
171+
Preconditions.checkArgument(
172+
!executionOptions.ignoreCommitError(),
173+
"INSERT OVERWRITE staging mode does not support sink.ignore.commit-error=true.");
174+
}
175+
176+
private static void validateTargetUnchanged(
177+
Connection connection, DorisOverwriteOptions overwriteOptions) throws Exception {
178+
Long expectedTargetId = overwriteOptions.getTargetTableId();
179+
Long currentTargetId = requireTableId(connection, overwriteOptions.getTargetTable());
180+
if (!expectedTargetId.equals(currentTargetId)) {
181+
throw new DorisSystemException(
182+
String.format(
183+
"Target table %s changed from id %s to id %s before overwrite finalization.",
184+
overwriteOptions.getTargetTable(), expectedTargetId, currentTargetId));
185+
}
186+
}
187+
188+
private static boolean isFinalizedAfterFailure(
189+
SimpleJdbcConnectionProvider jdbcConnectionProvider,
190+
DorisOverwriteOptions overwriteOptions,
191+
Exception originalException) {
192+
try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) {
193+
return isAlreadyFinalized(connection, overwriteOptions);
194+
} catch (Exception checkException) {
195+
originalException.addSuppressed(checkException);
196+
return false;
197+
}
198+
}
199+
200+
private static boolean isAlreadyFinalized(
201+
Connection connection, DorisOverwriteOptions overwriteOptions) throws Exception {
202+
Long stagingTableId = overwriteOptions.getStagingTableId();
203+
Long currentTargetId = queryTableId(connection, overwriteOptions.getTargetTable());
204+
return stagingTableId.equals(currentTargetId)
205+
&& !tableExists(connection, overwriteOptions.getStagingTable());
206+
}
207+
208+
private static boolean tableExists(Connection connection, DorisTableIdentifier table)
209+
throws Exception {
210+
String sql = "SHOW TABLES FROM " + DorisTableIdentifier.quote(table.getDatabase());
211+
try (Statement statement = connection.createStatement();
212+
ResultSet resultSet = statement.executeQuery(sql)) {
213+
while (resultSet.next()) {
214+
if (table.getTable().equals(resultSet.getString(1))) {
215+
return true;
216+
}
217+
}
218+
return false;
219+
}
220+
}
221+
222+
private static Long queryTableId(Connection connection, DorisTableIdentifier table)
223+
throws Exception {
224+
String sql =
225+
"SELECT TABLE_ID FROM information_schema.metadata_name_ids "
226+
+ "WHERE DATABASE_NAME = ? AND TABLE_NAME = ?";
227+
try (PreparedStatement statement = connection.prepareStatement(sql)) {
228+
statement.setString(1, table.getDatabase());
229+
statement.setString(2, table.getTable());
230+
try (ResultSet resultSet = statement.executeQuery()) {
231+
if (resultSet.next()) {
232+
return resultSet.getLong(1);
233+
}
234+
return null;
235+
}
236+
} catch (Exception e) {
237+
throw new DorisSystemException(
238+
"Failed to query Doris table id for "
239+
+ table
240+
+ ". INSERT OVERWRITE staging mode requires "
241+
+ "information_schema.metadata_name_ids to be available.",
242+
e);
243+
}
244+
}
245+
246+
private static Long requireTableId(Connection connection, DorisTableIdentifier table)
247+
throws Exception {
248+
Long tableId = queryTableId(connection, table);
249+
if (tableId == null) {
250+
throw new DorisSystemException("Doris table " + table + " does not exist.");
251+
}
252+
return tableId;
253+
}
254+
255+
private static void execute(Connection connection, String sql) throws Exception {
256+
try (Statement statement = connection.createStatement()) {
257+
LOG.info("Executing Doris overwrite SQL: {}", sql);
258+
statement.execute(sql);
259+
}
260+
}
261+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.flink.sink.overwrite;
19+
20+
import org.apache.doris.flink.cfg.DorisOptions;
21+
22+
import java.io.Serializable;
23+
import java.util.Objects;
24+
25+
/** Metadata required to finalize an INSERT OVERWRITE staging write. */
26+
public class DorisOverwriteOptions implements Serializable {
27+
private static final long serialVersionUID = 1L;
28+
29+
private final DorisOptions targetOptions;
30+
private final DorisTableIdentifier targetTable;
31+
private final DorisTableIdentifier stagingTable;
32+
private final Long targetTableId;
33+
private final Long stagingTableId;
34+
private final String attemptId;
35+
36+
public DorisOverwriteOptions(
37+
DorisOptions targetOptions,
38+
DorisTableIdentifier targetTable,
39+
DorisTableIdentifier stagingTable,
40+
Long targetTableId,
41+
Long stagingTableId,
42+
String attemptId) {
43+
this.targetOptions =
44+
DorisOverwriteManager.copyOptions(targetOptions, targetTable.asString());
45+
this.targetTable = targetTable;
46+
this.stagingTable = stagingTable;
47+
this.targetTableId = targetTableId;
48+
this.stagingTableId = stagingTableId;
49+
this.attemptId = attemptId;
50+
}
51+
52+
public DorisOptions getTargetOptions() {
53+
return DorisOverwriteManager.copyOptions(targetOptions, targetTable.asString());
54+
}
55+
56+
public DorisTableIdentifier getTargetTable() {
57+
return targetTable;
58+
}
59+
60+
public DorisTableIdentifier getStagingTable() {
61+
return stagingTable;
62+
}
63+
64+
public Long getTargetTableId() {
65+
return targetTableId;
66+
}
67+
68+
public Long getStagingTableId() {
69+
return stagingTableId;
70+
}
71+
72+
public String getAttemptId() {
73+
return attemptId;
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "DorisOverwriteOptions{"
79+
+ "targetTable="
80+
+ targetTable
81+
+ ", stagingTable="
82+
+ stagingTable
83+
+ ", targetTableId="
84+
+ targetTableId
85+
+ ", stagingTableId="
86+
+ stagingTableId
87+
+ ", attemptId='"
88+
+ attemptId
89+
+ '\''
90+
+ '}';
91+
}
92+
93+
@Override
94+
public boolean equals(Object o) {
95+
if (this == o) {
96+
return true;
97+
}
98+
if (o == null || getClass() != o.getClass()) {
99+
return false;
100+
}
101+
DorisOverwriteOptions that = (DorisOverwriteOptions) o;
102+
return Objects.equals(targetOptions, that.targetOptions)
103+
&& Objects.equals(targetTable, that.targetTable)
104+
&& Objects.equals(stagingTable, that.stagingTable)
105+
&& Objects.equals(targetTableId, that.targetTableId)
106+
&& Objects.equals(stagingTableId, that.stagingTableId)
107+
&& Objects.equals(attemptId, that.attemptId);
108+
}
109+
110+
@Override
111+
public int hashCode() {
112+
return Objects.hash(
113+
targetOptions, targetTable, stagingTable, targetTableId, stagingTableId, attemptId);
114+
}
115+
}

0 commit comments

Comments
 (0)