【问题标题】:Split part files based on date column using pyspark使用 pyspark 根据日期列拆分零件文件
【发布时间】:2018-03-28 04:54:53
【问题描述】:

我有 200 个csv 部分文件,它们按年份分隔,从 2012 年到 2018 年。我还想使用 pyspark 根据其中存在的日期列拆分 csv 文件。想知道一种有效的方法来做到这一点,因为csv 将包含数百万行。

我目前的方法是 - 将 2012 年的所有 csv 文件读入数据框 - 然后在所有 365 天里,我循环遍历上述数据框,然后按日期将内容写入 csv。

有没有其他有效的方法来实现这个pyspark。

我在下面放了示例数据:

> 1234|2012-01-01|abc|def|455 
> 
> 1278|2012-04-05|duuj|dea|457
> 
> 9998|2012-05-09|dimd|ase|759
> 
> 8892|2012-01-01|eedbnd|ss|378
> 
> 178|2012-04-05|dswuj|ada|47
> 
> 278|2012-04-05|d32j|d12a|421

我需要将此数据写入 3 个单独的 csv 文件,其中包含 2012-01-01、2012-04-05 和 2012-05-09 的数据

【问题讨论】:

    标签: python pyspark


    【解决方案1】:

    样本数据中有 3 个日期 - 01-01、04-05、05-09

    def fn(dt):
      return hash(dt)
    

    创建一个键值对,键为日期

    rdd = sc.textFile('path/your_file.txt',3).map(lambda r: r.split('|')).map(lambda r: (r[1],r))
    

    为键生成一个哈希并将其传递给partitionBy

    rdd.partitionBy(3, fn).saveAsTextFile('partitioned_parts')
    

    您现在应该看到 3 个部分文件,每个文件都有特定的日期。

    【讨论】:

    • 感谢巴拉的回复。我从源数据创建了一个数据框,因为我必须维护创建的 csv 中列的顺序,并且我做了一个 partitionBy 'date' 列。我最终写了这样的数据 - df.write.partitionBy("dt").format('csv').mode("overwrite").options(delimiter="|").save("gs://bucket ”)。这个过程完成了我的工作,但是当文件为 100GB 或更大时,它需要很长时间才能完成。有没有办法加快这个过程?源有 200 个部分文件。将部分文件组合成更少的文件会有帮助吗?
    猜你喜欢
    • 2020-02-12
    • 1970-01-01
    • 1970-01-01
    • 2016-08-06
    • 2022-10-18
    • 2022-11-14
    • 1970-01-01
    • 2021-07-02
    • 2022-01-13
    相关资源
    最近更新 更多