【发布时间】:2015-11-26 10:39:49
【问题描述】:
Kafka 0.8 官方文档对 Kafka Consumer 的描述如下:
“消费者用消费者组名称标记自己,发布到主题的每条消息都被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的进程中或在不同的机器上。 如果所有消费者实例都有相同的消费者组,那么这就像传统的队列平衡消费者负载一样。”
我使用 Kafka 0.8.1.1 设置了一个 Kafka 集群,并使用 Spark Streaming 作业 (spark 1.3) 从其主题中提取数据。 Spark Streaming 代码如下:
... ...
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokerList);
kafkaParams.put("group.id", groupId);
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws Exception {
long msgNum = strJavaRDD.count();
System.out.println("There are " + msgNum + " messages read from Kafka.");
... ...
return null;}});
然后我提交了两个 Spark Streaming 作业以访问具有相同组 ID 的相同主题。我假设当我向主题发送 100 条消息时,这两个作业总共收到 100 条消息(例如,job1 得到 50,job2 得到 50;或者 job1 得到 100,job2 得到 0)。但是,他们分别得到 100。这样的结果似乎与 Kafka 文档所说的不同。
我的代码有什么问题吗?我是否正确设置了组 ID 配置?这是一个错误还是 createDirectStream() 的设计?
测试环境:Kafka 0.8.1.1 + Spark 1.3.1
【问题讨论】:
-
知道了。我需要使用“createStream”而不是“createDirectStream”在共享单个组 ID 的线程之间共享主题中的消息。
标签: apache-spark apache-kafka spark-streaming