【问题标题】:How to use Consumer API of Kafka 0.8.2?如何使用 Kafka 0.8.2 的 Consumer API?
【发布时间】:2015-08-31 16:27:03
【问题描述】:

我开始使用最新的 Kafka 文档http://kafka.apache.org/documentation.html。但是当我尝试使用新的消费者 API 时遇到了一些问题。我已经通过以下步骤完成了这项工作:

1.添加新的依赖项

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2。添加配置

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3.使用 KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

但是,当我尝试从代理轮询消息时,我得到的只是 null:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

然后我检查了源代码后知道消费者出了什么问题:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

更糟糕的是,我找不到有关 0.8.2 API 的任何其他有用信息,因为有关 Kafka 的所有用法都与最新版本不兼容。有人可以帮助我吗?非常感谢。

【问题讨论】:

  • 新的 KafkaConsumer API 将仅在 0.8.3 cwiki.apache.org/confluence/display/KAFKA/Future+release+plan 中可用。显然,主干中有一些实现,尽管我对状态一无所知。目前我正在使用旧的消费者实现。
  • 谢谢@habsq。那么在所有老的api中,使用Kafka 0.8.x时哪个版本是最好的选择呢?
  • 我使用的是 0.8.1.1,不知道“最佳”选择。
  • 知道了。还是谢谢你。

标签: apache-kafka kafka-consumer-api


【解决方案1】:

我也在尝试在 Kafka 0.8.2.1 之上编写一个 Consumer 来读取新 Producer 产生的消息。

到目前为止,我得到的是 Producer API 已准备好并且可用,而在消费者方面,我们必须等待 0.8.3,正如@habsq 所指出的那样,您已经发现其中包含一些用于消费者的代码,但它仍然不起作用。

所以要使用的客户端(当前的客户端 API)是在您当前 Kafka 版本的“核心”项目中找到的客户端,即 0.8.2.1(最好不要将客户端降级到任何其他版本)。

所以现在我们需要导入两个 jar:一个用于“新”java 客户端,一个用于核心项目,这也取决于您使用的 scala 版本(我使用 2.11)。

在我的例子中,我使用 graddle 来管理依赖项,所以我只需要导入

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}

当您更新依赖项时,它将获得所有需要的库。

如果您使用不同的 Scala 版本,只需更改版本即可;无论如何,您可以在 maven Central 上找到所有不同的版本或完整的 pom: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

如果您使用这些 Consumer 实现,所有当前示例都应该照常工作。

PS ref: Kafka-users ml 线程http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

【讨论】:

  • 我明确地放置了两个依赖项,因为将来我们只需要将第一个放在“clients”jar 上,然后将一个放在“core”kafka 上。其实现在client也是core的一个依赖,想要简洁的可以用core kafka的依赖。
【解决方案2】:

是的,我可以确认 0.8.2.1 版在使用消息方面存在问题。现在使用 Java / Groovy 制作一个简单的消费者并发布 0.10.1.0,一切都运行良好。

不再需要设置PARTITION_ASSIGNMENT_STRATEGY

【讨论】:

    猜你喜欢
    • 2017-02-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-02
    • 2016-07-19
    • 2015-04-18
    • 2023-03-23
    • 2017-10-10
    相关资源
    最近更新 更多