【问题标题】:Spark Structured Streaming StreamingQueryListener.onQueryProgress not called per microbatch?Spark Structured Streaming StreamingQueryListener.onQueryProgress 不按微批次调用?
【发布时间】:2021-07-13 17:17:57
【问题描述】:

我使用的是 Spark 3.0.2,我有一个流式作业,它使用来自 Kafka 的数据,触发持续时间为“1 分钟”。

我在 Spark UI 中看到定义的每 1 分钟有一个新作业,但我看到方法 onQueryProgress 每 5~6 分钟被调用一次。我认为应该在每个微批处理之后直接调用此方法。

有没有办法控制这个持续时间并使其等于触发持续时间?

【问题讨论】:

  • 结构化流式查询是否每微批量处理数据?或者可能是您在某些微批处理中没有任何数据,因此不会触发 inQueryProgress。
  • 我可以在 Spark UI 中看到它每分钟处理一次,因为有输入数据。

标签: apache-spark spark-structured-streaming spark-kafka-integration


【解决方案1】:

我发现onQueryProgress 方法需要 5 分钟才能完成的原因。

正如 Mike 提到的那样,onQueryProgress 被异步调用,但我认为它使用同一个线程来调用此方法。所以它正在等待方法调用完成再次调用它。

因此,在我的情况下,解决方案是找出为什么需要这么长时间,并使其比触发持续时间更快。

【讨论】:

  • 接受我的回答是否有意义或与您的回答有什么不同?
  • 我的答案和你的答案完全不同。我的问题的原因是 onQueryProgress 方法本身需要 5 分钟才能完成,但 Spark 正在 1 分钟内处理微批处理。
【解决方案2】:

StreamingQueryListener 的inQueryProgress 方法被异步调用在每个微批处理中数据已经被完全处理完。

您会看到此侦听器仅每 5~6 分钟触发一次,因为流式作业需要这段时间来处理在微批处理中获取的所有数据。将触发器持续时间设置为 1 分钟将使 Spark 相应地计划任务,但它确实意味着作业也能够在 1 分钟的时间范围内处理所有可用数据。

要减少查询从 Kafka 获取的数据量,您可以使用源选项 maxOffsetsPerTrigger

顺便说一下,如果你不处理任何数据,这个方法默认每 10 秒调用一次。如果您想避免这种情况发生,您可以发送if(event.progress.numInputRows > 0)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-06-25
    • 1970-01-01
    • 2020-03-19
    • 1970-01-01
    • 1970-01-01
    • 2019-03-28
    • 2021-11-24
    • 2020-09-12
    相关资源
    最近更新 更多