【问题标题】:Serial consumption of Kafka topics from Spark来自 Spark 的 Kafka 主题的串行消费
【发布时间】:2016-08-28 21:10:14
【问题描述】:

给定以下代码:

def createKafkaStream(ssc: StreamingContext, 
                      kafkaTopics: String, brokers: String): DStream[(String, String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.foreach { msg =>
            // Now do some DataFrame-intensive work.
            // As I understand things, DataFrame ops must be run
            // on Workers as well as streaming consumers.
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

我的理解是,Spark 和 Kafka 将自动协同工作,以确定有多少消费者线程部署到可用的 Worker 节点,这可能会导致并行处理来自 Kafka 主题的消息。 p>

但是如果我想要多个并行消费者怎么办?如果希望 1-and-only-1 消费者读取来自主题的下一条消息,完全处理它,然后重新开始并轮询下一条消息。

另外,当我打电话时:

val ssc = new StreamingContext(sc, Seconds(10))

这是否意味着:

  • 单个消费者线程将接收过去 10 秒内发布到主题的所有消息;
  • 单个消费者线程将接收来自主题的下一条(单条)消息,并且每 10 秒轮询下一条消息?

【问题讨论】:

  • 感谢您的反馈,您说了几句话:“对于初学者,您展示的伪代码无法生成有效的 Spark 应用程序(您无法在操作中执行 DataFrame 密集型工作) )." (1) 你是什么意思无法执行?我一直在运行确实调用DataFrame操作的操作,所以你只是说我不应该这样做吗?或者你的意思是说我几周来一直在做的事情应该是不可能的?!?

标签: scala apache-spark apache-kafka spark-streaming


【解决方案1】:

但如果我不想要多个并行消费者怎么办?如果想要怎么办 1-and-only-1 消费者从主题中读取下一条消息, 完全处理它,然后重新开始 轮询下一条消息。

如果这是您的用例,我会说为什么要使用 Spark?它的全部优点是您可以并行阅读。我能想到的唯一 hacky 解决方法是创建一个带有单个分区的 Kafka 主题,这会使 Spark 将整个偏移范围分配给单个工作人员,但这很难看。

这是否意味着单个消费者线程将接收所有消息 在最后 10 秒内发布到主题或单个 消费者线程将接收来自主题的下一条(单条)消息, 并且它会每 10 秒轮询下一条消息?

两者都没有。由于您使用的是直接(无接收器)流方法,这意味着每 10 秒,您的驱动程序将要求 Kafka 向他提供自上一批以来已更改的偏移范围,用于所述主题的每个分区。然后,Spark 将获取每个这样的偏移范围,并将其发送给其中一个工作人员以直接从 Kafka 消费。这意味着使用直接流方法,Kafka 分区和 Spark 分区之间存在 1:1 的对应关系。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-07-09
    • 2018-07-21
    • 1970-01-01
    • 2021-12-12
    • 2022-10-23
    • 2020-12-12
    • 2020-03-05
    相关资源
    最近更新 更多