【问题标题】:Write HDFS outputfile with Scala使用 Scala 编写 HDFS 输出文件
【发布时间】:2016-09-10 01:10:58
【问题描述】:

我正在尝试使用 Scala 编写 HDFS 输出文件,但收到以下错误:

线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:1893) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:286) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:868) 引起:java.io.NotSerializableException:java.io.PrintWriter 序列化堆栈:

第 23 行我需要在输出文件中写一行。

代码来源:

package com.mycode.logs;

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import scala.io._
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;

/**
 * @author RondenaR
 * 
 */
object NormalizeMSLogs{

  def main(args: Array[String]){
    processMsLogs("/user/temporary/*file*")
  }

  def processMsLogs(path: String){
    System.out.println("INFO: ****************** started ******************")

    // **** SetMaster is Local only to test *****
    // Set context
    val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val hiveContext = new HiveContext(sc)

    // Set HDFS
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020")
    val hdfs = FileSystem.get(hdfsconf)

    val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt"))
    val writer = new PrintWriter(output)

    val sourcePath = new Path(path)
    var count :Int = 0
    var lineF :String = ""

    hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
      val filePathName = fileStatus.getPath().toString()
      val fileName = fileStatus.getPath().getName()

      val hdfsfileIn = sc.textFile(filePathName)
      val msNode = fileName.substring(1, fileName.indexOf("es"))

      System.out.println("filePathName: " + filePathName)
      System.out.println("fileName: " + fileName)
      System.out.println("hdfsfileIn: " + filePathName)
      System.out.println("msNode: " + msNode)

      for(line <- hdfsfileIn){
        //System.out.println("line = " + line)
        count += 1

        if(count != 23){
          lineF = lineF + line + ", "
        }

        if(count == 23){
          lineF = lineF + line + ", " + msNode
          System.out.println(lineF)
          writer.write(lineF) 
          writer.write("\n")
          count = 0
          lineF = ""
        }
      } // end for loop in file
    } // end foreach loop
    writer.close()
    System.out.println("INFO: ******************ended ******************")
    sc.stop()
  }
}

【问题讨论】:

  • 你试图在分布式块中使用writer,我觉得很可疑。我会尝试map 而不是foreach,然后你就有了RDD,你可以迭代和读/写。无论如何,您可能需要在这里洗牌,IMO 无法避免,HDFS 有自己的想法如何分发文件。
  • 在规范化文件后,我可以将其输出到列表中,完成列表后将其放入 HIVE 表中?

标签: scala apache-spark hdfs


【解决方案1】:

不仅PrintWriter 对象writer 不可序列化:您也不能将SparkContext (sc) 放在foreach 内:它是仅用于驱动程序的构造,对驱动程序没有意义通过电线发送给工人。

您应该花一些时间考虑通过网络发送哪些类型的对象是有意义的。任何指针/流/句柄都没有意义。结构体、字符串、原语:这些确实包含在闭包(或广播)中是有意义的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-08-30
    • 2015-11-29
    • 2020-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-14
    相关资源
    最近更新 更多