您实际上可以添加一个带有地图的新列。
df.map { row =>
val col1 = row.getAs[String]("col1")
val col2 = row.getAs[String]("col2")
// etc, extract all your columns
....
val newColumns = col1 + col2
// do what you need to do to obtain value for a new column
(col1, col2, ..., newColumn)
}.toDF("col1", "col2", ..., "new")
在 Java API 方面,这将与一些调整相同:
data.map((MapFunction<Row, Tuple3<String, String, String>>) row -> {
String col1 = row.getAs("col1");
String col2 = row.getAs("col2");
// whatever you need
String newColumns = col1 + col2;
return new Tuple3<>(col1, col2, newColumns);
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()))
.toDF("col1", "col2", ..., "new")
或者,您可以将所有列收集到数组中,然后在 UDF 中处理此数组。
val transformer = udf { arr: Seq[Any] =>
// do your stuff but bevare of types
}
data.withColumn("array", array($"col1", $"col2", ..., $"colN"))
.select($"col1", $"col2",..., transformer($"array") as "newCol")