Skip to content

Commit aded562

Browse files
[Improve][Connector-V2] Support TableSourceFactory on StarRocks (#6498)
1 parent c15340c commit aded562

File tree

11 files changed

+88
-167
lines changed

11 files changed

+88
-167
lines changed

Diff for: docs/en/connector-v2/source/StarRocks.md

+7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ delivers the query plan as a parameter to BE nodes, and then obtains data result
3535
| scan_batch_rows | int | no | 1024 |
3636
| scan_mem_limit | long | no | 2147483648 |
3737
| max_retries | int | no | 3 |
38+
| scan.params.* | string | no | - |
3839

3940
### node_urls [list]
4041

@@ -136,6 +137,10 @@ The maximum memory space allowed for a single query in the BE node, in bytes. Th
136137

137138
number of retry requests sent to StarRocks
138139

140+
### scan.params. [string]
141+
142+
The parameter of the scan data from be
143+
139144
## Example
140145

141146
```
@@ -164,6 +169,8 @@ source {
164169
DATETIME_COL = TIMESTAMP
165170
DATE_COL = DATE
166171
}
172+
scan.params.scanner_thread_pool_thread_num = "3"
173+
167174
}
168175
}
169176
```

Diff for: seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java

-33
Original file line numberDiff line numberDiff line change
@@ -18,51 +18,18 @@
1818
package org.apache.seatunnel.common.config;
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
2221
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
2322

2423
import lombok.NonNull;
2524

2625
import java.util.HashMap;
27-
import java.util.LinkedHashMap;
2826
import java.util.List;
2927
import java.util.Map;
3028

3129
public final class TypesafeConfigUtils {
3230

3331
private TypesafeConfigUtils() {}
3432

35-
/**
36-
* Extract sub config with fixed prefix
37-
*
38-
* @param source config source
39-
* @param prefix config prefix
40-
* @param keepPrefix true if keep prefix
41-
* @deprecated use org.apache.seatunnel.api.configuration.Option interface instead
42-
*/
43-
@Deprecated
44-
public static Config extractSubConfig(Config source, String prefix, boolean keepPrefix) {
45-
46-
// use LinkedHashMap to keep insertion order
47-
Map<String, String> values = new LinkedHashMap<>();
48-
49-
for (Map.Entry<String, ConfigValue> entry : source.entrySet()) {
50-
final String key = entry.getKey();
51-
final String value = String.valueOf(entry.getValue().unwrapped());
52-
53-
if (key.startsWith(prefix)) {
54-
55-
if (keepPrefix) {
56-
values.put(key, value);
57-
} else {
58-
values.put(key.substring(prefix.length()), value);
59-
}
60-
}
61-
}
62-
63-
return ConfigFactory.parseMap(values);
64-
}
65-
6633
/**
6734
* Check if config with specific prefix exists
6835
*

Diff for: seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java

-17
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,10 @@
2626
import java.util.HashMap;
2727
import java.util.Map;
2828

29-
import static org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfig;
3029
import static org.apache.seatunnel.common.config.TypesafeConfigUtils.hasSubConfig;
3130

3231
public class TypesafeConfigUtilsTest {
3332

34-
@Test
35-
public void testExtractSubConfig() {
36-
Config config = getConfig();
37-
Config subConfig = extractSubConfig(config, "test.", true);
38-
Map<String, String> configMap = new HashMap<>();
39-
configMap.put("test.t0", "v0");
40-
configMap.put("test.t1", "v1");
41-
Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
42-
43-
subConfig = extractSubConfig(config, "test.", false);
44-
configMap = new HashMap<>();
45-
configMap.put("t0", "v0");
46-
configMap.put("t1", "v1");
47-
Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
48-
}
49-
5033
@Test
5134
public void testHasSubConfig() {
5235
Config config = getConfig();

Diff for: seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222
import org.apache.seatunnel.api.table.catalog.Catalog;
2323
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2424
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
2526
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
2627
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
2728

2829
import com.google.auto.service.AutoService;
2930

3031
@AutoService(Factory.class)
3132
public class StarRocksCatalogFactory implements CatalogFactory {
32-
public static final String IDENTIFIER = "StarRocks";
33+
public static final String IDENTIFIER = CommonConfig.CONNECTOR_IDENTITY;
3334

3435
@Override
3536
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {

Diff for: seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2223

2324
import lombok.AllArgsConstructor;
2425
import lombok.Getter;
@@ -31,6 +32,9 @@
3132
@ToString
3233
@AllArgsConstructor
3334
public class CommonConfig implements Serializable {
35+
36+
public static final String CONNECTOR_IDENTITY = "StarRocks";
37+
3438
public static final Option<List<String>> NODE_URLS =
3539
Options.key("nodeUrls")
3640
.listType()
@@ -67,4 +71,12 @@ public class CommonConfig implements Serializable {
6771
private String password;
6872
private String database;
6973
private String table;
74+
75+
public CommonConfig(ReadonlyConfig config) {
76+
this.nodeUrls = config.get(NODE_URLS);
77+
this.username = config.get(USERNAME);
78+
this.password = config.get(PASSWORD);
79+
this.database = config.get(DATABASE);
80+
this.table = config.get(TABLE);
81+
}
7082
}

Diff for: seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java

+22-65
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,14 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.configuration.Option;
2321
import org.apache.seatunnel.api.configuration.Options;
24-
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2523

2624
import lombok.Getter;
27-
import lombok.NonNull;
2825
import lombok.Setter;
2926

3027
import java.util.HashMap;
31-
import java.util.List;
3228
import java.util.Map;
3329

3430
@Setter
@@ -37,13 +33,26 @@ public class SourceConfig extends CommonConfig {
3733

3834
private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
3935

40-
public SourceConfig(
41-
@NonNull List<String> nodeUrls,
42-
@NonNull String username,
43-
@NonNull String password,
44-
@NonNull String database,
45-
@NonNull String table) {
46-
super(nodeUrls, username, password, database, table);
36+
public SourceConfig(ReadonlyConfig config) {
37+
super(config);
38+
this.maxRetries = config.get(MAX_RETRIES);
39+
this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
40+
this.scanFilter = config.get(SCAN_FILTER);
41+
this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
42+
this.batchRows = config.get(SCAN_BATCH_ROWS);
43+
this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
44+
this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
45+
this.memLimit = config.get(SCAN_MEM_LIMIT);
46+
47+
String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
48+
config.toMap()
49+
.forEach(
50+
(key, value) -> {
51+
if (key.startsWith(prefix)) {
52+
this.sourceOptionProps.put(
53+
key.substring(prefix.length()).toLowerCase(), value);
54+
}
55+
});
4756
}
4857

4958
public static final Option<Integer> MAX_RETRIES =
@@ -105,57 +114,5 @@ public SourceConfig(
105114
private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
106115
private int batchRows = SCAN_BATCH_ROWS.defaultValue();
107116
private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
108-
private final Map<String, String> sourceOptionProps = new HashMap<>();
109-
110-
public static SourceConfig loadConfig(Config pluginConfig) {
111-
SourceConfig sourceConfig =
112-
new SourceConfig(
113-
pluginConfig.getStringList(NODE_URLS.key()),
114-
pluginConfig.getString(USERNAME.key()),
115-
pluginConfig.getString(PASSWORD.key()),
116-
pluginConfig.getString(DATABASE.key()),
117-
pluginConfig.getString(TABLE.key()));
118-
119-
if (pluginConfig.hasPath(MAX_RETRIES.key())) {
120-
sourceConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES.key()));
121-
}
122-
if (pluginConfig.hasPath(QUERY_TABLET_SIZE.key())) {
123-
sourceConfig.setRequestTabletSize(pluginConfig.getInt(QUERY_TABLET_SIZE.key()));
124-
}
125-
if (pluginConfig.hasPath(SCAN_FILTER.key())) {
126-
sourceConfig.setScanFilter(pluginConfig.getString(SCAN_FILTER.key()));
127-
}
128-
if (pluginConfig.hasPath(SCAN_CONNECT_TIMEOUT.key())) {
129-
sourceConfig.setConnectTimeoutMs(pluginConfig.getInt(SCAN_CONNECT_TIMEOUT.key()));
130-
}
131-
if (pluginConfig.hasPath(SCAN_BATCH_ROWS.key())) {
132-
sourceConfig.setBatchRows(pluginConfig.getInt(SCAN_BATCH_ROWS.key()));
133-
}
134-
if (pluginConfig.hasPath(SCAN_KEEP_ALIVE_MIN.key())) {
135-
sourceConfig.setKeepAliveMin(pluginConfig.getInt(SCAN_KEEP_ALIVE_MIN.key()));
136-
}
137-
if (pluginConfig.hasPath(SCAN_QUERY_TIMEOUT_SEC.key())) {
138-
sourceConfig.setQueryTimeoutSec(pluginConfig.getInt(SCAN_QUERY_TIMEOUT_SEC.key()));
139-
}
140-
if (pluginConfig.hasPath(SCAN_MEM_LIMIT.key())) {
141-
sourceConfig.setMemLimit(pluginConfig.getLong(SCAN_MEM_LIMIT.key()));
142-
}
143-
parseSourceOptionProperties(pluginConfig, sourceConfig);
144-
return sourceConfig;
145-
}
146-
147-
private static void parseSourceOptionProperties(
148-
Config pluginConfig, SourceConfig sourceConfig) {
149-
Config sourceOptionConfig =
150-
TypesafeConfigUtils.extractSubConfig(
151-
pluginConfig, STARROCKS_SCAN_CONFIG_PREFIX.key(), false);
152-
sourceOptionConfig
153-
.entrySet()
154-
.forEach(
155-
entry -> {
156-
final String configKey = entry.getKey().toLowerCase();
157-
sourceConfig.sourceOptionProps.put(
158-
configKey, (String) entry.getValue().unwrapped());
159-
});
160-
}
117+
private Map<String, String> sourceOptionProps = new HashMap<>();
161118
}

Diff for: seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.seatunnel.api.table.factory.Factory;
2626
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2727
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
28+
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
2829
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
2930
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
3031
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
@@ -42,7 +43,7 @@
4243
public class StarRocksSinkFactory implements TableSinkFactory {
4344
@Override
4445
public String factoryIdentifier() {
45-
return "StarRocks";
46+
return CommonConfig.CONNECTOR_IDENTITY;
4647
}
4748

4849
@Override

0 commit comments

Comments
 (0)