【问题标题】:Apache Spark, add an "CASE WHEN ... ELSE ..." calculated column to an existing DataFrameApache Spark,将“CASE WHEN ... ELSE ...”计算列添加到现有 DataFrame
【发布时间】:2015-08-27 07:25:45
【问题描述】:
我正在尝试使用 Scala API 将“CASE WHEN ... ELSE ...”计算列添加到现有 DataFrame。
起始数据框:
color
Red
Green
Blue
所需的数据帧(SQL 语法:CASE WHEN color == Green THEN 1 ELSE 0 END AS bool):
color bool
Red 0
Green 1
Blue 0
我应该如何实现这个逻辑?
【问题讨论】:
标签:
scala
apache-spark
dataframe
apache-spark-sql
【解决方案1】:
我一直在寻找很长一段时间,所以这里是 SPARK 2.1 JAVA 的示例,带有 group by- 供其他 java 用户使用。
import static org.apache.spark.sql.functions.*;
//...
Column uniqTrue = col("uniq").equalTo(true);
Column uniqFalse = col("uniq").equalTo(false);
Column testModeFalse = col("testMode").equalTo(false);
Column testModeTrue = col("testMode").equalTo(true);
Dataset<Row> x = basicEventDataset
.groupBy(col(group_field))
.agg(
sum(when((testModeTrue).and(uniqTrue), 1).otherwise(0)).as("tt"),
sum(when((testModeFalse).and(uniqTrue), 1).otherwise(0)).as("ft"),
sum(when((testModeTrue).and(uniqFalse), 1).otherwise(0)).as("tf"),
sum(when((testModeFalse).and(uniqFalse), 1).otherwise(0)).as("ff")
);
【解决方案2】:
我发现了这个:
https://issues.apache.org/jira/browse/SPARK-3813
在 spark 2.1.0 上为我工作:
import sqlContext._
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
rdd.registerTempTable("records")
println("Result of SELECT *:")
sql("SELECT case key when '93' then 'ravi' else key end FROM records").collect()
【解决方案3】:
在 Spark 1.5.0 中:还可以使用 SQL 语法 expr 函数
val df3 = df.withColumn("Green_Ind", expr("case when color = 'green' then 1 else 0 end"))
或普通的 spark-sql
df.registerTempTable("data")
val df4 = sql(""" select *, case when color = 'green' then 1 else 0 end as Green_ind from data """)
【解决方案4】:
在即将发布的 SPARK 1.4.0 版本中(应该在接下来的几天内发布)。您可以使用 when/otherwise 语法:
// Create the dataframe
val df = Seq("Red", "Green", "Blue").map(Tuple1.apply).toDF("color")
// Use when/otherwise syntax
val df1 = df.withColumn("Green_Ind", when($"color" === "Green", 1).otherwise(0))
如果您使用的是 SPARK 1.3.0,您可以选择使用 UDF:
// Define the UDF
val isGreen = udf((color: String) => {
if (color == "Green") 1
else 0
})
val df2 = df.withColumn("Green_Ind", isGreen($"color"))