【问题标题】:Spark Kafka streaming doesn't distribute consumer load on worker nodesSpark Kafka 流不会在工作节点上分配消费者负载
【发布时间】:2019-03-30 10:12:54
【问题描述】:

我创建了以下应用程序,可在 20 秒窗口内打印特定消息:

public class SparkMain {

public static void main(String[] args) {
    Map<String, Object> kafkaParams = new HashMap<>();

    kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
    kafkaParams.put(GROUP_ID_CONFIG, "spark-consumer-id");
    kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // events topic has 2 partitions
    Collection<String> topics = Arrays.asList("events");

    // local[*] Run Spark locally with as many worker threads as logical cores on your machine.
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SsvpSparkStreaming");

    // Create context with a 1 seconds batch interval
    JavaStreamingContext streamingContext =
            new JavaStreamingContext(conf, Durations.seconds(1));

    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    // extract event name from record value
    stream.map(new Function<ConsumerRecord<String, String>, String>() {
        @Override
        public String call(ConsumerRecord<String, String> rec) throws Exception {
            return rec.value().substring(0, 5);
        }})
    // filter events
    .filter(new Function<String, Boolean>() {
        @Override
        public Boolean call(String eventName) throws Exception {
            return eventName.contains("msg");
        }})
    // count with 20sec window and 5 sec slide duration
    .countByValueAndWindow(Durations.seconds(20), Durations.seconds(5))
    .print();

    streamingContext.checkpoint("c:\\projects\\spark\\");
    streamingContext.start();
    try {
        streamingContext.awaitTermination();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

在日志中运行 main 方法后,我只看到单个消费者初始化获取两个分区:

2018-10-25 18:25:56,007 信息 [org.apache.kafka.common.utils.LogContext$KafkaLogger.info] -

消费者的数量不应该等于spark工人的数量吗? 依据 https://spark.apache.org/docs/2.3.2/submitting-applications.html#master-urls

local[*] 表示 - 在本地运行 Spark,工作线程数与您机器上的逻辑核心数一样多。

我有 8 个核心 CPU,所以我预计应该创建 8 个消费者或至少 2 个消费者,每个消费者都获得“事件”主题的分区(2 个分区)。

在我看来,我需要运行一个具有 2 个节点的完整独立 spark master-worker 集群,每个节点都启动自己的消费者...

【问题讨论】:

    标签: java apache-spark apache-kafka spark-streaming


    【解决方案1】:

    您不一定需要单独的工作人员或运行集群管理器。

    听起来您正在寻找使用 2 个 Spark 执行器

    How to set amount of Spark executors?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-05-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多