【问题标题】:How to run udf on every column in a dataframe?如何在数据框中的每一列上运行 udf?
【发布时间】:2018-09-06 20:08:44
【问题描述】:

我有一个 UDF:

val TrimText = (s: AnyRef) => {
    //does logic returns string
}

还有一个数据框:

var df = spark.read.option("sep", ",").option("header", "true").csv(root_path + "/" + file)

我想对数据框中每一列的每个值执行TrimText

但是,问题是,我的列数是动态的。我知道我可以通过df.columns 获取列列表。但我不确定这将如何帮助我解决我的问题。我该如何解决这个问题?

TLDR 问题 - 当数据帧的列数未知时,对数据帧中的每一列执行 UDF


尝试使用:

df.columns.foldLeft( df )( (accDF, c) =>
  accDF.withColumn(c, TrimText(col(c)))
)

抛出此错误:

error: type mismatch;
 found   : String
 required: org.apache.spark.sql.Column
accDF.withColumn(c, TrimText(col(c)))

TrimText 假设返回一个字符串,并期望输入是列中的一个值。因此,它将标准化整个数据帧每一行中的每个值。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以使用foldLeft 遍历列列表以使用您的UDF 将withColumn 迭代应用到DataFrame:

    df.columns.foldLeft( df )( (accDF, c) =>
      accDF.withColumn(c, TrimText(col(c)))
    )
    

    【讨论】:

    • @test acc,您需要将TrimText 包裹在udf() 中以使其成为UDF。我还假设该函数具有处理各种数据类型的必要逻辑。
    • 抛出类似的错误,withColumn 在其第二个参数中期待一个字符串值,而udf(TrimText(col(c))) 返回一个org.apache.spark.sql.expressions.UserDefinedFunction 对象
    • 你申请udf()的方式不对。您要么直接将trimText 定义为udf( (...) => ... );或定义val trimTextUdf = udf(trimText) 并像trimTextUdf(col(c)) 一样使用它
    • @testacc:这是最好的方法。例如,请参阅:stackoverflow.com/questions/44819019/…,了解如何创建和应用 UDF。
    【解决方案2】:
    >> I would like to perform TrimText on every value in every column in the dataframe.
    >> I have a dynamic number of columns.
    

    当 sql 函数可用于修剪 UDF 时,可以看到下面的代码适合您吗?

    import org.apache.spark.sql.functions._
    
    spark.udf.register("TrimText", (x:String) =>  ..... )
    
    val df2 = sc.parallelize(List(
      (26, true, 60000.00),
      (32, false, 35000.00)
    )).toDF("age", "education", "income")
    
    val cols2 = df2.columns.toSet
    df2.createOrReplaceTempView("table1")
    
    val query = "select " + buildcolumnlst(cols2) + " from table1 "
    println(query)
    val dfresult = spark.sql(query)
    dfresult.show()
    
    def buildcolumnlst(myCols: Set[String]) = {
      myCols.map(x => "TrimText(" + x + ")" + " as " + x).mkString(",") 
    }
    

    结果,

    select trim(age) as age,trim(education) as education,trim(income) as income from table1 
    +---+---------+-------+
    |age|education| income|
    +---+---------+-------+
    | 26|     true|60000.0|
    | 32|    false|35000.0|
    +---+---------+-------+
    

    【讨论】:

    • 因为TrimText 的作用远超过trim 文本。它就是它的名字。不受我控制。
    • 好吧,那么你可以在那边的方法“buildcolumnlst”中修补UDF函数,而不是sql函数“trim”。 (对上述代码进行了编辑)
    【解决方案3】:
    val a = sc.parallelize(Seq(("1 "," 2"),(" 3","4"))).toDF()
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.functions._
    def TrimText(s: Column): Column = {
    //does logic returns string
      trim(s)
    }
    a.select(a.columns.map(c => TrimText(col(c))):_*).show
    

    【讨论】:

    • trimspark 中的函数与我创建的TrimText 函数不同,该函数可能命名不当,不是一回事。
    • 但是如果你想在列上操作任何东西都没有关系,像这样传递你的参数并且返回类型将是列
    • 好的。我尝试了您的解决方案并引发了此错误 - error: no `: _*' annotation allowed here (such annotations are only allowed in arguments to *-parameters) df = df.select(df.columns.map(c => TrimText(col(c))) : _*)
    • 请分享你的trimtext函数逻辑,它的返回类型应该是一列
    • 它是专有的。它返回一个字符串。我期待一个列值(字符串)。因此,如果我有一百万条记录和 5 列。该函数将被调用 500 万次,每列的每条记录的每个值都调用一次。输出将始终是一个字符串,
    猜你喜欢
    • 1970-01-01
    • 2020-01-05
    • 2021-10-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-04
    • 2019-07-01
    • 2017-04-01
    • 1970-01-01
    相关资源
    最近更新 更多