【问题标题】:Kafka: What is new API for ConsumerConnector in version 0.11Kafka:0.11 版中 ConsumerConnector 的新 API 是什么
【发布时间】:2017-10-27 22:27:54
【问题描述】:

我正在将 Kafka 客户端从 0.8.2.0 更新为 0.11.0.0

在我的旧代码中,我使用ConsumerConnector 通过createMessageStreams 方法获取消息流,然后遍历每个主题的流。但是,似乎 ConsumerConnector 在新 API 中已被贬低。

package kafka.consumer

import ...
/**
 *  Main interface for consumer
 */
@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0")
trait ConsumerConnector {
   ...
   def createMessageStreams[K,V](topicCountMap: Map[String,Int],
                                keyDecoder: Decoder[K],
                                valueDecoder: Decoder[V]): Map[String,List[KafkaStream[K,V]]]
   ...
}

我查找了新的 API 并找到了两个候选:

  • Client API 在 org.apache.kafka.clients.consumer
  • Stream API 在 org.apache.kafka.streams 中

我应该使用哪一个?而且,如何在新的 Kafka API 中实现同样的功能?

【问题讨论】:

    标签: java scala apache-kafka kafka-consumer-api


    【解决方案1】:

    新消费者示例如下:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    

    更多详情请参阅https://kafka.apache.org/0110/javadoc/index.html

    【讨论】:

    • 所以,在最新的API中我需要定期poll,而不是创建stream?如果我有stream,Kafka 会一直给我发送记录,这样我就不需要不断地轮询。有没有办法在最新的 API 中做到这一点,或者有理由改用你的方式?
    猜你喜欢
    • 2015-09-08
    • 1970-01-01
    • 1970-01-01
    • 2020-07-04
    • 2017-12-24
    • 2018-05-15
    • 1970-01-01
    • 2018-09-04
    • 2022-01-22
    相关资源
    最近更新 更多