【问题标题】:Using Spark StreamingContext to Consume from Kafka topic使用 Spark StreamingContext 从 Kafka 主题中消费
【发布时间】:2017-05-09 14:00:29
【问题描述】:

我是 Spark 和 Kafka 的新手,我正在尝试获取一些 Scala 代码(作为 Spark 作业运行)作为长期运行的进程(不仅仅是短期/计划任务)并不断轮询用于消息的 Kafka 代理。当它收到消息时,我只想将它们打印到控制台/STDOUT。同样,这需要是一个长期运行的过程,并且基本上(尝试)永远存在。

在进行了一些挖掘之后,似乎StreamingContext 是我想要使用的。这是我最好的尝试:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.storage._
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

def createKafkaStream(ssc: StreamingContext, kafkaTopics: String, brokers: String): DStream[(String, String)] = {
    val topicsSet = kafkaTopics.split(",").toSet
    val props = Map(
        "bootstrap.servers" -> "my-kafka.example.com:9092",
        "metadata.broker.list" -> "my-kafka.example.com:9092",
        "serializer.class" -> "kafka.serializer.StringEncoder",
        "value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
        "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
        "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
        "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topicsSet)
}

def processEngine(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(1))

    val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc1 = StreamingContext.getActiveOrCreate(processEngine)
ssc1.start()
ssc1.awaitTermination()

当我运行它时,我没有收到任何异常/错误,但似乎什么也没发生。我可以确认有关于该主题的消息。关于我要去哪里出错的任何想法?

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming


    【解决方案1】:

    当您foreachRDD 时,输出会打印在 Worker 节点,而不是 Master。我假设您正在查看 Master 的控制台输出。您可以改用DStream.print

    val ssc = new StreamingContext(sc, Seconds(1))
    val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
    

    另外,不要忘记在ssc.start()之后调用ssc.awaitTermination()

    ssc.start()
    ssc.awaitTermination()
    

    作为旁注,我假设您复制粘贴了此示例,但如果您实际上不打算对 OffsetRange 执行任何操作,则无需在 DStream 上使用 transform

    【讨论】:

    • 谢谢@Yuval (+1) - 你确定这就是它的全部吗?当我运行它时,我得到的输出表明一切都已正常启动,但它似乎在几秒钟后关闭,并且我发布到该主题的所有消息似乎都没有被消耗+打印。想法?
    • @smeeb 我忘了补充说,在调用ssc.start()之后,你需要调用ssc.awaitTermination()。更新了答案。
    • 好的,现在我想我们已经到了某个地方,谢谢@Yuval(再次!)。这解决了停止问题,但我仍然没有看到打印出的主题消息。你确定val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print() 会在消息被消费时将消息打印到 STDOUT 吗?我见过的所有示例都使用map 方法,该方法可以访问event 变量。
    • 我想在我的脑海里我担心我什至根本没有连接到我的 Kafka 代理。如果我运行我的“生产者”组件(上面的代码中没有提供),我可以运行 Kafka kafka-console-consumer.sh(就在 Kafka 服务器上)并查看消息。所以我知道我的制作人正在给这个主题写消息。但到目前为止,我没有看到任何具体证据表明上面的这个“消费者”组件甚至根本就在和 Kafka 对话!
    • Arrrrgggg,我正在查看驱动程序日志。消息 正在 被打印到 STDOUT,而不是我正在寻找的地方。呸呸呸
    【解决方案2】:

    这是您的完整代码吗?你在哪里创建sc?您必须在流式传输上下文之前创建火花上下文。你可以像这样创建sc:

    SparkConf sc = new SparkConf().setAppName("SparkConsumer");
    

    另外,没有awaitTermination,很难捕捉和打印后台数据处理过程中发生的异常。你能在最后加上ssc1.awaitTermination();,看看你有没有错误。

    【讨论】:

    • 感谢@Honda (+1) - 此代码在 Scala“笔记本”内部的 Databricks 上运行,就像 Scala REPL 通过 sc 变量为您提供 SparkContext .为了回答您的问题,即使添加了awaitTermination(请参阅我的编辑),我也没有遇到任何异常。
    • 如果你在shell中启动spark,那么spark-shell脚本会为你创建sc
    猜你喜欢
    • 2018-07-20
    • 2020-03-19
    • 2019-11-15
    • 1970-01-01
    • 1970-01-01
    • 2017-11-24
    • 1970-01-01
    • 2018-07-01
    • 1970-01-01
    相关资源
    最近更新 更多