【问题标题】:How to specify batch interval in Spark Structured Streaming?如何在 Spark Structured Streaming 中指定批处理间隔?
【发布时间】:2020-01-05 16:54:39
【问题描述】:

我正在通过 Spark Structured Streaming 并遇到问题。

在StreamingContext,DStreams中,我们可以定义一个batch interval如下:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval

如何在结构化流中做到这一点?

我的流媒体是这样的:

sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()

stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")

sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start() 

此代码按预期工作,但是如何/在哪里定义批处理间隔?

我是结构化流媒体的新手,请指导我。

【问题讨论】:

    标签: apache-spark pyspark spark-structured-streaming


    【解决方案1】:

    tl;dr 使用trigger(...)(在DataStreamWriter 上,即在writeStream 之后)


    这是一个很好的来源https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

    有多种选择,如果不设置批处理间隔,Spark 会在处理完最后一个批处理后立即查找数据。触发器就是这里。

    来自手册:

    流式查询的触发设置定义了 流数据处理,查询是否将被执行为 具有固定批次间隔或连续的微批次查询 处理查询。

    一些例子:

    默认触发器(尽快运行微批处理)

    df.writeStream \
      .format("console") \
      .start()
    

    ProcessingTime 触发器,微批处理间隔为 2 秒

    df.writeStream \
      .format("console") \
      .trigger(processingTime='2 seconds') \
      .start()
    

    一次性触发

    df.writeStream \
      .format("console") \
      .trigger(once=True) \
      .start()
    

    以一秒的检查点间隔连续触发

    df.writeStream
      .format("console")
      .trigger(continuous='1 second')
      .start()
    

    【讨论】:

    • @JacekLaskowski 没有使用 gitbook?
    • @thebluephantom 你的意思是我不再用 gitbook 来做内部网书了?是的......他们放弃了 git 支持,所以我不再喜欢他们了。寻找替代品。
    猜你喜欢
    • 2017-06-24
    • 2023-03-31
    • 2019-06-25
    • 2018-08-08
    • 2021-05-22
    • 2020-01-18
    • 2018-08-09
    • 1970-01-01
    • 2016-09-18
    相关资源
    最近更新 更多