【发布时间】:2015-07-05 14:08:40
【问题描述】:
异常信息如下
用户类抛出异常:作业因阶段失败而中止:阶段中的任务 0 1.0 失败 4 次,最近一次失败:在 1.0 阶段丢失任务 0.3 (TID 11, 10.215.155.82): java.lang.NullPointerException at org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) 在 org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) 在 org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:676) 在 org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:521) 在 org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:625) 在 org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:328) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) 在 scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:328) 在 scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:327) 在 scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113) 在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 在 org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28) 在 scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:327) 在 org.apache.spark.util.collection.CompactBuffer.groupBy(CompactBuffer.scala:28) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:41) 在 com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:40) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 在 scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 在 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 在 scala.collection.AbstractIterator.to(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 在 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 在 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 在 scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 在 org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) 在 org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) 在 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) 在 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 在 org.apache.spark.scheduler.Task.run(Task.scala:56) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:744)
我的代码如下:
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }
object DateTimeNullReferenceReappear extends App {
case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0)
val cfg = new Configuration
val sparkConf = new SparkConf()
sparkConf.setAppName("bourne_exception_reappear")
val sc = new SparkContext(sparkConf)
val data = TDWSparkContext.tdwTable( // this function just read data from an data warehouse
sc,
tdwuser = FaceConf.TDW_USER,
tdwpasswd = FaceConf.TDW_PASSWORD,
dbName = "my_db",
tblName = "my_table",
parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
.map(row => {
Record(uin = row(2),
date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
value = row(4).toDouble)
}).map(x => (x.uin, (x.date, x.value)))
.groupByKey
.map(x => {
x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here
})
// val data = TDWSparkContext.tdwTable( // It works, as I don't user datetime toString in the groupBy
// sc,
// tdwuser = FaceConf.TDW_USER,
// tdwpasswd = FaceConf.TDW_PASSWORD,
// dbName = "hy",
// tblName = "t_dw_cf_oss_tblogin",
// parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
// .map(row => {
// Record(uin = row(2),
// date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
// value = row(4).toDouble)
// }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
// .groupByKey
// .map(x => {
// x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
// })
data.take(10).map(println)
}
所以,似乎 groupBy 中的调用 toString 导致了异常,那么任何人都可以解释一下吗?
谢谢
【问题讨论】:
-
嗯...
NullPointerException发生在您尝试对包含任何类型的null值的变量调用任何函数时。所以...这意味着在您的x._2中有一些元组,其中第一个成员(_._1)是null。 -
你能添加这个结果吗?
TDWSparkContext.tdwTable( // this function just read data from an data warehouse sc, tdwuser = FaceConf.TDW_USER, tdwpasswd = FaceConf.TDW_PASSWORD, dbName = "my_db", tblName = "my_table", parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329")) .map(row => { Record(uin = row(2), date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)), value = row(4).toDouble) }).map(x => (x.uin, (x.date, x.value))) -
只需将此
groupBy(_._1.toString("yyyyMMdd"))替换为groupBy( d => { if ( d._1 != null ) { d._1.toString("yyyyMMdd") } else { "I am a placeholder" } })。您可以选择对占位符做任何您想做的事情。 -
@SarveshKumarSingh
map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))这个不会抛出异常,是不是RDD.map捕获了空指针异常,而Array.groupBy没有? -
嗯...是的。 Spark 在实际序列化和映射 RDD 之前,对映射函数执行大量清理以将其包装在“安全”闭包中。清理函数是 -
def clean(func: AnyRef, checkSerializable: Boolean = true)在ClosureCleaner.scala( github.com/apache/spark/blob/master/core/src/main/scala/org/… ) 中定义的。我不太确定......但可能这也可以防止NullPointerExceptions并为这种情况生成一个null值。
标签: scala apache-spark