【问题标题】:Writing dataframe partitions to custom directory in scala将数据框分区写入scala中的自定义目录
【发布时间】:2022-06-11 01:23:55
【问题描述】:

我有一个包含数百万条记录的数据框,需要使用胶水作业将数据分区到小于 200MB 或 200,000 行的 s3 存储桶文件夹中。使用 partitionBy 将不起作用,因为没有列值以将分区保持在某些下游进程所需的大小以下的方式拆分数据。我尝试添加单调递增的 id 并根据预定义的 id 范围进行写入,但这不起作用,因为 monotonically_increasing_id 不是连续的。如何获得胶水作业将分区数据写入小于 200mb 的 s3 文件夹,或者有没有办法通过重新分区的数据帧进行分区

    val newdf = diffDF.withColumn("id", monotonically_increasing_id())                    
    var batchSize = 100000
    var totalRecordCount = diffDF.count()
    var currentRow = 0        
         while(currentRow < totalRecordCount){
             var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
                                   .drop("id")
             segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
             currentRow = currentRow + batchSize
             }  

【问题讨论】:

    标签: scala amazon-s3 aws-glue


    【解决方案1】:

    这是一个 Scala 风格的解决方案,使用折叠,我尝试将相同的逻辑调整到 spark 中,spark rdd 现在最相似的东西是rdd.aggregate,它的参数列表中的 combineOp 只是破坏一切!因此,如果您对使用 RDDs 感到满意,那么这种方法或 spark 中的类似方法将适合您:

    val rdd = df.rdd
    rdd.collect().foldLeft(List.empty[List[Row]]) {
        case (l@(headAggregator :: tail), newRow) =>
          // this if represents rdd size, so instead of list.length you can capture rdd size
          if ((newRow :: headAggregator).length < 3) (newRow :: headAggregator)  :: tail
          else (newRow :: Nil) :: l
        case (Nil, newRow) =>
          (newRow :: Nil) :: Nil
      }
    

    我知道,这个rdd.collect()实际上很昂贵,但我只是实现了逻辑,所以如果你发现RDDs的类似foldLeft的东西,只需复制并粘贴函数体:)

    【讨论】:

      【解决方案2】:

      我最终做的是添加一个作为 id 值除法余数的列。

      val diffDF = .withColumn("partitionnum", col("Employee_ID") % 9) .write.option("header","true").partitionBy("partitionnum").mode(SaveMode.Overwrite).csv(tpath)
      

      这将提供 9 个分区并且是高度可定制的。你可以除以 5 来划分 5 个分区等

      【讨论】:

        猜你喜欢
        • 2019-07-22
        • 2022-01-19
        • 2015-08-01
        • 1970-01-01
        • 2021-04-13
        • 1970-01-01
        • 1970-01-01
        • 2021-11-20
        • 2011-01-30
        相关资源
        最近更新 更多