【问题标题】:Assigning columns to another columns in a Spark Dataframe using Scala使用 Scala 将列分配给 Spark Dataframe 中的另一列
【发布时间】:2019-03-09 08:07:46
【问题描述】:

我正在研究这个极好的问题,以提高我的 Scala 技能和答案:Extract a column value and assign it to another column as an array in spark dataframe

我创建了修改后的代码,如下所示,但还有几个问题:

import spark.implicits._   
import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
    ("r1", 1, 1),
    ("r2", 6, 4),
    ("r3", 4, 1),
    ("r4", 1, 2)
  )).toDF("ID", "a", "b")

val uniqueVal = df.select("b").distinct().map(x => x.getAs[Int](0)).collect.toList    
def myfun: Int => List[Int] = _ => uniqueVal 
def myfun_udf = udf(myfun)

df.withColumn("X", myfun_udf( col("b") )).show

+---+---+---+---------+
| ID|  a|  b|        X|
+---+---+---+---------+
| r1|  1|  1|[1, 4, 2]|
| r2|  6|  4|[1, 4, 2]|
| r3|  4|  1|[1, 4, 2]|
| r4|  1|  2|[1, 4, 2]|
+---+---+---+---------+

有效,但是:

  • 我注意到 b 列被放入了两次。
  • 我也可以在第二条语句的 a 列中输入,得到相同的结果。例如。那是什么意思呢?

df.withColumn("X", myfun_udf( col("a") )).show

  • 如果我输入 col ID,那么它会变为 null。
  • 那么,我想知道为什么要输入第二个列?
  • 如何使这对所有列都通用?

所以,这是我在别处看过的代码,但我遗漏了一些东西。

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    您显示的代码没有多大意义:

    • 它不可扩展 - 在最坏的情况下,每行的大小与大小成正比
    • 您已经发现它根本不需要争论。
    • 在编写它时不需要(重要的是它不需要)udf(在 2016 年 12 月 23 日 Spark 1.6 和 2.0 已经发布)
    • 如果您仍想使用 udf 零变量就足够了

    总体而言,这只是当时为 OP 服务的另一个令人费解且具有误导性的答案。我会忽略(或vote accordingly)并继续前进。

    那么如何做到这一点:

    • 如果你有一个本地列表并且你真的想使用udf。对于单个序列,使用 udfnullary 函数:

      val uniqueBVal: Seq[Int] = ???
      val addUniqueBValCol = udf(() => uniqueBVal)
      
      df.withColumn("X", addUniqueBValCol())
      

      概括为:

      import scala.reflect.runtime.universe.TypeTag
      
      def addLiteral[T : TypeTag](xs: Seq[T]) = udf(() => xs)
      
      val x = addLiteral[Int](uniqueBVal)
      df.withColumn("X", x())
      
    • 最好不要使用udf

      import org.apache.spark.sql.functions._
      
      df.withColumn("x", array(uniquBVal map lit: _*))
      
    • 截至

      以及如何使其对所有列通用?

      如开头所述,整个概念很难辩护。任一窗口函数(完全不可扩展)

      import org.apache.spark.sql.expressions.Window
      
      val w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
      df.select($"*" +: df.columns.map(c => collect_set(c).over(w).alias(s"${c}_unique")): _*)
      

      或与聚合交叉连接(大部分时间不可扩展)

      val uniqueValues = df.select(
        df.columns map (c => collect_set(col(c)).alias(s"${c}_unique")):_*
      )
      df.crossJoin(uniqueValues)
      

      但总的来说 - 你必须重新考虑你的方法,如果这出现在任何实际应用程序中,除非你确定,列的基数很小并且有严格的上限。

    带走的信息是 - 不要相信随机人在互联网上发布的随机代码。包括这个。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-09-29
      • 2021-09-23
      • 2016-07-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-11
      相关资源
      最近更新 更多