【问题标题】:Change dataframe schema from narrow one to full将数据框模式从窄模式更改为完整模式
【发布时间】:2018-12-27 17:50:20
【问题描述】:

假设我有以下数据集:

+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
|      0 |      1 | 0.15   |  2.132 |
|      1 |      2 | 0.72   |   0.15 |
|      2 |     12 | error  |        |
|      3 |     75 | error  |        |
+--------+--------+--------+--------+

如您所见,field3 可能包含doublestring 值。这里唯一可能的string 值是错误。如果errorfield4 根本不包含任何值(实际上,在 field3 之后有 15 个字段,为了便于阅读我省略了这些字段,并为他们应用同样的规则)

所以我正在尝试完成以下操作:

  1. 使用窄模式读取输入(仅包含前三个字段的描述)
  2. 过滤器错误
  3. 应用由所有字段组成的新架构

所以,阅读看起来像这样:

val er_schema = 
  StructType(
    Array(
      StructField("field1", IntegerType, true),
      StructField("field2", IntegerType, true),
      StructField("field3", StringType, true)))

val c_schema = 
  StructType(
    Array(
      StructField("field1", IntegerType, true),
      StructField("field2", IntegerType, true),
      // StringType only for now, DoubleType would be used instead
      StructField("field3", StringType, true),
      StructField("field4", StringType, true)))

val raw = sc.read.schema(er_schema).csv(PATH)
val correctOnly = filterErr(raw)
ss.createDataframe(
  correctOnly,
  c_schema))

这段代码出现异常:java.lang.ArrayIndexOutOfBoundsException: 3

据我了解,这是因为底层 RDD 仅包含 3 个第一个字段。

所以,问题来了:是否可以使用缩小(在减少字段数量的意思)模式,然后将数据帧转换为正常(包含所有字段)模式?

编辑 1: 源文件为 CSV 格式,如下所示:

0,1,0.15,2.132
1,2,0.72,0.15
2,12,error
3,75,error

我想到的可能解决方案是使用 RDD 并在过滤错误行后应用完整模式,但我想知道是否可以仅使用数据框来完成

编辑 2: 我想要的结果:

正确的一个:

+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
|      0 |      1 | 0.15   |  2.132 |
|      1 |      2 | 0.72   |   0.15 |
+--------+--------+--------+--------+

使用正确的数据类型(字段 field3field4 作为 DoubleType)

编辑 3: 这里的主要问题是 field3 列 - 它不仅可以包含 double 值,还可以包含 strings。我想删除带有string 值的行,只保留双精度值。我尝试使用两种不同的模式,但它不起作用。

【问题讨论】:

  • 只是从我得到的快速思考,如果您有两个数据框,一个用于正确的字段,一个带有错误,对于错误 df,您可以添加与正确 df 匹配的列并传递一个静态值,最后合并两个数据帧。我将这个想法放在这里是因为您只想从数据帧中执行此操作。
  • @roh 感谢您的回复。不幸的是,我的目标是只获取“正确”字段(在 3d 列中没有“错误”值)
  • 为什么不过滤?含义Dataframe.where(col("field3") =!= lit("error")) ?
  • @ArtavazdBalayan 感谢您的回复。不幸的是,这里的问题不在于过滤,而在于保留正确的数据类型。 Field3 可能不仅包含双精度,还包含字符串,我想以某种方式过滤这些行,以便结果只有双类型行
  • 1) 过滤字段3,然后您只填充需要的行(您可以为该行应用 UDF 以执行一些复合逻辑来检查值 - 尝试将其解析为双精度)2)使用 withColumn 创建新列并将其转换为所需的类型 3)删除原始列。不清楚的可以写个例子

标签: scala csv apache-spark apache-spark-sql


【解决方案1】:

您可以通过将mode 设置为DROPMALFORMED 来删除不遵循指定架构的行。读取数据时,使用您想要的数据框的架构:

val schema = StructType(Array(
  StructField("field1", IntegerType, true),
  StructField("field2", IntegerType, true),
  StructField("field3", DoubleType, true),
  StructField("field4", DoubleType, true)
))

然后读取csv文件:

val df = spark.read.
  .option("mode", "DROPMALFORMED")
  .schema(schema)
  .csv("/path/to/file")

这样,所有数据类型不正确或行数错误的行都会被丢弃。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-09-22
    • 1970-01-01
    • 2012-01-14
    • 2011-06-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多