【问题标题】:CSV load into Dataframe with filename as additional column in pysparkCSV 加载到 Dataframe 中,文件名作为 pyspark 中的附加列
【发布时间】:2021-06-09 01:59:42
【问题描述】:

我正在尝试从一个充满 csv 文件的目录创建一个数据框,但我想将数据框中每个文件的文件名保留为附加列,在 pyspark 上是否可以不使用 pandas?,我也想从文件名中删除路径。

from pyspark.sql.functions import input_file_name

df = spark.read.option("delimiter", "\t").csv(mount_point_input)
df_.withColumn("filename", input_file_name())

我尝试使用 input_file_name(),但数据框上的所有行都有相同的文件名。

输入:

False    2021-06-05T14:45:09     Server       True
True     2021-06-02T21:32:42     Server       True

输出:

+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-05T14:45:09     Server   True   /2021-06-02-general/c32d3f47.csv
+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-02T21:32:42     Server   True   /2021-06-02-general/c32d3f47.csv
+-----+-----------------------+-------+-------+--------------------------------+

预期输出:

+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-05T14:45:09     Server   True   c32d3f47.csv
+-----+-----------------------+-------+-------+--------------------------------+
False  2021-06-02T21:32:42     Server   True   c32d3f48.csv
+-----+-----------------------+-------+-------+--------------------------------+

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以在 UDF 中使用 os.path.basename -

    >>> from pyspark.sql.functions import input_file_name,udf
    >>> from pyspark.sql.types import StringType
    >>> from os.path import basename
    >>> 
    >>> data = [("/home/user/test/File1.txt",10), 
    ...         ("/home/user/test/File2.txt",20), 
    ...         ("/home/user/test/File3.txt",30), 
    ...         ("/home/user/test/File4.txt",40),
    ...         ("/2021-06-02-general/c32d3f47.csv",50),
    ...         ("/2021-06-02-general/c32d3f47.csv",50)
    ...         ]
    >>> 
    >>> 
    >>> cols = ["file_path","dummy_value"]
    >>> testDF = spark.createDataFrame(data=data, schema = cols)
    >>> 
    >>> testDF.show(truncate=False)
    +--------------------------------+-----------+
    |file_path                       |dummy_value|
    +--------------------------------+-----------+
    |/home/user/test/File1.txt       |10         |
    |/home/user/test/File2.txt       |20         |
    |/home/user/test/File3.txt       |30         |
    |/home/user/test/File4.txt       |40         |
    |/2021-06-02-general/c32d3f47.csv|50         |
    |/2021-06-02-general/c32d3f47.csv|50         |
    +--------------------------------+-----------+
    
    >>> 
    >>> 
    >>> @udf(StringType())
    ... def return_filename(inp):
    ...     if inp:
    ...       return basename(inp)
    ...     else:
    ...       return None
    ... 
    >>> testDF = testDF.withColumn("file_name", return_filename('file_path'))
    >>> testDF.show(truncate=False)
    +--------------------------------+-----------+------------+
    |file_path                       |dummy_value|file_name   |
    +--------------------------------+-----------+------------+
    |/home/user/test/File1.txt       |10         |File1.txt   |
    |/home/user/test/File2.txt       |20         |File2.txt   |
    |/home/user/test/File3.txt       |30         |File3.txt   |
    |/home/user/test/File4.txt       |40         |File4.txt   |
    |/2021-06-02-general/c32d3f47.csv|50         |c32d3f47.csv|
    |/2021-06-02-general/c32d3f47.csv|50         |c32d3f47.csv|
    +--------------------------------+-----------+------------+
    

    【讨论】:

    • 有没有办法根据 csv 文件的名称获取包含在数据框中的 file_path,input_file_name() 只获取文件的名字。
    猜你喜欢
    • 2018-04-25
    • 2015-03-06
    • 2019-03-07
    • 2015-06-24
    • 2019-02-22
    • 1970-01-01
    • 2012-10-25
    相关资源
    最近更新 更多