【问题标题】:Updating column value in loop in spark在火花的循环中更新列值
【发布时间】:2019-06-18 11:16:33
【问题描述】:

简要问题

对于更直接的查询,我想依次遍历所有行,并根据特定行的某些条件为某些变量(a、b、c)分配一些值,然后我将分配将这些变量中的 1 个放入该特定行的列中。

详细

我想在 Spark 中更新数据框中的列值。更新将是有条件的,其中我将在行上运行一个循环并根据该行的其他列的值更新一列。

我尝试使用 withColumn 方法但出现错误。请建议任何其他方法。 withColumn 方法的解析也会有很大的帮助。

表格

var table1 = Seq((11, 25, 2, 0), (42, 20, 10, 0)).toDF("col_1", "col_2", "col_3", "col_4")
table1.show()

架构

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    0|
|   42|   20|   10|    0|
+-----+-----+-----+-----+

我在这里尝试了两种方法:

  1. 带列
  2. i("col_4") = adj_c

在下面的代码中,在不同位置初始化的变量只需要根据条件以这种方式放置

代码

for(i <- table1.rdd.collect()) {
    if(i.getAs[Int]("col_1") > 0) {
       var adj_a = 0
       var adj_c = 0
        if(i.getAs[Int]("col_1") > (i.getAs[Int]("col_2") + i.getAs[Int]("col_3"))) {
            if(i.getAs[Int]("col_1") < i.getAs[Int]("col_2")) {
                adj_a = 10
                adj_c = 2
            }
            else {
                adj_a = 5
            }
        }
        else {
            adj_c = 1
        }
        adj_c = adj_c + i.getAs[Int]("col_2")
        table1.withColumn("col_4", adj_c)
         //i("col_4")  = adj_c
    }
}

第一种情况出错

table1.withColumn("col_4", adj_c)

<console>:80: error: type mismatch;
 found   : Int
 required: org.apache.spark.sql.Column
               table1.withColumn("col_4", adj_c)
                                          ^

我也尝试在这里使用 col(adj_c),但它开始失败了

<console>:80: error: type mismatch;
 found   : Int
 required: String
               table1.withColumn("col_4", col(adj_c))
                                              ^

第二种情况的错误

(i("col_4") = adj_c)

<console>:81: error: value update is not a member of org.apache.spark.sql.Row
                i("col_4")  = adj_c
                ^

我希望输出表是:

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    1|
|   42|   20|   10|    5|
+-----+-----+-----+-----+

请提出可能的解决方案,如果对问题有任何疑问,请回复。

请帮我解决这个问题,因为我遇到了问题。任何形式的建议都会非常有帮助。

【问题讨论】:

  • 您能否添加一个输入示例及其所需输出?
  • 输入和输出以表格的形式给出,列 (col_1, col_2, col_3, col_4),其中 col_4 的值正在修改
  • 你能解释一下你在“更新将是有条件的”中的条件吗? @雅诗
  • @C.S.ReddyGadipally 条件主要基于该行其他列的值。示例:(col_1 + col_2) > col_3
  • 在数据框上使用 foreach 方法并调用您的方法,并在该方法用例类中将数据框行转换为对象并执行您想要的任何操作。如果你愿意,我可以分享这个例子。

标签: scala apache-spark


【解决方案1】:

你应该使用when 函数而不是这样复杂的语法,也不需要显式循环,Spark 自己处理它。当您执行 withColumn 时,它将应用于每一行

table1.withColumn("col_4", when($"col_1" > $"col_2" + $"col_3", 5).otherwise(1)).show

快速测试:

输入

table1.show

-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    0|
|   42|   20|   10|    0|
+-----+-----+-----+-----+

输出

table1.withColumn("col_4", when($"col_1" > $"col_2" + $"col_3", lit(5)).otherwise(1)).show
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    1|
|   42|   20|   10|    5|
+-----+-----+-----+-----+

【讨论】:

  • 这里我用一个整数初始化adj_c,但实际上这将通过一个表达式来计算。此外,它将在其中包含嵌套的 if 条件。在这两种情况下,这种方法可能会导致问题。
  • 你可以嵌套你想要的when条件的数量,也可以根据其他列或其他变量使用表达式设置新列值
  • 不同条件下有多个变量赋值,正在进一步使用。可以在when语句中做变量赋值吗?
  • 是的,你可以。但我认为你应该用更准确的实际问题示例来更新你的问题
  • 我已尝试编辑问题以使问题更加清晰。编辑主要在样本数据中。
【解决方案2】:

UDF 可以与任何自定义逻辑一起用于计算列值,例如:

val calculateCol4 = (col_1:Int, col_2:Int, col_3:Int)  =>
  if (col_1 > 0) {

    var adj_a = 0
    var adj_c = 0
    if (col_1 > col_2 + col_3) {
      if (col_1 < col_2) {
        adj_a = 10
        adj_c = 2
      }
      else {
        adj_a = 5
      }
    }
    else {
      adj_c = 1
    }
    println("adj_c: "+adj_c)
    adj_c = adj_c + col_2
    // added for return correct result
    adj_c
  }
  // added for return correct result
  else 0

val col4UDF = udf(calculateCol4)
table1.withColumn("col_4",col4UDF($"col_1", $"col_2", $"col_3"))

【讨论】:

  • 这种方法对我很有效,就像 1 行编辑一样。我刚刚遇到另一种情况,我需要根据某些条件更改以前的行。更像是,如果 col_1 的值超过 20,我将在前一行中添加剩余金额。如果您也可以为此做点什么,那将非常有帮助。
  • 猜猜,以前的行变化是另一个问题,与当前无关。看起来这可以通过 Window 函数实现,但需要更多细节。此外,如果“前行更改”将出现在另一个问题中,更多的人可以帮助解决这个问题,而不仅仅是我。
【解决方案3】:

使用spark.sql,更易读懂-

scala> var table1 = Seq((11, 25, 2, 0), (42, 20, 10, 0)).toDF("col_1", "col_2", "col_3", "col_4")
table1: org.apache.spark.sql.DataFrame = [col_1: int, col_2: int ... 2 more fields]

scala> table1.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    0|
|   42|   20|   10|    0|
+-----+-----+-----+-----+

scala> table1.createOrReplaceTempView("table1")


scala> val result = spark.sql(s""" select col_1,
     |                                    col_2,
     |                                    col_3,
     |                                    CASE WHEN col_1 > (col_2 + col_3)
     |                                           THEN 5
     |                                         ELSE   1
     |                                    END as col_4
     |                              from  table1 """)
result: org.apache.spark.sql.DataFrame = [col_1: int, col_2: int ... 2 more fields]


scala> result.show(false)
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|11   |25   |2    |1    |
|42   |20   |10   |5    |
+-----+-----+-----+-----+

希望这有帮助。

【讨论】:

  • 其实原来的条件太多了,所以不想用这个查询。不过谢谢你的回答
  • 您可以在case语句中轻松添加多个条件,与数据框/数据集操作相比,它更具可读性。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-07
  • 1970-01-01
  • 2018-03-07
  • 2023-03-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多