【问题标题】:Why did two spark streaming jobs pull messages from the same Kafka topic with same group id not balancing load but getting same messages?为什么两个 Spark 流作业从具有相同组 id 的同一个 Kafka 主题中提取消息而不是平衡负载但获取相同的消息?
【发布时间】:2015-11-26 10:39:49
【问题描述】:

Kafka 0.8 官方文档对 Kafka Consumer 的描述如下:

“消费者用消费者组名称标记自己,发布到主题的每条消息都被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的进程中或在不同的机器上。 如果所有消费者实例都有相同的消费者组,那么这就像传统的队列平衡消费者负载一样。”

我使用 Kafka 0.8.1.1 设置了一个 Kafka 集群,并使用 Spark Streaming 作业 (spark 1.3) 从其主题中提取数据。 Spark Streaming 代码如下:

    ... ...

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokerList);
    kafkaParams.put("group.id", groupId);

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );

    messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {

        @Override
        public Void call(JavaPairRDD<String, String> rdd) throws Exception {
            long msgNum = strJavaRDD.count();
            System.out.println("There are " + msgNum + " messages read from Kafka.");

        ... ...

        return null;}});

然后我提交了两个 Spark Streaming 作业以访问具有相同组 ID 的相同主题。我假设当我向主题发送 100 条消息时,这两个作业总共收到 100 条消息(例如,job1 得到 50,job2 得到 50;或者 job1 得到 100,job2 得到 0)。但是,他们分别得到 100。这样的结果似乎与 Kafka 文档所说的不同。

我的代码有什么问题吗?我是否正确设置了组 ID 配置?这是一个错误还是 createDirectStream() 的设计?

测试环境:Kafka 0.8.1.1 + Spark 1.3.1

【问题讨论】:

  • 知道了。我需要使用“createStream”而不是“createDirectStream”在共享单个组 ID 的线程之间共享主题中的消息。

标签: apache-spark apache-kafka spark-streaming


【解决方案1】:

Group 是 Kafka 0.9 之前的高级消费者 API 的一个功能,在简单的消费者 API 中不可用。 createDirectStream 使用简单的消费者 API。

一些提示:

  1. 使用 SimpleConsumer 实现的主要原因是您希望对分区消耗的控制比 Consumer Group 给您的更大。 (EG:多次阅读一条消息)

  2. createDirectStream:这种方法不使用receiver来接收数据,而是定期向Kafka查询每个topic+partition中的最新offset,并相应地定义每个batch中要处理的offset范围。

参考:

  1. Spark Streaming + Kafka Integration Guide
  2. 0.8.0 SimpleConsumer Example

Kafka 0.9.0 版本添加了一个新的 Java 消费者来替换现有的基于 ZooKeeper 的高级消费者和低级消费者 API。然后你可以同时使用 group 和 commit offset 手册。

【讨论】:

    【解决方案2】:

    创建两个不同的 spark 应用来用相同的消息做同样的事情是没有意义的。与更多执行者一起使用一个应用程序。

    【讨论】:

      猜你喜欢
      • 2021-10-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多