一 序

通过上一篇的分析《RecordAccumulator(3)》我们知道,主线程通过KafkaProducer.send()方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络操作是由Sender统一进行的。

sender发消息的大概流程。

  • 用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息。
  • 根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点。
  • 创建请求,每个Node节点只生成一个请求。
  • 调用NetworkClient将请求发送出去。

二 sender的创建与成员变量

  先回顾下KafkaProducer的构造函数,可以看出,Sender就是KafkaProducer中创建的一个Thread.

    KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors interceptors,
                  Time time) {
        try {     
            ...

            //核心
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            //启动sender对应的线程
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

成员变量

public class Sender implements Runnable {

    private final Logger log;

    /* the state of each nodes connection */
    // 每个节点连接的状态KafkaClient实例client
    private final KafkaClient client;

    /* the record accumulator that batches records */
    // 批量记录的记录累加器RecordAccumulator实例accumulator
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    //客户端的元数据Metadata实例
    private final Metadata metadata;

    /* the flag indicating whether the producer should guarantee the message order on the broker or not. */
    //是否顺序
    private final boolean guaranteeMessageOrder;

    /* the maximum request size to attempt to send to the server */
    //试图发送到server端的最大请求大小maxRequestSize
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    // 从server端获得的请求发送的已确认数量acks
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    // 一个失败请求在被放弃之前的重试次数retries
    private final int retries;

    /* the clock instance used for getting the time */
    // 获取时间的时钟Time实例
    private final Time time;

    /* true while the sender thread is still running */
    // Sender线程运行的标志位,注意修饰符
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    // 强行停止的标识位
    private volatile boolean forceClose;

    /* metrics 监控指标 */
    private final SenderMetrics sensors;

    /* the max time to wait for the server to respond to the request*/
    //等到server端响应请求的超时时间
    private final int requestTimeoutMs;

    /* The max time to wait before retrying a request which has failed */
    private final long retryBackoffMs;

    /* current request API versions supported by the known brokers */
    private final ApiVersions apiVersions;

    /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
    private final TransactionManager transactionManager;

    // A per-partition queue of batches ordered by creation time for tracking the in-flight batches
    //tp 与已经发送尚未收到响应的batch映射关系
    private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;

三 核心方法

   Sender实现了Runnable接口,并运行在单独的ioThread中。Sender的run()方法调用了重载的run(long),这才是Sender线程的核心方法,这是发送消息的流程图。

《kafka producer学习笔记7》-sender1

主要流程在sendProducerData(),最后是调用了NetworkClient.poll().

 /**
     * The main run loop for the sender thread
     */
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        //一直运行,直到关闭:注意running修饰
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.如果不是强行停掉,则等待缓存处理完
        while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.强行停止,忽略未完成的
            log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

    /**
     * Run a single iteration of sending
     *
     * @param now The current POSIX time in milliseconds
     */
    void run(long now) {
        if (transactionManager != null) {//事务控制
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.
                    transactionManager.resetProducerId();
                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(
                        new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {
                    accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }

        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

其主要处理逻辑为:
        1、首先进入一个while主循环,当标志位running为true时一直循环,直到close被调用:   调用带参数的run(long now)方法,处理消息的发送;

        2、当close被调用时,running被设置为false,while主循环退出:

              2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;

              2.2、如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求;

              2.3、关闭客户端。
而run(long now) 里面,如果不考虑事务的话,只分为sendProducerData和poll, 所以说kafka 2.0对比1.0对于事务的处理还是很复杂的。代码复杂程度提高了,单独整理吧,现在还没看明白。

//发送数据,核心方法
    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        //获取准备号发送的数据,包括:队列长度大于1、第一个batch满了、没有缓存buffer空间了、正在关闭、在调用flush都会刷新待发送数据。
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        // 存在topic未知leader的副本,则需要更新元数据
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        // 移除没有ready 的 node
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        // create produce requests
        // 获取待发送消息的集合。drain主要是转换,从tp.batch-->node.batch,用于发送给服务器
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        //保证一个 tp 只有一个 RecordBatch 在发送,保证有序性
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
        	 // 如果是消息保序的,则将drain得到的batches对应的tp放入mute队列中
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        accumulator.resetNextBatchExpiryTime();
        //获取超时的数据
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        //已超时,调用expiredBatches处理
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
        // we need to reset the producer id here.  重置超时的batch
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }
        sensors.updateProduceRequestMetrics(batches);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        sendProduceRequests(batches, now);
        return pollTimeout;
    }

1. 从Metadata获取Kafka集群数据

2. 调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送消息,返回ReadyCheckResult对象。

3. 如果ReadyCheckResult中标识有unknownLeaderTopics,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。

4. 针对ReadyCheckResult中ready node集合,循环调用NetworkClient.ready()方法,目的是检查网络方面是否符合发送消息的条件,不符合条件的Node将从readyNodes中移除。NetworkClient类后面单独整理。

5. 通过以上步骤处理后的ready node集合,调用RecordAccumulator.drain()方法,获取待发送的消息集合。

6. 调用RecorAccumulator的drain()方法,将队列记录收集器中的记录转变为tp.batch-->node.batch。便于发送给服务器。

6. getExpiredInflightBatches()方法处理已发送未收到响应的消息。代码逻辑是,遍历RecordAccumulator,调用RecordAccumulator.getDeliveryTimeoutMs()方法获取发送时间和当前时间,判断已经超时的消息。接着调用expiredBatches(),遍历ProducerBatch,查询出已超时的消息,如果已超时,将所有超时的消息添加到expiredBatches中,再调用failBatch()方法,调用ProdcuerBatch的done()方法释放空间。

7. 调用Sender.sendProduceRequest()方法将待发送消息封装成ClientRequest,

8.调用NetwoekClient.send()将ClientRequestx写入KafkaChannel的send字段。

9.调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,并且还会处理服务端发回的响应、处理超时的请求、调用用户自定义的CallBack。

后面会逐步展开介绍。

3.1 创建请求

Sender.sendProduceRequests()方法的功能是将待发送的消息封装成ClientRequest,不管一个Node对应有多少个ProducerBatch,也不管这些ProducerBatch发给几个分区,为每个Node仅仅生成一个ClientRequest对象。

 /**
     * Transfer the record batches into a list of produce requests on a per-node basis
     * 分别发送每个node对应的batches
     */
    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
        	 //调用sendProduceRequest()方法,将发往同一个Node的ProducerBatch封装成一个ClientRequest对象。
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
    }

    /**
     * Create a produce request from the given record batches
     */
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())
            return;
        
        //produceRecordsByPartition和recordsByPartition的value不一样,
        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        // find the minimum magic version used when creating the record sets
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }
        //1.将ProducerBatch列表按照partition分类,整理成上述两个集合。
        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

            // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
            // that the producer starts building the batch and the time that we send the request, and we may have
            // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
            // the new message format, but found that the broker didn't support it, so we need to down-convert on the
            // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
            // not all support the same message format version. For example, if a partition migrates from a broker
            // which is supporting the new magic version to one which doesn't, then we will need to convert.
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionalId = transactionManager.transactionalId();
        }
        //构造produce请求,设置回调handler
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
            	 //设置结果回调方法,在handleProduceResponse对服务端返回结果进行处理,
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        String nodeId = Integer.toString(destination);
        //创建ClientRequest对象,
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                requestTimeoutMs, callback);
        //放入inFlightRequests,调用selector发送
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
    }

主要逻辑:

  • 将一个Node对应的ProducerBatch集合,重新整理为Map<TopicPartition, MemoryRecords> produceRecordsByPartition和rMap<TopicPartition, ProducerBatch> recordsByPartition 两个集合。
  • 创建ProduceRequest.Builder,其中有效负载就是produceRecordsByPartition中的数据。
  • 创建RequestCompletionHandler作为回调对象。
  • 将RequestSend对象和RequestCompletionHandler对象封装进ClientRequest对象中,并将其调用NetworkClient.send()发送

后面会继续整理NIO部分。

 

参考:

《Apache kafka 源码剖析》2.4 

相关文章: