Skip to content

Commit a69efca

Browse files
authored
[improve] update amazondynamodb connector (#8601)
1 parent c747e02 commit a69efca

14 files changed

+240
-260
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ private Set<String> buildWhiteList() {
224224
whiteList.add("RocketMqSourceOptions");
225225
whiteList.add("TablestoreSinkOptions");
226226
whiteList.add("TableStoreDBSourceOptions");
227-
whiteList.add("AmazonDynamoDBSinkOptions");
228227
whiteList.add("KuduSinkOptions");
229228
whiteList.add("TDengineSinkOptions");
230229
whiteList.add("Neo4jSourceOptions");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
import java.io.Serializable;
24+
25+
public class AmazonDynamoDBBaseOptions implements Serializable {
26+
public static final Option<String> URL =
27+
Options.key("url")
28+
.stringType()
29+
.noDefaultValue()
30+
.withDescription("url to read to Amazon DynamoDB");
31+
public static final Option<String> REGION =
32+
Options.key("region")
33+
.stringType()
34+
.noDefaultValue()
35+
.withDescription("The region of Amazon DynamoDB");
36+
public static final Option<String> ACCESS_KEY_ID =
37+
Options.key("access_key_id")
38+
.stringType()
39+
.noDefaultValue()
40+
.withDescription("The access id of Amazon DynamoDB");
41+
public static final Option<String> SECRET_ACCESS_KEY =
42+
Options.key("secret_access_key")
43+
.stringType()
44+
.noDefaultValue()
45+
.withDescription("The access secret key of Amazon DynamoDB");
46+
public static final Option<String> TABLE =
47+
Options.key("table")
48+
.stringType()
49+
.noDefaultValue()
50+
.withDescription("The table of Amazon DynamoDB");
51+
}

Diff for: seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java

+35-49
Original file line numberDiff line numberDiff line change
@@ -17,61 +17,47 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
1919

20-
import org.apache.seatunnel.api.configuration.Option;
21-
import org.apache.seatunnel.api.configuration.Options;
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
24+
25+
import lombok.AllArgsConstructor;
26+
import lombok.Data;
2227

2328
import java.io.Serializable;
2429

30+
@Data
31+
@AllArgsConstructor
2532
public class AmazonDynamoDBConfig implements Serializable {
26-
public static final Option<String> URL =
27-
Options.key("url")
28-
.stringType()
29-
.noDefaultValue()
30-
.withDescription("url to read to Amazon DynamoDB");
31-
public static final Option<String> REGION =
32-
Options.key("region")
33-
.stringType()
34-
.noDefaultValue()
35-
.withDescription("The region of Amazon DynamoDB");
36-
public static final Option<String> ACCESS_KEY_ID =
37-
Options.key("access_key_id")
38-
.stringType()
39-
.noDefaultValue()
40-
.withDescription("The access id of Amazon DynamoDB");
41-
public static final Option<String> SECRET_ACCESS_KEY =
42-
Options.key("secret_access_key")
43-
.stringType()
44-
.noDefaultValue()
45-
.withDescription("The access secret key of Amazon DynamoDB");
46-
public static final Option<String> TABLE =
47-
Options.key("table")
48-
.stringType()
49-
.noDefaultValue()
50-
.withDescription("The table of Amazon DynamoDB");
5133

52-
public static final Option<Integer> BATCH_SIZE =
53-
Options.key("batch_size")
54-
.intType()
55-
.defaultValue(25)
56-
.withDescription("The batch size of Amazon DynamoDB");
34+
private String url;
35+
36+
private String region;
37+
38+
private String accessKeyId;
39+
40+
private String secretAccessKey;
41+
42+
private String table;
5743

58-
public static final Option<Integer> BATCH_INTERVAL_MS =
59-
Options.key("batch_interval_ms")
60-
.intType()
61-
.defaultValue(1000)
62-
.withDescription("The batch interval of Amazon DynamoDB");
44+
private Config schema;
6345

64-
@SuppressWarnings("checkstyle:MagicNumber")
65-
public static final Option<Integer> SCAN_ITEM_LIMIT =
66-
Options.key("scan_item_limit")
67-
.intType()
68-
.defaultValue(1)
69-
.withDescription("number of item each scan request should return");
46+
public int batchSize;
47+
public int scanItemLimit;
48+
public int parallelScanThreads;
7049

71-
@SuppressWarnings("checkstyle:MagicNumber")
72-
public static final Option<Integer> PARALLEL_SCAN_THREADS =
73-
Options.key("parallel_scan_threads")
74-
.intType()
75-
.defaultValue(2)
76-
.withDescription("number of logical segments for parallel scan");
50+
public AmazonDynamoDBConfig(ReadonlyConfig config) {
51+
this.url = config.get(AmazonDynamoDBBaseOptions.URL);
52+
this.region = config.get(AmazonDynamoDBBaseOptions.REGION);
53+
this.accessKeyId = config.get(AmazonDynamoDBBaseOptions.ACCESS_KEY_ID);
54+
this.secretAccessKey = config.get(AmazonDynamoDBBaseOptions.SECRET_ACCESS_KEY);
55+
this.table = config.get(AmazonDynamoDBBaseOptions.TABLE);
56+
if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
57+
this.schema = ReadonlyConfig.fromMap(config.get(TableSchemaOptions.SCHEMA)).toConfig();
58+
}
59+
this.batchSize = config.get(AmazonDynamoDBSinkOptions.BATCH_SIZE);
60+
this.scanItemLimit = config.get(AmazonDynamoDBSourceOptions.SCAN_ITEM_LIMIT);
61+
this.parallelScanThreads = config.get(AmazonDynamoDBSourceOptions.PARALLEL_SCAN_THREADS);
62+
}
7763
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
public class AmazonDynamoDBSinkOptions extends AmazonDynamoDBBaseOptions {
24+
25+
public static final Option<Integer> BATCH_SIZE =
26+
Options.key("batch_size")
27+
.intType()
28+
.defaultValue(25)
29+
.withDescription("The batch size of Amazon DynamoDB");
30+
}

Diff for: seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java

+15-47
Original file line numberDiff line numberDiff line change
@@ -17,57 +17,25 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
2222
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
2323

24-
import lombok.AllArgsConstructor;
25-
import lombok.Data;
26-
27-
import java.io.Serializable;
28-
29-
@Data
30-
@AllArgsConstructor
31-
public class AmazonDynamoDBSourceOptions implements Serializable {
32-
33-
private String url;
34-
35-
private String region;
36-
37-
private String accessKeyId;
38-
39-
private String secretAccessKey;
24+
import java.util.Map;
4025

41-
private String table;
26+
public class AmazonDynamoDBSourceOptions extends AmazonDynamoDBBaseOptions {
4227

43-
private Config schema;
28+
public static final Option<Integer> SCAN_ITEM_LIMIT =
29+
Options.key("scan_item_limit")
30+
.intType()
31+
.defaultValue(1)
32+
.withDescription("number of item each scan request should return");
4433

45-
public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue();
46-
public int batchIntervalMs = AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue();
47-
public int scanItemLimit = AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.defaultValue();
48-
public int parallelScanThreads = AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.defaultValue();
34+
public static final Option<Integer> PARALLEL_SCAN_THREADS =
35+
Options.key("parallel_scan_threads")
36+
.intType()
37+
.defaultValue(2)
38+
.withDescription("number of logical segments for parallel scan");
4939

50-
public AmazonDynamoDBSourceOptions(Config config) {
51-
this.url = config.getString(AmazonDynamoDBConfig.URL.key());
52-
this.region = config.getString(AmazonDynamoDBConfig.REGION.key());
53-
this.accessKeyId = config.getString(AmazonDynamoDBConfig.ACCESS_KEY_ID.key());
54-
this.secretAccessKey = config.getString(AmazonDynamoDBConfig.SECRET_ACCESS_KEY.key());
55-
this.table = config.getString(AmazonDynamoDBConfig.TABLE.key());
56-
if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
57-
this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
58-
}
59-
if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) {
60-
this.batchSize = config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key());
61-
}
62-
if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) {
63-
this.batchIntervalMs = config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key());
64-
}
65-
if (config.hasPath(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key())) {
66-
this.scanItemLimit = config.getInt(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key());
67-
}
68-
if (config.hasPath(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key())) {
69-
this.parallelScanThreads =
70-
config.getInt(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key());
71-
}
72-
}
40+
public static final Option<Map<String, Object>> SCHEMA = TableSchemaOptions.SCHEMA;
7341
}

Diff for: seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2424
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2525
import org.apache.seatunnel.common.exception.CommonError;
26-
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
26+
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
2727

2828
import software.amazon.awssdk.core.SdkBytes;
2929
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -40,14 +40,13 @@
4040
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
4141

4242
private final SeaTunnelRowType seaTunnelRowType;
43-
private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
43+
private final AmazonDynamoDBConfig amazondynamodbConfig;
4444
private final List<AttributeValue.Type> measurementsType;
4545

4646
public DefaultSeaTunnelRowSerializer(
47-
SeaTunnelRowType seaTunnelRowType,
48-
AmazonDynamoDBSourceOptions amazondynamodbSourceOptions) {
47+
SeaTunnelRowType seaTunnelRowType, AmazonDynamoDBConfig amazondynamodbConfig) {
4948
this.seaTunnelRowType = seaTunnelRowType;
50-
this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
49+
this.amazondynamodbConfig = amazondynamodbConfig;
5150
this.measurementsType = convertTypes(seaTunnelRowType);
5251
}
5352

@@ -65,7 +64,7 @@ public PutItemRequest serialize(SeaTunnelRow seaTunnelRow) {
6564
measurementsType.get(index)));
6665
}
6766
return PutItemRequest.builder()
68-
.tableName(amazondynamodbSourceOptions.getTable())
67+
.tableName(amazondynamodbConfig.getTable())
6968
.item(itemValues)
7069
.build();
7170
}

Diff for: seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java

+12-50
Original file line numberDiff line numberDiff line change
@@ -17,79 +17,41 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
24-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2520
import org.apache.seatunnel.api.sink.SinkWriter;
2621
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2722
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
28-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
29-
import org.apache.seatunnel.common.config.CheckConfigUtil;
30-
import org.apache.seatunnel.common.config.CheckResult;
31-
import org.apache.seatunnel.common.constants.PluginType;
32-
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
33-
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
23+
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
3424
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
3525
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
3626

37-
import com.google.auto.service.AutoService;
38-
3927
import java.io.IOException;
4028
import java.util.Optional;
4129

42-
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
43-
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
44-
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
45-
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
46-
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
47-
48-
@AutoService(SeaTunnelSink.class)
4930
public class AmazonDynamoDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
5031

51-
private SeaTunnelRowType rowType;
32+
private CatalogTable catalogTable;
5233

53-
private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
34+
private AmazonDynamoDBConfig amazondynamodbConfig;
5435

55-
@Override
56-
public String getPluginName() {
57-
return "AmazonDynamodb";
36+
public AmazonDynamoDBSink(
37+
CatalogTable catalogTable, AmazonDynamoDBConfig amazondynamodbConfig) {
38+
this.catalogTable = catalogTable;
39+
this.amazondynamodbConfig = amazondynamodbConfig;
5840
}
5941

6042
@Override
61-
public void prepare(Config pluginConfig) throws PrepareFailException {
62-
CheckResult result =
63-
CheckConfigUtil.checkAllExists(
64-
pluginConfig,
65-
URL.key(),
66-
TABLE.key(),
67-
REGION.key(),
68-
ACCESS_KEY_ID.key(),
69-
SECRET_ACCESS_KEY.key());
70-
if (!result.isSuccess()) {
71-
throw new AmazonDynamoDBConnectorException(
72-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
73-
String.format(
74-
"PluginName: %s, PluginType: %s, Message: %s",
75-
getPluginName(), PluginType.SINK, result.getMsg()));
76-
}
77-
amazondynamodbSourceOptions = new AmazonDynamoDBSourceOptions(pluginConfig);
43+
public Optional<CatalogTable> getWriteCatalogTable() {
44+
return Optional.of(catalogTable);
7845
}
7946

8047
@Override
81-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
82-
this.rowType = seaTunnelRowType;
48+
public String getPluginName() {
49+
return "AmazonDynamodb";
8350
}
8451

8552
@Override
8653
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
8754
throws IOException {
88-
return new AmazonDynamoDBWriter(amazondynamodbSourceOptions, rowType);
89-
}
90-
91-
@Override
92-
public Optional<CatalogTable> getWriteCatalogTable() {
93-
return super.getWriteCatalogTable();
55+
return new AmazonDynamoDBWriter(amazondynamodbConfig, catalogTable.getSeaTunnelRowType());
9456
}
9557
}

0 commit comments

Comments
 (0)