【问题标题】:Rdd verify Date format and remove row if date format is incorrect Scala SparkRdd 验证日期格式并在日期格式不正确时删除行 Scala Spark
【发布时间】:2018-01-29 06:48:57
【问题描述】:

示例 rddDate: [2016-08-01,"pm",5,"ri"] 此 RDD 中有一些日期格式不正确的行,因此我无法计算 RDD 中的行数。这会引发 IndexOutOfBound 异常。 使用的日期格式为 java.sql.Date

RDD 中每一行的预期日期格式为:“yyyy-mm-dd”

2016-08-01

为了验证RDD中的日期格式,实现了以下代码,

val rddVerified: RDD[(Date, String, Long, String)] = rddDate.map{
                a => {
                    val fmt = DateTimeFormat forPattern "yyyy-mm-dd"
                    val input = a._1.toString
                    try {
                        val output = fmt parseDateTime input
                    } catch {
                        case e: Exception => {
                            val v1 = new java.util.Date("2016-08-01")
                            val v2 = new Date(a1.getTime)
                            val ed:(Date,String, Int, String) = (v2, "p1",2,"r1")
                            Some(ed) // This gives compile time error
                        }
                    } finally {
                        Some(a._1, a._2,a._3,a._4)
                    }

                }
            }

我无法处理 catch 部分中的异常。我想从 RDD 中删除该行或更正该行中的日期格式。 我想以这种格式返回 RDD:

RDD[(Date, String, Long, String)]

谢谢。

更新

统计数据框时的异常:

COUNT : : 
[error] o.a.s.e.Executor - Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IndexOutOfBoundsException: 1
    at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
    at scala.collection.immutable.List.apply(List.scala:84)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
[warn] o.a.s.s.TaskSetManager - Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.IndexOutOfBoundsException: 1
    at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
    at scala.collection.immutable.List.apply(List.scala:84)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

[error] o.a.s.s.TaskSetManager - Task 0 in stage 7.0 failed 1 times; aborting job
[warn] o.a.s.s.BlockManager - Putting block rdd_1_1 failed due to an exception
[warn] o.a.s.s.BlockManager - Block rdd_1_1 could not be removed as it was not found on disk or in memory
[warn] o.a.s.s.BlockManager - Putting block rdd_1_2 failed due to an exception
[warn] o.a.s.s.BlockManager - Block rdd_1_2 could not be removed as it was not found on disk or in memory
[warn] o.a.s.s.TaskSetManager - Lost task 1.0 in stage 7.0 (TID 8, localhost, executor driver): TaskKilled (unknown reason)
[warn] o.a.s.s.TaskSetManager - Lost task 2.0 in stage 7.0 (TID 9, localhost, executor driver): TaskKilled (unknown reason)
[error] application - 

 stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.IndexOutOfBoundsException: 1
    at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
    at scala.collection.immutable.List.apply(List.scala:84)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:]]
    at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:293)
    at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
    at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
    at play.api.DefaultGlobal$.onError(GlobalSettings.scala:188)
    at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
    at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
    at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
    at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:346)
    at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:345)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.IndexOutOfBoundsException: 1
    at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
    at scala.collection.immutable.List.apply(List.scala:84)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
Caused by: java.lang.IndexOutOfBoundsException: 1
    at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
    at scala.collection.immutable.List.apply(List.scala:84)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at controllers.Spark$$anonfun$5.apply(Spark.scala:78)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

【问题讨论】:

    标签: scala date apache-spark rdd


    【解决方案1】:

    我建议使用Dataset。它不仅更快更简单,而且对未来友好

    import org.apache.spark.sql.functions.to_date
    val spark: SparkSession = ???
    import spark.implicits._
    
    rddDate.toDF.withColumn("_1", to_date($"_1"))
      .na.drop(Seq("_1))
      .as[(java.sql.Date, String, Long, String)]
    

    编辑

    但问题出在您的代码中。

    原因:java.lang.IndexOutOfBoundsException: 1

    建议你犯一些错误,可能是在解析逻辑上。你必须后退到你调用apply的地方添加异常处理。

    【讨论】:

    • 谢谢!!这是一种更容易从 RDD 转换为 Dataframe 的技术。但它不检查日期的格式。因此,当我申请 df.count() 时,它会引发异常。为了克服这个问题,我想验证日期格式。
    • 确实如此。 sql.functions 通常是安全的。如果格式不正确,您将获得 SQL NULL,它在 na.drop 部分中被删除。如果你得到一个例外,它不在这里。你能发布回溯吗?
    • 更新了一个问题。添加了错误堆栈跟踪。 df.printSchema() df.show(10) 有效,但 df.count() 抛出异常
    • 哦,java.lang.IndexOutOfBoundsException 不会在这里发生 :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-04
    • 2020-05-31
    • 1970-01-01
    相关资源
    最近更新 更多