【发布时间】:2018-12-27 17:50:20
【问题描述】:
假设我有以下数据集:
+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
| 0 | 1 | 0.15 | 2.132 |
| 1 | 2 | 0.72 | 0.15 |
| 2 | 12 | error | |
| 3 | 75 | error | |
+--------+--------+--------+--------+
如您所见,field3 可能包含double 或string 值。这里唯一可能的string 值是错误。如果error 值 field4 根本不包含任何值(实际上,在 field3 之后有 15 个字段,为了便于阅读我省略了这些字段,并为他们应用同样的规则)
所以我正在尝试完成以下操作:
- 使用窄模式读取输入(仅包含前三个字段的描述)
- 过滤器错误
- 应用由所有字段组成的新架构
所以,阅读看起来像这样:
val er_schema =
StructType(
Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
StructField("field3", StringType, true)))
val c_schema =
StructType(
Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
// StringType only for now, DoubleType would be used instead
StructField("field3", StringType, true),
StructField("field4", StringType, true)))
val raw = sc.read.schema(er_schema).csv(PATH)
val correctOnly = filterErr(raw)
ss.createDataframe(
correctOnly,
c_schema))
这段代码出现异常:java.lang.ArrayIndexOutOfBoundsException: 3
据我了解,这是因为底层 RDD 仅包含 3 个第一个字段。
所以,问题来了:是否可以使用缩小(在减少字段数量的意思)模式,然后将数据帧转换为正常(包含所有字段)模式?
编辑 1: 源文件为 CSV 格式,如下所示:
0,1,0.15,2.132
1,2,0.72,0.15
2,12,error
3,75,error
我想到的可能解决方案是使用 RDD 并在过滤错误行后应用完整模式,但我想知道是否可以仅使用数据框来完成
编辑 2: 我想要的结果:
正确的一个:
+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
| 0 | 1 | 0.15 | 2.132 |
| 1 | 2 | 0.72 | 0.15 |
+--------+--------+--------+--------+
使用正确的数据类型(字段 field3 和 field4 作为 DoubleType)
编辑 3: 这里的主要问题是 field3 列 - 它不仅可以包含 double 值,还可以包含 strings。我想删除带有string 值的行,只保留双精度值。我尝试使用两种不同的模式,但它不起作用。
【问题讨论】:
-
只是从我得到的快速思考,如果您有两个数据框,一个用于正确的字段,一个带有错误,对于错误 df,您可以添加与正确 df 匹配的列并传递一个静态值,最后合并两个数据帧。我将这个想法放在这里是因为您只想从数据帧中执行此操作。
-
@roh 感谢您的回复。不幸的是,我的目标是只获取“正确”字段(在 3d 列中没有“错误”值)
-
为什么不过滤?含义
Dataframe.where(col("field3") =!= lit("error"))? -
@ArtavazdBalayan 感谢您的回复。不幸的是,这里的问题不在于过滤,而在于保留正确的数据类型。 Field3 可能不仅包含双精度,还包含字符串,我想以某种方式过滤这些行,以便结果只有双类型行
-
1) 过滤字段3,然后您只填充需要的行(您可以为该行应用 UDF 以执行一些复合逻辑来检查值 - 尝试将其解析为双精度)2)使用 withColumn 创建新列并将其转换为所需的类型 3)删除原始列。不清楚的可以写个例子
标签: scala csv apache-spark apache-spark-sql