【发布时间】:2016-07-16 19:04:56
【问题描述】:
如果我连续向 Kafka 集群发布多条消息(使用 new Producer API),我会从生产者那里为每条消息获得一个 Future。
现在,假设我已将生产者配置为具有max.in.flight.requests.per.connection = 1 和retries > 0,我是否可以等待最后一个未来并确定所有先前的也已交付(并按顺序)?还是我需要等待所有期货?
在代码中,我可以这样做:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
f.get();
} catch(ExecutionException e) {
//handle exception
}
而不是这个:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
for(Future<?> f : futureList) {
f.get();
}
} catch(ExecutionException e) {
//handle exception
}
并确保如果这里没有发现任何东西(从第一个 sn-p 开始):
try {
f.get();
} catch(ExecutionException e) {
然后我的所有消息都已按顺序存储在集群中(无论生产者是否在后台执行了任何重试),如果出现问题,我将在那里得到一个异常,即使它不是第一次遇到问题的最后一个未来(我正在等待)吗?
还有什么奇怪的极端情况需要注意吗?
【问题讨论】:
-
如果您只是检查最后一个未来,它不会告诉您任何有关先前请求的信息。此外,启用重试可能会导致在失败时重新排序消息。
-
如果我有 max.in.flight.requests.per.connection = 1 那么不应该阻止重新排序消息吗?如果上一批失败了,应该在继续下一批之前重试,至少根据我的解释。
-
它将重试,但如果仍然失败
n次 (n=numberOfRetries),它会继续处理下一条消息并可能成功插入它。要处理这种情况,您需要检查每个未来的结果。
标签: java apache-kafka kafka-producer-api