builder()
+ .setKafkaMetadataService(new MyKafkaMetadataService())
+ .setStreamIds(Collections.singleton("input-stream"))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ .setProperties(properties)
+ .build();
+
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic Kafka Source");
+```
+{{< /tab >}}
+{{< /tabs >}}
+The following properties are **required** for building a DynamicKafkaSource:
+
+The Kafka metadata service, configured by setKafkaMetadataService(KafkaMetadataService)
+The stream ids to subscribe, see the following Kafka stream subscription section for more details.
+Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{< ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
+
+### Kafka Stream Subscription
+The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
+* A set of Kafka stream ids. For example:
+ {{< tabs "DynamicKafkaSource#setStreamIds" >}}
+ {{< tab "Java" >}}
+ ```java
+ DynamicKafkaSource.builder().setStreamIds(Set.of("stream-a", "stream-b"));
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+* A regex pattern that subscribes to all Kafka stream ids that match the provided regex. For example:
+ {{< tabs "DynamicKafkaSource#setStreamPattern" >}}
+ {{< tab "Java" >}}
+ ```java
+ DynamicKafkaSource.builder().setStreamPattern(Pattern.of("stream.*"));
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
+### Kafka Metadata Service
+
+An interface is provided to resolve the logical Kafka stream(s) into the corresponding physical
+topic(s) and cluster(s). Typically, these implementations are based on services that align well
+with internal Kafka infrastructure--if that is not available, an in-memory implementation
+would also work. An example of in-memory implementation can be found in our tests.
+
+This source achieves its dynamic characteristic by periodically polling this Kafka metadata service
+for any changes to the Kafka stream(s) and reconciling the reader tasks to subscribe to the new
+Kafka metadata returned by the service. For example, in the case of a Kafka migration, the source would
+swap from one cluster to the new cluster when the service makes that change in the Kafka stream metadata.
+
+### Additional Properties
+There are configuration options in DynamicKafkaSourceOptions that can be configured in the properties through the builder:
+
+
+
+ | Option |
+ Required |
+ Default |
+ Type |
+ Description |
+
+
+
+
+ stream-metadata-discovery-interval-ms |
+ required |
+ -1 |
+ Long |
+ The interval in milliseconds for the source to discover the changes in stream metadata. A non-positive value disables the stream metadata discovery. |
+
+
+ stream-metadata-discovery-failure-threshold |
+ required |
+ 1 |
+ Integer |
+ The number of consecutive failures before letting the exception from Kafka metadata service discovery trigger jobmanager failure and global failover. The default is one to at least catch startup failures. |
+
+
+
+
+
+In addition to this list, see the [regular Kafka connector]({{< ref "docs/connectors/datastream/kafka" >}}#additional-properties) for
+a list of applicable properties.
+
+### Metrics
+
+
+
+
+ | Scope |
+ Metrics |
+ User Variables |
+ Description |
+ Type |
+
+
+
+
+ | Operator |
+ currentEmitEventTimeLag |
+ n/a |
+ The time span from the record event timestamp to the time the record is emitted by the source connector¹: currentEmitEventTimeLag = EmitTime - EventTime. |
+ Gauge |
+
+
+ | watermarkLag |
+ n/a |
+ The time span that the watermark lags behind the wall clock time: watermarkLag = CurrentTime - Watermark |
+ Gauge |
+
+
+ | sourceIdleTime |
+ n/a |
+ The time span that the source has not processed any record: sourceIdleTime = CurrentTime - LastRecordProcessTime |
+ Gauge |
+
+
+ | pendingRecords |
+ n/a |
+ The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition. |
+ Gauge |
+
+
+ | kafkaClustersCount |
+ n/a |
+ The total number of Kafka clusters read by this reader. |
+ Gauge |
+
+
+
+
+In addition to this list, see the [regular Kafka connector]({{< ref "docs/connectors/datastream/kafka" >}}#monitoring) for
+the KafkaSourceReader metrics that are also reported.
+
+### Additional Details
+
+For additional details on deserialization, event time and watermarks, idleness, consumer offset
+committing, security, and more, you can refer to the [Kafka Source documentation]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-source). This is possible because the
+Dynamic Kafka Source leverages components of the Kafka Source, and the implementation will be
+discussed in the next section.
+
+### Behind the Scene
+{{< hint info >}}
+If you are interested in how Kafka source works under the design of new data source API, you may
+want to read this part as a reference. For details about the new data source API,
+[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and
+FLIP-27
+provide more descriptive discussions.
+{{< /hint >}}
+
+
+Under the abstraction of the new data source API, Dynamic Kafka Source consists of the following components:
+#### Source Split
+A source split in Dynamic Kafka Source represents a partition of a Kafka topic, with cluster information. It
+consists of:
+* A Kafka cluster id that can be resolved by the Kafka metadata service.
+* A Kafka Source Split (TopicPartition, starting offset, stopping offset).
+
+You can check the class `DynamicKafkaSourceSplit` for more details.
+
+#### Split Enumerator
+
+This enumerator is responsible for discovering and assigning splits from one or more clusters. At startup, the
+enumerator will discover metadata belonging to the Kafka stream ids. Using the metadata, it can
+initialize KafkaSourceEnumerators to handle the functions of assigning splits to the readers. In addition,
+source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the
+KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators when metadata changes involve
+clearing outdated metrics since clusters may be removed and so should their metrics.
+
+#### Source Reader
+
+This reader is responsible for reading from one or more clusters and using the KafkaSourceReader to fetch
+records from topics and clusters based on the metadata. When new metadata is discovered by the enumerator,
+the reader will reconcile metadata changes to possibly restart the KafkaSourceReader to read from the new
+set of topics and clusters.
+
+#### Kafka Metadata Service
+
+This interface represents the source of truth for the current metadata for the configured Kafka stream ids.
+Metadata that is removed in between polls is considered non-active (e.g. removing a cluster from the
+return value, means that a cluster is non-active and should not be read from). The cluster metadata
+contains an immutable Kafka cluster id, the set of topics, and properties needed to connect to the
+Kafka cluster.
+
+#### FLIP 246
+
+To understand more behind the scenes, please read [FLIP-246](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320)
+for more details and discussion.
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md
index 039bdeca8..4a90ece31 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -36,7 +36,7 @@ Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client
当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
有关 Kafka 兼容性的更多细节,请参考 [Kafka 官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。
-{{< connector_artifact flink-connector-kafka 3.0.0 >}}
+{{< connector_artifact flink-connector-kafka kafka >}}
如果使用 Kafka source,```flink-connector-base``` 也需要包含在依赖中:
@@ -45,7 +45,7 @@ Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client
Flink 目前的流连接器还不是二进制发行版的一部分。
[在此处]({{< ref "docs/dev/configuration/overview" >}})可以了解到如何链接它们,从而在集群中运行。
-{{< py_download_link "kafka" >}}
+{{< py_connector_download_link "kafka" >}}
## Kafka Source
{{< hint info >}}
@@ -222,14 +222,12 @@ Kafka Source 支持流式和批式两种运行模式。默认情况下,KafkaSo
Kafka consumer 的配置可以参考 [Apache Kafka 文档](http://kafka.apache.org/documentation/#consumerconfigs)。
请注意,即使指定了以下配置项,构建器也会将其覆盖:
-- ```key.deserializer``` 始终设置为 ByteArrayDeserializer
-- ```value.deserializer``` 始终设置为 ByteArrayDeserializer
- ```auto.offset.reset.strategy``` 被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖
- ```partition.discovery.interval.ms``` 会在批模式下被覆盖为 -1
### 动态分区检查
为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition
-订阅模式下定期检查新分区。要启用动态分区检查,请将 ```partition.discovery.interval.ms``` 设置为非负值:
+订阅模式下定期检查新分区。要启用动态分区检查,请将 ```partition.discovery.interval.ms``` 设置为正值:
{{< tabs "KafkaSource#PartitionDiscovery" >}}
@@ -248,7 +246,7 @@ KafkaSource.builder() \
{{< /tabs >}}
{{< hint warning >}}
-分区检查功能默认**不开启**。需要显式地设置分区检查间隔才能启用此功能。
+分区检查间隔默认为5分钟。需要显式地设置分区检查间隔为非正数才能关闭此功能。
{{< /hint >}}
### 事件时间和水印
diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md
index 408cb1a2f..9df680df8 100644
--- a/docs/content.zh/docs/connectors/table/kafka.md
+++ b/docs/content.zh/docs/connectors/table/kafka.md
@@ -48,7 +48,7 @@ CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
@@ -81,7 +81,7 @@ CREATE TABLE KafkaTable (
topic |
STRING NOT NULL |
Kafka 记录的 Topic 名。 |
- R |
+ R/W |
partition |
@@ -127,7 +127,7 @@ CREATE TABLE KafkaTable (
```sql
CREATE TABLE KafkaTable (
- `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
@@ -151,7 +151,7 @@ CREATE TABLE KafkaTable (
```sql
CREATE TABLE KafkaTable (
- `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
+ `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
@@ -191,17 +191,17 @@ CREATE TABLE KafkaTable (
topic |
- required for sink |
+ 可选 |
(无) |
String |
- 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。 |
+ 当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 'topic-1;topic-2' 来作为 source 的 topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。 |
topic-pattern |
可选 |
(无) |
String |
- 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。 |
+ 用于读取或写入的 topic 名称模式的正则表达式。所有匹配指定正则表达式的 topic 名称将在作业开始运行时被消费者订阅。对于 sink 来说,`topic` 元数据列是可写的,必须提供并且与 `topic-pattern` 正则表达式匹配。注意,“topic-pattern”和“topic”只能指定其中一个。 |
properties.bootstrap.servers |
@@ -338,9 +338,17 @@ CREATE TABLE KafkaTable (
scan.topic-partition-discovery.interval |
可选 |
- (无) |
+ 5分钟 |
Duration |
- Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。 |
+ Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能 |
+
+
+ scan.parallelism |
+ optional |
+ no |
+ (none) |
+ Integer |
+ 定义 Kafka source 算子的并行度。默认情况下会使用全局默认并行度。 |
sink.partitioner |
@@ -389,7 +397,7 @@ Kafka 消息的消息键和消息体部分都可以使用某种 [格式]({{< ref
```sql
CREATE TABLE KafkaTable (
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
@@ -415,7 +423,7 @@ ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
```sql
CREATE TABLE KafkaTable (
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
@@ -572,29 +580,29 @@ Source 输出的 watermark 由读取的分区中最小的 watermark 决定。
请参阅 [Kafka watermark 策略]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}#watermark-策略和-kafka-连接器) 以获取更多细节。
### 安全
-要启用加密和认证相关的安全配置,只需将安全配置加上 "properties." 前缀配置在 Kafka 表上即可。下面的代码片段展示了如何配置 Kafka 表以使用
-PLAIN 作为 SASL 机制并提供 JAAS 配置:
+要启用加密和认证相关的安全配置,只需将安全配置加上 "properties." 前缀配置在 Kafka 表上即可。下面的代码片段展示了当依赖 SQL client JAR 时, 如何配置 Kafka 表
+以使用 PLAIN 作为 SASL 机制并提供 JAAS 配置:
```sql
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
- 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
+ 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)
```
-另一个更复杂的例子,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:
+另一个更复杂的例子,当依赖 SQL client JAR 时,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:
```sql
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
@@ -610,13 +618,13 @@ CREATE TABLE KafkaTable (
/* 将 SASL 机制配置为 as SCRAM-SHA-256 */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* 配置 JAAS */
- 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
+ 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";'
)
```
-如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此请根据登录模块在
-JAR 中实际的类路径来改写以上配置。例如在 SQL client JAR 中,Kafka client 依赖被重置在了 `org.apache.flink.kafka.shaded.org.apache.kafka` 路径下,
-因此 plain 登录模块的类路径应写为 `org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule`。
+在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此需要根据登录模块在
+JAR 中实际的类路径来改写以上配置。在 SQL client JAR 中,Kafka client 依赖被重置在了 `org.apache.flink.kafka.shaded.org.apache.kafka`
+路径下,因此以上的代码片段中 plain 登录模块的类路径写为 `org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule`。
关于安全配置的详细描述,请参阅 Apache Kafka 文档中的"安全"一节。
diff --git a/docs/content.zh/docs/connectors/table/upsert-kafka.md b/docs/content.zh/docs/connectors/table/upsert-kafka.md
index 40df1fa20..bacaae52b 100644
--- a/docs/content.zh/docs/connectors/table/upsert-kafka.md
+++ b/docs/content.zh/docs/connectors/table/upsert-kafka.md
@@ -38,7 +38,7 @@ Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并
依赖
------------
-{{< sql_download_table "upsert-kafka" >}}
+{{< sql_connector_download_table "kafka" >}}
Upsert Kafka 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解如何在集群运行中引用 Upsert Kafka 连接器。
@@ -119,7 +119,7 @@ of all available metadata fields.
必选 |
(none) |
String |
- 用于读取和写入的 Kafka topic 名称。 |
+ 当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 'topic-1;topic-2' 来作为 source 的 topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。 |
properties.bootstrap.servers |
@@ -136,7 +136,7 @@ of all available metadata fields.
该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。
Flink 会自动移除 选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 'properties.allow.auto.create.topics' = 'false'
- 来禁止自动创建 topic。 但是,某些选项,例如'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
+ 来禁止自动创建 topic。 但是,某些选项,例如'auto.offset.reset' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
|
@@ -180,6 +180,14 @@ of all available metadata fields.
+
+ scan.parallelism |
+ optional |
+ no |
+ (none) |
+ Integer |
+ 定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并行度。 |
+
sink.parallelism |
可选 |
@@ -221,7 +229,7 @@ prefixed with either the `'key'` or `'value'` plus format identifier.
```sql
CREATE TABLE KafkaTable (
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md b/docs/content/docs/connectors/datastream/dynamic-kafka.md
new file mode 100644
index 000000000..e64b93e67
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -0,0 +1,250 @@
+---
+title: Dynamic Kafka
+weight: 3
+type: docs
+aliases:
+ - /dev/connectors/dynamic-kafka.html
+---
+
+
+# Dynamic Kafka Source _`Experimental`_
+
+Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading data from Kafka topics from one or more Kafka clusters.
+The Dynamic Kafka connector discovers the clusters and topics using a Kafka metadata service and can achieve reading in a dynamic fashion, facilitating changes in
+topics and/or clusters, without requiring a job restart. This is especially useful when you need to read a new Kafka cluster/topic and/or stop reading
+an existing Kafka cluster/topic (cluster migration/failover/other infrastructure changes) and when you need direct integration with Hybrid Source. The solution
+makes these operations automated so that they are transparent to Kafka consumers.
+
+## Dependency
+
+For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+
+{{< connector_artifact flink-connector-kafka kafka >}}
+
+Flink's streaming connectors are not part of the binary distribution.
+See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+## Dynamic Kafka Source
+{{< hint info >}}
+This part describes the Dynamic Kafka Source based on the new
+[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
+{{< /hint >}}
+
+### Usage
+
+Dynamic Kafka Source provides a builder class to initialize the DynamicKafkaSource. The code snippet
+below shows how to build a DynamicKafkaSource to consume messages from the earliest offset of the
+stream "input-stream" and deserialize only the value of the
+ConsumerRecord as a string, using "MyKafkaMetadataService" to resolve the cluster(s) and topic(s)
+corresponding to "input-stream".
+
+{{< tabs "DynamicKafkaSource" >}}
+{{< tab "Java" >}}
+```java
+
+DynamicKafkaSource source = DynamicKafkaSource.builder()
+ .setKafkaMetadataService(new MyKafkaMetadataService())
+ .setStreamIds(Collections.singleton("input-stream"))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ .setProperties(properties)
+ .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic Kafka Source");
+```
+{{< /tab >}}
+{{< /tabs >}}
+The following properties are **required** for building a DynamicKafkaSource:
+
+The Kafka metadata service, configured by setKafkaMetadataService(KafkaMetadataService)
+The stream ids to subscribe, see the following Kafka stream subscription section for more details.
+Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{< ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
+
+### Kafka Stream Subscription
+The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
+* A set of Kafka stream ids. For example:
+ {{< tabs "DynamicKafkaSource#setStreamIds" >}}
+ {{< tab "Java" >}}
+ ```java
+ DynamicKafkaSource.builder().setStreamIds(Set.of("stream-a", "stream-b"));
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+* A regex pattern that subscribes to all Kafka stream ids that match the provided regex. For example:
+ {{< tabs "DynamicKafkaSource#setStreamPattern" >}}
+ {{< tab "Java" >}}
+ ```java
+ DynamicKafkaSource.builder().setStreamPattern(Pattern.of("stream.*"));
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
+### Kafka Metadata Service
+
+An interface is provided to resolve the logical Kafka stream(s) into the corresponding physical
+topic(s) and cluster(s). Typically, these implementations are based on services that align well
+with internal Kafka infrastructure--if that is not available, an in-memory implementation
+would also work. An example of in-memory implementation can be found in our tests.
+
+This source achieves its dynamic characteristic by periodically polling this Kafka metadata service
+for any changes to the Kafka stream(s) and reconciling the reader tasks to subscribe to the new
+Kafka metadata returned by the service. For example, in the case of a Kafka migration, the source would
+swap from one cluster to the new cluster when the service makes that change in the Kafka stream metadata.
+
+### Additional Properties
+There are configuration options in DynamicKafkaSourceOptions that can be configured in the properties through the builder:
+
+
+
+ | Option |
+ Required |
+ Default |
+ Type |
+ Description |
+
+
+
+
+ stream-metadata-discovery-interval-ms |
+ required |
+ -1 |
+ Long |
+ The interval in milliseconds for the source to discover the changes in stream metadata. A non-positive value disables the stream metadata discovery. |
+
+
+ stream-metadata-discovery-failure-threshold |
+ required |
+ 1 |
+ Integer |
+ The number of consecutive failures before letting the exception from Kafka metadata service discovery trigger jobmanager failure and global failover. The default is one to at least catch startup failures. |
+
+
+
+
+
+In addition to this list, see the [regular Kafka connector]({{< ref "docs/connectors/datastream/kafka" >}}#additional-properties) for
+a list of applicable properties.
+
+### Metrics
+
+
+
+
+ | Scope |
+ Metrics |
+ User Variables |
+ Description |
+ Type |
+
+
+
+
+ | Operator |
+ currentEmitEventTimeLag |
+ n/a |
+ The time span from the record event timestamp to the time the record is emitted by the source connector¹: currentEmitEventTimeLag = EmitTime - EventTime. |
+ Gauge |
+
+
+ | watermarkLag |
+ n/a |
+ The time span that the watermark lags behind the wall clock time: watermarkLag = CurrentTime - Watermark |
+ Gauge |
+
+
+ | sourceIdleTime |
+ n/a |
+ The time span that the source has not processed any record: sourceIdleTime = CurrentTime - LastRecordProcessTime |
+ Gauge |
+
+
+ | pendingRecords |
+ n/a |
+ The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition. |
+ Gauge |
+
+
+ | kafkaClustersCount |
+ n/a |
+ The total number of Kafka clusters read by this reader. |
+ Gauge |
+
+
+
+
+In addition to this list, see the [regular Kafka connector]({{< ref "docs/connectors/datastream/kafka" >}}#monitoring) for
+the KafkaSourceReader metrics that are also reported.
+
+### Additional Details
+
+For additional details on deserialization, event time and watermarks, idleness, consumer offset
+committing, security, and more, you can refer to the [Kafka Source documentation]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-source). This is possible because the
+Dynamic Kafka Source leverages components of the Kafka Source, and the implementation will be
+discussed in the next section.
+
+### Behind the Scene
+{{< hint info >}}
+If you are interested in how Kafka source works under the design of new data source API, you may
+want to read this part as a reference. For details about the new data source API,
+[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and
+FLIP-27
+provide more descriptive discussions.
+{{< /hint >}}
+
+
+Under the abstraction of the new data source API, Dynamic Kafka Source consists of the following components:
+#### Source Split
+A source split in Dynamic Kafka Source represents a partition of a Kafka topic, with cluster information. It
+consists of:
+* A Kafka cluster id that can be resolved by the Kafka metadata service.
+* A Kafka Source Split (TopicPartition, starting offset, stopping offset).
+
+You can check the class `DynamicKafkaSourceSplit` for more details.
+
+#### Split Enumerator
+
+This enumerator is responsible for discovering and assigning splits from one or more clusters. At startup, the
+enumerator will discover metadata belonging to the Kafka stream ids. Using the metadata, it can
+initialize KafkaSourceEnumerators to handle the functions of assigning splits to the readers. In addition,
+source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the
+KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators when metadata changes involve
+clearing outdated metrics since clusters may be removed and so should their metrics.
+
+#### Source Reader
+
+This reader is responsible for reading from one or more clusters and using the KafkaSourceReader to fetch
+records from topics and clusters based on the metadata. When new metadata is discovered by the enumerator,
+the reader will reconcile metadata changes to possibly restart the KafkaSourceReader to read from the new
+set of topics and clusters.
+
+#### Kafka Metadata Service
+
+This interface represents the source of truth for the current metadata for the configured Kafka stream ids.
+Metadata that is removed in between polls is considered non-active (e.g. removing a cluster from the
+return value, means that a cluster is non-active and should not be read from). The cluster metadata
+contains an immutable Kafka cluster id, the set of topics, and properties needed to connect to the
+Kafka cluster.
+
+#### FLIP 246
+
+To understand more behind the scenes, please read [FLIP-246](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320)
+for more details and discussion.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index 7589b8d3c..bddcefec9 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -36,12 +36,12 @@ The version of the client it uses may change between Flink releases.
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
-{{< connector_artifact flink-connector-kafka 3.0.0 >}}
+{{< connector_artifact flink-connector-kafka kafka >}}
Flink's streaming connectors are not part of the binary distribution.
See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
-{{< py_download_link "kafka" >}}
+{{< py_connector_download_link "kafka" >}}
## Kafka Source
{{< hint info >}}
@@ -235,8 +235,6 @@ for more details.
Please note that the following keys will be overridden by the builder even if
it is configured:
-- ```key.deserializer``` is always set to ```ByteArrayDeserializer```
-- ```value.deserializer``` is always set to ```ByteArrayDeserializer```
- ```auto.offset.reset.strategy``` is overridden by ```OffsetsInitializer#getAutoOffsetResetStrategy()```
for the starting offsets
- ```partition.discovery.interval.ms``` is overridden to -1 when
@@ -245,7 +243,7 @@ it is configured:
### Dynamic Partition Discovery
In order to handle scenarios like topic scaling-out or topic creation without restarting the Flink
job, Kafka source can be configured to periodically discover new partitions under provided
-topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for
+topic-partition subscribing pattern. To enable partition discovery, set a positive value for
property ```partition.discovery.interval.ms```:
{{< tabs "KafkaSource#PartitionDiscovery" >}}
@@ -264,8 +262,7 @@ KafkaSource.builder() \
{{< /tabs >}}
{{< hint warning >}}
-Partition discovery is **disabled** by default. You need to explicitly set the partition discovery
-interval to enable this feature.
+The partition discovery interval is 5 minutes by default. To **disable** this feature, you need to explicitly set the partition discovery interval to a non-positive value.
{{< /hint >}}
### Event Time and Watermarks
@@ -466,6 +463,25 @@ client dependencies in the job JAR, so you may need to rewrite it with the actua
For detailed explanations of security configurations, please refer to
the "Security" section in Apache Kafka documentation.
+## Kafka Rack Awareness
+
+Kafka rack awareness allows Flink to select and control the cloud region and availability zone that Kafka consumers read from, based on the Rack ID. This feature reduces network costs and latency since it allows consumers to connect to the closest Kafka brokers, possibly colocated in the same cloud region and availability zone.
+A client's rack is indicated using the `client.rack` config, and should correspond to a broker's `broker.rack` config.
+
+https://kafka.apache.org/documentation/#consumerconfigs_client.rack
+
+### RackId
+
+setRackIdSupplier() is the Builder method allows us to determine the consumer's rack. If provided, the Supplier will be run when the consumer is set up on the Task Manager, and the consumer's `client.rack` configuration will be set to the value.
+
+One of the ways this can be implemented is by making setRackId equal to an environment variable within your taskManager, for instance:
+
+```
+.setRackIdSupplier(() -> System.getenv("TM_NODE_AZ"))
+```
+
+The "TM_NODE_AZ" is the name of the environment variable in the TaskManager container that contains the zone we want to use.
+
### Behind the Scene
{{< hint info >}}
If you are interested in how Kafka source works under the design of new data source API, you may
diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md
index 3c9e739c2..12b0821c3 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -35,7 +35,7 @@ The Kafka connector allows for reading data from and writing data into Kafka top
Dependencies
------------
-{{< sql_download_table "kafka" >}}
+{{< sql_connector_download_table "kafka" >}}
The Kafka connector is not part of the binary distribution.
See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
@@ -50,7 +50,7 @@ CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
@@ -83,7 +83,7 @@ Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT I
topic |
STRING NOT NULL |
Topic name of the Kafka record. |
- R |
+ R/W |
partition |
@@ -129,7 +129,7 @@ The extended `CREATE TABLE` example demonstrates the syntax for exposing these m
```sql
CREATE TABLE KafkaTable (
- `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
@@ -154,7 +154,7 @@ The following example shows how to access both Kafka and Debezium metadata field
```sql
CREATE TABLE KafkaTable (
- `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
+ `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
@@ -196,11 +196,11 @@ Connector Options
topic |
- required for sink |
+ optional |
yes |
(none) |
String |
- Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks. |
+ Topic name(s) to read data from when the table is used as source, or topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified. For sinks, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified. |
topic-pattern |
@@ -208,7 +208,7 @@ Connector Options
yes |
(none) |
String |
- The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources. |
+ The regular expression for a pattern of topic names to read from or write to. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. For sinks, the `topic` metadata column is writable, must be provided and match the `topic-pattern` regex. Note, only one of "topic-pattern" and "topic" can be specified. |
properties.bootstrap.servers |
@@ -233,7 +233,7 @@ Connector Options
(none) |
String |
- This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
+ This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'auto.offset.reset'.
|
@@ -337,6 +337,7 @@ Connector Options
scan.bounded.mode |
optional |
+ no |
unbounded |
Enum |
Bounded mode for Kafka consumer, valid values are 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'.
@@ -364,9 +365,17 @@ Connector Options
| scan.topic-partition-discovery.interval |
optional |
yes |
- (none) |
+ 5 minutes |
Duration |
- Interval for consumer to discover dynamically created Kafka topics and partitions periodically. |
+ Interval for consumer to discover dynamically created Kafka topics and partitions periodically. To disable this feature, you need to explicitly set the 'scan.topic-partition-discovery.interval' value to 0. |
+
+
+ scan.parallelism |
+ optional |
+ no |
+ (none) |
+ Integer |
+ Defines the parallelism of the Kafka source operator. If not set, the global default parallelism is used. |
sink.partitioner |
@@ -435,7 +444,7 @@ options are prefixed with the format identifier.
```sql
CREATE TABLE KafkaTable (
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
@@ -461,7 +470,7 @@ prefixed with either the `'key'` or `'value'` plus format identifier.
```sql
CREATE TABLE KafkaTable (
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
@@ -608,8 +617,8 @@ Besides enabling Flink's checkpointing, you can also choose three different mode
* `none`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
* `at-least-once` (default setting): This guarantees that no records will be lost (although they can be duplicated).
* `exactly-once`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write
- to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed`
- or `read_uncommitted` - the latter one is the default value) for any application consuming records
+ to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_uncommitted`
+ or `read_committed` - the latter one is the default value) for any application consuming records
from Kafka.
Please refer to [Kafka documentation]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for more caveats about delivery guarantees.
@@ -629,13 +638,13 @@ for more details.
### Security
In order to enable security configurations including encryption and authentication, you just need to setup security
configurations with "properties." prefix in table options. The code snippet below shows configuring Kafka table to
-use PLAIN as SASL mechanism and provide JAAS configuration:
+use PLAIN as SASL mechanism and provide JAAS configuration when using SQL client JAR :
```sql
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
@@ -644,13 +653,13 @@ CREATE TABLE KafkaTable (
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
)
```
-For a more complex example, use SASL_SSL as the security protocol and use SCRAM-SHA-256 as SASL mechanism:
+For a more complex example, use SASL_SSL as the security protocol and use SCRAM-SHA-256 as SASL mechanism when using SQL client JAR :
```sql
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
@@ -672,8 +681,9 @@ CREATE TABLE KafkaTable (
Please note that the class path of the login module in `sasl.jaas.config` might be different if you relocate Kafka
client dependencies, so you may need to rewrite it with the actual class path of the module in the JAR.
-For example if you are using SQL client JAR, which has relocate Kafka client dependencies to `org.apache.flink.kafka.shaded.org.apache.kafka`,
-the path of plain login module should be `org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule` instead.
+SQL client JAR has relocated Kafka client dependencies to `org.apache.flink.kafka.shaded.org.apache.kafka`,
+then the path of plain login module in code snippets above need to be
+`org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule` when using SQL client JAR.
For detailed explanations of security configurations, please refer to
the "Security" section in Apache Kafka documentation.
diff --git a/docs/content/docs/connectors/table/upsert-kafka.md b/docs/content/docs/connectors/table/upsert-kafka.md
index 12a23c5c2..db75309a2 100644
--- a/docs/content/docs/connectors/table/upsert-kafka.md
+++ b/docs/content/docs/connectors/table/upsert-kafka.md
@@ -47,7 +47,7 @@ key will fall into the same partition.
Dependencies
------------
-{{< sql_download_table "upsert-kafka" >}}
+{{< sql_connector_download_table "kafka" >}}
The Upsert Kafka connector is not part of the binary distribution.
See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
@@ -129,7 +129,7 @@ Connector Options
required |
(none) |
String |
- The Kafka topic name to read from and write to. |
+ Topic name(s) to read data from when the table is used as source, or topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified. For sinks, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified. |
properties.bootstrap.servers |
@@ -144,7 +144,7 @@ Connector Options
(none) |
String |
- This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
+ This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via 'properties.allow.auto.create.topics' = 'false'. But there are some configurations that do not support to set, because Flink will override them, e.g. 'auto.offset.reset'.
|
@@ -192,6 +192,14 @@ Connector Options
format which means that key columns appear in the data type for both the key and value format.
+
+ scan.parallelism |
+ optional |
+ no |
+ (none) |
+ Integer |
+ Defines the parallelism of the upsert-kafka source operator. If not set, the global default parallelism is used. |
+
sink.parallelism |
optional |
@@ -221,6 +229,22 @@ Connector Options
By default, this is disabled. Note both 'sink.buffer-flush.max-rows' and
'sink.buffer-flush.interval' must be set to be greater than zero to enable sink buffer flushing.
+
+ sink.delivery-guarantee |
+ optional |
+ no |
+ at-least-once |
+ String |
+ Defines the delivery semantic for the upsert-kafka sink. Valid enumerationns are 'at-least-once', 'exactly-once' and 'none'. See Consistency guarantees for more details. |
+
+
+ sink.transactional-id-prefix |
+ optional |
+ yes |
+ (none) |
+ String |
+ If the delivery guarantee is configured as 'exactly-once' this value must be set and is used a prefix for the identifier of all opened Kafka transactions. |
+
@@ -240,7 +264,7 @@ prefixed with either the `'key'` or `'value'` plus format identifier.
{{< tab "SQL" >}}
```sql
CREATE TABLE KafkaTable (
- `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
@@ -277,6 +301,19 @@ connector is working in the upsert mode, the last record on the same key will ta
reading back as a source. Therefore, the upsert-kafka connector achieves idempotent writes just like
the [HBase sink]({{< ref "docs/connectors/table/hbase" >}}).
+With Flink's checkpointing enabled, the `upsert-kafka` connector can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.delivery-guarantee` option:
+
+* `none`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
+* `at-least-once` (default setting): This guarantees that no records will be lost (although they can be duplicated).
+* `exactly-once`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_uncommitted`
+ or `read_committed` - the latter one is the default value) for any application consuming records
+ from Kafka.
+
+Please refer to [Kafka connector documentation]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for more caveats about delivery guarantees.
+
### Source Per-Partition Watermarks
Flink supports to emit per-partition watermarks for Upsert Kafka. Watermarks are generated inside the Kafka
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
index 4cb2c19ad..e414db962 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-connector-kafka-e2e-tests
- 4.0-SNAPSHOT
+ 3.4-SNAPSHOT
4.0.0
@@ -37,10 +37,6 @@ under the License.
-
- 7.2.2
-
-
org.apache.flink
@@ -71,6 +67,12 @@ under the License.
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+ test
+
org.apache.kafka
@@ -110,6 +112,12 @@ under the License.
kafka-avro-serializer
${confluent.version}
test
+
+
+ com.google.guava
+ guava
+
+
@@ -122,6 +130,10 @@ under the License.
org.apache.kafka
kafka-clients
+
+ com.google.guava
+ guava
+
@@ -159,7 +171,7 @@ under the License.
com.google.guava
guava
- 30.1.1-jre
+ ${guava.version}
@@ -238,7 +250,7 @@ under the License.
com.google.guava
guava
- 30.1.1-jre
+ ${guava.version}
guava.jar
jar
${project.build.directory}/dependencies
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
index b22e8a382..e18c035b0 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -19,6 +19,7 @@
package org.apache.flink.tests.util.kafka;
import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
+import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
@@ -28,7 +29,6 @@
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.test.resources.ResourceTestUtils;
-import org.apache.flink.util.DockerImageVersions;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
index 4a036df20..1a2ac1f24 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
@@ -18,6 +18,7 @@
package org.apache.flink.tests.util.kafka;
+import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
import org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContextFactory;
import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
@@ -28,7 +29,6 @@
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.test.resources.ResourceTestUtils;
-import org.apache.flink.util.DockerImageVersions;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index a18976b18..e3b18194a 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -19,12 +19,13 @@
package org.apache.flink.tests.util.kafka;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.kafka.containers.SchemaRegistryContainer;
-import org.apache.flink.tests.util.kafka.test.DockerImageVersions;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
@@ -40,11 +41,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import java.nio.file.Path;
@@ -60,9 +58,6 @@
/** End-to-end test for SQL client using Avro Confluent Registry format. */
public class SQLClientSchemaRegistryITCase {
- private static final Logger LOG = LoggerFactory.getLogger(SQLClientSchemaRegistryITCase.class);
- private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
-
public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
private static final Path sqlAvroJar = ResourceTestUtils.getResource(".*avro.jar");
@@ -78,10 +73,9 @@ public class SQLClientSchemaRegistryITCase {
@ClassRule
public static final KafkaContainer KAFKA =
- new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+ KafkaUtil.createKafkaContainer(SQLClientSchemaRegistryITCase.class)
.withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
- .withLogConsumer(LOG_CONSUMER);
+ .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
@ClassRule
public static final SchemaRegistryContainer REGISTRY =
@@ -92,7 +86,11 @@ public class SQLClientSchemaRegistryITCase {
.dependsOn(KAFKA);
public final TestcontainersSettings testcontainersSettings =
- TestcontainersSettings.builder().network(NETWORK).logger(LOG).dependsOn(KAFKA).build();
+ TestcontainersSettings.builder()
+ .network(NETWORK)
+ .logger(KafkaUtil.getLogger("flink", SQLClientSchemaRegistryITCase.class))
+ .dependsOn(KAFKA)
+ .build();
public final FlinkContainers flink =
FlinkContainers.builder().withTestcontainersSettings(testcontainersSettings).build();
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
index 726eceea9..a8c416b0b 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -29,8 +29,6 @@
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
@@ -47,8 +45,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
@@ -56,6 +52,7 @@
import java.nio.ByteBuffer;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +61,6 @@
import java.util.stream.Collectors;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
-import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.assertj.core.api.Assertions.assertThat;
/** smoke test for the kafka connectors. */
@@ -72,20 +68,22 @@
@Testcontainers
class SmokeKafkaITCase {
- private static final Logger LOG = LoggerFactory.getLogger(SmokeKafkaITCase.class);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*";
@Container
public static final KafkaContainer KAFKA_CONTAINER =
- createKafkaContainer(KAFKA, LOG)
+ createKafkaContainer(SmokeKafkaITCase.class)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
- TestcontainersSettings.builder().logger(LOG).dependsOn(KAFKA_CONTAINER).build();
+ TestcontainersSettings.builder()
+ .logger(KafkaUtil.getLogger("flink", SmokeKafkaITCase.class))
+ .dependsOn(KAFKA_CONTAINER)
+ .build();
@RegisterExtension
public static final FlinkContainers FLINK =
@@ -103,6 +101,10 @@ private static Configuration getConfiguration() {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
flinkConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ // Workaround for FLINK-36454 ; default config is entirely overwritten
+ flinkConfig.setString(
+ "env.java.opts.all",
+ "--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED");
return flinkConfig;
}
@@ -137,7 +139,7 @@ public void testKafka() throws Exception {
// create the required topics
final short replicationFactor = 1;
admin.createTopics(
- Lists.newArrayList(
+ Arrays.asList(
new NewTopic(inputTopic, 1, replicationFactor),
new NewTopic(outputTopic, 1, replicationFactor)))
.all()
diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties
index 358fd81ef..9c49ae58a 100644
--- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties
+++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties
@@ -32,3 +32,27 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
#logger.yarn.name = org.testcontainers.shaded.com.github.dockerjava.core
#logger.yarn.level = WARN
#logger.yarn.appenderRef.console.ref = TestLogger
+
+# Logger configuration for containers, by default this is off
+# If you want to investigate test failures, overwrite the level as above
+logger.container.name = container
+logger.container.level = OFF
+logger.container.additivity = false # This prevents messages from being logged by the root logger
+logger.container.appenderRef.containerappender.ref = ContainerLogger
+
+logger.kafkacontainer.name = container.kafka
+logger.kafkacontainer.level = OFF
+
+logger.flinkcontainer.name = container.flink
+logger.flinkcontainer.level = OFF
+
+logger.flinkenv.name = org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment
+logger.flinkenv.level = OFF
+logger.flinkenv.additivity = false # This prevents messages from being logged by the root logger
+logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger
+
+appender.containerappender.name = ContainerLogger
+appender.containerappender.type = CONSOLE
+appender.containerappender.target = SYSTEM_ERR
+appender.containerappender.layout.type = PatternLayout
+appender.containerappender.layout.pattern = [%c{1}] %m%n
diff --git a/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/pom.xml b/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/pom.xml
index d4b7dfbcd..974e9d421 100644
--- a/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/pom.xml
+++ b/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-connector-kafka-e2e-tests
- 4.0-SNAPSHOT
+ 3.4-SNAPSHOT
4.0.0
diff --git a/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java b/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
index 2d05380b8..a6fa83e61 100644
--- a/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
+++ b/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
@@ -27,7 +27,7 @@ public class KafkaExampleUtil {
public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool parameterTool)
throws Exception {
- if (parameterTool.getNumberOfParameters() < 5) {
+ if (parameterTool.getNumberOfParameters() < 4) {
System.out.println(
"Missing parameters!\n"
+ "Usage: Kafka --input-topic --output-topic "
diff --git a/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test/pom.xml b/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test/pom.xml
index f2d9a2cc1..87498bea7 100644
--- a/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test/pom.xml
+++ b/flink-connector-kafka-e2e-tests/flink-streaming-kafka-test/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-connector-kafka-e2e-tests
- 4.0-SNAPSHOT
+ 3.4-SNAPSHOT
4.0.0
diff --git a/flink-connector-kafka-e2e-tests/pom.xml b/flink-connector-kafka-e2e-tests/pom.xml
index ddc83d60d..12c881197 100644
--- a/flink-connector-kafka-e2e-tests/pom.xml
+++ b/flink-connector-kafka-e2e-tests/pom.xml
@@ -25,7 +25,7 @@ under the License.
org.apache.flink
flink-connector-kafka-parent
- 4.0-SNAPSHOT
+ 3.4-SNAPSHOT
pom
diff --git a/flink-connector-kafka/archunit-violations/27a0a5e4-29c2-4069-b381-952746c90862 b/flink-connector-kafka/archunit-violations/27a0a5e4-29c2-4069-b381-952746c90862
index c7abba76e..109ecf836 100644
--- a/flink-connector-kafka/archunit-violations/27a0a5e4-29c2-4069-b381-952746c90862
+++ b/flink-connector-kafka/archunit-violations/27a0a5e4-29c2-4069-b381-952746c90862
@@ -1 +1 @@
-Method calls method in (FlinkKafkaProducer.java:1327)
\ No newline at end of file
+Method calls method in (FlinkKafkaProducer.java:1320)
diff --git a/flink-connector-kafka/archunit-violations/86dfd459-67a9-4b26-9b5c-0b0bbf22681a b/flink-connector-kafka/archunit-violations/86dfd459-67a9-4b26-9b5c-0b0bbf22681a
index c1e656185..8d8514add 100644
--- a/flink-connector-kafka/archunit-violations/86dfd459-67a9-4b26-9b5c-0b0bbf22681a
+++ b/flink-connector-kafka/archunit-violations/86dfd459-67a9-4b26-9b5c-0b0bbf22681a
@@ -1,78 +1,90 @@
-org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does not satisfy: only one of the following predicates match:\
+org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriterITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only one of the following predicates match:\
+org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.KafkaWriterFaultToleranceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.kafka.sink.KafkaWriterITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.internal.ProducerPoolImplITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.kafka.KafkaITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
diff --git a/flink-connector-kafka/archunit-violations/984f05c0-ec82-405e-9bcc-d202dbe7202e b/flink-connector-kafka/archunit-violations/984f05c0-ec82-405e-9bcc-d202dbe7202e
new file mode 100644
index 000000000..c27f7c3d4
--- /dev/null
+++ b/flink-connector-kafka/archunit-violations/984f05c0-ec82-405e-9bcc-d202dbe7202e
@@ -0,0 +1,354 @@
+Class extends class in (YamlFileMetadataService.java:0)
+Class is annotated with in (KafkaSourceEnumerator.java:0)
+Class is annotated with in (KafkaSourceEnumerator.java:0)
+Class extends class in (FlinkKafkaProducer.java:0)
+Class is annotated with in (FlinkKafkaProducer.java:0)
+Class is annotated with in (FlinkKafkaProducer.java:0)
+Class is annotated with in (FlinkKafkaProducer.java:0)
+Class extends class in (FlinkKafkaProducer.java:0)
+Class is annotated with in (FlinkKafkaProducer.java:0)
+Class extends class in (FlinkKafkaProducer.java:0)
+Class is annotated with in (FlinkKafkaProducer.java:0)
+Class is annotated with in (KafkaShuffleFetcher.java:0)
+Class is annotated with in (KafkaShuffleFetcher.java:0)
+Class is annotated with in (KafkaShuffleFetcher.java:0)
+Class is annotated with in (KafkaShuffleFetcher.java:0)
+Class extends class in (StreamKafkaShuffleSink.java:0)
+Constructor (java.util.function.Function, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.kafka.sink.HeaderProvider)> calls method in (KafkaRecordSerializationSchemaBuilder.java:308)
+Constructor (java.util.function.Function, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.kafka.sink.HeaderProvider)> calls method in (KafkaRecordSerializationSchemaBuilder.java:309)
+Constructor (java.lang.Class, boolean, java.util.Map, java.util.function.Function)> calls method in (KafkaSerializerWrapper.java:51)
+Constructor (java.lang.Class, boolean, java.util.Map, java.util.function.Function)> calls method in (KafkaSerializerWrapper.java:53)
+Constructor (java.lang.Class, boolean, java.util.Map, java.util.function.Function)> calls method in (KafkaSerializerWrapper.java:54)
+Constructor (org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method in (KafkaWriter.java:134)
+Constructor (org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method in (KafkaWriter.java:135)
+Constructor (org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method in (KafkaWriter.java:136)
+Constructor (org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method in (KafkaWriter.java:137)
+Constructor (org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method in (KafkaWriter.java:138)
+Constructor (org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method in (KafkaWriter.java:173)
+Constructor (java.lang.String)> calls method in (KafkaWriterState.java:28)
+Constructor (int, int, java.util.function.Function, java.util.function.Consumer)> calls method in (TransactionAborter.java:60)
+Constructor (java.lang.Class)> calls constructor ()> in (YamlFileMetadataService.java:270)
+Constructor (org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber, org.apache.flink.connector.kafka.source.enumerator.metadata.KafkaMetadataService, org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer, java.util.Properties, org.apache.flink.api.connector.source.Boundedness, org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState, org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy$StoppableKafkaEnumContextProxyFactory)> is annotated with in (DynamicKafkaSourceEnumerator.java:0)
+Constructor (java.lang.String, org.apache.flink.connector.kafka.source.enumerator.metadata.KafkaMetadataService, org.apache.flink.api.connector.source.SplitEnumeratorContext)> calls constructor (java.lang.String)> in (StoppableKafkaEnumContextProxy.java:90)
+Constructor (org.apache.flink.api.connector.source.SourceReaderContext, org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema, java.util.Properties)> calls constructor (int)> in (DynamicKafkaSourceReader.java:111)
+Constructor (java.util.List, java.util.regex.Pattern, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, long, boolean)> calls method in (FlinkKafkaConsumerBase.java:253)
+Constructor (java.util.List, java.util.regex.Pattern, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, long, boolean)> calls method in (FlinkKafkaConsumerBase.java:251)
+Constructor ()> calls constructor ()> in (FlinkKafkaProducer.java:1733)
+Constructor (java.util.Set)> calls method in (FlinkKafkaProducer.java:1599)
+Constructor (java.util.Set)> is annotated with in (FlinkKafkaProducer.java:0)
+Constructor (java.lang.String, long, short, org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer)> is annotated with in (FlinkKafkaProducer.java:0)
+Constructor (java.lang.String, org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer)> is annotated with in (FlinkKafkaProducer.java:0)
+Constructor (org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer)> is annotated with in (FlinkKafkaProducer.java:0)
+Constructor ()> calls constructor ()> in (FlinkKafkaProducer.java:1879)
+Constructor ()> calls constructor ()> in (FlinkKafkaProducer.java:1630)
+Constructor (java.lang.String, org.apache.flink.streaming.util.serialization.KeyedSerializationSchema, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner)> calls constructor ()> in (FlinkKafkaProducerBase.java:120)
+Constructor (java.lang.String, org.apache.flink.streaming.util.serialization.KeyedSerializationSchema, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner)> calls method in (FlinkKafkaProducerBase.java:144)
+Constructor (java.lang.String, org.apache.flink.streaming.util.serialization.KeyedSerializationSchema, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner)> calls method in (FlinkKafkaProducerBase.java:146)
+Constructor (java.lang.Object, java.util.List, org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> calls method in (AbstractFetcher.java:584)
+Constructor (java.lang.Object, java.util.List, org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> calls method in (AbstractFetcher.java:586)
+Constructor (java.lang.Object, java.util.List, org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> has parameter of type in (AbstractFetcher.java:0)
+Constructor (java.lang.Object, java.util.List, org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> has parameter of type in (AbstractFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, org.apache.flink.metrics.MetricGroup, boolean)> calls constructor (org.apache.flink.api.common.eventtime.WatermarkOutput)> in (AbstractFetcher.java:154)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, org.apache.flink.metrics.MetricGroup, boolean)> calls method in (AbstractFetcher.java:152)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, org.apache.flink.metrics.MetricGroup, boolean)> calls method in (AbstractFetcher.java:156)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, org.apache.flink.metrics.MetricGroup, boolean)> calls method in (AbstractFetcher.java:159)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, org.apache.flink.metrics.MetricGroup, boolean)> has generic parameter type >> with type argument depending on in (AbstractFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, org.apache.flink.metrics.MetricGroup, boolean)> has parameter of type in (AbstractFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, org.apache.flink.metrics.MetricGroup, boolean)> has parameter of type in (AbstractFetcher.java:0)
+Constructor (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor, int, int)> calls method in (AbstractPartitionDiscoverer.java:81)
+Constructor (org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread, org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback)> calls method in (KafkaConsumerThread.java:539)
+Constructor (org.slf4j.Logger, org.apache.flink.streaming.connectors.kafka.internals.Handover, java.util.Properties, org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue, java.lang.String, long, boolean, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup)> calls method in (KafkaConsumerThread.java:136)
+Constructor (org.slf4j.Logger, org.apache.flink.streaming.connectors.kafka.internals.Handover, java.util.Properties, org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue, java.lang.String, long, boolean, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup)> calls method in (KafkaConsumerThread.java:137)
+Constructor (org.slf4j.Logger, org.apache.flink.streaming.connectors.kafka.internals.Handover, java.util.Properties, org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue, java.lang.String, long, boolean, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup)> calls method in (KafkaConsumerThread.java:138)
+Constructor (org.slf4j.Logger, org.apache.flink.streaming.connectors.kafka.internals.Handover, java.util.Properties, org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue, java.lang.String, long, boolean, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup)> calls method in (KafkaConsumerThread.java:139)
+Constructor (org.slf4j.Logger, org.apache.flink.streaming.connectors.kafka.internals.Handover, java.util.Properties, org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue, java.lang.String, long, boolean, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup)> calls method in (KafkaConsumerThread.java:140)
+Constructor (org.slf4j.Logger, org.apache.flink.streaming.connectors.kafka.internals.Handover, java.util.Properties, org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue, java.lang.String, long, boolean, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup)> calls method in (KafkaConsumerThread.java:142)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, java.lang.String, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, java.util.Properties, long, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup, boolean)> has generic parameter type >> with type argument depending on in (KafkaFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, java.lang.String, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, java.util.Properties, long, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup, boolean)> has parameter of type in (KafkaFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, java.lang.String, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, java.util.Properties, long, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup, boolean)> has parameter of type in (KafkaFetcher.java:0)
+Constructor (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor, int, int, java.util.Properties)> calls method in (KafkaPartitionDiscoverer.java:50)
+Constructor (org.apache.flink.api.common.typeutils.TypeSerializer)> is annotated with in (KafkaShuffleFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, java.lang.String, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, java.util.Properties, long, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup, boolean, org.apache.flink.api.common.typeutils.TypeSerializer, int)> has generic parameter type >> with type argument depending on in (KafkaShuffleFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, java.lang.String, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, java.util.Properties, long, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup, boolean, org.apache.flink.api.common.typeutils.TypeSerializer, int)> has parameter of type in (KafkaShuffleFetcher.java:0)
+Constructor (org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long, java.lang.ClassLoader, java.lang.String, org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema, java.util.Properties, long, org.apache.flink.metrics.MetricGroup, org.apache.flink.metrics.MetricGroup, boolean, org.apache.flink.api.common.typeutils.TypeSerializer, int)> has parameter of type in (KafkaShuffleFetcher.java:0)
+Constructor (java.util.List, java.util.regex.Pattern)> calls method in (KafkaTopicsDescriptor.java:45)
+Constructor (java.util.List, java.util.regex.Pattern)> calls method in (KafkaTopicsDescriptor.java:51)
+Constructor (java.lang.String, int, int, int, int)> calls method in (TransactionalIdsGenerator.java:56)
+Constructor (java.lang.String, int, int, int, int)> calls method in (TransactionalIdsGenerator.java:57)
+Constructor (java.lang.String, int, int, int, int)> calls method in (TransactionalIdsGenerator.java:58)
+Constructor (java.lang.String, int, int, int, int)> calls method in (TransactionalIdsGenerator.java:59)
+Constructor (java.lang.String, int, int, int, int)> calls method in (TransactionalIdsGenerator.java:61)
+Constructor (java.lang.String, org.apache.flink.api.common.serialization.TypeInformationSerializationSchema, org.apache.flink.api.common.typeutils.TypeSerializer, java.util.Properties)> calls method in (FlinkKafkaShuffleConsumer.java:56)
+Constructor (java.lang.String, org.apache.flink.api.common.serialization.TypeInformationSerializationSchema, org.apache.flink.api.common.typeutils.TypeSerializer, java.util.Properties)> calls method in (FlinkKafkaShuffleConsumer.java:59)
+Constructor (java.lang.String, org.apache.flink.api.common.typeutils.TypeSerializer, java.util.Properties, org.apache.flink.api.java.functions.KeySelector, org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$Semantic, int)> calls method in (FlinkKafkaShuffleProducer.java:71)
+Constructor (java.lang.String, org.apache.flink.api.common.typeutils.TypeSerializer, java.util.Properties, org.apache.flink.api.java.functions.KeySelector, org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$Semantic, int)> calls method in (FlinkKafkaShuffleProducer.java:74)
+Constructor (org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer)> calls constructor (org.apache.flink.streaming.api.functions.sink.SinkFunction)> in (StreamKafkaShuffleSink.java:35)
+Constructor (int, org.apache.flink.api.common.serialization.DeserializationSchema, [I, org.apache.flink.api.common.serialization.DeserializationSchema, [I, boolean, [Lorg.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema$MetadataConverter;, org.apache.flink.api.common.typeinfo.TypeInformation, boolean)> calls method in (DynamicKafkaDeserializationSchema.java:70)
+Constructor (java.lang.String, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, [Lorg.apache.flink.table.data.RowData$FieldGetter;, boolean, [I, boolean)> calls method in (DynamicKafkaRecordSerializationSchema.java:59)
+Constructor (java.lang.String, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, [Lorg.apache.flink.table.data.RowData$FieldGetter;, boolean, [I, boolean)> calls method in (DynamicKafkaRecordSerializationSchema.java:63)
+Constructor (java.lang.String, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, [Lorg.apache.flink.table.data.RowData$FieldGetter;, boolean, [I, boolean)> calls method in (DynamicKafkaRecordSerializationSchema.java:66)
+Constructor (java.lang.String, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.api.common.serialization.SerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, [Lorg.apache.flink.table.data.RowData$FieldGetter;, boolean, [I, boolean)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:175)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:156)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:158)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:161)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:162)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:163)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:169)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:170)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, org.apache.flink.table.connector.format.EncodingFormat, [I, [I, java.lang.String, java.lang.String, java.util.Properties, org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner, org.apache.flink.connector.base.DeliveryGuarantee, boolean, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, java.lang.Integer, java.lang.String)> calls method in (KafkaDynamicSink.java:173)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:210)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:194)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:198)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:201)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:203)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:216)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:218)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:220)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:224)
+Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.format.DecodingFormat, [I, [I, java.lang.String, java.util.List, java.util.regex.Pattern, java.util.Properties, org.apache.flink.streaming.connectors.kafka.config.StartupMode, java.util.Map, long, org.apache.flink.streaming.connectors.kafka.config.BoundedMode, java.util.Map, long, boolean, java.lang.String)> calls method in (KafkaDynamicSource.java:226)
+Constructor (org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink$PrecommittingStatefulSinkWriter, org.apache.flink.table.types.DataType, [I, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, org.apache.flink.api.common.operators.ProcessingTimeService, java.util.function.Function)> calls method in (ReducingUpsertWriter.java:70)
+Constructor (org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink$PrecommittingStatefulSinkWriter, org.apache.flink.table.types.DataType, [I, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, org.apache.flink.api.common.operators.ProcessingTimeService, java.util.function.Function)> calls method in (ReducingUpsertWriter.java:71)
+Constructor (org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink$PrecommittingStatefulSinkWriter, org.apache.flink.table.types.DataType, [I, org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode, org.apache.flink.api.common.operators.ProcessingTimeService, java.util.function.Function)> calls method in (ReducingUpsertWriter.java:72)
+Field has type in (YamlFileMetadataService.java:0)
+Field has generic type > with type argument depending on in (KafkaClusterMetricGroupManager.java:0)
+Field has type in (DynamicKafkaSourceReader.java:0)
+Field has type in (FlinkKafkaConsumerBase.java:0)
+Field has type in (FlinkKafkaProducerBase.java:0)
+Field has type in (AbstractFetcher.java:0)
+Field has type in (AbstractFetcher.java:0)
+Field has type in (AbstractFetcher.java:0)
+Field has type in (AbstractFetcher.java:0)
+Field