【问题标题】:AWS connection timeout when running Spark job on EMR在 EMR 上运行 Spark 作业时 AWS 连接超时
【发布时间】:2018-02-08 19:50:25
【问题描述】:

我正在尝试在 Amazon EMR 集群中提交一个简单的 spark 作业。我的集群有 5 个 M4.2xlarge 实例(1 个主实例,4 个从属实例),每个实例有 16 个 vCPU 和 32 GB 内存。

这是我的代码:

def main(args : Array[String]): Unit = {
 val sparkConfig = new SparkConf()
  .set("hive.exec.dynamic.partition", "true")
  .set("hive.exec.dynamic.partition.mode", "nonstrict")
  .set("hive.s3.max-client-retries", "50")
  .set("hive.s3.max-error-retries", "50")
  .set("hive.s3.max-connections", "100")
  .set("hive.s3.connect-timeout", "5m")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired", "true")
  .set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock")
  .set("spark.broadcast.compress", "true")

 val spark = SparkSession.builder()
    .appName("Spark Hive Example")
    .enableHiveSupport()
    .config(sparkConfig)
    .getOrCreate()

// Set Kryo for serializing
GraphXUtils.registerKryoClasses(sparkConfig)
val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000")
val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String]))

val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000")
val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String]))

val graph = Graph(vertexRDD, edgesRDD)

val connectedComponents = graph.connectedComponents().vertices

table1 和 table2 都是 Hive 上 S3 支持的外部表。当我运行这个程序时,我的工作失败并出现以下错误:

Job aborted due to stage failure: Task 827 in stage 0.0 failed 4 times, most recent failure: Lost task 827.3 in stage 0.0 (TID 921, xxx.internal, executor 3): com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1069)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
    at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy35.retrieveMetadata(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1194)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:773)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:355)
    at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316)
    at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:237)
    at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1204)
    at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:246)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:245)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:203)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
    at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.get(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1190)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
    ... 59 more

不确定它是来自 hadoop 还是从 hive 读取时,但我看到了类似的问题 here,因此我在 spark-submit 命令中添加了以下参数:

--conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60"

还是不行。有谁知道怎么回事?

【问题讨论】:

  • 你发现了吗?我也遇到了同样的问题,不知道是不是bug。
  • 不,仍在尝试解决。
  • 我遇到了同样的问题,您找到解决方案或解决方法了吗?谢谢。

标签: hadoop apache-spark amazon-s3 apache-spark-sql emr


【解决方案1】:

我不使用 EMRFS,但我知道其他 spark/hadoop S3 客户端都使用一个 http 连接池来处理他们对 S3 的请求,并且“等待池超时”消息总是意味着“池不大足够的”。看看您是否可以找出增加该池大小的 emrfs 选项。对于在您的进程中运行的每个工作线程,您至少需要一个,我会加倍希望 emrfs 可以像 s3a 客户端那样并行化块上传。

【讨论】:

  • 有趣的是,我第一次遇到这个问题以及 emrfs。想知道这是否是一个错误,因为我没有看到 emrfs max.connection 池的文档。
  • 您必须与 AWS EMR 团队合作。我只处理与 S3A“文件@JIRA 带有补丁”相关的无偿支持问题并为 Hortonworks HDP 付费“这很有趣,让我们看看我是否可以修复它”
【解决方案2】:

TLDR:需要设置的属性是emrfs-site.xml配置文件中的fs.s3.maxConnections。它默认为 50。我们得到的错误/堆栈跟踪与您完全相同,因此我将其设置为 5000,这解决了问题并且没有不良影响。

据我所知,根本原因是 InputFormat 实现没有正确使用 try...finally 以确保在引发异常时关闭连接。值得注意的是,旧版本的 Hive,包括针对 Spark 编译的 v1.2.1,会出现此错误。 Hive 2.x 对 OrcInputFormat 进行了大规模重构,尽管我尚未验证该错误是否已修复,也不知道是否/何时/如何针对 Hive 2.x 编译 Spark。

解决方法增加了连接池的大小,如另一个答案中所建议的那样,但属性及其位置都与“经典”S3 文件系统 (s3/s3a/s3n) 中的完全不同。当然,这在任何地方都没有记录,需要对 emrfs jar 进行反编译才能梳理出来......

【讨论】:

  • 有趣。该错误的 Hadoop JIRA ID 是什么?
  • HIVE-13216,它描述了缺少的 try...finally 问题,显然已在 2.1.0 中修复。还有this issue in spark-avro 会导致类似的问题,同样是因为S3 文件系统连接没有正确关闭。我应该更具体地说,这是我的主要候选根本原因,但还没有 100% 追踪到它。可能是 Hive 出于某种原因尝试一次打开 50 多个文件。在我们的案例中,该解决方法不会产生不利影响。
  • 补充以上内容,我体验过 EMR 5.9.0 和 5.10.0。这个问题似乎是特定于连接器的。我正在使用 Parquet 格式,但仍然面临这个问题。但是,EMR4.8.0 上的旧作业对此没有任何问题。
  • 对此有任何替代解决方案吗?我最终设置了spark.hadoop.fs.s3a.connection.maximum=1000000,但仍然出现错误。
  • 有没有办法设置这个而不必修改每个节点中的 emrfs-site.xml 文件?自动化是一个可怕的混乱。
猜你喜欢
  • 2017-08-31
  • 1970-01-01
  • 1970-01-01
  • 2021-09-28
  • 2018-10-19
  • 2013-04-26
  • 2016-06-03
  • 2014-08-01
  • 1970-01-01
相关资源
最近更新 更多