前言
本文解析生产者流程图中的NetworkClient部分的处理逻辑。
概述:本文主要解析NetworkClient对于请求的进一步处理以及消息发送到Kafka Cluster之后的后置处理,比如对请求完成的处理、响应完成的处理、断开连接的以及新加入连接的处理、超时请求的处理,最后记录相关信息,执行对应的回调,完成或者重试批次的处理。
源码分析
开始源码分析之前,首先看下官方对于NetworkClient的注释说明。
A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.This class is not thread-safe!
接下来正式展开对NetworkClient的源码分析。本文主要分析它的send和poll方法。
send方法
首先看下它的send方法。
我觉得需要关注的点是发送了什么、发送到哪里。
前面在Sender的相关方法中,有封装好一个ClientRequest实例。可以看的出来是发送ClientRequest。
先来看下 doSend方法的前半部分的逻辑。
ensureActive():要求state为ACTIVE状态。
canSendRequest(…):是否满足发送条件。
判断ClusterConnectionStates、Selector、InFlightRequests是否同时满足发送条件。
ClusterConnectionStates,代表每个节点的连接状态。
Selector,Kafka定义的用于处理I/O的模型。
InFlightRequests,代表已经发送的请求,但是还没有得到响应。
接下来看下doSend方法的后续部分。
获取版本号,然后调用doSend(…)方法。
- makeHeader(…):将ApiKeys、version、clientId、correlationId封装到RequestHeader里。
- toSend(…):将destination以及序列化后的RequestHeader封装到NetworkSend中。
InFlightRequest,请求已经发送,但是还没有得到响应。这样的请求称之为InFlight。
可以看出NetworkClient#send方法的目的地是Selector。
poll方法
处理之前中断发送的请求。
将abortedSends加入到Response列表中。然后清空abortedSends。
触发ClientResponse#onComplete()逻辑。
也就是触发具体的RequestCompletionHandler属性的onComplete(…)逻辑。
调用Selector#poll(…)进行更底层的I/O逻辑。这里暂时不展开分析。在下一篇文章中,会详细分析。
接下来就是对消息的后置处理。
- 完成不需要响应的请求。
- completedSends():在上一次poll调用完成的Send列表。
获取最近一次发送给指定节点的请求。
获取给定节点的InFlightRequest队列。
获取并删除队首元素InFlightRequest。对inFlightRequestCount减一。
构建一个ClientResponse实例。
- 处理响应完成的请求。
- completedReceives():从上一次poll调用完成的NetworkReceive列表。
一般情况,根据ApiKeys创建具体的AbstractResponse实例,加入到AbstractReponse列表中。
从InFlightRequest队列中获取并删除队尾元素。inFlightRequestCount减一。
获取给定节点的InFlightRequest队列。
构建一个ClientResponse实例。
- 处理断开连接的节点。
获取Selector#disconnected属性。
标识元数据需要更新。
更新NodeConnectionState的一些属性。
更新ApiVersions的一些属性。
更新NodeConnectionState的一些属性。
更新Metadata的一些属性。
清空该节点的InFlightRequest集合。如果请求不是内部请求,会构建断开连接的响应,加入到响应列表中。
- 处理新建立的连接。
获取Selector#connected属性。
更新NodeConnectionState的一些属性。
- 发送一个ApiVersionsRequest。
- 处理超时的请求。具体的和处理断开连接的方式相同。
获取含有处理超时的请求的节点。
判定超时的标准。
- 完成正常的请求完成的响应处理。 触发ClientResponse的onComplete()处理。
实际上是触发具体的RequestCompletionHandler#callback属性的onComplete(…)方法。