【发布时间】:2018-02-20 07:02:46
【问题描述】:
我有一个主 SQL 表,我正在读入 Spark 并修改它以写入 CassandraDB。目前我有一个工作实现,用于将性别从 0、1、2、3(整数)转换为“男性”、“女性”、“反式”等(字符串)。虽然下面的方法确实有效,但将这些映射到 DataFrame 的单独 Array 制作成一个单独的 Array,将其加入主表/DataFrame,然后删除、重命名等似乎非常低效。
我见过:
.withColumn("gender", when(col("gender) === 1, "male").otherwise("female")
这将允许我继续在主表上进行方法链接,但无法使用超过 2 个选项使其工作。有没有办法做到这一点?我在这个表上有大约 10 个不同的列,每个列都需要创建自己的自定义转换。由于此代码将处理 TB 的数据,因此是否有一种重复性更少且更有效的方法来完成此操作。提前感谢您的帮助!
case class Gender(tmpid: Int, tmpgender: String)
private def createGenderDf(spark:SparkSession): DataFrame = {
import spark.implicits._
Seq(
Gender(1, "Male"),
Gender(2, "Female"),
Gender(777, "Prefer not to answer")
).toDF
}
private def createPersonsDf(spark: SparkSession): DataFrame = {
val genderDf = createGenderDf(spark)
genderDf.show()
val personsDf: DataFrame = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "\t")
.load(dataPath + "people.csv")
.withColumnRenamed("ID", "id")
.withColumnRenamed("name_first", "firstname")
val personsDf1: DataFrame = personsDf
.join(genderDf, personsDf("gender") === genderDf("tmpid"), "leftouter")
val personsDf2: DataFrame = personsDf1
.drop("gender")
.drop("tmpid")
.withColumnRenamed("tmpgender", "gender")
}
【问题讨论】:
标签: sql scala apache-spark cassandra