【问题标题】:Spark 2.2.0 S3 PerformanceSpark 2.2.0 S3 性能
【发布时间】:2018-02-18 05:11:53
【问题描述】:

我正在将我的代码从测试实验室集群转移到 EC2 集群。我使用 flintrock 设置它,我正在运行“香草”Spark 2.2.0 。 目前集群有4个c3.2xlarge节点(1个master,3个Worker)

我想处理大量文件,每个文件本身都比较大(大约 1 GB)。在我的代码中,我将文件数量分成块。在“实验室”中,我发现 8vCPU 13G 系统的性能峰值约为每个块 32 个文件,并将结果保存到镶木地板中。在有 3 个工作人员的 EC2 上,我将其翻译为 96 个块,从而产生 192 个任务。现在我面临着糟糕的 S3 性能。我收到以下错误:

17/09/09 03:45:33 INFO AmazonHttpClient: Unable to execute HTTP request: Read timed out
java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
    at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
    at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
    at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:259)
    at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:209)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
    at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
    at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:686)
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:488)
    at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507)
    at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143)
    at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131)
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189)
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134)
    at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我遵循了这个指南:https://hortonworks.github.io/hdp-aws/s3-performance/

我将 sparkconf 设置更改为:

conf = SparkConf().setAppName(appname)\
.setMaster(master)\
.set('spark.executor.memory','13g')\
.set('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version','2')\
.set('fs.s3a.fast.upload','true')\
.set('fs.s3a.fast.upload.buffer','disk')\
.set('fs.s3a.buffer.dir','/tmp/s3a')

并像这样使用 s3a 保存镶木地板:

df.write.parquet('s3a://mybucket/result_parquet')

我还将块大小减少到 48(每个实例 16 个)。错误变少了,但仍有一些错误出现。但是现在由于块大小减小,性能下降了。

现在我想知道:

a) 我是否正确配置了SparkConf()?只有在我减小块大小后才会显着减少的错误。

b) 如果 S3 性能受限于每个 EC2 实例的“请求”。因此,如果我不是 3 个中型实例,而是 6 个较小的实例,那么 S3 是否能够仅仅因为它们来自更多实例而更好地处理 192 个拼花写入任务?

【问题讨论】:

    标签: amazon-web-services apache-spark amazon-s3 amazon-ec2 pyspark


    【解决方案1】:
    1. 您到 S3 的带宽取决于您租用的 VM 类型:网络越好,带宽就越多。
    2. 您因写入特定存储桶或存储桶的分片而受到限制,S3 将在其中发回 503 响应以供客户端处理。
    3. 但是您在这里看到的是一个 Socket 异常;看起来 AWS xfer 管理器没有处理它。

    如果这发生在任务提交时,那是因为 s3a 使用复制 + 删除来模仿 rename(),并且输出提交者期望 rename 是一个 O(1) 原子操作,而不是一个缓慢的操作。更关键的是,重命名的模仿依赖于 S3 中的列表文件,并且由于 s3 最终是一致的,因此可能会丢失列表中的文件。 你会丢失数据

    除非您在 S3 上具有一致性层(s3mper、s3guard),否则您应该将任何查询序列提交到本地 HDFS,并在完成最终工作时复制到 S3。或者,如果您有时间,请帮助测试 HADOOP-13786,它添加了 0-rename-commit

    【讨论】:

    • 感谢您的意见。我可能会创建一个 HDFS 缓存。当我将内容从 HFDS 复制到 S3 时,一个棘手的问题是仍然存在丢失数据的风险吗?
    • 否:应用获得一致的 HDFS 数据列表,用于复制
    猜你喜欢
    • 2018-02-25
    • 2014-11-20
    • 1970-01-01
    • 2016-06-01
    • 1970-01-01
    • 2021-09-13
    • 2019-03-02
    • 2018-03-11
    • 2019-11-10
    相关资源
    最近更新 更多