【问题标题】:Spark SQL exception handlingSpark SQL 异常处理
【发布时间】:2020-03-08 05:46:53
【问题描述】:

为了处理 RDD 操作上的 Spark 异常,我可以使用以下方法和额外的 exceptions 列:

val df: DataFrame = ...

val rddWithExcep = df.rdd.map { row: Row =>
  val memberIdStr = row.getAs[String]("member_id")
  val memberIdInt = Try(memberIdStr.toInt) match {
    case Success(integer) => List(integer, null)
    case Failure(ex) => List(null, ex.toString)
  }
  Row.fromSeq(row.toSeq.toList ++ memberIdInt)
}

val castWithExceptionSchema = StructType(df.schema.fields ++ Array(StructField("member_id_int", IntegerType, true)
  , StructField("exceptions", StringType, true)))

val castExcepDf = sparkSession.sqlContext.createDataFrame(rddWithExcep, castWithExceptionSchema)

castExcepDf.printSchema()
castExcepDf.show()

是否可以在 Spark SQL 上处理此类异常?例如,目前如果出现任何错误,Spark SQL 会简单地返回null 值并隐藏错误。

例如除以 0 将导致 null 值而不是错误.. 在我看来 - 这是 Spark SQL 中的一个非常严重的问题,因为它可以简单地产生您甚至不会注意到的意外/错误数据。

是否可以覆盖此行为并让 Spark 失败并出现适当的详细异常?

【问题讨论】:

    标签: scala dataframe apache-spark error-handling apache-spark-sql


    【解决方案1】:

    从 Spark 3.0 开始,您可以在 spark 会话中将属性 spark.sql.ansi.enabled 设置为 true 以引发异常并停止 spark 执行,而不是在列中保存 null 值。但是,失败将是全局的,而不是逐行的。有关详细信息,请参阅 Spark 文档网站上的 ANSI Compliance page

    所以下面的代码sn -p:

    sparkSession.conf.set("spark.sql.ansi.enabled", "true")
    
    Seq(1, 2, 3).toDF("MyCol")
      .withColumn("divideByZero", col("MyCol") / 0)
      .show(false)
    

    抛出以下异常:

    Exception in thread "main" org.apache.spark.SparkArithmeticException: divide by zero
        at org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:140)
        at org.apache.spark.sql.catalyst.expressions.DivModLike.eval(arithmetic.scala:437)
        at org.apache.spark.sql.catalyst.expressions.DivModLike.eval$(arithmetic.scala:425)
        at org.apache.spark.sql.catalyst.expressions.Divide.eval(arithmetic.scala:534)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
        at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.$anonfun$applyOrElse$80(Optimizer.scala:1840)
        at scala.collection.immutable.List.map(List.scala:293)
        at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1840)
        at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1835)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
        at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
        at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
        at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:447)
        at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1835)
        at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1833)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
        at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
        at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
        at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
        ...
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-05-26
      • 2017-12-20
      • 1970-01-01
      • 2016-02-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多