【问题标题】:Spark writing to hdfs not working with the saveAsNewAPIHadoopFile methodSpark 写入 hdfs 无法使用 saveAsNewAPIHadoopFile 方法
【发布时间】:2015-01-20 06:44:14
【问题描述】:

我在 CDH 5.2.0 上使用 Spark 1.1.0,并试图确保我可以读取和写入 hdfs。

我很快意识到 .textFile 和 .saveAsTextFile 调用旧的 api 并且似乎与我们的 hdfs 版本不兼容。

  def testHDFSReadOld(sc: SparkContext, readFile: String){
    //THIS WILL FAIL WITH
    //(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
    //java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)

    sc.textFile(readFile).take(2).foreach(println)
  }

  def testHDFSWriteOld(sc: SparkContext, writeFile: String){
    //THIS WILL FAIL WITH
    //(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data
    //java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420)

    sc.parallelize(List("THIS","ISCOOL")).saveAsTextFile(writeFile)
  }

转到修复从 hdfs 读取的 newAPI 方法!

  def testHDFSReadNew(sc: SparkContext, readFile: String){
    //THIS WORKS
    sc.newAPIHadoopFile(readFile, classOf[TextInputFormat], classOf[LongWritable],
      classOf[Text],sc.hadoopConfiguration).map{
      case (x:LongWritable, y: Text) => y.toString
    }.take(2).foreach(println)
  }

所以看起来我正在取得进步。写入不再像上面那样以硬错误退出,而是似乎正在工作。唯一的问题是目录中除了一个单独的 SUCCESS 标志文件之外,什么都没有。更令人费解的是,日志显示数据正在写入 _temporary 目录。似乎文件提交者从未意识到需要将文件从 _temporary 目录移动到输出目录。

  def testHDFSWriteNew(sc: SparkContext, writeFile: String){
    /*This will have an error message of:
    INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(dl1rhd400.internal.edmunds.com,35927)
    14/11/21 02:02:27 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@2281f1b2
      14/11/21 02:02:27 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@2281f1b2
      java.nio.channels.CancelledKeyException
    at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
    at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

    However lately it hasn't even had errors, symptoms are no part files in the directory but a success flag is there
    */
    val conf = sc.hadoopConfiguration
    conf.set("mapreduce.task.files.preserve.failedtasks", "true")
    conf.set("mapred.output.dir", writeFile)
    sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
      .saveAsNewAPIHadoopFile(writeFile, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf)

  }

当我在本地运行并指定 hdfs 路径时,文件在 hdfs 中显示正常。这只发生在我在我们的 spark 独立集群上运行时。

我提交的工作如下: spark-submit --deploy-mode client --master spark://sparkmaster --class driverclass driverjar

【问题讨论】:

    标签: hadoop hdfs apache-spark cloudera


    【解决方案1】:

    你可以试试下面的代码吗?

    import org.apache.hadoop.io._
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
    nums.saveAsNewAPIHadoopFile[TextOutputFormat[IntWritable, Text]]("/data/newAPIHadoopFile")
    

    以下代码也适用于我。

    val x = sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x)))
    x.saveAsNewAPIHadoopFile("/data/nullwritable", classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], sc.hadoopConfiguration)
    

    [root@sparkmaster ~]# hadoop fs -cat /data/nullwritable/*

    15/08/20 02:09:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-17
      • 1970-01-01
      • 2020-09-15
      • 2018-06-02
      • 2020-05-17
      • 1970-01-01
      相关资源
      最近更新 更多