【问题标题】:Spark job gives NullPointerException on Amazon EMRSpark 作业在 Amazon EMR 上给出 NullPointerException
【发布时间】:2017-02-26 16:29:37
【问题描述】:

目标:我正在尝试对 parquet 文件 [从 S3 读取] 运行查询,然后将其作为单个制表符分隔的文本文件写入另一个 S3 存储桶中。所有这一切都在 Amazon EMR 集群上运行的 Spark 应用程序中完成

我已阅读 StackOverflow 上的其他类似问题,但无济于事。

//Read parquet file
final Dataset<Row> df = spark.read().parquet(fileName)
//Register temp table for convinience 
df.registerTempTable(tableName)

//Valid SQL query string - Verified using data bricks on the same parquet file
final String queryString = //SQL String

//Result of running sql query on parquet file
final Dataset<Row> result = spark.sql(queryString)

//Check if result is null - result is NOT NULL

final JavaRDD<Row> rowJavaRDD = result.toJavaRDD()

//Check if rowJavaRDD is null - rowJavaRDD is NOT NULL

//Coalesce and write to a text file
rowJavaRDD.map(r -> r.mkString("\t")).coalesce(1).saveAsTextFile(savePath)

但是我在rowJavaRDD.map(r -&gt; r.mkString("\t")).coalesce(1).saveAsTextFile(savePath)线上得到了一个N​​PE

我也尝试过使用broadcast,但这并没有做任何事情。在没有合并的情况下尝试过,但仍然出现相同的错误。

我已检查 savePath 是否有效并且没有 S3 权限问题

我尝试在 ./spark-shell 在同一个镶木地板文件上使用 scala 在本地做同样的事情,它工作正常:/

在 EMR 集群上运行 spark 2.0.2。 [2.1 版在其他方面提供了 classCastException - 所以更新是一个更大的麻烦]

任何帮助将不胜感激。

StackTrace如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 38.0 failed 4 times, most recent failure: Lost task
7.3 in stage 38.0 (TID 13586, ip-10-30-1-150.ec2.internal): java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:231)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1203)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1219)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1064)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:956)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1459)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438)
        at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:549)
        at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)

相关链接

1)NPE saveAsTextFile

2)RDD as textfile

【问题讨论】:

    标签: java apache-spark nullpointerexception emr amazon-emr


    【解决方案1】:

    正如您自己提供的第一个相关链接 (NPE saveAsTextFile) 中所述,当您调用 saveAsTextFile() 时发生这种情况并不意味着问题与该行有关,因为所有转换到那时为止你所做的都是懒惰的。

    Google 快速搜索“OnHeapColumnVector NullPointerException”为我找到了SPARK-16518。看起来这是一个自 2.0.0 以来一直存在的问题,并且仍未解决。

    【讨论】:

    • 我在其他镶木地板文件上也使用了相同的命令,它工作正常。正是在这个文件上,我得到了 NPE。我在本地下载了麻烦的镶木地板文件并尝试了相同的命令 - 效果很好 ://
    【解决方案2】:

    看起来 org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt 命中了一个空列(由于错误)。检查您是否在 parquet 文件的架构中找到 int32。在这里查看另一个版本:https://issues.apache.org/jira/browse/HIVE-14294

    另外,共享 parquet 文件的架构。

    【讨论】:

      猜你喜欢
      • 2019-04-05
      • 1970-01-01
      • 2018-12-13
      • 2019-07-03
      • 1970-01-01
      • 2017-08-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多