Skip to content

Commit b0d70ce

Browse files
authored
[Improve] Add SaveMode log of process detail (#6375)
1 parent eb46c48 commit b0d70ce

File tree

26 files changed

+1331
-50
lines changed

26 files changed

+1331
-50
lines changed

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java

+76-7
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,26 @@
2323
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
2424

2525
import lombok.AllArgsConstructor;
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
import javax.annotation.Nonnull;
29+
import javax.annotation.Nullable;
30+
31+
import java.util.Optional;
2632

2733
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST;
2834
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA;
2935

3036
@AllArgsConstructor
37+
@Slf4j
3138
public class DefaultSaveModeHandler implements SaveModeHandler {
3239

33-
public SchemaSaveMode schemaSaveMode;
34-
public DataSaveMode dataSaveMode;
35-
public Catalog catalog;
36-
public TablePath tablePath;
37-
public CatalogTable catalogTable;
38-
public String customSql;
40+
@Nonnull public SchemaSaveMode schemaSaveMode;
41+
@Nonnull public DataSaveMode dataSaveMode;
42+
@Nonnull public Catalog catalog;
43+
@Nonnull public TablePath tablePath;
44+
@Nullable public CatalogTable catalogTable;
45+
@Nullable public String customSql;
3946

4047
public DefaultSaveModeHandler(
4148
SchemaSaveMode schemaSaveMode,
@@ -132,17 +139,58 @@ protected boolean tableExists() {
132139
}
133140

134141
protected void dropTable() {
142+
try {
143+
log.info(
144+
"Dropping table {} with action {}",
145+
tablePath,
146+
catalog.previewAction(
147+
Catalog.ActionType.DROP_TABLE, tablePath, Optional.empty()));
148+
} catch (UnsupportedOperationException ignore) {
149+
log.info("Dropping table {}", tablePath);
150+
}
135151
catalog.dropTable(tablePath, true);
136152
}
137153

138154
protected void createTable() {
139155
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
140-
catalog.createDatabase(TablePath.of(tablePath.getDatabaseName(), ""), true);
156+
TablePath databasePath = TablePath.of(tablePath.getDatabaseName(), "");
157+
try {
158+
log.info(
159+
"Creating database {} with action {}",
160+
tablePath.getDatabaseName(),
161+
catalog.previewAction(
162+
Catalog.ActionType.CREATE_DATABASE,
163+
databasePath,
164+
Optional.empty()));
165+
} catch (UnsupportedOperationException ignore) {
166+
log.info("Creating database {}", tablePath.getDatabaseName());
167+
}
168+
catalog.createDatabase(databasePath, true);
169+
}
170+
try {
171+
log.info(
172+
"Creating table {} with action {}",
173+
tablePath,
174+
catalog.previewAction(
175+
Catalog.ActionType.CREATE_TABLE,
176+
tablePath,
177+
Optional.ofNullable(catalogTable)));
178+
} catch (UnsupportedOperationException ignore) {
179+
log.info("Creating table {}", tablePath);
141180
}
142181
catalog.createTable(tablePath, catalogTable, true);
143182
}
144183

145184
protected void truncateTable() {
185+
try {
186+
log.info(
187+
"Truncating table {} with action {}",
188+
tablePath,
189+
catalog.previewAction(
190+
Catalog.ActionType.TRUNCATE_TABLE, tablePath, Optional.empty()));
191+
} catch (UnsupportedOperationException ignore) {
192+
log.info("Truncating table {}", tablePath);
193+
}
146194
catalog.truncateTable(tablePath, true);
147195
}
148196

@@ -151,9 +199,30 @@ protected boolean dataExists() {
151199
}
152200

153201
protected void executeCustomSql() {
202+
log.info("Executing custom SQL for table {} with SQL: {}", tablePath, customSql);
154203
catalog.executeSql(tablePath, customSql);
155204
}
156205

206+
@Override
207+
public TablePath getHandleTablePath() {
208+
return tablePath;
209+
}
210+
211+
@Override
212+
public Catalog getHandleCatalog() {
213+
return catalog;
214+
}
215+
216+
@Override
217+
public SchemaSaveMode getSchemaSaveMode() {
218+
return schemaSaveMode;
219+
}
220+
221+
@Override
222+
public DataSaveMode getDataSaveMode() {
223+
return dataSaveMode;
224+
}
225+
157226
@Override
158227
public void close() throws Exception {
159228
catalog.close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.sink;
19+
20+
import lombok.extern.slf4j.Slf4j;
21+
22+
@Slf4j
23+
public class SaveModeExecuteWrapper {
24+
25+
public SaveModeExecuteWrapper(SaveModeHandler handler) {
26+
this.handler = handler;
27+
}
28+
29+
public void execute() {
30+
log.info(
31+
"Executing save mode for table: {}, with SchemaSaveMode: {}, DataSaveMode: {} using Catalog: {}",
32+
handler.getHandleTablePath(),
33+
handler.getSchemaSaveMode(),
34+
handler.getDataSaveMode(),
35+
handler.getHandleCatalog().name());
36+
handler.handleSaveMode();
37+
}
38+
39+
private final SaveModeHandler handler;
40+
}

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java

+11
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,23 @@
1717

1818
package org.apache.seatunnel.api.sink;
1919

20+
import org.apache.seatunnel.api.table.catalog.Catalog;
21+
import org.apache.seatunnel.api.table.catalog.TablePath;
22+
2023
public interface SaveModeHandler extends AutoCloseable {
2124

2225
void handleSchemaSaveMode();
2326

2427
void handleDataSaveMode();
2528

29+
SchemaSaveMode getSchemaSaveMode();
30+
31+
DataSaveMode getDataSaveMode();
32+
33+
TablePath getHandleTablePath();
34+
35+
Catalog getHandleCatalog();
36+
2637
default void handleSaveMode() {
2738
handleSchemaSaveMode();
2839
handleDataSaveMode();

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java

+13
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,19 @@ default boolean isExistsData(TablePath tablePath) {
262262

263263
default void executeSql(TablePath tablePath, String sql) {}
264264

265+
default PreviewResult previewAction(
266+
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
267+
throw new UnsupportedOperationException("Preview action is not supported");
268+
}
269+
270+
enum ActionType {
271+
CREATE_TABLE,
272+
CREATE_DATABASE,
273+
DROP_TABLE,
274+
DROP_DATABASE,
275+
TRUNCATE_TABLE
276+
}
277+
265278
// todo: Support for update table metadata
266279

267280
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.catalog;
19+
20+
public class InfoPreviewResult extends PreviewResult {
21+
private final String info;
22+
23+
public String getInfo() {
24+
return info;
25+
}
26+
27+
public InfoPreviewResult(String info) {
28+
super(Type.INFO);
29+
this.info = info;
30+
}
31+
32+
@Override
33+
public String toString() {
34+
return info;
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.catalog;
19+
20+
/** The result of a SQL preview action in {@link Catalog#previewAction}. */
21+
public abstract class PreviewResult {
22+
23+
private final Type type;
24+
25+
public PreviewResult(Type type) {
26+
this.type = type;
27+
}
28+
29+
public Type getType() {
30+
return type;
31+
}
32+
33+
public enum Type {
34+
SQL,
35+
INFO,
36+
OTHER
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.catalog;
19+
20+
public class SQLPreviewResult extends PreviewResult {
21+
22+
private final String sql;
23+
24+
public String getSql() {
25+
return sql;
26+
}
27+
28+
public SQLPreviewResult(String sql) {
29+
super(Type.SQL);
30+
this.sql = sql;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return sql;
36+
}
37+
}

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.apache.seatunnel.api.table.catalog.Catalog;
2121
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2222
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
23+
import org.apache.seatunnel.api.table.catalog.PreviewResult;
2324
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
25+
import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
2426
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2527
import org.apache.seatunnel.api.table.catalog.TablePath;
2628
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -49,6 +51,9 @@
4951
import java.util.HashMap;
5052
import java.util.List;
5153
import java.util.Map;
54+
import java.util.Optional;
55+
56+
import static com.google.common.base.Preconditions.checkArgument;
5257

5358
public class DorisCatalog implements Catalog {
5459

@@ -339,8 +344,7 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
339344
throws TableNotExistException, CatalogException {
340345
try {
341346
if (ignoreIfNotExists) {
342-
conn.createStatement()
343-
.execute(String.format("TRUNCATE TABLE %s", tablePath.getFullName()));
347+
conn.createStatement().execute(DorisCatalogUtil.getTruncateTableQuery(tablePath));
344348
}
345349
} catch (Exception e) {
346350
throw new CatalogException(
@@ -359,4 +363,27 @@ public boolean isExistsData(TablePath tablePath) {
359363
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
360364
}
361365
}
366+
367+
@Override
368+
public PreviewResult previewAction(
369+
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
370+
if (actionType == ActionType.CREATE_TABLE) {
371+
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
372+
return new SQLPreviewResult(
373+
DorisCatalogUtil.getCreateTableStatement(
374+
dorisConfig.getCreateTableTemplate(), tablePath, catalogTable.get()));
375+
} else if (actionType == ActionType.DROP_TABLE) {
376+
return new SQLPreviewResult(DorisCatalogUtil.getDropTableQuery(tablePath, true));
377+
} else if (actionType == ActionType.TRUNCATE_TABLE) {
378+
return new SQLPreviewResult(DorisCatalogUtil.getTruncateTableQuery(tablePath));
379+
} else if (actionType == ActionType.CREATE_DATABASE) {
380+
return new SQLPreviewResult(
381+
DorisCatalogUtil.getCreateDatabaseQuery(tablePath.getDatabaseName(), true));
382+
} else if (actionType == ActionType.DROP_DATABASE) {
383+
return new SQLPreviewResult(
384+
DorisCatalogUtil.getDropDatabaseQuery(tablePath.getDatabaseName(), true));
385+
} else {
386+
throw new UnsupportedOperationException("Unsupported action type: " + actionType);
387+
}
388+
}
362389
}

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java

+4
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ public static String getDropTableQuery(TablePath tablePath, boolean ignoreIfNotE
9595
return "DROP TABLE " + (ignoreIfNotExists ? "IF EXISTS " : "") + tablePath.getFullName();
9696
}
9797

98+
public static String getTruncateTableQuery(TablePath tablePath) {
99+
return "TRUNCATE TABLE " + tablePath.getFullName();
100+
}
101+
98102
/**
99103
* @param createTableTemplate create table template
100104
* @param catalogTable catalog table

0 commit comments

Comments
 (0)