11---
22title : Flink源码 - 从Kafka Connector看Source接口重构
33tags : flink
4+ outline : deep
45---
56
67# ** Flink源码 - 从Kafka Connector看Source接口重构**
@@ -85,7 +86,7 @@ Split分配流程由`KafkaSourceEnumerator`实现, 它是`SplitEnumerator`的实
8586
8687可以看到` KafkaSourceEnumerator ` 的Partition分配流程还是比较复杂的, 不过在把握整体流程之后再阅读各个函数的代码, 其实也不难理解.
8788
88- ## 数据读取流程
89+ ### 数据读取流程
8990
9091当` KafkaSourceReader ` 接收到来自` KafkaSourceEnumerator ` 分配的Partition之后, 就会开始真正进行数据读取了. 数据读取的整体流程还是比较简单的, ` KafkaSourceReader ` 运行在Task主线程中, 非阻塞地从` FutureCompletingBlockingQueue ` 中获取数据, 如果有数据就使用` KafkaRecordEmitter ` 向下游发送数据. ` SplitFetcher ` 是真正的IO线程, 通过` KafkaPartitionSplitReader ` 从Kafka读取数据后放入` FutureCompletingBlockingQueue ` .
9192
@@ -106,7 +107,7 @@ Split分配流程由`KafkaSourceEnumerator`实现, 它是`SplitEnumerator`的实
106107 ![ kafka-source-reader-io-3.png] ( ./img/flink-connector-kafka/kafka-source-reader-io-3.png )
107108
108109
109- ## ** SplitFetcher生命周期**
110+ ### ** SplitFetcher生命周期**
110111
111112通过上文的分析我们已经知道, ` SplitFetcher ` 是真正从数据源读取数据的任务, 它继承自` Runnable ` 并运行在` SplitFetcherManager ` 的线程池中. ` SplitFetcher ` 任务的状态在运行过程中会不断发生变化, 笔者将` SplitFetcher ` 的生命周期总结为以下四个状态(** 需要注意的是这几个状态是笔者总结的逻辑上的状态, 并不与Java线程的状态完全对应** ):
112113
@@ -138,7 +139,7 @@ Split分配流程由`KafkaSourceEnumerator`实现, 它是`SplitEnumerator`的实
138139
139140` SpliteFetcher ` 是真正实现数据读取的任务, 理解它的生命周期和执行流程是理解新版Source接口的核心之一.
140141
141- ## ** Watermark对齐流程**
142+ ### ** Watermark对齐流程**
142143
143144新Source接口中一个十分重要的特性就是Watermark对齐, 用于解决[ Event Time倾斜问题] ( https://liebing.org.cn/flink-watermark.html#Event-Time%E5%80%BE%E6%96%9C%E7%9A%84%E9%97%AE%E9%A2%98 ) . 在Flink 1.15及之后的版本中可以通过如下方式指定对齐参数.
144145
@@ -233,7 +234,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
233234
234235Split级别的Watermark对齐在` checkSplitWatermarkAlignment() ` 中实现, 其调用链较长, 如上图所示. 最终的实现在` KafkaPartitionSplitReader.pauseOrResumeSplits() ` 中, 通过` KafkaConsumer ` 提供的` pause() ` 和` resume() ` 两个方法分别用于停止和继续读取相应的Partition. 此外, 对于Multi-split Multi-threaded模式的实现, 由于一个` SplitFetcher ` 仅读取一个Split, 在需要对齐时可直接在` SplitFetcherManager.pauseOrResumeSplits() ` 通过` SplitFetcher.pause() ` 和` SplitFetcher.resume() ` 将指定的` SplitFetcher ` 暂停或唤醒.
235236
236- ## Checkpoint和Failover流程
237+ ### Checkpoint和Failover流程
237238
238239除了Split分配和数据读取流程, 在生产环境中还需要关注的是Checkpoint流程和Failover流程.
239240
0 commit comments