【问题标题】:Why does collecting dataset fail with org.apache.spark.shuffle.FetchFailedException?为什么收集数据集失败并出现 org.apache.spark.shuffle.FetchFailedException?
【发布时间】:2017-11-01 05:46:55
【问题描述】:

我将 Spark 与 YARN 集群管理器一起使用。

我从 Cassandra 表创建了一个数据集,该表有大约 700 行,其中 5 列,其中一列包含 JSON 格式的数据。数据量仅以 MB 为单位。

我使用spark-shell 运行:

  • spark.executor.memory=4g
  • spark.driver.memory=2g

我收到此错误:

org.apache.spark.shuffle.FetchFailedException:无法连接到 org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException 的 bosirahtaicas02.bscuat.local/172.17.0.1:53093(ShuffleBlockFetcherIterator.scala:323)

当我试图从我的 dataFrame 收集数据时

我直接在 spark-shell 中运行以下代码(逐行):

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

import com.datastax.spark.connector.cql._
import com.datastax.spark.connector._

import org.json._
import java.sql.Timestamp

val sqc = new SQLContext(sc)
val csc = new CassandraSQLContext(sc)

val allRecordsDF = sqc.read.format("org.apache.spark.sql.cassandra").
    option("table", "xyz").
    option("keyspace", 'pqr").load().cache()

val allRowsDF = allRecordsDF.select(allRecordsDF.col("record_type"), allRecordsDF.col("process_time"))

val allPatientsTS = allRowsDF.where(allRowsDF("record_type") === "patient").select("process_time").sort("process_time")

在这里,当我尝试收集 allPatientsTS Dataframe 时,它显示错误。

【问题讨论】:

  • 您能附上来自spark-shellyarn logs -applicationId <application ID> 的整个堆栈跟踪吗?你也可以yarn version吗?

标签: scala apache-spark apache-spark-sql spark-cassandra-connector


【解决方案1】:

引用我关于FetchFailedException的笔记:

FetchFailedException 可能在任务运行时抛出异常(并且ShuffleBlockFetcherIterator 没有设法获取 shuffle 块)。

FetchFailedException 的根本原因通常是,因为执行器(带有用于 shuffle 块的 BlockManager)由于以下原因丢失(即不再可用):

  • 可能会引发 OutOfMemoryError(又名 OOMed)或其他一些未处理的异常。

  • 集群管理器与 Spark 应用程序的执行器一起管理工作器,例如YARN(这里就是这种情况),强制执行容器内存限制并最终决定由于内存使用过多而终止执行程序。

您应该使用 Web UI、Spark History Server 或特定于集群的工具(例如用于 Hadoop YARN 的 yarn logs -applicationId)查看 Spark 应用程序的日志。

解决方案通常是调整 Spark 应用程序的内存。

如您所见,如果不对您的数据集(最重要的是它的大小和 Spark 上的分区方案)和 YARN 中的日志进行广泛审查,就很难准确地说出导致 FetchFailedException 的原因。

【讨论】:

  • 如果这对您有用,请接受未来读者的答案。谢谢。
  • DivasNikhra:看来这个答案是正确的。您为什么不接受@JacekLaskowski 作为所有者的这个答案。它也将是指向其他用户的指针。不接受对其他用户有很多含义。所以请接受它是否适合你。或评论为什么它不起作用或这里需要什么样的澄清。谢谢!
  • 嗨@Ram,虽然 Jacek 的建议非常重要,但我在该答案中看不到任何具体的解决方案
  • 在 Jaceks 中回答明确提到的 “解决方案通常是调整 Spark 应用程序的内存。”,你也做了同样的事情.....这个类型的问题很难准确地告诉您要查找的内容。请。尝试理解它不是程序执行和获取输出的逻辑。
【解决方案2】:

我也遇到过类似的问题。为了找到解决方案,我花了几周时间来实施许多来源的建议,这些建议包括:配置、分区、内存部分增量、网络超时等。然而,都未能为我提供解决方案。

最后,我发现了我的小集群中的问题。 worker 节点的 RAM 大小为 8 GB,我在 .conf 文件中配置了 6 GB 的 worker 内存(用于更大的开销)。不幸的是,其中一名工作人员的后台服务正在消耗 3 GB 内存,这就是问题发生的原因(因为所有工作人员都在寻求为每个工作人员映射 6GB 内存)。我在 spark-defaults.conf 文件中降级到 4GB 工作内存,现在我没有遇到这个问题。 注意:检查所有工作人员是否有任何其他服务正在消耗大量资源。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-02-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-22
    • 1970-01-01
    相关资源
    最近更新 更多