【问题标题】:How can I set a maximum allowed execution time per task on Spark-YARN?如何在 Spark-YARN 上设置每个任务的最大允许执行时间?
【发布时间】:2021-10-16 02:23:26
【问题描述】:

我有一个长期运行的 PySpark Structured Streaming 作业,它读取一个 Kafka 主题,进行一些处理并将结果写回另一个 Kafka 主题。我们的 Kafka 服务器在另一个集群上运行。

它运行良好,但每隔几个小时它就会冻结,即使在 Web UI 中 YARN 应用程序的状态仍然是“正在运行”。检查日志后,似乎是由于 Kafka 源的一些暂时连接问题。事实上,有问题的微批次的所有任务都已正确完成,除了显示:

21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy

Spark 或 YARN 未检测到该故障,并且该任务将永远运行(最多几天),并且每秒持续打印 10-20 条此类错误消息。重新启动进程即可解决问题。

在这种情况下是否有可能强制 Spark 任务(在 YARN 上)失败?然后它将自动重新启动并解决问题。当然,任何其他方式来恢复 Kafka 连接也可以……

我知道可以根据最大可接受的内存消耗杀死 YARN 容器,但在执行时间方面没有看到类似的情况。

【问题讨论】:

  • This question 与根本原因有关,但没有解决,就我而言,我从未收到“自动偏移提交失败”消息
  • Another post 与我的根本错误有关,但也没有提到无限死亡发现交换
  • 尽管有赏金,但没有一个答案或评论?生活很艰难...

标签: apache-spark apache-kafka hadoop-yarn spark-structured-streaming


【解决方案1】:

我还没有找到使用 YARN 的解决方案,但是在 Pyspark 驱动程序中使用监视循环的解决方法。循环将定期检查状态,如果状态 10 分钟未更新,则流式应用程序会失败

MAX_DURATION = 10*60 # in seconds

df:DataFrame = define_my_data_stream(params)
writer:DataStreamWriter = write_to_my_kafka(df)

qy = writer.start()

prevBatch = -1
while not spark.streams.awaitAnyTermination(defaultMaxDuration):
    lastBatch = qy.lastProgress['batchId']
    if lastBatch == prevBatch:
        qy.stop()
        print("Query stopped")
        raise RuntimeError("Query '"+(qy.name or "")+"' ("+qy.id+") has stalled")
    else:
        prevBatch = lastBatch

引发异常将使 Spark 应用程序失败。然后可以由 YARN 管理此故障,并使用以下选项重新启动应用程序来 spark-submit:

--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=4 \
--conf spark.yarn.executor.failuresValidityInterval=1h \

确实有效:检测到冻结并从检查点重新启动应用程序。但是只能重启一次,好像我没有指定failuresValidityInterval参数一样。那是另一个问题,Spark 的known issue...

【讨论】:

    猜你喜欢
    • 2020-05-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-29
    相关资源
    最近更新 更多