【问题标题】:How to fix connection issues while sending heavy files to hive table?如何在将大量文件发送到 hive 表时解决连接问题?
【发布时间】:2019-10-17 04:12:03
【问题描述】:

我有想放在 Hive 表中的 hdfs 文件。该操作由 Java 应用程序中的 Spark 批处理执行。 执行任务的代码如下:

[...]

final Dataset<File> fileDs = rawDs.map(record -> {
        return FileService.map(record.getList(2));
      }, Encoders.bean(File.class));

final Dataset<Row> fileDsWithId = fileDs.withColumn("id", functions.lit(id));
fileDsWithId.repartition(fileDsWithId.col("id")); 

fileWithId.write().mode(SaveMode.Append)
  .format("orc")
  .partitionBy("id")
  .option("path", hdfs://..../mydatabase.db/mytable")
  .saveAsTable("mydatabase.mytable");

当我使用一个小文件(1 或 2 行数据)时,应用程序运行良好,作业在 30 秒内成功结束。该表是在 Hive 中创建的,我可以使用 Select * 查询显示数据。当表已经存在时,它也可以工作。数据只是简单地添加到现有数据中。 蜂巢中生成表的结构似乎很好。它与我的数据匹配。

但是当我尝试处理一个更大的文件(3.7Mo,大约 1000 行数据)时,作业在 15 分钟后失败。在 hdfs 中创建了对应的 orc 文件,但它是空的,Hive 不知道它。

日志文件显示如下几个错误:

2019-05-31 14:20:07,500 - [ERROR] [                           dispatcher-event-loop-3] pache.spark.scheduler.cluster.YarnClusterScheduler - [{}] - Lost executor 31 on XXXXXX: Container marked as failed: container_e71_1559121287708_0019_02_000032 on host: XXXXXXXXX. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143.
Killed by external signal
[...]
java.lang.RuntimeException: java.io.IOException: Connection reset by peer
        at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
        at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
        at org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105)
        at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79)
...
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
...
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more
[...]       
        2019-05-31 14:20:17,898 - [ERROR] [                                shuffle-client-4-1]    org.apache.spark.network.client.TransportClient - [{}] - Failed to send RPC 9035939448873337359 to XXXXXXXX: java.nio.channels.ClosedChannelExceptionsg
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source)
2019-05-31 14:20:17,899 - [ERROR] [          Executor task launch worker for task 244] apache.spark.network.client.TransportClientFactory - [{}] - Exception while bootstrapping client after 5999 mssg
java.lang.RuntimeException: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXX: java.nio.channels.ClosedChannelException
        at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
        at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
        at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
        at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:115)
     ...
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXXXXX: java.nio.channels.ClosedChannelException
        at org.apache.spark.network.client.TransportClient.lambda$sendRpc$2(TransportClient.java:237)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:852)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:738)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:733)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:725)
        at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:35)
...
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more
[...]       
2019-05-31 14:20:22,907 - [INFO ] [                             Block Fetch Retry-6-1] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Retrying fetch (2/3) for 1 outstanding blocks after 5000 mssg
2019-05-31 14:20:27,909 - [ERROR] [                             Block Fetch Retry-6-2] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Exception while beginning fetch of 1 outstanding blocks (after 2 retries)sg
java.io.IOException: Failed to connect to XXXXXXXXX
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
...
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        ... 2 more
[...]       
        2019-05-31 14:20:32,915 - [WARN ] [          Executor task launch worker for task 244]              org.apache.spark.storage.BlockManager - [{}] - Failed to fetch remote block broadcast_2_piece0 from BlockManagerId(1, XXXXXXX, 44787, None) (failed attempt 1)sg
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:105)
        at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:642)
...
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to connect to XXXXXXXXX
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
...
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        ... 2 more

我不明白那里发生了什么。我检查了内存问题,但似乎还不错。这些机器用于处理更大的文件(通常是几十 GB)。 为什么连接丢失/拒绝/重置? Spark 事先创建表方案是否有任何问题可以解释这一点?


UPDATED after Ram Ghadiyaram's answer :
我尝试将spark.network.timeout 设置为 6000s。环境中未配置其他超时设置。 结果似乎是一样的。作业在 10 分钟后失败,在日志文件中显示相同的错误:“对等方重置连接”、“发送 RPC 失败”等

spark.core.connection.ack.wait.timeoutspark.storage.blockManagerSlaveTimeoutMsspark.shuffle.io.connectionTimeoutspark.rpc.askTimeoutspark.rpc.lookupTimeout 设置为相同的值(6000 秒)似乎也不起作用。

我认为我的数据集太杂乱而无法正确处理。我将尝试更改数据模型,然后使用这些超时设置再次运行应用程序。


Updated 01/07/2019 :
我已经简化了数据模型。该模型很复杂,导致数据集中出现一些空结构,因为系统无法通过继承链接某些字段。我已经扁平化了结构,以便每个可能的类型都作为泛型类的实际属性存在,因此我删除了继承。

总结一下,类似这样:
文件.class
|-field1
|-field2
|-field3
|- 通用类
|-Class1
|-Class2
|-Class3

我没有创建一个带有一些女儿的抽象类,而是创建了一个带有其他类作为属性的通用类。 这很脏(我不建议这样做),但这样数据集更干净。

执行此更改后,我不再遇到超时问题。我猜以前的模型太乱了,Spark 无法高效编写。

我尝试过以 ORC 和 Avro 格式编写,两者都还可以。 在 Avro 中,我设法在一分钟内写了大约 300000 行,因此默认超时设置不再是问题。

【问题讨论】:

    标签: apache-spark hive hdfs bigdata hiveql


    【解决方案1】:

    问:为什么连接丢失/拒绝/重置?

    org.apache.spark.SparkException: Exception thrown in awaitResult:
            at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    

    似乎是大型或繁重工作负载的典型超时问题。

    我不知道您使用的是哪个版本的 spark。但它在这里失败了。 基本上它所做的是它会等待一段时间(默认超时)并且它会失败。见ThreadUtils

    /**
       * Preferred alternative to `Await.result()`.
       *
       * This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
       * that this thread's stack trace appears in logs.
       *
       * In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
       * `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
       * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
       * method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
       * In general, we should use this method because many places in Spark use [[ThreadLocal]] and it's
       * hard to debug when [[ThreadLocal]]s leak to other tasks.
       */
      @throws(classOf[SparkException])
      def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
        try {
          // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
          // See SPARK-13747.
          val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
          awaitable.result(atMost)(awaitPermission)
        } catch {
          case e: SparkFatalException =>
            throw e.throwable
          // TimeoutException is thrown in the current thread, so not need to warp the exception.
          case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
            throw new SparkException("Exception thrown in awaitResult: ", t)
        }
      }
    

    你必须增加超时见networking docs

    spark.network.timeout 120s 所有网络的默认超时 互动。此配置将用于代替 spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeoutspark.rpc.askTimeoutspark.rpc.lookupTimeout 如果未配置。


    总结:对于小型工作负载,超时对于大型工作负载就足够了 超时需要增加。

    【讨论】:

    • 您是否尝试过增加超时时间?请发布结果
    • 抱歉耽搁了,我有一段时间不在办公室。之前在环境中没有配置超时设置,所以我设置了 spark.network.timeout = 6000s 并重新运行应用程序。似乎并没有改变结果。大约 10 分钟后,作业失败,日志文件中出现相同类型的错误:“对等连接重置”、“发送 RPC 失败”等。
    • 好的,请尝试我提到的其他超时选项,例如 spark.rpc.askTimeout
    • 如果你没事请关注accept answer as owner
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-04-04
    • 2011-11-06
    • 1970-01-01
    • 1970-01-01
    • 2015-07-17
    • 2019-08-02
    • 1970-01-01
    相关资源
    最近更新 更多