【问题标题】:Guaranteed delivery of multiple messages to Kafka cluster保证将多条消息传递到 Kafka 集群
【发布时间】:2016-07-16 19:04:56
【问题描述】:

如果我连续向 Kafka 集群发布多条消息(使用 new Producer API),我会从生产者那里为每条消息获得一个 Future

现在,假设我已将生产者配置为具有max.in.flight.requests.per.connection = 1retries > 0,我是否可以等待最后一个未来并确定所有先前的也已交付(并按顺序)?还是我需要等待所有期货? 在代码中,我可以这样做

Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
  f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
  f.get();
} catch(ExecutionException e) {
  //handle exception
}

而不是这个:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
  futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
  for(Future<?> f : futureList) {
    f.get();
  }
} catch(ExecutionException e) {
  //handle exception
}

并确保如果这里没有发现任何东西(从第一个 sn-p 开始):

try {
  f.get();
} catch(ExecutionException e) {

然后我的所有消息都已按顺序存储在集群中(无论生产者是否在后台执行了任何重试),如果出现问题,我将在那里得到一个异常,即使它不是第一次遇到问题的最后一个未来(我正在等待)吗?

还有什么奇怪的极端情况需要注意吗?

【问题讨论】:

  • 如果您只是检查最后一个未来,它不会告诉您任何有关先前请求的信息。此外,启用重试可能会导致在失败时重新排序消息。
  • 如果我有 max.in.flight.requests.per.connection = 1 那么不应该阻止重新排序消息吗?如果上一批失败了,应该在继续下一批之前重试,至少根据我的解释。
  • 它将重试,但如果仍然失败n 次 (n=numberOfRetries),它会继续处理下一条消息并可能成功插入它。要处理这种情况,您需要检查每个未来的结果。

标签: java apache-kafka kafka-producer-api


【解决方案1】:

您可以这样做,但如果您 a) 将重试设置为无限(或实际上无限)并且 b) 如果遇到不可重试的异常,则可以丢弃数据。

为了进一步解释,Kafka 有两类异常。可重试异常是指再次运行可能会成功的失败。例如,NotEnoughReplicasException 表示副本数量少于您的要求,因此请求被拒绝。但是如果一个失败的代理重新上线,那么你可能有足够的副本,恢复良好的状态,如果你再次发送请求就会成功。相反,SerializationException 是不可重试的,因为我们没有理由相信如果您再次尝试序列化,结果会有所不同。

生产者重试仅适用于您遇到不可重试异常的点。因此,如果您从未遇到任何这些,请使用无限重试,并使用您提到的其他设置,一旦解决了最终的未来,就可以保证订购和成功交付。但是,由于您可能会遇到不可重试的异常,因此处理每个未来(或回调)肯定要好得多,并确保您至少在请求失败时记录一些内容。

【讨论】:

    【解决方案2】:

    除了 Ewen 所说的之外,您还可以在循环中发送完所有消息后致电flush()。此调用将阻塞,直到所有期货都完成,因此在此之后您可以检查期货是否有任何异常。你需要坚持所有的未来才能做到这一点。

    另一种方法是在您的发送中使用回调并存储任何返回的异常,如下所示。在检查异常之前,再次使用刷新可确保所有发送都已完成。

    Producer<String, String> producer = new KafkaProducer<>(myConfig);
    final ArrayList<Exception> exceptionList = new ArrayList<>();
    
    for(MessageType message : messages){
      producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          if (exception != null) {
            exceptionList.add(exception);
          }
        }
      });
    }
    
    producer.flush();
    
    if (!exceptionList.isEmpty()) {
      // do stuff
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-24
      • 1970-01-01
      相关资源
      最近更新 更多