【问题标题】:Scala Spark row-level error handlingScala Spark 行级错误处理
【发布时间】:2015-08-08 00:23:53
【问题描述】:

我在弄清楚如何使用 Scala Spark 程序进行一些行级错误处理时遇到了一些麻烦。在下面的代码中,我正在读取 CSV 文本文件,对其进行解析,然后使用 mapSchema 方法创建一个 Row(未显示;基本上,它采用 CSV 生成的字符串数组并使用模式来转换字符串转换为整数、双精度、日期等)。当数据全部格式化时效果很好。但是,如果我有一个错误的行——例如,一个字段比预期的少——我想执行一些错误处理。

val rddFull = sqlContext.sparkContext.textFile(csvPath).map {
  case(txt) =>
    try {
      val reader = new CSVReader(new StringReader(txt), delimiter, quote, escape, headerLines)
      val parsedRow = reader.readNext()
      Row(mapSchema(parsedRow, schema) : _*)
    } catch {
      case err: Throwable =>
        println("a record had an error: "+ txt)
        throw new RuntimeException("SomeError")
    }

问题是 try/catch 表达式似乎不起作用。当我给它坏行时,我永远不会得到“SomeError”RuntimeException。相反,我得到了与不使用 try/catch 时相同的错误。

关于这里可能出了什么问题有什么想法吗?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您需要在正确的位置查找日志。首先:catch 确实有效。以下是 spark-shell 的示例:

    val d = sc.parallelize(0 until 10)
    val e = d.map{ n =>
      try {
       if (n % 3==0) throw new IllegalArgumentException("That was a bad call")
       println(n)
     } catch {
        case e:  IllegalArgumentException =>  throw new UnsupportedOperationException("converted from Arg to Op except")
     }
    }
    e.collect
    

    结果如下:注意异常被正确捕获和转换:

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in
    stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in   
    stage 0.0 (TID 5, localhost): 
    java.lang.UnsupportedOperationException: converted from Arg to Op except
        at $anonfun$1.apply$mcVI$sp(<console>:29)
        at $anonfun$1.apply(<console>:24)
        at $anonfun$1.apply(<console>:24)
    

    尝试查看一名或多名工作人员的stderr 日志。

    【讨论】:

    • 啊,谢谢你的简单例子!它帮助我发现错误没有像我想象的那样发生在 map 步骤中,而是在后续步骤中将 RDD 注册为 dataFrame。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-28
    • 2017-12-20
    • 2015-09-13
    • 2013-02-28
    • 2017-06-12
    • 2017-01-24
    相关资源
    最近更新 更多