【问题标题】:Handling schema mismatches in Spark在 Spark 中处理模式不匹配
【发布时间】:2020-03-18 16:09:33
【问题描述】:

我正在使用 Scala 中的 Spark 读取 csv 文件。 架构是预定义的,我正在使用它来阅读。 这是示例代码:

// create the schema
val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

// Initialize Spark session
val spark: SparkSession = SparkSession.builder
    .appName("Parquet Converter")
    .getOrCreate

// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)

根据我在使用架构使用 Spark 阅读 cav 时所读到的内容,有 3 个选项:

  1. 将模式设置为DROPMALFORMED -->这将删除与架构不匹配的行
  2. 将模式设置为PERMISSIVE -->这会将整行设置为空值
  3. 将模式设置为FAILFAST -->这将在发现不匹配时引发异常

组合选项的最佳方式是什么?我想要的行为是获取架构中的不匹配,将它们打印为错误并忽略数据框中的行。 基本上,我想要 FAILFAST 和 DROPMALFORMED 的组合。

提前致谢

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    这就是我最终所做的:
    我在架构中添加了“_corrupt_record”列,例如:

    val schema= StructType(Array(
        StructField("col1", IntegerType,true),    
        StructField("col2", StringType,false),
        StructField("col3", StringType,true),
        StructField("_corrupt_record", StringType, true)))
    

    然后我使用 PERMISSIVE 模式读取 CSV(它是 Spark 默认):

    val dataFrame: DataFrame = spark.read.format("csv")
                                    .schema(schema)
                                    .option("header", false)
                                    .option("mode", "PERMISSIVE")
                                    .load(inputCsvPath)
    

    现在我的数据框包含一个额外的列,其中包含架构不匹配的行。 我过滤了数据不匹配的行并打印出来:

    val badRows = dataFrame.filter("_corrupt_record is not null")
    badRows.cache()
    badRows.show()
    

    【讨论】:

      【解决方案2】:

      只需使用DROPMALFORMED 并关注日志即可。如果存在格式错误的记录,则会将其转储到日志中,最高可达maxMalformedLogPerPartition 选项设置的限制。

      spark.read.format("csv")
        .schema(schema)
        .option("header", false)
        .option("mode", "DROPMALFORMED")
        .option("maxMalformedLogPerPartition", 128)
        .load(inputCsvPath)
      

      【讨论】:

      • 这个格式错误的记录在哪里转储并存储在哪个位置?我们还需要指定位置吗?
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-26
      • 2016-03-06
      • 1970-01-01
      • 2020-12-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多