【问题标题】:Spark Streaming Direct Kafka Consumers are not evenly distrubuted across executorsSpark Streaming Direct Kafka 消费者在执行者之间分布不均
【发布时间】:2020-06-13 02:57:54
【问题描述】:

我在 Spark 中创建了一个示例 Direct Kafka Stream。 Kafka 有 30 个给定主题的分区。但是所有消费者都从同一台执行器机器上执行。

卡夫卡经理截图。

根据我对直接 Kafka Stream 的理解,Driver 将偏移量提供给执行器并以此进行轮询。

Spark 版本:2.4

示例代码如下:



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;
import java.util.HashMap;


public class Main {


    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("StreamingTest");

        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));



        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "test-topic-1");

        kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
            System.out.println("Processing test-topic-1");
            try {
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

        kafkaStream1.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);
            rdd.foreachPartition(partition -> {
                OffsetRange o = beginOffsets[TaskContext.get().partitionId()];

            });
            ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(beginOffsets);
        });



        ssc.start();
        ssc.awaitTermination();
    }

    public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-ids>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"hrishi-testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
    }
}


【问题讨论】:

  • Dstream api 已弃用。您尝试过结构化流式传输吗?
  • 是的,我知道。 Dstream 已被弃用,但我的用例仅限于映射分区,因此这不是问题,目前无法负担迁移。我怀疑它是因为 Dstream。
  • 您也可以映射数据框。无论如何,请显示完整代码,这个属性看起来很可疑spark.streaming.concurrentJobs
  • 添加了完整的示例代码@cricket_007
  • 看起来你的代码并没有真正做任何事情接受提交偏移量。您是否尝试过仅使用非常基本的属性集?你想优化什么?

标签: apache-spark apache-kafka


【解决方案1】:

我发现它正在发生的问题是因为我提交了驱动程序的偏移量。这是代码。

((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);

   kafkaStream1.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(partition -> {
            partition.forEachRemaining(e -> {
                try {
                    System.out.println("hrishi mess" + e);
                    Thread.sleep(2);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });

        });
        ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
    });

接下来,我在 Executors 上启用了调试日志,发现 KafkaRDD 正在从 Kafka 轮询,它在日志中清晰可见。

【讨论】:

    猜你喜欢
    • 2017-02-23
    • 2018-12-18
    • 2014-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多