【问题标题】:Spark streaming on dataproc throws FileNotFoundExceptiondataproc 上的 Spark 流引发 FileNotFoundException
【发布时间】:2016-12-13 01:01:42
【问题描述】:

当我尝试向 google dataproc 集群提交 Spark 流式传输作业时,出现以下异常:

16/12/13 00:44:20 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
...
16/12/13 00:44:20 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@d7bffbc{HTTP/1.1}{0.0.0.0:4040}
16/12/13 00:44:20 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
16/12/13 00:44:20 ERROR org.apache.spark.util.Utils: Uncaught exception in thread main
java.lang.NullPointerException
        at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
        at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1360)
...
Exception in thread "main" java.io.FileNotFoundException: File file:/tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

完整输出here

似乎在 spark-env.sh 中未正确定义 hadoop 配置时会发生此错误 - link1, link2

它可以在某个地方配置吗?关于如何解决它的任何指示?

在本地模式下运行相同的代码可以正常工作:

sparkConf.setMaster("local[4]")

对于其他上下文:作业是这样调用的:

gcloud dataproc jobs submit spark \
--cluster my-test-cluster \
--class com.company.skyfall.Skyfall \
--jars gs://my-bucket/resources/skyfall-assembly-0.0.1.jar \
--properties spark.ui.showConsoleProgress=false

这是样板设置代码:

  lazy val conf = {
    val c = new SparkConf().setAppName(this.getClass.getName)
    c.set("spark.ui.port", (4040 + scala.util.Random.nextInt(1000)).toString)

    if (isLocal) c.setMaster("local[4]")
    c.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    c.set("spark.streaming.blockInterval", "1s")
  }

  lazy val ssc = if (checkPointingEnabled) {
    StreamingContext.getOrCreate(getCheckPointDirectory, createStreamingContext)
  } else {
    createStreamingContext()
  }

  private def getCheckPointDirectory: String = {
    if (isLocal) localCheckPointPath else checkPointPath
  }

  private def createStreamingContext(): StreamingContext = {
    val s = new StreamingContext(conf, Seconds(batchDurationSeconds))
    s.checkpoint(getCheckPointDirectory)
    s
  }

提前致谢

【问题讨论】:

    标签: apache-spark google-cloud-dataproc


    【解决方案1】:

    这可能不是您第一次使用给定的检查点目录运行作业,因为检查点目录中已经包含一个检查点?

    发生这种情况是因为检查点硬编码了用于提交 YARN 应用程序的确切 jarfile 参数,并且当在 Dataproc 上运行并带有指向 GCS 的 --jars 标志时,这实际上是 Dataproc 自动从 GCS 暂存您的 jarfile 的语法糖到本地文件路径 /tmp/0afbad25-cb65-49f1-87b8-9cf6523512dd/skyfall-assembly-0.0.1.jar 中,该路径仅在单个作业运行期间临时使用,因为 Spark 无法直接从 GCS 调用 jarfile 而无需在本地暂存它。

    但是,在后续作业中,之前的 tmp jarfile 将已被删除,但新作业会尝试引用硬编码到检查点数据中的旧位置。

    检查点数据中的硬编码也会导致其他问题;例如,Dataproc 还使用 YARN“标签”来跟踪作业,如果在新的 YARN 应用程序中重用旧 Dataproc 作业的“标签”,则会与 YARN 发生冲突。要运行您的流式应用程序,您需要首先清除检查点目录(如果可能)以从头开始,然后:

    1. 在开始作业之前,您必须将作业 jarfile 放在主节点上的某个位置,然后您的“--jar”标志必须指定“file:///path/on/master/node/to/jarfile.jar” .

    当您指定“file:///”路径时,dataproc 知道它已经在主节点上,因此它不会重新暂存到 /tmp 目录中,因此在这种情况下,检查点指向某个目录是安全的修复了 master 上的本地目录。

    您可以使用 init 操作来执行此操作,也可以提交一个快速的 pig 作业(或者只是 ssh 到 master 并下载该 jarfile):

    # Use a quick pig job to download the jarfile to a local directory (for example /usr/lib/spark in this case)
    gcloud dataproc jobs submit pig --cluster my-test-cluster \
        --execute "fs -cp gs://my-bucket/resources/skyfall-assembly-0.0.1.jar file:///usr/lib/spark/skyfall-assembly-0.0.1.jar"
    
    # Submit the first attempt of the job
    gcloud dataproc jobs submit spark --cluster my-test-cluster \
        --class com.company.skyfall.Skyfall \
        --jars file:///usr/lib/spark/skyfall-assembly-0.0.1.jar \
        --properties spark.ui.showConsoleProgress=false
    
    1. Dataproc 在后台依赖 spark.yarn.tags 来跟踪与作业关联的 YARN 应用程序。但是,检查点包含陈旧的 spark.yarn.tags,这会导致 Dataproc 与似乎与旧作业相关联的新应用程序混淆。

    目前,只要最近杀死的 jobid 保存在内存中,它只会“清理”可疑的 YARN 应用程序,因此重新启动 dataproc 代理将解决此问题。

    # Kill the job through the UI or something before the next step.
    # Now use "pig sh" to restart the dataproc agent
    gcloud dataproc jobs submit pig --cluster my-test-cluster \
        --execute "sh systemctl restart google-dataproc-agent.service"
    
    # Re-run your job without needing to change anything else,
    # it'll be fine now if you ever need to resubmit it and it
    # needs to recover from the checkpoint again.
    

    请记住,虽然检查点的性质意味着您将无法更改在后续运行中传递的参数,因为检查点恢复用于破坏您的命令行设置。

    【讨论】:

    • 谢谢丹尼斯。清理检查点工作。总而言之,要从检查点恢复: 1. 将 jar cp 到本地文件系统,并在 spark 作业提交中引用它; 2. 在从检查点恢复之前,重新启动 dataproc 代理,否则会引发各种异常(gist.github.com/rvenkatesh25/a36e6f339febbf0a6d3b96ce5ad08fdc )。听起来对吗?
    • 为了更简洁地说明,您只需在第一次从给定检查点恢复时重新启动 dataproc 代理,所有后续时间都可以在没有重新启动的情况下进行。在 Dataproc 中更干净地处理这个问题正在进行中,因此不可否认,当前需要重新启动代理并不理想。基本上,当您第一次在没有检查点的情况下运行作业时,Dataproc 的“jobid”会被烘焙到检查点文件中;所有后续检查点恢复运行将重用该旧作业标识。因此,单次重启会使代理忘记来自第一次作业运行的作业 ID。
    • 即使使用重新启动 dataproc 代理技巧,我也会收到异常:“17/05/16 17:39:04 ERROR org.apache.spark.SparkContext: Error initializing SparkContext.org.apache.spark .SparkException:Yarn 应用程序已经结束!它可能已被杀死或无法启动应用程序主控。在 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)" 在 dataproc 端有什么变化?
    • 您是否在全新的集群上运行?现有集群上不应发生任何变化。
    • 是的,集群已重新创建。在stackoverflow.com/questions/44008418/… 继续讨论
    【解决方案2】:

    您还可以在纱线集群模式下运行作业,以避免将 jar 添加到您的主机。潜在的权衡是火花驱动程序将在工作节点而不是主节点中运行。

    【讨论】:

      猜你喜欢
      • 2016-09-21
      • 2016-10-31
      • 2019-01-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-30
      • 2020-07-19
      相关资源
      最近更新 更多