【问题标题】:Spark Streaming awaitTermination in Jupyter NotebookJupyter Notebook 中的 Spark Streaming awaitTermination
【发布时间】:2019-09-14 08:42:50
【问题描述】:

我正在关注 Apache Spark Definitive Guide 中的代码。当我有注释的代码行“awaitTermination()”时,我遇到了以下代码无法在 Jupyter Notebook 中打印结果的问题。 由于代码中包含“awaitTermination()”,Jupyter 内核处于忙碌状态,并且可能会无限期地保持忙碌状态。

没有“awaitTermination”,代码可以正常工作。

谁能解释一下这种行为。我该如何克服呢?

static = spark.read.json(r"/resources/activity-data/")
dataSchema = static.schema
streaming = (spark
             .readStream
             .schema(dataSchema)
             .option("maxFilesPerTrigger", 1)
             .json(r"/resources/activity-data/")
            )
activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
activityQuery = (activityCounts
                 .writeStream
                 .queryName("activity_counts")
                 .format("memory")
                 .outputMode("complete")
                 .start()
                )
#activityQuery.awaitTermination()
#activityQuery.stop()
from time import sleep
for x in range(5):
    spark.table("activity_counts").show()
    sleep(1)

【问题讨论】:

    标签: apache-spark pyspark spark-streaming


    【解决方案1】:

    是的;请参阅此文档作为参考 (https://docs.databricks.com/spark/latest/structured-streaming/production.html),Spark TDG 中的第 352 页也对此进行了说明。

    Spark Streaming 作业是连续的应用程序,在生产环境中 activityQuery.awaitTermination() 是必需的,因为它可以防止驱动程序进程在流处于活动状态时终止(在后台)。

    如果驱动程序被杀死,那么应用程序也因此被杀死,因此 activityQuery.awaitTermination() 有点像故障安全。如果您想在 Jupyter 中关闭流,您可以运行 activityQuery.stop() 来重置查询以进行测试......我希望这会有所帮助。

    activityDataSample = 'path/to/data'
    spark.conf.set("spark.sql.shuffle.partitions", 8)
    static = spark.read.json(activityDataSample)
    dataSchema = static.schema
    static.printSchema()
    
    streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
    .json(activityDataSample)
    
    activityCounts = streaming.groupBy("gt").count()
    
    activityQuery = activityCounts.writeStream.queryName("activity_counts")\
    .format("memory").outputMode("complete")\
    .start()
    
    # simulates a continuous stream for testing (cntrl-C to kill app)
    '''
    activityQuery = activityCounts.writeStream.queryName("activity_counts")\
    .format("console").outputMode("complete")\
    .start()
    activityQuery.awaitTermination()
    '''
    
    spark.streams.active # query stream is active
    [<pyspark.sql.streaming.StreamingQuery at 0x28a4308d320>]
    
    from time import sleep
    for x in range(3):
        spark.sql("select * from activity_counts").show(3)
        sleep(2)
    +---+-----+
    | gt|count|
    +---+-----+
    +---+-----+
    
    +--------+-----+
    |      gt|count|
    +--------+-----+
    |    bike|10796|
    |    null|10449|
    |stairsup|10452|
    +--------+-----+
    only showing top 3 rows
    
    +--------+-----+
    |      gt|count|
    +--------+-----+
    |    bike|10796|
    |    null|10449|
    |stairsup|10452|
    +--------+-----+
    only showing top 3 rows
    
    activityQuery.stop() # stop query stream
    spark.streams.active # no active streams anymore
    []
    

    【讨论】:

    • 在activityQuery.awaitTermination() 之后添加了activityQuery.stop(),但内核长时间保持忙碌状态。我可以探索数据的唯一方法是在中断内核之后。
    • 所以您使用的是“内存”模式,它仅用于调试/测试,就像“控制台”模式一样。您永远不会在生产环境中使用它。如果您切换到“控制台”模式,您可以在终端中查看小批量结果以进行测试。您的代码可用于学习目的,我在回复中添加了一些代码以进行确认。这能回答你的问题吗?
    • 那么总结是什么?使用 awaitTermination() dataframe.show() 将不会在 Jupyter notebook 中打印数据帧。
    • 您是在笔记本电脑上运行它吗?如果是这样,那么您就不会扩展任何东西……它只是在您机器上的内存中运行。总结是火花流作业被设计成连续的应用程序,awaitTermination() 对此做出了贡献。我不是专家,但请记住,我认为这个话题也令人困惑——这个文档对spark.apache.org/docs/latest/… 有帮助——我所知道的唯一能与 Spark Streaming 应用程序可靠集成的笔记本是 Databricks。 Jupyter 并非旨在扩展/流式传输连续分布式应用程序。
    • 这篇文章可能会有所帮助 => stackoverflow.com/questions/47357418/…
    猜你喜欢
    • 2019-11-24
    • 1970-01-01
    • 2018-09-19
    • 1970-01-01
    • 1970-01-01
    • 2023-03-18
    • 2018-05-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多