一 序
通过上一篇的分析《RecordAccumulator(3)》我们知道,主线程通过KafkaProducer.send()方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络操作是由Sender统一进行的。
sender发消息的大概流程。
- 用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息。
- 根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点。
- 创建请求,每个Node节点只生成一个请求。
-
调用NetworkClient将请求发送出去。
二 sender的创建与成员变量
先回顾下KafkaProducer的构造函数,可以看出,Sender就是KafkaProducer中创建的一个Thread.
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Metadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
Time time) {
try {
...
//核心
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
//启动sender对应的线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
成员变量
public class Sender implements Runnable {
private final Logger log;
/* the state of each nodes connection */
// 每个节点连接的状态KafkaClient实例client
private final KafkaClient client;
/* the record accumulator that batches records */
// 批量记录的记录累加器RecordAccumulator实例accumulator
private final RecordAccumulator accumulator;
/* the metadata for the client */
//客户端的元数据Metadata实例
private final Metadata metadata;
/* the flag indicating whether the producer should guarantee the message order on the broker or not. */
//是否顺序
private final boolean guaranteeMessageOrder;
/* the maximum request size to attempt to send to the server */
//试图发送到server端的最大请求大小maxRequestSize
private final int maxRequestSize;
/* the number of acknowledgements to request from the server */
// 从server端获得的请求发送的已确认数量acks
private final short acks;
/* the number of times to retry a failed request before giving up */
// 一个失败请求在被放弃之前的重试次数retries
private final int retries;
/* the clock instance used for getting the time */
// 获取时间的时钟Time实例
private final Time time;
/* true while the sender thread is still running */
// Sender线程运行的标志位,注意修饰符
private volatile boolean running;
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
// 强行停止的标识位
private volatile boolean forceClose;
/* metrics 监控指标 */
private final SenderMetrics sensors;
/* the max time to wait for the server to respond to the request*/
//等到server端响应请求的超时时间
private final int requestTimeoutMs;
/* The max time to wait before retrying a request which has failed */
private final long retryBackoffMs;
/* current request API versions supported by the known brokers */
private final ApiVersions apiVersions;
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager;
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
//tp 与已经发送尚未收到响应的batch映射关系
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
三 核心方法
Sender实现了Runnable接口,并运行在单独的ioThread中。Sender的run()方法调用了重载的run(long),这才是Sender线程的核心方法,这是发送消息的流程图。
主要流程在sendProducerData(),最后是调用了NetworkClient.poll().
/**
* The main run loop for the sender thread
*/
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
//一直运行,直到关闭:注意running修饰
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.如果不是强行停掉,则等待缓存处理完
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.强行停止,忽略未完成的
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
/**
* Run a single iteration of sending
*
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
if (transactionManager != null) {//事务控制
try {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which requires a reset of the producer state.
transactionManager.resetProducerId();
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
return;
}
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now);
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
transactionManager.authenticationFailed(e);
}
}
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
其主要处理逻辑为:
1、首先进入一个while主循环,当标志位running为true时一直循环,直到close被调用: 调用带参数的run(long now)方法,处理消息的发送;
2、当close被调用时,running被设置为false,while主循环退出:
2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;
2.2、如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求;
2.3、关闭客户端。
而run(long now) 里面,如果不考虑事务的话,只分为sendProducerData和poll, 所以说kafka 2.0对比1.0对于事务的处理还是很复杂的。代码复杂程度提高了,单独整理吧,现在还没看明白。
//发送数据,核心方法
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
//获取准备号发送的数据,包括:队列长度大于1、第一个batch满了、没有缓存buffer空间了、正在关闭、在调用flush都会刷新待发送数据。
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 存在topic未知leader的副本,则需要更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
// 移除没有ready 的 node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
// 获取待发送消息的集合。drain主要是转换,从tp.batch-->node.batch,用于发送给服务器
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
//保证一个 tp 只有一个 RecordBatch 在发送,保证有序性
if (guaranteeMessageOrder) {
// Mute all the partitions drained
// 如果是消息保序的,则将drain得到的batches对应的tp放入mute队列中
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
accumulator.resetNextBatchExpiryTime();
//获取超时的数据
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
//已超时,调用expiredBatches处理
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here. 重置超时的batch
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}
1. 从Metadata获取Kafka集群数据
2. 调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送消息,返回ReadyCheckResult对象。
3. 如果ReadyCheckResult中标识有unknownLeaderTopics,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
4. 针对ReadyCheckResult中ready node集合,循环调用NetworkClient.ready()方法,目的是检查网络方面是否符合发送消息的条件,不符合条件的Node将从readyNodes中移除。NetworkClient类后面单独整理。
5. 通过以上步骤处理后的ready node集合,调用RecordAccumulator.drain()方法,获取待发送的消息集合。
6. 调用RecorAccumulator的drain()方法,将队列记录收集器中的记录转变为tp.batch-->node.batch。便于发送给服务器。
6. getExpiredInflightBatches()方法处理已发送未收到响应的消息。代码逻辑是,遍历RecordAccumulator,调用RecordAccumulator.getDeliveryTimeoutMs()方法获取发送时间和当前时间,判断已经超时的消息。接着调用expiredBatches(),遍历ProducerBatch,查询出已超时的消息,如果已超时,将所有超时的消息添加到expiredBatches中,再调用failBatch()方法,调用ProdcuerBatch的done()方法释放空间。
7. 调用Sender.sendProduceRequest()方法将待发送消息封装成ClientRequest,
8.调用NetwoekClient.send()将ClientRequestx写入KafkaChannel的send字段。
9.调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,并且还会处理服务端发回的响应、处理超时的请求、调用用户自定义的CallBack。
后面会逐步展开介绍。
3.1 创建请求
Sender.sendProduceRequests()方法的功能是将待发送的消息封装成ClientRequest,不管一个Node对应有多少个ProducerBatch,也不管这些ProducerBatch发给几个分区,为每个Node仅仅生成一个ClientRequest对象。
/**
* Transfer the record batches into a list of produce requests on a per-node basis
* 分别发送每个node对应的batches
*/
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
//调用sendProduceRequest()方法,将发往同一个Node的ProducerBatch封装成一个ClientRequest对象。
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
/**
* Create a produce request from the given record batches
*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
//produceRecordsByPartition和recordsByPartition的value不一样,
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
//1.将ProducerBatch列表按照partition分类,整理成上述两个集合。
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
//构造produce请求,设置回调handler
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
//设置结果回调方法,在handleProduceResponse对服务端返回结果进行处理,
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
//创建ClientRequest对象,
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
//放入inFlightRequests,调用selector发送
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
主要逻辑:
- 将一个Node对应的ProducerBatch集合,重新整理为Map<TopicPartition, MemoryRecords> produceRecordsByPartition和rMap<TopicPartition, ProducerBatch> recordsByPartition 两个集合。
- 创建ProduceRequest.Builder,其中有效负载就是produceRecordsByPartition中的数据。
- 创建RequestCompletionHandler作为回调对象。
- 将RequestSend对象和RequestCompletionHandler对象封装进ClientRequest对象中,并将其调用NetworkClient.send()发送
后面会继续整理NIO部分。
参考:
《Apache kafka 源码剖析》2.4