【问题标题】:Getting NullPointerException using spark-csv with DataFrames使用带有 DataFrames 的 spark-csv 获取 NullPointerException
【发布时间】:2016-03-27 02:46:43
【问题描述】:

spark-csv README 中运行有类似这样的示例Java 代码 import org.apache.spark.sql.SQLContext; 导入 org.apache.spark.sql.types.*;

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
    new StructField("year", IntegerType, true), 
    new StructField("make", StringType, true),
    new StructField("model", StringType, true),
    new StructField("comment", StringType, true),
    new StructField("blank", StringType, true));

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

它没有开箱即用地编译,因此经过一番争论,我通过将不正确的 FooType 语法更改为 DataTypes.FooType 并将 StructFields 作为 new StructField[] 传递来编译它;编译器在StructField 的构造函数中为metadata 请求了第四个参数,但我很难找到关于它的含义的文档(javadocs 描述了它的用例,但并没有真正决定如何决定在 StructField 构造期间传递什么)。使用以下代码,它现在会一直运行,直到出现任何副作用方法,例如 collect()

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
    new StructField("case_id", DataTypes.StringType, false, null), 
    new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(featuresSchema)
    .load(args[0]);
for (Row r : features.collect()) {
  System.out.println("Row: " + r);
}

我得到以下异常:

Exception in thread "main" java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
  at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
  at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
  at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
  at scala.collection.SetLike$class.map(SetLike.scala:93)
  at scala.collection.AbstractSet.map(Set.scala:47)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
  at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
  at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...

知道有什么问题吗?

【问题讨论】:

    标签: apache-spark spark-dataframe spark-csv


    【解决方案1】:

    即使我遇到了同样的异常。 我通过提供元数据来修复它。

    所以修改代码

    StructType customSchema = new StructType(
    new StructField("year", IntegerType, true,Metadata.empty()), 
    new StructField("make", StringType, true,Metadata.empty()),
    new StructField("model", StringType, true,Metadata.empty()),
    new StructField("comment", StringType, true,Metadata.empty()),
    new StructField("blank", StringType, true,Metadata.empty()));
    

    这将解决问题

    【讨论】:

      【解决方案2】:

      自述文件似乎非常过时,需要对 Java 示例进行一些重要的编辑。我追踪了实际的JIRA which added the metadata field,它指出了 Scala 案例中默认 Map.empty 值的使用,并且编写文档的人一定只是将 Scala 直接翻译成 Java,尽管输入缺少相同的默认值参数。

      1.5 branch of SparkSQL's code 中,我们可以看到它在没有检查的情况下引用了metadata.hashCode(),这就是导致NullPointerException 的原因。 Metadata.empty() 方法的存在以及关于在 Scala 中使用空映射作为默认值的讨论似乎暗示正确的实现是继续前进并传递 Metadata.empty() 如果你不关心它。修改后的例子应该是:

      SQLContext sqlContext = new SQLContext(sc);
      StructType customSchema = new StructType(new StructField[] {
          new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), 
          new StructField("make", DataTypes.StringType, true, Metadata.empty()),
          new StructField("model", DataTypes.StringType, true, Metadata.empty()),
          new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
          new StructField("blank", DataTypes.StringType, true, Metadata.empty())
      });
      
      DataFrame df = sqlContext.read()
          .format("com.databricks.spark.csv")
          .schema(customSchema)
          .option("header", "true")
          .load("cars.csv");
      
      df.select("year", "model").write()
          .format("com.databricks.spark.csv")
          .option("header", "true")
          .save("newcars.csv");
      

      【讨论】:

      • 发送pull request to spark-csv 修复文档
      • 我认为您需要将答案从 .option("inferSchema", "true") 更改为 .option("customSchema", "true")
      • 啊,你说得对,很好,当我尝试对上游示例进行最小更改时,我只关注指定 customSchema 定义的行。发送另一个 pull request 在文档中修复此问题并编辑答案以匹配。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-10-14
      • 2018-02-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多