【问题标题】:Read a file in pyspark with custom column and record delmiter使用自定义列和记录分隔符读取 pyspark 中的文件
【发布时间】:2021-08-14 10:48:20
【问题描述】:

在 pyspark 中读取 csv 文件时,有什么方法可以使用自定义记录分隔符。在我的文件中,记录由 ** 而不是换行符分隔。将 csv 读入 PySpark 数据帧时,有什么方法可以使用此自定义行/记录分隔符?我的列分隔符也是';'下面的代码正确获取了列,但它只计为一行

from pyspark import SparkContext 
sc =  SparkSession.builder.appName('temp').getOrCreate()
df = sc.read.format('csv').option("header", "false").option("delimiter", ';').option("inferSchema", "true").load("some-file-on-s3")

【问题讨论】:

    标签: python python-3.x pyspark apache-spark-sql


    【解决方案1】:

    我会将它作为纯文本文件读取到 rdd 中,然后在作为换行符的字符上拆分。然后将其转换为数据框 像这样

    rdd1= (sc
           .textFile("/jupyter/nfs/test.txt")
           .flatMap(lambda line: line.split("**"))
           .map(lambda x: x.split(";"))
          )
    df1=rdd1.toDF(["a","b","c"])
    df1.show()
    
    +---+---+---+
    |  a|  b|  c|
    +---+---+---+
    | a1| b1| c1|
    | a2| b2| c2|
    | a3| b2| c3|
    +---+---+---+
    

    或者如果这样

    
    rdd2= (sc
           .textFile("/jupyter/nfs/test.txt")
           .flatMap(lambda line: line.split("**"))
           .map(lambda x: [x])
          )
    df2=(rdd2
         .toDF(["abc"])
         .withColumn("a",f.split(f.col("abc"),";")[0])
         .withColumn("b",f.split(f.col("abc"),";")[1])
         .withColumn("c",f.split(f.col("abc"),";")[2])
         .drop("abc")
        )
    df2.show()
    
    +---+---+---+
    |  a|  b|  c|
    +---+---+---+
    | a1| b1| c1|
    | a2| b2| c2|
    | a3| b2| c3|
    +---+---+---+
    
    

    test.txt 的样子

    a1;b1;c1**a2;b2;c2**a3;b2;c3
    

    【讨论】:

    • 这是一个大于 1TB 的文件大小,这会一次将所有数据加载到内存中。我如何使用 rdd 并在我的集群中并行化这个负载
    • Spark 遵循延迟加载的范例。只要您应用转换,就不会加载任何数据。只要您调用 show()、count()、collect() 或 write() 等操作,数据就会加载到内存中。在集群上运行的 Spark 会自行负责在集群中分发和计算所有内容。这就是 Spark 的全部意义所在。内存火花需求取决于您应用的转换。由于所有数据都被拆分为垃圾,Spark 通常不需要与您要处理的文件相同的内存(除非您缓存它)
    • Spark 也会在内存不足之前将数据溢出到磁盘上。如果您缓存不智能的 rdd/df 或应用循环或大型连接(例如交叉连接)等昂贵的操作,则会出现内存问题
    • 谢谢,很有帮助。是否可以在上面使用正则表达式,因为列可能会被 onr 或更多 ';' 分割和额外的';'只需要被忽略
    • 是的,您可以将每个 python 函数用作 lambda 函数,只要我使用它。为了更准确地查看它,请注释掉每个步骤并使用 rdd.take(4) 或 rdd.collect() 显示给生成的 rdd。 rdd 只知道行。所以第一步是获取行,而不是格式化行,以便以后可以从中创建数据框。如果你需要一个数据框。您当然也可以在 rdd 行中做所有事情,最后再逐行写出
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-12-15
    • 2017-12-16
    • 1970-01-01
    • 2013-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多