【问题标题】:How to pass more than one column as a parameter to Spark dataframe如何将多个列作为参数传递给 Spark 数据框
【发布时间】:2020-12-26 17:11:15
【问题描述】:

我想将多个列名作为参数传递给数据框。

val readData = spark.sqlContext
  .read.format("csv")
  .option("delimiter",",")
  .schema(Schema)
  .load("emp.csv")

val cols_list1 = "emp_id,emp_dt"
val cols_list2 = "emp_num"

val RemoveDupli_DF = readData
  .withColumn("rnk", row_number().over(Window.partitionBy(s"$cols_list1").orderBy(s"$cols_list2") ))

如果我有一个列名,上面的代码正在工作,而有两个或更多列,它给出以下错误。

线程“主”org.apache.spark.sql.AnalysisException 中的异常:无法解析“emp_id,emp_dt

使用 Scala 2.x 版本。

【问题讨论】:

    标签: apache-spark apache-spark-sql apache-spark-dataset


    【解决方案1】:

    partitionBy 方法作为多重签名:

    def partitionBy(colName: String, colNames: String*)
    // or
    def partitionBy(cols: Column*)
    

    您的代码将列列表作为单个字符串提供,这将失败,因为没有名为 emp_id,emp_dt 的列。因此,您会收到错误消息。

    您可以在集合中定义列名(作为字符串)

    val cols_seq1 = Seq("emp_id","emp_dt")
    

    然后像这样调用 partitionsBy:

    Window.partitionBy(cols_seq1: _*)
    

    符号: _* 告诉编译器将cols_seq1 的每个元素作为其自己的参数传递给partitionBy 调用,而不是将其全部作为单个参数。

    你也可以直接使用

    Window.partitionBy("emp_id", "emp_dt")
    

    【讨论】:

    • 感谢迈克的回复。根据上述输入,我遇到了类似的问题,我尝试如下。 val cols_list1 = Seq(col("emp_id"),col("emp_dt")) val checkDupli = readData.withColumn("rnk", row_number().over(Window.partitionBy(s"$cols_list1").orderBy(s "$cols_list2") )) 线程“主”org.apache.spark.sql.AnalysisException 中的异常:无法解析给定输入列的“List(emp_id, emp_dt)”:注意:emp_id 和 emp_dt 列是我的数据框 readData 的一部分不确定,这里出了什么问题。欣赏是否有其他可行的解决方案。
    • 我认为要让 spark 扩展您的列表,我需要使用 cols_seq1:_*
    猜你喜欢
    • 1970-01-01
    • 2019-07-23
    • 1970-01-01
    • 2023-03-28
    • 1970-01-01
    • 2011-02-12
    • 2020-05-03
    • 2019-08-12
    • 2020-07-08
    相关资源
    最近更新 更多