【问题标题】:How to avoid queuing up of Batches in spark streaming如何避免在火花流中排队
【发布时间】:2021-06-13 21:39:31
【问题描述】:

我使用 Direct Streaming 进行 spark 流式传输,并且正在使用以下配置

批处理间隔60s

spark.streaming.kafka.maxRatePerPartition 42

auto.offset.reset 最早

当我开始使用最早选项的流式批处理时,为了更快地使用来自 Kafka 的消息并减少延迟,我将 spark.streaming.kafka.maxRatePerPartition 保持为 42。所以它应该消耗 42 x 60s x 60 分区 = 151200每批记录。

这里有两个问题

  1. 我看到这两个初始批次正确地消耗了 151200 条记录,尽管有很多记录要从 kafka 消耗,但在后面的批次中逐渐减少。请看下面的截图。可能是什么原因
  2. 我看到很多批次都在排队。我们怎样才能避免这种情况。

是否有可能实现以下场景 我们将批处理间隔设置为 60s,如果每个批处理都在 60s 内运行,则下一个批处理可以按时开始。如果一个批次的时间超过 60 秒,我们不希望下一批来排队。现有运行完成后,下一次运行可以通过选取该时间之前的记录来开始。这样我们就不会出现延迟,也不会排队。

Spark UI - Screenshot for question 1

【问题讨论】:

  • 你开启背压了吗?
  • 是的..我现在正在测试没有背压..谢谢

标签: apache-spark apache-kafka spark-streaming spark-kafka-integration


【解决方案1】:

您观察到的是 Spark 的背压机制的预期行为。

您已将配置 spark.streaming.kafka.maxRatePerPartition 设置为 42,并且根据您的计算,作业将开始获取

42 * 60 partitions * 60 seconds batch interval = 151200 records per batch

查看您所附屏幕截图中的时间(处理时间),该作业以该数量的记录开始。

但是,由于处理所有这 151200 条记录需要超过 60 秒的时间,因此背压机制将减少后续批次中的输入记录。这仅在几批之后才会发生,因为背压机制(又称“PID 控制器”)需要等到第一批完成,以便它可以使用该经验来估计下一个间隔的输入记录数。如前所述,处理第一个 151200 所用的时间比一个时间间隔要长,这意味着后续两个时间间隔已经使用 maxRatePerPartition 进行了安排,而没有完成批处理时间间隔的经验。

这就是您看到输入记录仅在第四批中减少的原因。然后,输入记录的数量仍然太多,无法在 60 秒内处理,因此作业建立了越来越多的延迟,并且 PID 控制器(背压)最终意识到它落后于许多记录并大幅减少输入记录的数量为spark.streaming.backpressure.pid.minRate 设置的最小值。在您的情况下,此值似乎设置为 2,这导致每个批次间隔 2 * 60 * 60 = 7200 条记录。

总而言之,您观察到的是预期和预期的行为。 Streaming 作业需要一些批次来了解和了解它应该从 Kafka 获取多少数据以适应给定的(非灵活的)60 秒的批次间隔。 无论一个批次中的处理时间需要多长时间,您的流式传输作业都会以每 60 秒为单位提前计划下一个批次。

你能做什么:

  • 建议将maxRatePerPartition 设置为实际容量的150-200% 左右。只要让作业运行一段时间,您就会看到估计的 100% 会是什么。
  • 当您在 Kafka 中使用 60 个分区时,您需要确保数据在分区中均匀分布。只有这样 maxRatePerPartition 才会做你打算做的事情
  • 拥有 60 个分区,您可以在 Spark 集群中使用 60 个内核以获得最大消耗速度。

【讨论】:

    猜你喜欢
    • 2020-01-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-08
    • 2020-11-02
    • 1970-01-01
    • 2020-02-20
    • 2018-08-09
    相关资源
    最近更新 更多