【问题标题】:Meaning of sendOffsetsToTransaction in Kafka 0.11Kafka 0.11 中 sendOffsetsToTransaction 的含义
【发布时间】:2017-12-24 23:42:39
【问题描述】:

新的 Kafka 版本 (0.11) 支持仅一次语义。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

我有一个像这样在 java 中使用 kafka 事务代码的生产者设置。

producer.initTransactions();
    try {
        producer.beginTransaction();
        for (ProducerRecord<String, String> record : payload) {
            producer.send(record);
        }

        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
            {
                put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
            }
        };
        producer.sendOffsetsToTransaction(groupCommit, "groupId");
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.close();
    } catch (KafkaException e) {
        producer.abortTransaction();
    }

我不太确定如何使用 sendOffsetsToTransaction 以及它的预期用例。 AFAIK,消费者组是消费者端的多线程读取功能。

javadoc 说

" 向消费者组协调器发送一个消耗的偏移量列表,并将这些偏移量标记为当前事务的一部分。这些偏移量只有在事务提交成功时才会被视为已消耗。需要时应使用此方法将消费和生产的消息一起批量处理,通常采用消费-转换-生产模式。”

produce 将如何维护消耗的偏移量列表?它有什么意义?

【问题讨论】:

    标签: java multithreading apache-kafka kafka-producer-api


    【解决方案1】:

    这仅与您正在消费然后根据您消费的内容生成消息的工作流程相关。此功能允许您仅在下游生产成功时提交您消耗的偏移量。如果您使用数据,以某种方式对其进行处理,然后生成结果,这将启用跨消费/生产的事务保证。

    在没有事务的情况下,您通常使用Consumer#commitSync()Consumer#commitAsync() 来提交消费者偏移量。但是,如果您在与生产者一起生产之前使用这些方法,那么您将在知道生产者是否成功发送之前已经提交了偏移量。

    因此,您可以在生产者上使用Producer#sendOffsetsToTransaction() 来提交偏移量,而不是向消费者提交偏移量。这会将偏移量发送到处理事务的事务管理器。只有当整个事务(消费和生产)成功时,它才会提交偏移量。

    (注意:当你将偏移量发送到commit时,你应该在最后一次读取的偏移量上加1,这样以后的读取就会从你没有读取的偏移量中恢复。这是真的,不管无论您是与消费者还是生产者一起提交。请参阅:KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset)。

    【讨论】:

    • 建议对注释进行改进,将最后读取偏移量加 1。 long newOffset = consumer.position(topicPartition);这给出了将给出的下一条记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一
    • @PranaviChandramohan 我相信答案的末尾会注明这一点
    猜你喜欢
    • 2020-07-04
    • 1970-01-01
    • 1970-01-01
    • 2018-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-15
    相关资源
    最近更新 更多