【问题标题】:kafka asynchronous produce lost messagekafka异步产生丢失消息
【发布时间】:2020-04-21 09:30:54
【问题描述】:

尝试按照网上的说明实现kafka异步生产。这是我的制作人的样子:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public void asynSend(String topic, Integer partition, String message) {
    ProducerRecord<Object, Object> data = new ProducerRecord<>(topic, partition,null, message);
        producer.send(data, new DefaultProducerCallback());
    }

private static class DefaultProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            logger.error("Asynchronous produce failed");
        }
    }
}

我在这样的 for 循环中生成:

for (int i = 0; i < 5000; i++) {
    int partition = i % 2;
    FsProducerFactory.getInstance().asynSend(topic, partition,i + "th message to partition " + partition);
}

但是,某些消息可能会丢失。如下图,4508到4999的消息没有投递。

我发现原因可能是生产者进程关闭,缓存中所有当时未发送的消息都会丢失。 在 for 循环之后添加此行将解决此问题:

producer.flush();

但是,我不确定这是否是一个魅力解决方案,因为我注意到有人提到刷新会使异步发送以某种方式同步,任何人都可以解释或帮助我改进它吗?

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    Kafka - The definitive Guide 书中,有一个异步生产者的示例,与您编写的代码完全相同。它使用sendCallback

    discussion 中写着:

    在退出之前添加flush() 将使客户端等待任何未完成的消息传递给代理(这将在queue.buffering.max.ms 附近,加上延迟)。 如果您在每个 produce() 调用之后添加 flush(),那么您实际上是在实现一个同步生产者。

    但如果你在for 循环之后进行,它不再是同步的,而是异步的。

    您还可以将生产者配置中的acks 设置为all。这样,在主题的复制设置为大于 1 的情况下,您将有更多的保证来成功生成消息。

    【讨论】:

    • thx mike, flush() 现在似乎不是一个糟糕的选择,可以让生产者保持活力并在我的场景中传递所有消息。我也试过你建议的acks,但没有用。
    • 您是否检查了所有分区以查看是否生成了所有数据?在您的屏幕截图中,它仅显示分区 1,但没有显示分区 0。请记住,分区的顺序可能不同。
    • 我的消费者线程每 100 毫秒轮询一次,每次轮询消耗大约 800 条消息。因此,特定分区的输出中有 800 条消息,然后是另一个分区的 800 条消息。我确信这两个分区的数据都已生成,但都没有完全交付。
    猜你喜欢
    • 2017-02-28
    • 2019-05-14
    • 2019-08-10
    • 2018-09-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-19
    • 1970-01-01
    相关资源
    最近更新 更多