【问题标题】:Spark Job fails at saveAsHadoopDataset stage due to Lost Executor due to some unknown reason由于某些未知原因,Spark Job 在 saveAsHadoopDataset 阶段失败,原因是 Lost Executor
【发布时间】:2016-05-26 11:56:25
【问题描述】:

我有一个在 yarn 上运行的 spark 作业,它使用大约 150gb 的数据集并执行多个 shuffle 操作,最后将数据存储到 hbase 中。它一直在saveAsHadoopDataset 失败,在报告高 GC 活动后,基本上多个 Executor 在此阶段失败。但是,执行程序日志、驱动程序日志或节点管理器日志均未指示任何 OutOfMemory 错误或 GC Overhead Exceeded 错误或超出内存限制错误。我在 spark ui 中也没有看到 Executor 失败的任何其他原因。

val hConf = HBaseConfiguration.create
hConf.setInt("hbase.client.scanner.caching", 10000)
hConf.setBoolean("hbase.cluster.distributed", true) 
new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)

驱动程序日志:

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)

执行者日志:

16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0 (TID 15318). 2099 bytes result sent to driver
16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 15333
16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0 (TID 15333)
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125 non-empty blocks out of 3007 blocks
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14 remote fetches in 10 ms
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to maprnode5 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from maprnode5 is closed
16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from maprnode5 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
        at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:744)
16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 6 outstanding blocks after 5000 ms
16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive connection to maprnode5, creating a new one.
16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in connection from maprnode5
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:744)
16/02/24 11:12:16 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from maprnode5 is closed
16/02/24 11:12:16 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches

【问题讨论】:

  • 您只包含了一个执行程序日志,这似乎不是来自失败执行程序的日志。执行器日志显示“来自 maprnode5 的连接已关闭”,那么 maprnode5 上的执行器日志是什么样的?
  • "Executor Logs" 是通过运行yarn logs -applicationId appid 获得的,所以我相信这是执行程序日志。我可以在发生日志聚合的文件系统上看到相同的日志。让我知道我是否应该在其他地方寻找它。
  • 嗨 Nir,我正在使用相同的 api 在 Hbase 中插入数据

标签: java scala hadoop apache-spark


【解决方案1】:

事实证明,尽管 spark UI 说它在 saveAsHadoopDataSet 失败,但实际上在 saveAsHadoopDataSet 是最后一步的阶段的第一步失败了。更详细地说,Spark 是根据窄变换序列或宽变换和窄变换组合的序列来定义阶段边界的。在我的特殊情况下,序列是groupByKey(wide dep) -> mapValues(narrow dep) -> map(narrow dep),最后一个地图实际上是在做saveAsHadoopDataSet。实际上,执行程序在 shuffle 阶段 groupByKey 报告了高 GC 活动和内存使用情况。我将应用程序逻辑更改为使用reduceByKey 而不是groupByKey。现在它超级慢,但至少没有失败。

【讨论】:

    猜你喜欢
    • 2014-09-10
    • 1970-01-01
    • 1970-01-01
    • 2014-11-23
    • 1970-01-01
    • 1970-01-01
    • 2014-01-02
    • 1970-01-01
    • 2013-12-22
    相关资源
    最近更新 更多