【发布时间】:2019-06-18 11:16:33
【问题描述】:
简要问题:
对于更直接的查询,我想依次遍历所有行,并根据特定行的某些条件为某些变量(a、b、c)分配一些值,然后我将分配将这些变量中的 1 个放入该特定行的列中。
详细:
我想在 Spark 中更新数据框中的列值。更新将是有条件的,其中我将在行上运行一个循环并根据该行的其他列的值更新一列。
我尝试使用 withColumn 方法但出现错误。请建议任何其他方法。 withColumn 方法的解析也会有很大的帮助。
表格:
var table1 = Seq((11, 25, 2, 0), (42, 20, 10, 0)).toDF("col_1", "col_2", "col_3", "col_4")
table1.show()
架构:
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 11| 25| 2| 0|
| 42| 20| 10| 0|
+-----+-----+-----+-----+
我在这里尝试了两种方法:
- 带列
- i("col_4") = adj_c
在下面的代码中,在不同位置初始化的变量只需要根据条件以这种方式放置
代码:
for(i <- table1.rdd.collect()) {
if(i.getAs[Int]("col_1") > 0) {
var adj_a = 0
var adj_c = 0
if(i.getAs[Int]("col_1") > (i.getAs[Int]("col_2") + i.getAs[Int]("col_3"))) {
if(i.getAs[Int]("col_1") < i.getAs[Int]("col_2")) {
adj_a = 10
adj_c = 2
}
else {
adj_a = 5
}
}
else {
adj_c = 1
}
adj_c = adj_c + i.getAs[Int]("col_2")
table1.withColumn("col_4", adj_c)
//i("col_4") = adj_c
}
}
第一种情况出错:
table1.withColumn("col_4", adj_c)
<console>:80: error: type mismatch;
found : Int
required: org.apache.spark.sql.Column
table1.withColumn("col_4", adj_c)
^
我也尝试在这里使用 col(adj_c),但它开始失败了
<console>:80: error: type mismatch;
found : Int
required: String
table1.withColumn("col_4", col(adj_c))
^
第二种情况的错误:
(i("col_4") = adj_c)
<console>:81: error: value update is not a member of org.apache.spark.sql.Row
i("col_4") = adj_c
^
我希望输出表是:
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 11| 25| 2| 1|
| 42| 20| 10| 5|
+-----+-----+-----+-----+
请提出可能的解决方案,如果对问题有任何疑问,请回复。
请帮我解决这个问题,因为我遇到了问题。任何形式的建议都会非常有帮助。
【问题讨论】:
-
您能否添加一个输入示例及其所需输出?
-
输入和输出以表格的形式给出,列 (col_1, col_2, col_3, col_4),其中 col_4 的值正在修改
-
你能解释一下你在“更新将是有条件的”中的条件吗? @雅诗
-
@C.S.ReddyGadipally 条件主要基于该行其他列的值。示例:(col_1 + col_2) > col_3
-
在数据框上使用 foreach 方法并调用您的方法,并在该方法用例类中将数据框行转换为对象并执行您想要的任何操作。如果你愿意,我可以分享这个例子。
标签: scala apache-spark