文章目录
前言
首先看下官方注释对于Fetcher的解释说明。
作用:用于负责从Kafka服务器获取数据。
Fetcher的请求、响应可能会被多个不同的线程处理。其他操作是使用的单线程(负责拉取消费者的线程)。
源码
接下来分析下它的相关源码。
sendFetches
发起一个获取数据的请求。
Part One
选择合适的节点。保存节点与发送的相关数据。
确保当前已分配分区的TopicPartition信息在leader副本中保存的是最新的。(通过校验上一次消费记录的LeaderAndEpoch与元数据中保存的LeaderAndEpoch是否相同的方式)
- 对分区的获取状态(FetchState)进行判断。
- 校验上一次消费记录的LeaderAndEpoch与元数据中保存的LeaderAndEpoch是否相同。
- 如果不相同,构造一个新的FetchPosition实例,进行之后的校验处理。
- position:FetchPosition类型。表示上一次消费的位置。
- currentLeader:LeaderAndEpoch类型。
对于不同状态的FetchStates(await_validation、fetching),分别对position、nextRetryTimeMs参数进行不同的更新。
获取可以获取数据的所有分区。进行遍历。
- nextInLineRecords:PartitionRecords类型。封装了CompletedFetch等信息。
- completedFetches:CompletedFetch队列。
判断分区可以获取数据的条件是该分区不在exclude集合中,并且分区的状态是可以获取的。
- 获取分区保存的上一次消费的位置。
- 选择preferredReadReplica属性对应的节点来读取数据。
- subscriptions:SubscriptionState类型。
如果选择的节点为空,则发起元数据更新请求。
如果选择的节点不为空,但是客户端无法成功连接该节点,则抛出异常。
在nodesWithPendingFetchRequests集合中的节点,进行跳过。
获取或者创建FetchSessionHandler。
Part Two
- 构造FetchRequest.Builder实例。
- 使用ConsumerNetworkClient向指定节点发送获取数据的请求。
Part Three
添加监听器。
onSuccess
使用FetchSessionHandler对响应进行处理。(对nextMetadata参数进行更新)
如果PartitionData为空,抛出异常。
如果PartitionData不为空,构造一个CompletedFetch实例,存到completedFetches队列中。
onFailure
重置元数据的epoch为0。
resetOffsetsIfNeeded
对需要重置位移的分区,进行重置位移。
获取需要重置位移的所有分区。
获取位移重置的时间戳。
发送重置位移请求。添加监听器。
发送位移重置请求,并处理响应。