Skip to content

Commit 8a9db47

Browse files
authored
[Fix][Connector-V2] assign size for KafkaSource reader cache queue (#9041)
1 parent 44e54c7 commit 8a9db47

File tree

6 files changed

+16
-4
lines changed

6 files changed

+16
-4
lines changed

Diff for: docs/en/connector-v2/source/Kafka.md

+2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
5656
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details |
5757
| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name |
5858
| protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition |
59+
| reader_cache_queue_size | Integer | No | 1024 | The reader shard cache queue is used to cache the data corresponding to the shards. The size of the shard cache depends on the number of shards obtained by each reader, rather than the amount of data in each shard. |
60+
| is_native | Boolean | No | false | Supports retaining the source information of the record.
5961

6062
### debezium_record_table_filter
6163

Diff for: docs/zh/connector-v2/source/Kafka.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ import ChangeLog from '../changelog/connector-kafka.md';
5656
| common-options | || - | 源插件的常见参数,详情请参考 [Source Common Options](../source-common-options.md)|
5757
| protobuf_message_name | String || - | 当格式设置为 protobuf 时有效,指定消息名称。 |
5858
| protobuf_schema | String || - | 当格式设置为 protobuf 时有效,指定 Schema 定义。 |
59-
| is_native | Boolean | No | false | 支持保留record的源信息。 |
59+
| reader_cache_queue_size | Integer || 1024 | Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。 |
60+
| is_native | Boolean | No | false | 支持保留record的源信息。 |
6061

6162
### debezium_record_table_filter
6263

Diff for: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java

+6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public class KafkaSourceOptions extends KafkaBaseOptions {
4141
.withDescription(
4242
"Kafka consumer group id, used to distinguish different consumer groups.");
4343

44+
public static final Option<Integer> READER_CACHE_QUEUE_SIZE =
45+
Options.key("reader_cache_queue_size")
46+
.intType()
47+
.defaultValue(1024)
48+
.withDescription("The size of reader queue.");
49+
4450
public static final Option<Boolean> COMMIT_ON_CHECKPOINT =
4551
Options.key("commit_on_checkpoint")
4652
.booleanType()

Diff for: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,8 @@ public List<CatalogTable> getProducedCatalogTables() {
7878
@Override
7979
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
8080
SourceReader.Context readerContext) {
81-
8281
BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue =
83-
new LinkedBlockingQueue<>();
82+
new LinkedBlockingQueue<>(kafkaSourceConfig.getReaderCacheQueueSize());
8483

8584
Supplier<KafkaPartitionSplitReader> kafkaPartitionSplitReaderSupplier =
8685
() -> new KafkaPartitionSplitReader(kafkaSourceConfig, readerContext);

Diff for: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java

+3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PATTERN;
8989
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_MESSAGE_NAME;
9090
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_SCHEMA;
91+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.READER_CACHE_QUEUE_SIZE;
9192
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE;
9293
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS;
9394
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP;
@@ -105,6 +106,7 @@ public class KafkaSourceConfig implements Serializable {
105106
@Getter private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
106107
@Getter private final String consumerGroup;
107108
@Getter private final long pollTimeout;
109+
@Getter private final int readerCacheQueueSize;
108110

109111
public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
110112
this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS);
@@ -116,6 +118,7 @@ public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
116118
readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
117119
this.pollTimeout = readonlyConfig.get(KEY_POLL_TIMEOUT);
118120
this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP);
121+
this.readerCacheQueueSize = readonlyConfig.get(READER_CACHE_QUEUE_SIZE);
119122
}
120123

121124
private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {

Diff for: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public OptionRule optionRule() {
5757
KafkaSourceOptions.FORMAT,
5858
KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA,
5959
KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER,
60-
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
60+
KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
61+
KafkaSourceOptions.READER_CACHE_QUEUE_SIZE)
6162
.conditional(
6263
KafkaSourceOptions.START_MODE,
6364
StartMode.TIMESTAMP,

0 commit comments

Comments
 (0)