【问题标题】:joda DateTime format cause null pointer error in spark RDD functionsjoda DateTime 格式导致 Spark RDD 函数中的空指针错误
【发布时间】:2015-07-05 14:08:40
【问题描述】:

异常信息如下

用户类抛出异常:作业因阶段失败而中止:阶段中的任务 0 1.0 失败 4 次,最近一次失败:在 1.0 阶段丢失任务 0.3 (TID 11, 10.215.155.82): java.lang.NullPointerException at org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) 在 org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) 在 org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:676) 在 org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:521) 在 org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:625) 在 org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:328) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) 在 scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:328) 在 scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:327) 在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113) 在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72) 在 org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28) 在 scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:327) 在 org.apache.spark.util.collection.CompactBuffer.groupBy(CompactBuffer.scala:28) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:41) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:40) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) 在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:27​​3) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) 在 org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) 在 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) 在 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 在 org.apache.spark.scheduler.Task.run(Task.scala:56) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:744)

我的代码如下:

import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }




object DateTimeNullReferenceReappear extends App {

  case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0) 

  val cfg = new Configuration
  val sparkConf = new SparkConf()
  sparkConf.setAppName("bourne_exception_reappear")
  val sc = new SparkContext(sparkConf)

val data = TDWSparkContext.tdwTable(   // this function just read data from an data warehouse
  sc,
  tdwuser = FaceConf.TDW_USER,
  tdwpasswd = FaceConf.TDW_PASSWORD,
  dbName = "my_db",
  tblName = "my_table",
  parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
  .map(row => {
    Record(uin = row(2),
      date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
      value = row(4).toDouble)
  }).map(x => (x.uin, (x.date, x.value)))
  .groupByKey
  .map(x => {
    x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum)   // throw exception here
  })

//      val data = TDWSparkContext.tdwTable(  // It works, as I don't user datetime toString in the groupBy 
//      sc,
//      tdwuser = FaceConf.TDW_USER,
//      tdwpasswd = FaceConf.TDW_PASSWORD,
//      dbName = "hy",
//      tblName = "t_dw_cf_oss_tblogin",
//      parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
//      .map(row => {
//        Record(uin = row(2),
//          date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
//          value = row(4).toDouble)
//      }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
//      .groupByKey
//      .map(x => {
//        x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
//      })

  data.take(10).map(println)

}

所以,似乎 groupBy 中的调用 toString 导致了异常,那么任何人都可以解释一下吗?

谢谢

【问题讨论】:

  • 嗯...NullPointerException 发生在您尝试对包含任何类型的null 值的变量调用任何函数时。所以...这意味着在您的x._2 中有一些元组,其中第一个成员(_._1)是null
  • 你能添加这个结果吗? TDWSparkContext.tdwTable( // this function just read data from an data warehouse sc, tdwuser = FaceConf.TDW_USER, tdwpasswd = FaceConf.TDW_PASSWORD, dbName = "my_db", tblName = "my_table", parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329")) .map(row => { Record(uin = row(2), date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)), value = row(4).toDouble) }).map(x => (x.uin, (x.date, x.value)))
  • 只需将此groupBy(_._1.toString("yyyyMMdd")) 替换为groupBy( d => { if ( d._1 != null ) { d._1.toString("yyyyMMdd") } else { "I am a placeholder" } })。您可以选择对占位符做任何您想做的事情。
  • @SarveshKumarSingh map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value))) 这个不会抛出异常,是不是RDD.map 捕获了空指针异常,而Array.groupBy 没有?
  • 嗯...是的。 Spark 在实际序列化和映射 RDD 之前,对映射函数执行大量清理以将其包装在“安全”闭包中。清理函数是 - def clean(func: AnyRef, checkSerializable: Boolean = true)ClosureCleaner.scala ( github.com/apache/spark/blob/master/core/src/main/scala/org/… ) 中定义的。我不太确定......但可能这也可以防止NullPointerExceptions 并为这种情况生成一个null 值。

标签: scala apache-spark


【解决方案1】:

您需要禁用 Kryo,使用 Kryo JodaTime Serializers,或者避免序列化 DateTime 对象,即传递 Longs。

【讨论】:

    【解决方案2】:

    我们对“问题”知之甚少。所以我们可以试试下面的 experimat,这会让我们更多地了解这个问题。

    替换下面的部分,

    map(x => {
      x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum)   // throw exception here
    })
    

    有了这个,

    map( x => {
      x._2.groupBy( t => {
        val dateStringTry = Try( t._2.toString( "yyyyMMdd" ) )
        dateStringTry match {
          case Success( dateString ) => Right( dateString )
          case Failure( e ) => {
            println( "=========== Null Tuple Description ==========" )
            println( "Problem Tuple :: [" + t + "]" )
            println( "Error Info :: [" + e.getMessage + "]" )
            // finally the stack trace, if needed
            // e.printStackTrace()
            prinln( "=============================================" )
            Left( e )
          }
        }
      } )
    } )
    

    让我们检查一下这个实验的运行结果。

    【讨论】:

      【解决方案3】:

      问题似乎是DateTime 在 Spark 中序列化时丢失了一些东西(我猜这种情况经常发生)。在我的情况下,Chronology 被搞砸了,导致了同样的异常。

      对我有用的一个非常实用的解决方法是在使用之前重新创建DateTime,例如:

      date.toMutableDateTime.toDateTime

      这似乎恢复了所有丢失的位,之后一切正常。

      Marius Soutier 发布的禁用 Kryo 的解决方案也对我有用。这是一种不那么老套的方法。

      【讨论】:

        【解决方案4】:

        这里的问题是 Joda 的 CachedDateTimeZone 序列化错误 - 它包含一个未序列化的瞬态字段,在反序列化对象中保留 null

        您可以创建并注册您自己的Serializer 以正确处理此对象:

        import com.esotericsoftware.kryo.Kryo;
        import com.esotericsoftware.kryo.Serializer;
        import com.esotericsoftware.kryo.io.Input;
        import com.esotericsoftware.kryo.io.Output;
        import org.joda.time.DateTimeZone;
        import org.joda.time.tz.CachedDateTimeZone;
        
        public class JodaCachedDateTimeZoneSerializer extends Serializer<CachedDateTimeZone> {
        
            public JodaCachedDateTimeZoneSerializer() {
                setImmutable(true);
            }
        
            @Override
            public CachedDateTimeZone read(final Kryo kryo, final Input input, final Class<CachedDateTimeZone> type) {
                // reconstruct from serialized ID:
                final String id = input.readString();
                return CachedDateTimeZone.forZone(DateTimeZone.forID(id));
            }
        
            @Override
            public void write(final Kryo kryo, final Output output, final CachedDateTimeZone cached) {
                // serialize ID only:
                output.writeString(cached.getID());
            }
        }
        

        然后,在你扩展KryoRegistrator的类中,添加:

        kryo.register(classOf[CachedDateTimeZone], new JodaCachedDateTimeZoneSerializer())
        

        这样您就不必禁用 Kryo 或避免使用 Joda。

        【讨论】:

          【解决方案5】:
          sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
          

          【讨论】:

            【解决方案6】:

            请参考这个——https://issues.apache.org/jira/browse/SPARK-4170

            基本上,你不应该为你的主类扩展scala.App。在某些情况下它可能无法正常工作。请改用显式 main() 方法。

            这是 Spark 1.6.1 代码中记录的警告(在 SparkSubmit 类中)

            // SPARK-4170
            if (classOf[scala.App].isAssignableFrom(mainClass)) {
              printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
            }
            

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 2015-07-13
              • 2021-05-25
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2016-06-13
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多