Skip to content

Commit a24fa8f

Browse files
authored
[Fix][Connector-V2] Fix MaxCompute cannot get project and tableName when use schema (#8865)
1 parent e56f06c commit a24fa8f

File tree

2 files changed

+57
-10
lines changed
  • seatunnel-connectors-v2/connector-maxcompute/src

2 files changed

+57
-10
lines changed

seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.api.source.SupportParallelism;
2828
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2929
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
30+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
3031
import org.apache.seatunnel.api.table.catalog.TablePath;
3132
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3233
import org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
@@ -69,6 +70,13 @@ private Map<TablePath, SourceTableInfo> getSourceTableInfos(ReadonlyConfig reado
6970

7071
if (readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
7172
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
73+
catalogTable =
74+
CatalogTable.of(
75+
TableIdentifier.of(
76+
"maxcompute",
77+
readonlyConfig.get(PROJECT),
78+
readonlyConfig.get(TABLE_NAME)),
79+
catalogTable);
7280
tables.put(
7381
catalogTable.getTablePath(),
7482
new SourceTableInfo(
@@ -81,28 +89,32 @@ private Map<TablePath, SourceTableInfo> getSourceTableInfos(ReadonlyConfig reado
8189
if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) {
8290
for (Map<String, Object> subConfig : readonlyConfig.get(TABLE_LIST)) {
8391
ReadonlyConfig subReadonlyConfig = ReadonlyConfig.fromMap(subConfig);
92+
String project =
93+
subReadonlyConfig
94+
.getOptional(PROJECT)
95+
.orElse(readonlyConfig.get(PROJECT));
96+
TablePath tablePath =
97+
TablePath.of(project, subReadonlyConfig.get(TABLE_NAME));
8498
if (subReadonlyConfig
8599
.getOptional(ConnectorCommonOptions.SCHEMA)
86100
.isPresent()) {
87101
CatalogTable catalogTable =
88102
CatalogTableUtil.buildWithConfig(subReadonlyConfig);
103+
catalogTable =
104+
CatalogTable.of(
105+
TableIdentifier.of("maxcompute", tablePath),
106+
catalogTable);
89107
tables.put(
90108
catalogTable.getTablePath(),
91109
new SourceTableInfo(
92110
catalogTable,
93111
subReadonlyConfig.get(PARTITION_SPEC),
94112
subReadonlyConfig.get(SPLIT_ROW)));
95113
} else {
96-
String project =
97-
subReadonlyConfig
98-
.getOptional(PROJECT)
99-
.orElse(readonlyConfig.get(PROJECT));
100114
Integer splitRow =
101115
subReadonlyConfig
102116
.getOptional(SPLIT_ROW)
103117
.orElse(readonlyConfig.get(SPLIT_ROW));
104-
TablePath tablePath =
105-
TablePath.of(project, subReadonlyConfig.get(TABLE_NAME));
106118
tables.put(
107119
tablePath,
108120
new SourceTableInfo(

seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java

+39-4
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,21 @@
2222
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
2323

2424
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
25+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2526
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2627
import org.apache.seatunnel.api.table.type.SqlType;
2728

2829
import org.junit.jupiter.api.Assertions;
2930
import org.junit.jupiter.api.Test;
3031

32+
import java.util.Collections;
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
3136
public class MaxcomputeSourceTest {
3237

3338
@Test
34-
public void prepare() {
39+
public void testParseSchema() {
3540
Config fields =
3641
ConfigFactory.empty()
3742
.withValue("id", ConfigValueFactory.fromAnyRef("int"))
@@ -40,10 +45,40 @@ public void prepare() {
4045

4146
Config schema = fields.atKey("fields").atKey("schema");
4247

43-
MaxcomputeSource maxcomputeSource = new MaxcomputeSource(ReadonlyConfig.fromConfig(schema));
48+
Config root =
49+
schema.withValue("project", ConfigValueFactory.fromAnyRef("project"))
50+
.withValue("table_name", ConfigValueFactory.fromAnyRef("test_table"));
51+
52+
MaxcomputeSource maxcomputeSource = new MaxcomputeSource(ReadonlyConfig.fromConfig(root));
4453

45-
SeaTunnelRowType seaTunnelRowType =
46-
maxcomputeSource.getProducedCatalogTables().get(0).getSeaTunnelRowType();
54+
CatalogTable table = maxcomputeSource.getProducedCatalogTables().get(0);
55+
Assertions.assertEquals("project.test_table", table.getTablePath().toString());
56+
SeaTunnelRowType seaTunnelRowType = table.getSeaTunnelRowType();
4757
Assertions.assertEquals(SqlType.INT, seaTunnelRowType.getFieldType(0).getSqlType());
58+
59+
Map<String, Object> tableList = new HashMap<>();
60+
Map<String, Object> schemaMap = new HashMap<>();
61+
Map<String, Object> fieldsMap = new HashMap<>();
62+
fieldsMap.put("id", "int");
63+
fieldsMap.put("name", "string");
64+
fieldsMap.put("age", "int");
65+
schemaMap.put("fields", fieldsMap);
66+
tableList.put("schema", schemaMap);
67+
tableList.put("table_name", "test_table2");
68+
69+
root =
70+
ConfigFactory.empty()
71+
.withValue("project", ConfigValueFactory.fromAnyRef("project"))
72+
.withValue("accessId", ConfigValueFactory.fromAnyRef("accessId"))
73+
.withValue("accesskey", ConfigValueFactory.fromAnyRef("accessKey"))
74+
.withValue(
75+
"table_list",
76+
ConfigValueFactory.fromIterable(
77+
Collections.singletonList(tableList)));
78+
79+
maxcomputeSource = new MaxcomputeSource(ReadonlyConfig.fromConfig(root));
80+
81+
table = maxcomputeSource.getProducedCatalogTables().get(0);
82+
Assertions.assertEquals("project.test_table2", table.getTablePath().toString());
4883
}
4984
}

0 commit comments

Comments
 (0)