【问题标题】:How do I consume Kafka topic inside spark streaming app?如何在 Spark Streaming 应用程序中使用 Kafka 主题?
【发布时间】:2020-04-01 19:32:03
【问题描述】:

当我从 Kafka 主题创建流并打印其内容时

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    sc = SparkContext(appName="PythonStreamingKafkaWords")
    ssc = StreamingContext(sc, 10)

    lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})

    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

我得到一个空结果

    -------------------------------------------
    Time: 2019-12-07 13:11:50
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:00
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:10
    -------------------------------------------

同时,它在控制台中工作:

    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

正确地给出了我在 Kafka 主题中的所有文本行:

    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    . . . 

将数据从 Kafka 主题流式传输到 Spark 流式应用程序的正确方法是什么?

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-streaming


    【解决方案1】:

    根据您的代码,我们无法直接打印流式 RDD,应该基于 foreachRDD 进行打印。DStream.foreachRDD 是 Spark Streaming 中的“输出运算符”。它允许您访问 DStream 的底层 RDD 以执行对数据执行实际操作的操作。

    What's the meaning of DStream.foreachRDD function?

    注意::您也可以通过结构化流式传输来实现。参考:Pyspark Structured streaming processing

    示例工作代码:此代码尝试从 kafka 主题读取消息并打印它。您可以根据需要更改此代码。

    from pyspark import SparkConf, SparkContext
    from operator import add
    import sys
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    import json
    
    def handler(message):
        records = message.collect()
        for record in records:
            print(record[1])
    
    def main():
        sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
        ssc = StreamingContext(sc, 10)
    
        kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
        kvs.foreachRDD(handler)
    
        ssc.start()
        ssc.awaitTermination()
    if __name__ == "__main__":
    
       main()
    

    【讨论】:

    • 您能否从答案中删除 Cassandra 代码?这有点让人分心。这不是你说的结构化流媒体
    • 感谢您分享您的 cmets。我已经从这个答案中删除了 Cassandra 代码。正如您所提到的,这不是结构化流方法,并试图根据相关代码在 DSStreaming 方法本身中给出解决方案。除了 DSSreaming 答案,我还给出了结构化流堆栈溢出参考链接。如果您觉得还需要改进,请让我知道会纠正它。谢谢!!
    【解决方案2】:

    您在流输出中看不到任何数据的原因是,默认情况下,spark 流开始从latest 读取数据。因此,如果您先启动 Spark 流应用程序,然后将数据写入 Kafka,您将在流式作业中看到输出。参考文档here

    默认从每个Kafka分区的最新offset开始消费

    但您也可以从主题的任何特定偏移量读取数据。看看createDirectStream 方法here。它需要一个dict参数fromOffsets,您可以在其中指定字典中每个分区的偏移量。

    我已经使用 kafka 2.2.0 和 spark 2.4.3 和 Python 3.7.3 测试了以下代码:

    使用 kafka 依赖项启动 pyspark shell:

    pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0
    

    运行以下代码:

    from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
    from pyspark.streaming import StreamingContext
    ssc = StreamingContext(sc, 1)
    topicPartion = TopicAndPartition('test',0)
    fromOffset = {topicPartion: 0}
    
    lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)
    
    lines.pprint()
    
    ssc.start()
    ssc.awaitTermination()
    
    

    如果你有 kafka 代理版本 10 或更高版本,你应该考虑使用结构化流而不是 Spark 流。请参阅结构化流式处理文档 here 和结构化流式处理与 Kafka 集成 here

    以下是在结构化流中运行的示例代码。 请根据您的 Kafka 版本和 spark 版本使用 jar 版本。 我使用 spark 2.4.3Scala 11kafka 0.10 所以使用 jar spark-sql-kafka-0-10_2.11:2.4.3

    启动pyspark外壳:

    pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
    
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "test") \
      .option("startingOffsets", "earliest") \
      .load()
    
    
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
      .writeStream \
      .format("console") \
      .start()
    

    【讨论】:

      【解决方案3】:

      我建议使用 Spark 结构化流。它是 Spark 2 发布的新一代流引擎。您可以在此link 中查看。

      对于 Kafka 集成,您可以查看link 上的文档。

      【讨论】:

      • 好的,你应该将你的df写入控制台或kafka主题以查看数据。你能在文档中查看this section吗?
      • 我试过这个 # 订阅 1 个主题 df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ . option("subscribe", "sample_topic") \ .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 但我得到 AnalysisException: 'Failed to find data source: kafka .请按照《Structured Streaming + Kafka Integration Guide》的部署部分部署应用。;'
      • 看来 kafka 依赖项在 spark libs 目录中不可用。请确保您已添加 spark+kafka 依赖项。此外,您可以检查部署部分以在此 link 中使用依赖项运行
      • 我同时使用了:1)在 Jupiter notebook 中:import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12 :2.4.4 pyspark-shell' 和 2) 命令行:spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4 test.py - 不走运。跨度>
      猜你喜欢
      • 2017-08-16
      • 2019-07-12
      • 1970-01-01
      • 2018-10-10
      • 2016-11-29
      • 2018-07-02
      • 2019-06-24
      • 2017-09-30
      • 2017-08-30
      相关资源
      最近更新 更多