【问题标题】:How to apply a customized function with multiple parameters to each group of a dataframe and union the resulting dataframes in Scala Spark?如何将具有多个参数的自定义函数应用于每组数据帧并在 Scala Spark 中合并生成的数据帧?
【发布时间】:2021-07-07 07:26:31
【问题描述】:

我有一个看起来像这样的自定义函数,它返回一个不同的数据帧作为输出

def customizedfun(data : DataFrame, param1 : Boolean, param2 : string) : DataFrame = {...}

我想将此功能应用于每一组

df.groupBy("type")

然后将每个type 的输出数据帧附加到一个数据帧中。

这与其他关于将自定义函数应用于分组数据帧的问题略有不同,因为除了有问题的数据帧df.groupBy("type") 之外,此函数还接受其他输入。

最好的方法是什么?

【问题讨论】:

  • 只需将 param1 和 param2 添加为 lit 列并将函数也应用于这些列
  • @mck 有更优雅的方法吗?不幸的是,我无法修改customizedfun
  • @mck 我没有使用 python/pyspark
  • 抱歉,customizedfun 是 UDAF 吗?
  • @mck 不,这是一个常规函数,它接受一个数据帧、一个布尔值和一个字符串作为输入并输出一个数据帧

标签: scala dataframe apache-spark group-by user-defined-functions


【解决方案1】:

您可以将原始df 过滤到不同的组,为每个组调用customizedfun,然后合并结果。

我假设customizedfun 是一个简单地将两个参数添加为新列的函数,但它可以是任何函数:

def customizedfun(data : DataFrame, param1 : Boolean, param2 : String) : DataFrame =
  data.withColumn("newCol", lit(s"$param2 $param1"))

我需要两个帮助函数来计算 param1param2 的值,这取决于 type 的值。在现实世界的应用程序中,这些功能可以是例如查找字典。

def calcParam1(typ: Integer): Boolean = typ % 2 == 0
def calcParam2(typ: Integer): String = s"type is $typ"

现在将原来的df过滤到不同的组中,调用customizedfun并将结果合并:

//create some test data
val df = Seq((1, "A", "a"), (1, "B", "b"), (1, "C", "c"), (2, "D", "d"), (2, "E", "e"), (3, "F", "f"))
  .toDF("type", "val1", "val2")
//+----+----+----+
//|type|val1|val2|
//+----+----+----+
//|   1|   A|   a|
//|   1|   B|   b|
//|   1|   C|   c|
//|   2|   D|   d|
//|   2|   E|   e|
//|   3|   F|   f|
//+----+----+----+

//get the distinct values of column type
val distinctTypes = df.select("type").distinct().as[Integer].collect()

//call customizedfun for each group
val resultPerGroup= for( typ <- distinctTypes)
  yield customizedfun( df.filter(s"type = $typ"), calcParam1(typ), calcParam2(typ))

//the final union
val result = resultPerGroup.tail.foldLeft(resultPerGroup.head)(_ union _)

//+----+----+----+---------------+
//|type|val1|val2|         newCol|
//+----+----+----+---------------+
//|   1|   A|   a|type is 1 false|
//|   1|   B|   b|type is 1 false|
//|   1|   C|   c|type is 1 false|
//|   3|   F|   f|type is 3 false|
//|   2|   D|   d| type is 2 true|
//|   2|   E|   e| type is 2 true|
//+----+----+----+---------------+

【讨论】:

  • 有没有办法并行化 for 循环?
  • @Amazonian 理论上是的,但你不需要这样做。除非调用了某个操作(例如 result.show),否则 Spark 懒惰不会执行任何繁重的工作。因此,并行运行循环不会加快速度。如果你在(_ union _)之后的行中下一个断点,然后查看Spark UI,你会看到到目前为止只执行了收集distinctTypes的作业。当为result 调用操作时,Spark 将自动并行执行对 customizedfun 内部发生的任何逻辑的调用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-11-29
  • 2020-08-27
  • 1970-01-01
  • 2018-03-04
  • 1970-01-01
  • 2020-06-12
相关资源
最近更新 更多