【问题标题】:How to avoid queuing up of Batches in spark streaming如何避免在火花流中排队
【发布时间】:2021-06-13 21:39:31
【问题描述】:
我使用 Direct Streaming 进行 spark 流式传输,并且正在使用以下配置
批处理间隔60s
spark.streaming.kafka.maxRatePerPartition 42
auto.offset.reset 最早
当我开始使用最早选项的流式批处理时,为了更快地使用来自 Kafka 的消息并减少延迟,我将 spark.streaming.kafka.maxRatePerPartition 保持为 42。所以它应该消耗 42 x 60s x 60 分区 = 151200每批记录。
这里有两个问题
- 我看到这两个初始批次正确地消耗了 151200 条记录,尽管有很多记录要从 kafka 消耗,但在后面的批次中逐渐减少。请看下面的截图。可能是什么原因
- 我看到很多批次都在排队。我们怎样才能避免这种情况。
是否有可能实现以下场景
我们将批处理间隔设置为 60s,如果每个批处理都在 60s 内运行,则下一个批处理可以按时开始。如果一个批次的时间超过 60 秒,我们不希望下一批来排队。现有运行完成后,下一次运行可以通过选取该时间之前的记录来开始。这样我们就不会出现延迟,也不会排队。
Spark UI - Screenshot for question 1
【问题讨论】:
标签:
apache-spark
apache-kafka
spark-streaming
spark-kafka-integration
【解决方案1】:
您观察到的是 Spark 的背压机制的预期行为。
您已将配置 spark.streaming.kafka.maxRatePerPartition 设置为 42,并且根据您的计算,作业将开始获取
42 * 60 partitions * 60 seconds batch interval = 151200 records per batch
查看您所附屏幕截图中的时间(处理时间),该作业以该数量的记录开始。
但是,由于处理所有这 151200 条记录需要超过 60 秒的时间,因此背压机制将减少后续批次中的输入记录。这仅在几批之后才会发生,因为背压机制(又称“PID 控制器”)需要等到第一批完成,以便它可以使用该经验来估计下一个间隔的输入记录数。如前所述,处理第一个 151200 所用的时间比一个时间间隔要长,这意味着后续两个时间间隔已经使用 maxRatePerPartition 进行了安排,而没有完成批处理时间间隔的经验。
这就是您看到输入记录仅在第四批中减少的原因。然后,输入记录的数量仍然太多,无法在 60 秒内处理,因此作业建立了越来越多的延迟,并且 PID 控制器(背压)最终意识到它落后于许多记录并大幅减少输入记录的数量为spark.streaming.backpressure.pid.minRate 设置的最小值。在您的情况下,此值似乎设置为 2,这导致每个批次间隔 2 * 60 * 60 = 7200 条记录。
总而言之,您观察到的是预期和预期的行为。 Streaming 作业需要一些批次来了解和了解它应该从 Kafka 获取多少数据以适应给定的(非灵活的)60 秒的批次间隔。 无论一个批次中的处理时间需要多长时间,您的流式传输作业都会以每 60 秒为单位提前计划下一个批次。
你能做什么:
- 建议将
maxRatePerPartition 设置为实际容量的150-200% 左右。只要让作业运行一段时间,您就会看到估计的 100% 会是什么。
- 当您在 Kafka 中使用 60 个分区时,您需要确保数据在分区中均匀分布。只有这样 maxRatePerPartition 才会做你打算做的事情
- 拥有 60 个分区,您可以在 Spark 集群中使用 60 个内核以获得最大消耗速度。