【问题标题】:Writing RDD Data to CSV with Dynamic Columns in Spark - Scala使用 Spark 中的动态列将 RDD 数据写入 CSV - Scala
【发布时间】:2017-11-28 14:33:05
【问题描述】:

我正在从 HDFS 目录中读取多个文件,并且对于每个文件,生成的数据都使用以下命令打印:

frequencies.foreach(x => println(x._1 + ": "+x._2))

而打印出来的数据是(对于File1.txt):

'text': 45
'data': 100
'push': 150

其他文件的密钥可以不同,例如 (File2.txt):

'data': 45
'lea': 100
'jmp': 150

密钥不一定在所有文件中都相同。我希望将所有文件数据写入 .csv 文件,格式如下:

Filename   text  data  push  lea  jmp
File1.txt  45    100   150   0    0
File2.txt  0     45    0     100  150  ....

有人可以帮我找到解决这个问题的方法吗?

【问题讨论】:

    标签: scala csv hadoop apache-spark


    【解决方案1】:

    如果您的文件不够大,您可以在没有火花的情况下完成。 这是我的示例代码,csv 格式是旧样式,不喜欢您的预期输出,但您可以轻松调整它。

      import scala.io.Source
      import org.apache.hadoop.fs._
      val sparkSession =   ...  // I created it to retrieve hadoop configuration, you can create your own Configuration.
      val inputPath =   ...
      val outputPath =   ...
    
      val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
      // read all files content to Array of Map[String,String]
      val filesContent = fs.listStatus(new Path(inputPath)).filter(_.isFile).map(_.getPath).filter(_.getName.endsWith(".txt"))
        .map(s => (s.getName, Source.fromInputStream(fs.open(s)).getLines()
                        .map(_.split(":").map(_.trim))
                        .filter(_.length == 2)
                        .map(p => (p.head, p.last)).toMap))
      // create default Map with all possible keys
      val listKeys = filesContent.flatMap(_._2.keys).distinct.map(s => (s, "0")).toMap
      val csvContent = filesContent.map(s => (s._1, listKeys ++ s._2))
        .map(s => (s._1, s._2.values.mkString(",")))
        .map(s => s"${s._1},${s._2}")
        .mkString("\n")
      val csvHeader = ("Filename" +: listKeys.keys.toList).mkString(",")
      val csv = csvHeader + "\n" + csvContent
    
      new PrintWriter(fs.create(new Path(outputPath))){
        write(csv)
        close()
      }
    

    【讨论】:

      【解决方案2】:

      我建议为您目录中的所有文件创建一个数据框,然后使用pivot 相应地重新调整数据:

      val df1 = sc.parallelize(Array(
      ("text",45  ),
      ("data",100 ),
      ("push",150 ))).toDF("key", "value").withColumn("Filename", lit("File1") )
      
      val df2 = sc.parallelize(Array(
      ("data",45  ),
      ("lea",100 ),
      ("jump",150 ))).toDF("key", "value").withColumn("Filename", lit("File2") )
      
      val df = df1.unionAll(df2)
      
      df.show
      +----+-----+--------+
      | key|value|Filename|
      +----+-----+--------+
      |text|   45|   File1|
      |data|  100|   File1|
      |push|  150|   File1|
      |data|   45|   File2|
      | lea|  100|   File2|
      |jump|  150|   File2|
      +----+-----+--------+
      
      
      val finalDf = df.groupBy($"Filename").pivot("key").agg(first($"value") ).na.fill(0)
      
      finalDf.show
      +--------+----+----+---+----+----+
      |Filename|data|jump|lea|push|text|
      +--------+----+----+---+----+----+
      |   File1| 100|   0|  0| 150|  45|
      |   File2|  45| 150|100|   0|   0|
      +--------+----+----+---+----+----+
      

      您可以使用 DataFrameWriter 将其写为 CSV

      df.write.csv(..)
      

      困难的部分是为每个文件创建一个不同的数据框,并为创建数据框的Filename 增加一列

      【讨论】:

      • 我无法使用 df.write.csv 将 finalDf 写入 csv 并收到此错误:value csv is not a member of org.apache.spark.sql.DataFrameWriter。谢谢@philantrovert
      • 如果您使用的是 Spark 1.6,则需要将 databricks csv jar 添加到您的应用程序中。并且代码将更改为df.write.format("com.databricks.spark.csv")。更多信息在这里:github.com/databricks/spark-csv
      • 对不起,但我已经搜索了很多关于它并没有找到从这个 github repo 制作 jar 文件以及如何将它添加到应用程序中的方法。如果您在此处提及这些步骤,那将有很大帮助。谢谢@philatrovert
      • 我已经弄明白了.. 谢谢@philantrovert
      • 当我在 11,00 个文件(数据集文本文件)上运行此代码时,我收到错误 java.lang.StackOverFlowError。我正在为每个文件调用一个函数,并使用以前的数据帧执行联合操作,这将对每个文件继续进行。您能否建议在不减少时间的情况下应该采取什么措施来避免此错误?
      猜你喜欢
      • 1970-01-01
      • 2017-04-22
      • 2017-08-21
      • 2019-04-30
      • 1970-01-01
      • 2018-03-18
      • 1970-01-01
      • 1970-01-01
      • 2017-06-27
      相关资源
      最近更新 更多