首先提取密钥:
val input = Seq(Map("col1" -> "val1"), Map("col2" -> "val2", "col1" -> "val1"), Map("col3" -> "val3"))
val keys = input.flatMap(_.keys.toSeq).distinct
然后,您将需要一个方法来用 null 填充每个 Map 对象上所有不存在的键,如下所示:
def completeNonExistingValuesWithNull(obj: Map[String, String])(columnNames: String*): Map[String, String] = {
val nonExistingKeys = columnNames.filterNot(obj.keys.toSeq.contains)
obj concat Map(
nonExistingKeys.map { key =>
key -> (null: String)
}: _*
)
}
// I would also rather creating a function value to use in
// map functions easily so I wont need to pass the keys everytime
val completeNonExistingValues: Map[String, String] => Map[String, String] =
obj => completeNonExistingValuesWithNull(obj)(keys: _*)
您需要的另一件事是能够将序列转换为元组,以便为您的数据框创建行(因为序列对象被视为具有 ArrayType 结构的单个对象)
def toProduct(seq: Seq[String]) = seq match {
case Seq(a, b, c) => (a, b, c)
}
完成了:
val completedKeyValues: Seq[Map[String, String]] =
input.map(completeNonExistingValues)
val objects = completedKeyValues.map(_.values.toSeq).map(toProduct)
import spark.implicits._
objects.toDF(keys: _*)