【发布时间】:2020-04-21 09:30:54
【问题描述】:
尝试按照网上的说明实现kafka异步生产。这是我的制作人的样子:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public void asynSend(String topic, Integer partition, String message) {
ProducerRecord<Object, Object> data = new ProducerRecord<>(topic, partition,null, message);
producer.send(data, new DefaultProducerCallback());
}
private static class DefaultProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Asynchronous produce failed");
}
}
}
我在这样的 for 循环中生成:
for (int i = 0; i < 5000; i++) {
int partition = i % 2;
FsProducerFactory.getInstance().asynSend(topic, partition,i + "th message to partition " + partition);
}
但是,某些消息可能会丢失。如下图,4508到4999的消息没有投递。
我发现原因可能是生产者进程关闭,缓存中所有当时未发送的消息都会丢失。 在 for 循环之后添加此行将解决此问题:
producer.flush();
但是,我不确定这是否是一个魅力解决方案,因为我注意到有人提到刷新会使异步发送以某种方式同步,任何人都可以解释或帮助我改进它吗?
【问题讨论】:
标签: apache-kafka kafka-producer-api