前言

本文解析生产者流程图中的NetworkClient部分的处理逻辑。
Kafka生产者架构 - NetworkClient

概述:本文主要解析NetworkClient对于请求的进一步处理以及消息发送到Kafka Cluster之后的后置处理,比如对请求完成的处理、响应完成的处理、断开连接的以及新加入连接的处理、超时请求的处理,最后记录相关信息,执行对应的回调,完成或者重试批次的处理。


源码分析

开始源码分析之前,首先看下官方对于NetworkClient的注释说明。

A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.This class is not thread-safe!

接下来正式展开对NetworkClient的源码分析。本文主要分析它的send和poll方法。


send方法

首先看下它的send方法。

我觉得需要关注的点是发送了什么、发送到哪里。

Kafka生产者架构 - NetworkClient

前面在Sender的相关方法中,有封装好一个ClientRequest实例。可以看的出来是发送ClientRequest。

先来看下 doSend方法的前半部分的逻辑。
Kafka生产者架构 - NetworkClient

ensureActive():要求state为ACTIVE状态。
canSendRequest(…):是否满足发送条件。

Kafka生产者架构 - NetworkClient

判断ClusterConnectionStates、Selector、InFlightRequests是否同时满足发送条件。


ClusterConnectionStates,代表每个节点的连接状态。

Kafka生产者架构 - NetworkClient

Selector,Kafka定义的用于处理I/O的模型。
Kafka生产者架构 - NetworkClient
Kafka生产者架构 - NetworkClient

InFlightRequests,代表已经发送的请求,但是还没有得到响应。
Kafka生产者架构 - NetworkClient


接下来看下doSend方法的后续部分。

Kafka生产者架构 - NetworkClient

获取版本号,然后调用doSend(…)方法。

Kafka生产者架构 - NetworkClient

  • makeHeader(…):将ApiKeys、version、clientId、correlationId封装到RequestHeader里。
  • toSend(…):将destination以及序列化后的RequestHeader封装到NetworkSend中。

InFlightRequest,请求已经发送,但是还没有得到响应。这样的请求称之为InFlight。

可以看出NetworkClient#send方法的目的地是Selector。


poll方法

Kafka生产者架构 - NetworkClient

处理之前中断发送的请求。


Kafka生产者架构 - NetworkClient

将abortedSends加入到Response列表中。然后清空abortedSends。

Kafka生产者架构 - NetworkClient

触发ClientResponse#onComplete()逻辑。

Kafka生产者架构 - NetworkClient

也就是触发具体的RequestCompletionHandler属性的onComplete(…)逻辑。


Kafka生产者架构 - NetworkClient

调用Selector#poll(…)进行更底层的I/O逻辑。这里暂时不展开分析。在下一篇文章中,会详细分析。


接下来就是对消息的后置处理。

Kafka生产者架构 - NetworkClient

  1. 完成不需要响应的请求

Kafka生产者架构 - NetworkClient

  • completedSends():在上一次poll调用完成的Send列表。

Kafka生产者架构 - NetworkClient

获取最近一次发送给指定节点的请求。

Kafka生产者架构 - NetworkClient

获取给定节点的InFlightRequest队列。

Kafka生产者架构 - NetworkClient

获取并删除队首元素InFlightRequest。对inFlightRequestCount减一。

Kafka生产者架构 - NetworkClient

构建一个ClientResponse实例。


Kafka生产者架构 - NetworkClient

  1. 处理响应完成的请求

Kafka生产者架构 - NetworkClient

  • completedReceives():从上一次poll调用完成的NetworkReceive列表。

一般情况,根据ApiKeys创建具体的AbstractResponse实例,加入到AbstractReponse列表中。

Kafka生产者架构 - NetworkClient

从InFlightRequest队列中获取并删除队尾元素。inFlightRequestCount减一。

Kafka生产者架构 - NetworkClient

获取给定节点的InFlightRequest队列。

Kafka生产者架构 - NetworkClient

构建一个ClientResponse实例。


Kafka生产者架构 - NetworkClient

  1. 处理断开连接的节点

Kafka生产者架构 - NetworkClient

获取Selector#disconnected属性。

Kafka生产者架构 - NetworkClient
Kafka生产者架构 - NetworkClient

标识元数据需要更新。

Kafka生产者架构 - NetworkClient

Kafka生产者架构 - NetworkClient

更新NodeConnectionState的一些属性。

Kafka生产者架构 - NetworkClient
Kafka生产者架构 - NetworkClient

更新ApiVersions的一些属性。

Kafka生产者架构 - NetworkClient

更新NodeConnectionState的一些属性。

Kafka生产者架构 - NetworkClient
Kafka生产者架构 - NetworkClient

更新Metadata的一些属性。

Kafka生产者架构 - NetworkClient

清空该节点的InFlightRequest集合。如果请求不是内部请求,会构建断开连接的响应,加入到响应列表中。

Kafka生产者架构 - NetworkClient


Kafka生产者架构 - NetworkClient

  1. 处理新建立的连接

Kafka生产者架构 - NetworkClient

获取Selector#connected属性。

Kafka生产者架构 - NetworkClient

Kafka生产者架构 - NetworkClient

更新NodeConnectionState的一些属性。


Kafka生产者架构 - NetworkClient

  1. 发送一个ApiVersionsRequest

Kafka生产者架构 - NetworkClient

  1. 处理超时的请求。具体的和处理断开连接的方式相同。

Kafka生产者架构 - NetworkClient

获取含有处理超时的请求的节点。

Kafka生产者架构 - NetworkClient

判定超时的标准。


Kafka生产者架构 - NetworkClient

  1. 完成正常的请求完成的响应处理。 触发ClientResponse的onComplete()处理。

Kafka生产者架构 - NetworkClient

实际上是触发具体的RequestCompletionHandler#callback属性的onComplete(…)方法。

相关文章: