Skip to content

Commit a70d696

Browse files
committed
[Bufgix][Paimon] change catalog by paimon sink config
1 parent 7b29d10 commit a70d696

File tree

3 files changed

+44
-18
lines changed

3 files changed

+44
-18
lines changed

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.google.auto.service.AutoService;
3737
import lombok.extern.slf4j.Slf4j;
3838

39+
import java.util.Objects;
3940
import java.util.concurrent.atomic.AtomicInteger;
4041

4142
@Slf4j
@@ -141,7 +142,10 @@ public DataType reconvert(Column column) {
141142
return DataTypes.DATE();
142143
case TIME:
143144
Integer timeScale = column.getScale();
144-
if (timeScale != null && timeScale > TimeType.MAX_PRECISION) {
145+
if (Objects.isNull(timeScale)) {
146+
timeScale = TimeType.DEFAULT_PRECISION;
147+
}
148+
if (timeScale > TimeType.MAX_PRECISION) {
145149
timeScale = TimeType.MAX_PRECISION;
146150
log.warn(
147151
"The time column {} type time({}) is out of range, "
@@ -155,8 +159,10 @@ public DataType reconvert(Column column) {
155159
return DataTypes.TIME(timeScale);
156160
case TIMESTAMP:
157161
Integer timestampScale = column.getScale();
158-
if (timestampScale != null
159-
&& timestampScale > LocalZonedTimestampType.MAX_PRECISION) {
162+
if (Objects.isNull(timestampScale)) {
163+
timestampScale = LocalZonedTimestampType.DEFAULT_PRECISION;
164+
}
165+
if (timestampScale > LocalZonedTimestampType.MAX_PRECISION) {
160166
timestampScale = LocalZonedTimestampType.MAX_PRECISION;
161167
log.warn(
162168
"The timestamp column {} type timestamp({}) is out of range, "
@@ -193,6 +199,8 @@ public DataType reconvert(Column column) {
193199
id, field, reconvert(getPhysicalColumn(column, fieldType)));
194200
}
195201
return DataTypes.ROW(dataFields);
202+
case NULL:
203+
return DataTypes.VARBINARY(0);
196204
default:
197205
throw CommonError.convertToConnectorTypeError(
198206
identifier(), column.getDataType().getSqlType().name(), column.getName());

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java

-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.common.JobContext;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2423
import org.apache.seatunnel.api.serialization.DefaultSerializer;
2524
import org.apache.seatunnel.api.serialization.Serializer;
2625
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -30,9 +29,6 @@
3029
import org.apache.seatunnel.api.table.catalog.TableSchema;
3130
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3231
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
33-
import org.apache.seatunnel.common.config.CheckConfigUtil;
34-
import org.apache.seatunnel.common.config.CheckResult;
35-
import org.apache.seatunnel.common.constants.PluginType;
3632
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
3733
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
3834
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo;
@@ -80,16 +76,6 @@ public class PaimonSink
8076
private JobContext jobContext;
8177

8278
public PaimonSink(Config pluginConfig, CatalogTable catalogTable) {
83-
CheckResult result =
84-
CheckConfigUtil.checkAllExists(
85-
pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key());
86-
if (!result.isSuccess()) {
87-
throw new PaimonConnectorException(
88-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
89-
String.format(
90-
"PluginName: %s, PluginType: %s, Message: %s",
91-
getPluginName(), PluginType.SINK, result.getMsg()));
92-
}
9379
// initialize paimon table
9480
final String warehouse = pluginConfig.getString(WAREHOUSE.key());
9581
final String database = pluginConfig.getString(DATABASE.key());

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,26 @@
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

22+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2223
import org.apache.seatunnel.api.configuration.util.OptionRule;
2324
import org.apache.seatunnel.api.table.catalog.CatalogTable;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2426
import org.apache.seatunnel.api.table.connector.TableSink;
2527
import org.apache.seatunnel.api.table.factory.Factory;
2628
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2729
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
30+
import org.apache.seatunnel.common.config.CheckConfigUtil;
31+
import org.apache.seatunnel.common.config.CheckResult;
32+
import org.apache.seatunnel.common.constants.PluginType;
2833
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
34+
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
2935

3036
import com.google.auto.service.AutoService;
3137

38+
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE;
39+
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE;
40+
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
41+
3242
@AutoService(Factory.class)
3343
public class PaimonSinkFactory implements TableSinkFactory {
3444

@@ -50,7 +60,29 @@ public OptionRule optionRule() {
5060
@Override
5161
public TableSink createSink(TableSinkFactoryContext context) {
5262
Config pluginConfig = context.getOptions().toConfig();
53-
CatalogTable catalogTable = context.getCatalogTable();
63+
CatalogTable catalogTable = renameCatalogTable(pluginConfig, context.getCatalogTable());
5464
return () -> new PaimonSink(pluginConfig, catalogTable);
5565
}
66+
67+
private CatalogTable renameCatalogTable(Config pluginConfig, CatalogTable catalogTable) {
68+
CheckResult result =
69+
CheckConfigUtil.checkAllExists(
70+
pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key());
71+
if (!result.isSuccess()) {
72+
throw new PaimonConnectorException(
73+
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
74+
String.format(
75+
"PluginName: %s, PluginType: %s, Message: %s",
76+
factoryIdentifier(), PluginType.SINK, result.getMsg()));
77+
}
78+
TableIdentifier tableId = catalogTable.getTableId();
79+
String tableName = pluginConfig.getString(TABLE.key());
80+
String namespace = pluginConfig.getString(DATABASE.key());
81+
82+
TableIdentifier newTableId =
83+
TableIdentifier.of(
84+
tableId.getCatalogName(), namespace, tableId.getSchemaName(), tableName);
85+
86+
return CatalogTable.of(newTableId, catalogTable);
87+
}
5688
}

0 commit comments

Comments
 (0)