Skip to content

Commit 3f57888

Browse files
committed
[Improve][DynamoDb] Dynamodb client supports endpointOverride and create credentials from container or system env.
1 parent 1e57e39 commit 3f57888

File tree

9 files changed

+117
-78
lines changed

9 files changed

+117
-78
lines changed

docs/en/connector-v2/sink/AmazonDynamoDB.md

+12-8
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,38 @@ Write data to Amazon DynamoDB
1414

1515
| Name | Type | Required | Default value |
1616
|-------------------|--------|----------|---------------|
17-
| url | string | yes | - |
18-
| region | string | yes | - |
19-
| access_key_id | string | yes | - |
20-
| secret_access_key | string | yes | - |
17+
| url | string | no | - |
18+
| region | string | no | - |
19+
| access_key_id | string | no | - |
20+
| secret_access_key | string | no | - |
2121
| table | string | yes | - |
2222
| batch_size | string | no | 25 |
2323
| common-options | | no | - |
2424

2525
### url [string]
2626

27-
The URL to write to Amazon DynamoDB.
27+
The URL to write to Amazon DynamoDB. It will override the `endpoint` of AWS DynamoDB. Note: the `url` and `region` parameters can only be set one. When setting the url, the `region` parameter will be ignored.
2828

2929
### region [string]
3030

31-
The region of Amazon DynamoDB.
31+
The region of Amazon DynamoDB. The DynamoDB client will use the region to determine the service endpoint.
3232

3333
### accessKeyId [string]
3434

35-
The access id of Amazon DynamoDB.
35+
The access id of Amazon DynamoDB. If you don't set it, the plugin will use the container credential provider chain to get the access id.
3636

3737
### secretAccessKey [string]
3838

39-
The access secret of Amazon DynamoDB.
39+
The access secret of Amazon DynamoDB. If you don't set it, the plugin will use the container credential provider chain to get the access secret.
4040

4141
### table [string]
4242

4343
The table of Amazon DynamoDB.
4444

45+
### batch_size [string]
46+
47+
The number of records to write to Amazon DynamoDB in a batch. The default value is 25, and the maximum value is 25 because the Amazon DynamoDB batch write item limit is 25.
48+
4549
### common options
4650

4751
Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details.

docs/en/connector-v2/source/AmazonDynamoDB.md

+9-9
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ Read data from Amazon DynamoDB.
1919

2020
| name | type | required | default value |
2121
|-----------------------|--------|----------|---------------|
22-
| url | string | yes | - |
23-
| region | string | yes | - |
24-
| access_key_id | string | yes | - |
25-
| secret_access_key | string | yes | - |
22+
| url | string | no | - |
23+
| region | string | no | - |
24+
| access_key_id | string | no | - |
25+
| secret_access_key | string | no | - |
2626
| table | string | yes | - |
2727
| schema | config | yes | - |
2828
| common-options | | yes | - |
@@ -31,19 +31,19 @@ Read data from Amazon DynamoDB.
3131

3232
### url [string]
3333

34-
The URL to read to Amazon Dynamodb.
34+
The URL to read to Amazon Dynamodb. It will override the `endpoint` of AWS DynamoDB. Note: the `url` and `region` parameters can only be set one. When setting the url, the `region` parameter will be ignored.
3535

3636
### region [string]
3737

38-
The region of Amazon Dynamodb.
38+
The region of Amazon Dynamodb. The DynamoDB client will use the region to determine the service endpoint.
3939

4040
### accessKeyId [string]
4141

42-
The access id of Amazon DynamoDB.
42+
The access id of Amazon DynamoDB. If you don't set it, the plugin will use the container credential provider chain to get the access id.
4343

4444
### secretAccessKey [string]
4545

46-
The access secret of Amazon DynamoDB.
46+
The access secret of Amazon DynamoDB. If you don't set it, the plugin will use the container credential provider chain to get the access secret.
4747

4848
### table [string]
4949

@@ -53,7 +53,7 @@ The table of Amazon DynamoDB.
5353

5454
#### fields [config]
5555

56-
Amazon Dynamodb is a NOSQL database service of support keys-value storage and document data structure,there is no way to get the data type.Therefore, we must configure schema.
56+
Amazon Dynamodb is a NOSQL database service of support keys-value storage and document data structure, there is no way to get the data type. Therefore, we must configure schema.
5757

5858
such as:
5959

docs/zh/connector-v2/sink/AmazonDynamoDB.md

+9-9
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,29 @@
1414

1515
| 名称 | 类型 | 必需 | 默认值 |
1616
|-------------------|--------|----|---------------|
17-
| url | string | | - |
18-
| region | string | | - |
19-
| access_key_id | string | | - |
20-
| secret_access_key | string | | - |
17+
| url | string | | - |
18+
| region | string | | - |
19+
| access_key_id | string | | - |
20+
| secret_access_key | string | | - |
2121
| table | string || - |
2222
| batch_size | string || 25 |
23-
| common-options | || - |
23+
| common-options | | | - |
2424

2525
### url [string]
2626

27-
要写入Amazon DynamoDB的URL.
27+
要写入Amazon DynamoDB的URL. 它将覆盖AWS DynamoDB的`endpoint`. 注意,url 和 region 两个参数只能设置一个. 当设置url时,region参数将被忽略.
2828

2929
### region [string]
3030

31-
Amazon DynamoDB 的分区.
31+
Amazon DynamoDB 的分区. DynamoDB客户端将使用该区域确定服务端点.
3232

3333
### accessKeyId [string]
3434

35-
Amazon DynamoDB的访问id.
35+
Amazon DynamoDB的访问id. 如果未设置,插件将使用容器凭证提供程序链获取访问id.
3636

3737
### secretAccessKey [string]
3838

39-
Amazon DynamoDB的访问密钥.
39+
Amazon DynamoDB的访问密钥. 如果未设置,插件将使用容器凭证提供程序链获取访问密钥.
4040

4141
### table [string]
4242

docs/zh/connector-v2/source/AmazonDynamoDB.md

+15-15
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,33 @@
1717

1818
## 选项
1919

20-
| 名称 | 类型 | 必需 | 默认值 |
21-
|-----------------------|--------|-------|---------------|
22-
| url | string | | - |
23-
| region | string | | - |
24-
| access_key_id | string | | - |
25-
| secret_access_key | string | | - |
26-
| table | string | | - |
27-
| schema | config | | - |
28-
| common-options | | | - |
29-
| scan_item_limit | | | - |
30-
| parallel_scan_threads | || - |
20+
| 名称 | 类型 | 必需 | 默认值 |
21+
|-----------------------|--------|----|---------------|
22+
| url | string | | - |
23+
| region | string | | - |
24+
| access_key_id | string | | - |
25+
| secret_access_key | string | | - |
26+
| table | string || - |
27+
| schema | config || - |
28+
| common-options | || - |
29+
| scan_item_limit | || - |
30+
| parallel_scan_threads | | | - |
3131

3232
### url [string]
3333

34-
读取Amazon Dynamodb的URL.
34+
读取Amazon Dynamodb的URL. 它将覆盖AWS DynamoDB的`endpoint`. 注意,url 和 region 两个参数只能设置一个. 当设置url时,region参数将被忽略.
3535

3636
### region [string]
3737

38-
Amazon DynamoDB 的分区.
38+
Amazon DynamoDB 的分区. DynamoDB客户端将使用该区域确定服务端点.
3939

4040
### accessKeyId [string]
4141

42-
Amazon DynamoDB的访问id.
42+
Amazon DynamoDB的访问id. 如果未设置,插件将使用容器凭证提供程序链获取访问id.
4343

4444
### secretAccessKey [string]
4545

46-
Amazon DynamoDB的访问密钥.
46+
Amazon DynamoDB的访问密钥. 如果未设置,插件将使用容器凭证提供程序链获取访问密钥.
4747

4848
### table [string]
4949

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.client;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
21+
22+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
23+
import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider;
24+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
25+
import software.amazon.awssdk.regions.Region;
26+
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
27+
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
28+
29+
import java.net.URI;
30+
31+
public class DynamoDbClientProvider {
32+
33+
public static DynamoDbClient createDynamoDBClient(AmazonDynamoDBConfig config) {
34+
DynamoDbClientBuilder builder = DynamoDbClient.builder();
35+
36+
if (config.getUrl() != null) {
37+
builder.endpointOverride(URI.create(config.getUrl()));
38+
}
39+
40+
if (config.getRegion() != null) {
41+
builder.region(Region.of(config.getRegion()));
42+
}
43+
44+
if (config.getAccessKeyId() != null && config.getSecretAccessKey() != null) {
45+
// Set up AWS credentials with accessKeyId and secretAccessKey
46+
builder.credentialsProvider(
47+
StaticCredentialsProvider.create(
48+
AwsBasicCredentials.create(
49+
config.getAccessKeyId(), config.getSecretAccessKey())));
50+
} else {
51+
// If accessKeyId and secretAccessKey are not provided, use container credentials.
52+
builder.credentialsProvider(ContainerCredentialsProvider.builder().build());
53+
}
54+
return builder.build();
55+
}
56+
}

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public String factoryIdentifier() {
4343
@Override
4444
public OptionRule optionRule() {
4545
return OptionRule.builder()
46-
.required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
47-
.optional(BATCH_SIZE)
46+
.required(REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
47+
.optional(BATCH_SIZE, URL)
4848
.build();
4949
}
5050

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java

+2-16
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,15 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.client.DynamoDbClientProvider;
2021
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
2122

22-
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
23-
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
24-
import software.amazon.awssdk.regions.Region;
2523
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2624
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
2725
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
2826
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
2927
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
3028

31-
import java.net.URI;
3229
import java.util.ArrayList;
3330
import java.util.HashMap;
3431
import java.util.List;
@@ -49,18 +46,7 @@ private void tryInit() {
4946
if (initialize) {
5047
return;
5148
}
52-
dynamoDbClient =
53-
DynamoDbClient.builder()
54-
.endpointOverride(URI.create(amazondynamodbConfig.getUrl()))
55-
// The region is meaningless for local DynamoDb but required for client
56-
// builder validation
57-
.region(Region.of(amazondynamodbConfig.getRegion()))
58-
.credentialsProvider(
59-
StaticCredentialsProvider.create(
60-
AwsBasicCredentials.create(
61-
amazondynamodbConfig.getAccessKeyId(),
62-
amazondynamodbConfig.getSecretAccessKey())))
63-
.build();
49+
dynamoDbClient = DynamoDbClientProvider.createDynamoDBClient(amazondynamodbConfig);
6450
initialize = true;
6551
}
6652

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,14 @@ public String factoryIdentifier() {
5050
@Override
5151
public OptionRule optionRule() {
5252
return OptionRule.builder()
53-
.required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE, SCHEMA)
54-
.optional(SCAN_ITEM_LIMIT, PARALLEL_SCAN_THREADS)
53+
.required(TABLE, SCHEMA)
54+
.optional(
55+
SCAN_ITEM_LIMIT,
56+
PARALLEL_SCAN_THREADS,
57+
URL,
58+
REGION,
59+
ACCESS_KEY_ID,
60+
SECRET_ACCESS_KEY)
5561
.build();
5662
}
5763

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java

+4-17
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,20 @@
2121
import org.apache.seatunnel.api.source.SourceReader;
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
24+
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.client.DynamoDbClientProvider;
2425
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
2526
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
2627
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
2728

2829
import lombok.extern.slf4j.Slf4j;
29-
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
30-
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
31-
import software.amazon.awssdk.regions.Region;
3230
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
3331
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
3432
import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
3533

36-
import java.net.URI;
3734
import java.util.ArrayList;
3835
import java.util.List;
3936
import java.util.Objects;
37+
import java.util.Optional;
4038
import java.util.Queue;
4139
import java.util.concurrent.ConcurrentLinkedDeque;
4240

@@ -63,23 +61,12 @@ public AmazonDynamoDBSourceReader(
6361

6462
@Override
6563
public void open() {
66-
dynamoDbClient =
67-
DynamoDbClient.builder()
68-
.endpointOverride(URI.create(amazondynamodbConfig.getUrl()))
69-
// The region is meaningless for local DynamoDb but required for client
70-
// builder validation
71-
.region(Region.of(amazondynamodbConfig.getRegion()))
72-
.credentialsProvider(
73-
StaticCredentialsProvider.create(
74-
AwsBasicCredentials.create(
75-
amazondynamodbConfig.getAccessKeyId(),
76-
amazondynamodbConfig.getSecretAccessKey())))
77-
.build();
64+
dynamoDbClient = DynamoDbClientProvider.createDynamoDBClient(amazondynamodbConfig);
7865
}
7966

8067
@Override
8168
public void close() {
82-
dynamoDbClient.close();
69+
Optional.ofNullable(dynamoDbClient).ifPresent(DynamoDbClient::close);
8370
}
8471

8572
@Override

0 commit comments

Comments
 (0)