【发布时间】:2018-12-17 00:23:31
【问题描述】:
我正在从 S3 读取大量 CSV(所有内容都在一个键前缀下)并创建一个强类型 Dataset。
val events: DataFrame = cdcFs.getStream()
events
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]
其中TradeRecord 是一个案例类,通常可以通过 SparkSession 隐式反序列化。但是,对于某个批次,记录无法反序列化。这是错误(省略了堆栈跟踪)
Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "deal")
- root class: "com.company.trades.TradeRecord"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
deal 是 TradeRecord 的一个字段,在源数据(S3 对象)中永远不应为空,因此它不是 Option。
不幸的是,错误消息没有给我任何关于 CSV 数据是什么样子的线索,甚至没有给我任何关于它来自哪个 CSV 文件的线索。该批次包含数百个文件,因此我需要一种方法将其缩小到最多几个文件来调查问题。
【问题讨论】:
-
不同的问题。我理解错误的含义。我需要查看传入的数据是什么样的(是否缺少一列?csv 中是否有额外的逗号?),这意味着我需要知道哪个文件包含损坏的记录。
-
如果您想缩小范围,我建议您跳过
as并使用input_file_name来查明确切的来源。 -
查找这个名为 columnNameOfCorruptRecord 的选项。它将使用此列标记记录损坏记录。您需要在案例类或模式中添加一个字段,并将其设置为选项的值。我还有另一个想法,使用名为 from_file 的 spark-sql 函数,稍后我将尝试解释。
标签: scala apache-spark apache-spark-sql apache-spark-dataset