【发布时间】:2017-04-27 19:02:17
【问题描述】:
在执行看似简单的 spark sql 过滤工作时出现异常:
someOtherDF
.filter(/*somecondition*/)
.select($"eventId")
.createOrReplaceTempView("myTempTable")
records
.filter(s"eventId NOT IN (SELECT eventId FROM myTempTable)")
知道如何解决这个问题吗?
注意:
- someOtherDF 在过滤后包含约 1M 到 5M 行,并且 eventId 是 guid。
- 记录包含 40M 到 50M 行。
错误:
Stacktrace:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:212)
... 84 more
【问题讨论】:
-
我们遇到了类似的“期货超时”问题 (SPARK-20784),但查询模式却截然不同。您使用的是什么类型的部署?纱线客户端?其他 ?涉及的数据集是否已缓存?
-
在这个特定场景中,数据集没有被缓存,因为它是单通道转换。这些测试是在纱线上运行的;如果我没记错的话,客户端或集群模式并不重要。
-
我终于找到了我的原因,这是由于驱动程序上的OOM(大部分时间是无声的,只是在日志流中偶然看到了异常)
标签: apache-spark-sql