【问题标题】:Does Kafka have a batch consumer?卡夫卡有批量消费者吗?
【发布时间】:2016-06-17 07:26:56
【问题描述】:

高级消费者 API 似乎一次读取一条消息。

如果消费者想要处理这些消息并将其提交给 Solr 或 Elastic-Search 等其他下游消费者,这对于消费者来说可能会很成问题,因为他们更喜欢批量接收消息而不是一次接收一条消息。

在内存中批处理这些消息也不是一件容易的事,因为只有当批处理已经提交时,Kafka 中的偏移量也需要同步,否则带有未提交下游消息(如在 Solr 或 ES 中)的崩溃 kafka-consumer 将它的偏移量已经更新,因此消息松散。

如果消费者在向下游提交消息之后但在更新消息偏移量之前崩溃,它可能会多次使用消息。

如果 Kafka 批量消费消息,那么一些指向代码/文档的指针将不胜感激。

谢谢!

【问题讨论】:

  • 您要的是什么版本的 Kafka?我假设如果您谈论的是高级消费者,它是 0.8.2 或之前的版本。

标签: apache-kafka


【解决方案1】:

我不知道批量消费者。但即使有一个你的主要问题仍然存在。您希望在成功转发数据后提交偏移量。实现此目的的一种方法是通过设置属性auto.commit.enable = false 来关闭消费者的自动提交。权衡当然是您必须注意何时提交您的偏移量。

在此处查找消费者属性的完整文档:https://kafka.apache.org/documentation.html#consumerconfigs

关于如何手动提交从 java-doc (https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) 窃取的偏移量的一个很好的例子:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "false");
 props.put("auto.commit.interval.ms", "1000");
 props.put("session.timeout.ms", "30000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));
 final int minBatchSize = 200;
 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

【讨论】:

  • 我同意你对自动提交的解释。但就您的代码而言,ConsumerRecord 是 Kafka 0.9 类,而他的问题使他看起来像是在询问 0.9 之前的消费者。虽然他没有明确说明。
  • 如果消费者在提交偏移量之前崩溃,那么消息将被重播。我没有 beginTransaction() 和 endTransaction() 的 DB 等价物。
猜你喜欢
  • 2023-03-29
  • 1970-01-01
  • 2019-07-03
  • 2018-05-05
  • 2021-08-22
  • 1970-01-01
  • 1970-01-01
  • 2020-10-28
  • 2015-12-18
相关资源
最近更新 更多