【问题标题】:Pyspark read csv with schema, header check, and store corrupt recordsPyspark 读取带有模式的 csv、标头检查并存储损坏的记录
【发布时间】:2019-07-29 19:51:18
【问题描述】:

我正在尝试使用具有以下条件的 pyspark csv 阅读器:

  • 根据架构中的数据类型读取 csv
  • 检查标题和架构中的列名是否匹配
  • 将损坏的记录存储在新字段中

这是我尝试过的。

file: ab.csv
------
a,b
1,2
3,four
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
                    columnNameOfCorruptRecord='broken')
print(df.show())

输出:

+----+----+
|   a|   b|
+----+----+
|   1|   2|
|null|null|
+----+----+

此命令不存储损坏的记录。如果我将broken 添加到 架构和删除标头验证命令 使用警告。

DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True, 
                    columnNameOfCorruptRecord='broken')
print(df.show())

输出:

WARN  CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
|   a|   b|broken|
+----+----+------+
|   1|   2|  null|
|null|null|3,four|
+----+----+------+

这是预期的行为还是存在破坏第一个示例的错误? 有没有更好的方法来做到这一点?

还有一件事。我想处理损坏记录中格式正确的字段 得到这样的数据框。

+--+----+------+
| a|   b|broken|
+--+----+------+
| 1|   2|  null|
| 3|null|3,four|
+--+----+------+

我应该做一个额外的步骤阅读来得到那个,还是有一些 我错过了更宽容的选择。

【问题讨论】:

    标签: python csv apache-spark pyspark


    【解决方案1】:

    这是正确的默认行为。 如果您正在推断架构,它会在输出架构中隐式添加 columnNameOfCorruptRecord 字段,否则您必须在用户定义的架构中提供名为 columnNameOfCorruptRecord 的字符串类型字段,或者更改列名(如损坏)并将相同的名称添加到架构中.

    没有选项可以像您提到的那样部分处理数据,为此您需要编写自己的自定义解析器,在 spark 中扩展 CSVFileFormat。 有关所有 csvoptions 的列表,请检查 org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

    【讨论】:

    • 如果您要推断架构,它会在输出架构中隐式添加 columnNameOfCorruptRecord 字段。 我刚刚对此进行了测试。它没有添加列。它只是删除了记录并将 None 放在字段中(就像我在第一个示例中所做的那样)。我通过制作一个主要包含整数的更长的 ab.csv 文件并降低用于推断模式的采样率来对其进行测试。 spark.read.csv('ab.csv', header=True, inferSchema=True, enforceSchema=False, columnNameOfCorruptRecord='broken', samplingRatio=0.1)
    【解决方案2】:

    就像@deo 所说,当使用columnNameOfCorruptRecord 时,Spark 会在解析过程中删除它之前隐式创建列。为了保留该列,您需要将其显式添加到您的架构中。请注意,此行为还取决于您在阅读时指定的 mode

    请参阅此 sn-p 以获取 Spark documentation 中的 mode 参数:

    PERMISSIVE :当遇到损坏的记录时,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将其他字段设置为 null。为了保留损坏的记录,用户可以在用户定义的模式中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。如果模式没有该字段,它会在解析过程中删除损坏的记录。令牌少于/多于模式的记录不是 CSV 的损坏记录。当它遇到标记少于模式长度的记录时,将 null 设置为额外字段。当记录的标记多于模式长度时,它会丢弃额外的标记。

    【讨论】:

      【解决方案3】:

      我的团队正在努力解决同样的问题,因此我们检查了文件,格式错误是因为数据中有一些带有 \n 的文本。但是,它应该在 \ " 中。所以,当我们使用Atom 检查文件时,我们注意到了问题,然后解决方案就是强制阅读器处理多行。

      df = spark.read.options(header=True,  sep='|').csv("my_csv_file", multiLine=True)
      

      【讨论】:

        猜你喜欢
        • 2021-05-09
        • 2020-02-26
        • 2019-10-11
        • 2014-07-06
        • 1970-01-01
        • 2020-11-17
        • 2019-09-26
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多