【发布时间】:2019-07-29 19:51:18
【问题描述】:
我正在尝试使用具有以下条件的 pyspark csv 阅读器:
- 根据架构中的数据类型读取 csv
- 检查标题和架构中的列名是否匹配
- 将损坏的记录存储在新字段中
这是我尝试过的。
file: ab.csv
------
a,b
1,2
3,four
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
columnNameOfCorruptRecord='broken')
print(df.show())
输出:
+----+----+
| a| b|
+----+----+
| 1| 2|
|null|null|
+----+----+
此命令不存储损坏的记录。如果我将broken 添加到
架构和删除标头验证命令
使用警告。
DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True,
columnNameOfCorruptRecord='broken')
print(df.show())
输出:
WARN CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
| a| b|broken|
+----+----+------+
| 1| 2| null|
|null|null|3,four|
+----+----+------+
这是预期的行为还是存在破坏第一个示例的错误? 有没有更好的方法来做到这一点?
还有一件事。我想处理损坏记录中格式正确的字段 得到这样的数据框。
+--+----+------+
| a| b|broken|
+--+----+------+
| 1| 2| null|
| 3|null|3,four|
+--+----+------+
我应该做一个额外的步骤阅读来得到那个,还是有一些 我错过了更宽容的选择。
【问题讨论】:
标签: python csv apache-spark pyspark