【问题标题】:Spark: Load multiple files, perform same operation and merge into a single dataFrameSpark:加载多个文件,执行相同的操作并合并到一个数据帧中
【发布时间】:2020-08-01 19:44:24
【问题描述】:

我有很多小的、单独的 .txt 文件。对于这些文件中的每一个,我将多行用空格分隔为 2 列,start_time 和 end_time(一个浮点数)。

我想:

  • 加载所有 .txt 文件
  • 为每一行计算一个包含 (end_time - start_time) 的新列
  • 为每一行添加一个带有文件名的新列
  • 最后,我想获得一个具有此架构的数据帧:
+------------+--------------+------------+------------+
|  file_name |   start_time |   end_time |   duration |
+------------+--------------+------------+------------+

我知道我可以简单地为每个文件和每一行创建一个循环,并一次将一行添加到数据帧中,但我想知道是否有更快的方法来执行此操作。
我对事情完成的顺序不感兴趣,但最终结果的速度。 我看到 SparkContext 中提供了诸如 textFile()wholeTextFiles() 等现有函数,但我不知道如何使用它们来做我想做的事情。

非常感谢任何方向或建议!

(对不起我的英语不好)

更新:

感谢@Shu 的帮助,这是我用来解决问题的最终代码

from pyspark.sql.functions import split, reverse, input_file_name

original_schema = [StructField("Start", FloatType(), True),
                    StructField("End", FloatType(), True)]

data_structure = StructType(original_schema)

df = self.spark_session.read.\
    csv(path=PATH_FILES+'\\*.txt', header=False, schema=data_structure, sep='\t').\
    withColumn("Filename", reverse(split(input_file_name(), "/")).getItem(0) ).\
    withColumn("duration", col("End") - col("Start"))

df.show(20, False)

【问题讨论】:

    标签: python apache-spark pyspark hdfs rdd


    【解决方案1】:

    Scala 中的另一种类似方法 - 使用 spark.read.csv() 读取文件,分隔符为空格,文件名命名为 (假设 spark -> spark session 已经存在)

    val inputDF = spark.read
          .option("inferSchema", "true")
          .option("delimiter", " ")
          .csv("<path>")
        .toDF("start_time","end_time")
    
     val output = inputDF
         .withColumn("duration", col("end_time") - col("start_time"))
         .withColumn("input_file_name", input_file_name())
         .withColumn("file_name_splits", split(col("input_file_name"), "/"))
         // Getting the last element from the splits using size function
         .withColumn("file_name", col("file_name_splits").apply(size(col("file_name_splits")).minus(1)))
         .select("file_name", "start_time", "end_time", "duration")
    
    // To show the sample data
    output.show(false)
    

    【讨论】:

      【解决方案2】:

      使用spark.read.csv() 读取文件,如果您的列由space 分隔,请使用.option("delimiter"," ")

      • 使用input_file_name函数获取文件名。

      示例:

      from pyspark.sql.functions import *
      
      spark.read.option("header",true).\
      option("delimiter"," ").\
      csv("<path>").\
      withColumn("file_name",input_file_name).\
      withColumn("duration",col("end_time") - col("start_time")).show()
      

      如果行由space 分隔,则使用文件中不存在的分隔符读取数据。

      • 然后用\\s+拆分数据,然后分解,我们将数据分成几行数据框。

      • 使用 substring 函数提取 start_time,end_time 并减去它们以获得持续时间。


      spark.read.csv("<file_path>").\
      withColumn("input",explode(split(col("_c0"),"\\s+"))).\
      withColumn("filename",input_file_name()).\
      drop("_c0").\
      show()
      

      UPDATE

      Using array index:

      spark.read.csv("<file_path>").\
      withColumn("input",explode(split(col("_c0"),"\\s+"))).\
      withColumn("filename",reverse(split(input_file_name(),'/'))[0]).\
      drop("_c0").\
      show()
      #or
      spark.read.csv("<file_path>").\
      withColumn("input",explode(split(col("_c0"),"\\s+"))).\
      withColumn("filename",reverse(split(input_file_name(),'/')).getItem(0)).\
      drop("_c0").\
      show()
      

      From Spark-2.4+ Using element_at:

      spark.read.csv("<file_path>").\
      withColumn("input",explode(split(col("_c0"),"\\s+"))).\
      withColumn("filename",element_at(split(input_file_name(),'/'),-1)).\
      drop("_c0").\
      show()
      

      【讨论】:

      • 感谢您的帮助,但是只有文件名而不是整个路径,有没有有效的方法呢?
      • @Karots96,我已经更新了使用 spark 内置函数仅提取 filename 的答案..!
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-30
      • 2011-02-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-02-10
      相关资源
      最近更新 更多