【问题标题】:Avoid queuing spark microbatches避免排队火花微批处理
【发布时间】:2020-01-28 07:45:18
【问题描述】:

我创建了从 Apache Flume 获取输入数据的 spark 应用程序。我将 spark 批处理间隔设置为 4 分钟,这样 spark 将每 4 分钟处理一次数据。但是我有一些昂贵的 spark 批处理需要相当长的时间(比如 30 分钟),因此在此期间大约 7 个 spark 批处理将在队列中挂起,一旦完成昂贵的批处理执行,它将开始一个接一个地处理。通过这种方式,我的火花在最大时间内保持忙碌。那么我怎样才能避免这种排队呢?如果我的 spark 批处理正在执行并且需要超过 4 分钟,我不想在队列中添加下一个 spark 批处理。

我正在启动如下火花

val ssc = new StreamingContext(sc, 240000)// 240000 equals to 4 minutes

【问题讨论】:

标签: apache-spark spark-streaming


【解决方案1】:

您可以使用锁定机制。您需要在调度程序端维护单行数据以进行锁定。因此,当您的 spark 作业在 4 分钟后触发时,它会首先检查该文件/数据库表,如果它返回锁 True,那么它将不做任何事情 terminate

我在 spark 作业中也使用了相同的方法,每 3 分钟触发一次。所以我维护了 ElasticSearch 索引,在该索引中我保留了带有 application id, start and End time, spark Job Name, Status, Lock 之类的数据的单个记录。因此,当 Spark 作业触发时,它首先检查该索引是否锁定为False,然后检查该索引是否为overwrite,该行/记录与True 以及其他详细信息并开始运行,否则如果锁定为True,则它通过记录一条消息停止执行带有 spark 作业名称的应用程序 ID 正在运行。 3 分钟后,新的 Spark 作业重新触发并再次检查所有内容。 当 spark job 获得finish 时,它会将锁更改为False,以便可以运行下一个作业。如果作业失败,我也会处理此问题,因此当作业因任何原因失败时,它会将锁定标记为False,以便下一个触发器可以毫无问题地运行。

您可能需要根据要求更改一些内容。

【讨论】:

  • 我的 spark 批次一个接一个地按顺序运行。所以我不确定如何添加锁定?
【解决方案2】:

两个可能对您有所帮助或至少给您一些见解的想法:

  1. 如果 Spark Streaming 的背压机制(或版本

  2. 如果您使用 YARN(无论推/拉模型),您可能能够配置/减少运行 + 待处理应用程序的最大数量:https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html

系统中可以同时处于活动状态的正在运行和挂起的应用程序的最大数量。每个队列的限制与其队列容量和用户限制成正比。这是一个硬性限制,达到此限制时提交的任何申请都将被拒绝。默认为10000。这可以使用yarn.scheduler.capacity.maximum-applications 为所有队列设置,也可以通过设置yarn.scheduler.capacity.<queue-path>.maximum-applications 在每个队列的基础上覆盖。应为整数值。

希望对你有帮助!

【讨论】:

  • 同意@Fabio 所说的。在这种情况下,重新考虑流式处理流程和处理流程等选项会有所帮助。
猜你喜欢
  • 2016-06-25
  • 2021-06-13
  • 1970-01-01
  • 2021-04-12
  • 1970-01-01
  • 1970-01-01
  • 2020-11-02
  • 1970-01-01
  • 2020-02-20
相关资源
最近更新 更多