【问题标题】:Optimized way to apply transformation on several columns of a Spark DataFrame在 Spark DataFrame 的多个列上应用转换的优化方法
【发布时间】:2021-11-09 19:33:38
【问题描述】:

在我的 spark 工作中,我必须针对 2 个用例对多列进行转换:

  • 铸造柱

在我的个人用例中,我在 150 列的 Df 上使用它

  def castColumns(inputDf: DataFrame, columnsDefs: Array[(String, DataType)]): DataFrame = {
    columnsDefs.foldLeft(inputDf) {
      (acc, col) => acc.withColumn(col._1, inputDf(col._1).cast(col._2))
    }
  }
  • 转换

在我的个人用例中,我使用它来执行计算 n 多列以创建 n 新列 (1 个输入列对应 1 个输出列,n 次)

    ListOfCol.foldLeft(dataFrame) {
      (tmpDf, m) => 
          tmpDf.withColumn(addSuffixToCol(m), UDF(m))
    }

如您所见,我使用 FoldLeft 方法和 withColumn。 但我最近在文档中发现,多次使用 withColumn 并不是那么好:

此方法在内部引入投影。因此,调用它 多次,例如,通过循环以添加多个 列可以生成可能导致性能问题和 甚至 StackOverflowException。为避免这种情况,请将 select 与 一次多列。

我还发现 foldleft 会减慢 Spark 应用程序的速度,因为每次迭代都会执行完整的计划分析。我认为这是真的,因为我在代码中添加了 foldleft,我的 spark 开始工作需要比以前更多的时间。

在多列上应用转换时有好的做法吗?

Spark 版本:2.2 语言:斯卡拉

【问题讨论】:

    标签: scala apache-spark apache-spark-2.2


    【解决方案1】:

    在铸造的情况下,您可以通过以下方式实现您正在寻找的东西:

    val df: DataFrame = ???
    val cols = Array(("a", StringType), ("b", BooleanType), ("c", LongType)).map(c => col(c._1).cast(c._2))
    val renamed = df.select(cols:_*)
    

    它使用select(cols: Column*): DataFrame (Spark 2.2 docs) 方法,它采用Columns 的集合。变量cols 上的映射创建列表达式。

    在转换的情况下,我并不完全清楚你在做什么,但可以应用类似的逻辑。我从您的示例中对类型签名做出了一些最佳猜测:

    def addSuffixToCol(c: Column): String = ???
    def UDF(c: Column): Column = ???
    val ListOfCol: List[Column] = ???
    val dataFrame: DataFrame = ???
    dataFrame.select(ListOfCol.map(c => UDF(c).as(addSuffixToCol(c))):_*)
    

    如上所述,我们将转换应用于ListOfCol 中的列,然后用于从dataFrame 中进行选择。

    如果要包含其他列,请将它们添加到 select 语句中,例如:

    dataFrame.select(col("foo"), col("bar"), ListOfCol.map(c => UDF(c).as(addSuffixToCol(c))):_*)
    

    【讨论】:

    • 如果我不想转换我的数据框的所有列怎么办?在您的示例中,未转换的 i 列在转换后不在 DF 中
    • @Marwan02:我已经更新了答案以包含该场景。简而言之,您只是在处理一个 select 语句,因此您可以像往常一样选择列。
    • 如果不指定所有列,就没有办法做到这一点: col("foo"), col("bar") ?我有 400 列要添加
    • 我试过dataFrame.select(dataFrame.columns.diff(ListOfCol).map(c => col(c)) +: ListOfCol.map(c => UDF(c).as(addSuffixToCol(c))):_*) 但我得到了重载的方法值选择与替代
    • 你快到了:你只需要给编译器更多的帮助。确保将这两个列表加入到 select 中,然后再将它们解压缩为 varargs。 dataFrame.select( (dataFrame.columns.diff(ListOfCol).map(col) :+ ListOfCol.map(c => UDF(c).as(addSuffixToCol(c)))):_*)
    猜你喜欢
    • 2020-11-29
    • 1970-01-01
    • 1970-01-01
    • 2017-02-20
    • 1970-01-01
    • 2022-01-23
    • 2022-01-22
    • 2016-05-26
    • 1970-01-01
    相关资源
    最近更新 更多