【问题标题】:How to convert a DataFrame where all Columns are Strings into a DataFrame with a specific Schema如何将所有列都是字符串的 DataFrame 转换为具有特定模式的 DataFrame
【发布时间】:2018-09-07 02:18:18
【问题描述】:

想象以下输入:

val data = Seq (("1::Alice"), ("2::Bob"))
val dfInput = data.toDF("input")
val dfTwoColTypeString = dfInput.map(row => row.getString(0).split("::")).map{ case Array(id, name) => (id, name) }.toDF("id", "name")

现在我有一个包含所需列的 DataFrame:

scala> dfTwoColTypeString.show
+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+

我当然想要int类型的列id,但它是String类型的:

scala> dfTwoColTypeString.printSchema
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

因此我定义了这个架构:

val mySchema = StructType(Array(
    StructField("id", IntegerType, true),
    StructField("name", StringType, true)
    ))

将 DataFrame dfTwoColTypeString 转换或转换为给定目标架构的最佳方法是什么。

奖励:如果给定的输入无法转换或转换为目标模式,我希望得到一个空行,其中包含一个包含错误输入数据的额外列“bad_record”。也就是说,我想完成与 PERMISSIVE 模式下的 CSV 解析器相同的操作。

非常感谢任何帮助。

【问题讨论】:

  • 我正在寻找的是与 CSV 阅读器一样聪明的东西。 IE。我没有解析 csv 文件或 Dataset[String],而是有一个 Dataset[List[String]],并且像 CSV 解析器一样,我希望有一个函数,它将 List[String] 转换为对应于类型的 List由目标架构给出,无需手动转换每一列。

标签: csv apache-spark apache-spark-sql


【解决方案1】:

如果读取数据时需要转换,可以使用这样的代码:

val resultDF = mySchema.fields.foldLeft(dfTwoColTypeString)((df, c) => df.withColumn(c.name, col(c.name).cast(c.dataType)))
resultDF.printSchema()

输出:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

为了检查值匹配类型,可以使用这样的代码:

  val dfTwoColTypeString = dfInput.map(
  row =>
    row.getString(0).split("::"))
  .map {
        case Array(id, name) =>
          if (ConvertUtils.canBeCasted((id, name), mySchema))
            (id, name, null)
          else (null, null, id + "::" + name)}
  .toDF("id", "name", "malformed")

可以在自定义类(此处为 ConvertUtils)中创建两个新的静态函数:

def canBeCasted(values: Product, mySchema: StructType): Boolean = {
    mySchema.fields.zipWithIndex.forall(v => canBeCasted(values.productElement(v._2).asInstanceOf[String], v._1.dataType))
  }

import scala.util.control.Exception.allCatch

def canBeCasted(value: String, dtype: DataType): Boolean = dtype match {
    case StringType => true
    case IntegerType => (allCatch opt value.toInt).isDefined
    // TODO add other types here
    case _ => false
  }

输出错误的“cc::Bob”值:

+----+-----+---------+
|id  |name |malformed|
+----+-----+---------+
|1   |Alice|null     |
|null|null |cc::Bob  |
+----+-----+---------+

【讨论】:

  • 这个解决方案非常好,接近我的需要。当字符串无法转换为目标模式类型时,我该如何处理这种情况。例如。如果 id 列包含无法转换为整数的“badid”。扩展架构以包含“corrupt_record”列是没有问题的,如果无法转换值,则应将整行放入损坏的列中。 IE。就像 PERMISSIVE 模式下的 CSV 解析器一样,其中有一列用于损坏记录。
  • "dfTwoColTypeString" 转换前可以过滤,猜测,需要额外的过滤脚本。
  • 或在“dfInput.map”期间可以根据schema检查值,如果值不正确,可以将所有输入字符串放在附加列中以获取错误记录。此类行的所有常规列都可以设置为空。
  • 听起来不错......这将是完美的解决方案......因为我是火花初学者......你能提供一个代码示例如何做到这一点吗?
【解决方案2】:

如果需要读取 CSV,并且架构已知,则可以在读取期间分配:

spark.read.schema(mySchema).csv("filename.csv")

【讨论】:

  • 这正是问题所在:它不是逗号分隔而是双冒号分隔......所以我需要自己拆分输入并且不能再使用csv阅读器。所以这个答案没有帮助。
  • 可以更改阅读的分隔符,更多内容在这里:github.com/databricks/spark-csv
  • 当您有多个不同的分隔符和正则表达式来将一行解析为分隔值时?这是 csv 阅读器无法做到的。我需要一个可以接收 Dataset[List[String]] 的 CSV 阅读器 ...即,这些值已经以列表的形式分隔,现在我只想像 csv 阅读器那样将值转换为目标模式下一步。这就是我正在寻找的功能。
  • 据我所知,csv阅读器只接受一个字符作为分隔符,而不是像'::'这样的字符串......
【解决方案3】:
val cols = Array(col("id").cast(IntegerType),col("name"))
dfTwoColTypeString.select(cols:_*).printSchema

根 |-- id: 整数(可为空=真) |-- 名称:字符串(可为空=真)

//另一种方法

import org.apache.spark.sql.types.{StringType,IntegerType,StructType,StructField}
val mySchema = StructType(Array(StructField("id", IntegerType, true),StructField("name", StringType, true)))
val df = spark.createDataFrame(dfTwoColTypeString.rdd,mySchema)
df.printSchema

根 |-- id: 整数(可为空=真) |-- 名称:字符串(可为空=真)

【讨论】:

  • 这不是我的意思。您必须手动将 id 从字符串转换为整数。但我想要的是,这个演员表是由给定的目标模式生成的。例如。当您阅读 CSV 时,所有列当然首先作为字符串读取,然后由为 csv 文件提供的模式自动转换。即我不能编写任何代码来转换列。
  • 第二种方法是错误的。当您从 RDD 创建 DataFrame 时,spark 假定给定模式适合给定 RDD,但不会强制转换或检查是否所有行都对模式有效。当您执行 df.show(false) 时,您可以看到您的解决方案是错误的。只有现在所有行都得到处理,您将看到一条错误消息,指出列 id 不是架构建议的整数类型
  • @Hiro.Protagonist 我将再次交叉检查第二种方法。第一个呢。是否符合您的要求
  • @Hiro.Protagonist 这是第一种方法,我们将字符串显式转换为 int
  • @Chandan.Ray 第一种方法是手动输入。它不使用目标模式。我不想手动编码转换,因为目标模式清楚地给出了所需的转换。例如。 csv 转换器只需要目标模式来进行转换。我不需要手动转换它。
【解决方案4】:

考虑到dfTwoColTypeString 是一个数据框,您还可以将其架构类型转换如下。

dfTwoColTypeString.withColumn("id", col("id").cast("Int"))

【讨论】:

  • 我要求的任务不同。我不想手动显式投射。我想要一种方法,该方法能够使用给定的目标模式转换列。这就是 csv 阅读器在给定模式时所做的事情。
猜你喜欢
  • 2020-02-16
  • 2017-06-04
  • 2016-06-29
  • 2014-06-23
  • 1970-01-01
  • 2019-07-22
  • 1970-01-01
  • 2023-03-13
  • 2023-03-27
相关资源
最近更新 更多