【问题标题】:Spark Performance issue - Writing partitions to S3 as individual filesSpark 性能问题 - 将分区作为单个文件写入 S3
【发布时间】:2020-11-21 21:16:49
【问题描述】:

我正在运行一个 spark 作业,其工作是扫描一个大文件并将其拆分为较小的文件。该文件采用 Json Lines 格式,我正在尝试按某个列 (id) 对其进行分区,并将每个分区作为单独的文件保存到 S3。文件大小约为 12 GB,但有大约 500000 个不同的 id 值。查询大约需要 15 个小时。我可以做些什么来提高性能?对于这样的任务,Spark 是不是一个糟糕的选择?请注意,我确实有权确保将源作为每个 id 的固定行数。

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.sql.functions import udf, substring, instr, locate
from datetime import datetime, timedelta

    
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Get parameters that were passed to the job
args = getResolvedOptions(sys.argv, ['INPUT_FOLDER', 'OUTPUT_FOLDER', 'ID_TYPE', 'DATASET_DATE'])

id_type = args["ID_TYPE"]
output_folder = "{}/{}/{}".format(args["OUTPUT_FOLDER"], id_type, args["DATASET_DATE"])
input_folder = "{}/{}/{}".format(args["INPUT_FOLDER"], id_type, args["DATASET_DATE"])


INS_SCHEMA = StructType([
    StructField("camera_capture_timestamp", StringType(), True),
    StructField(id_type, StringType(), True),
    StructField("image_uri", StringType(), True)
])


data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA)

data = data.withColumn("fnsku_1", F.col("fnsku"))

data.coalesce(1).write.partitionBy(["fnsku_1"]).mode('append').json(output_folder)   

我也尝试过重新分区而不是合并。

我正在使用 AWS Glue

【问题讨论】:

  • 为什么需要coalesce(1)?任何原因? - 啊...模式追加,我明白了。
  • Spark 使用分布式文件系统,所以单文件进程真的很糟糕,它让工作变慢。
  • 那么你是说如果我有多个文件作为源文件会更快吗?我认为即使源是单个文件,Spark 也会使用分布式处理。
  • 我的意思是当你写作,而不是阅读。
  • 那么当您尝试repartition("fnsku_1") 而不是coalesce(1) 时发生了什么?两者之间的区别在于repartition 创建了一个舞台边界,而coalesce 可以向前“优化”(spark.apache.org/docs/2.4.0/api/python/… ...但是,如果您要进行剧烈的合并... )

标签: apache-spark pyspark apache-spark-sql aws-glue aws-glue-spark


【解决方案1】:

如果您不打算将 Spark 用于将文件拆分为自身的较小版本,那么我会说 Spark 是一个糟糕的选择。您最好按照this Stack Overflow post 中给出的方法在 AWS 中执行此操作

假设您有一个可用的 EC2 实例,您将运行如下内容:

aws s3 cp s3://input_folder/12GB.json - | split -l 1000 - output.
aws s3 cp output.* s3://output_folder/

如果您希望在 Spark 中对数据进行进一步处理,则需要将数据重新分区为 128MB 和 1 GB 之间的块。使用默认(快速)压缩,您通常会得到原始文件大小的 20%。所以,在你的情况下:在 (12/5) ~3 和 (12/5/8) ~20 个分区之间,所以:

data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA) 

dataPart = data.repartition(12)

这对于 Spark 来说实际上并不是一个特别大的数据集,处理起来应该不会那么麻烦。

另存为 parquet 可为您提供良好的恢复点,并且重新读取数据会非常快。总文件大小约为 2.5 GB。

【讨论】:

【解决方案2】:

请考虑以下选项之一。看看它是否有帮助会很棒:)

首先,如果你合并,如 cmets 中的@Lamanus 所说,这意味着你将减少分区的数量,因此也会减少 writer 任务,因此将所有数据洗牌到 1 个任务。这可能是需要改进的第一个因素。

要克服这个问题,即。每个分区写一个文件并保持并行化级别,您可以更改以下逻辑:

object TestSoAnswer extends App {

  private val testSparkSession = SparkSession.builder()
    .appName("Demo groupBy and partitionBy").master("local[*]")
    .getOrCreate()
  import testSparkSession.implicits._

  // Input dataset with 5 partitions
  val dataset = testSparkSession.sparkContext.parallelize(Seq(
    TestData("a", 0), TestData("a", 1), TestData("b", 0), TestData("b", 1),
    TestData("c", 1), TestData("c", 2)
  ), 5).toDF("letter", "number")

  dataset.as[TestData].groupByKey(row => row.letter)
    .flatMapGroups {
      case (_, values) => values
    }.write.partitionBy("letter").mode("append").json("/tmp/test-parallel-write")

}

case class TestData(letter: String, number: Int)

它是如何工作的?

首先,代码执行洗牌以将与特定键(与分区相同)相关的所有行收集到相同的 分区。这样,它将一次对属于该键的所有行执行写入。前段时间我写了a blog post about partitionBy method。粗略地说,在内部它将对给定分区上的记录进行排序,然后将它们写入 一个一个的放入文件中。

这样我们就得到了这样的计划,其中只有 1 次随机播放,因此存在消耗处理的操作:

== Physical Plan ==
*(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#22, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#23]
+- MapGroups TestSoAnswer$$$Lambda$1236/295519299@55c50f52, value#18.toString, newInstance(class TestData), [value#18], [letter#3, number#4], obj#21: TestData
   +- *(1) Sort [value#18 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#18, 200), true, [id=#15]
         +- AppendColumnsWithObject TestSoAnswer$$$Lambda$1234/1747367695@6df11e91, [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#3, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#4], [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#18]
            +- Scan[obj#2]

TestSoAnswer 执行两次的输出如下:

test-parallel-write % ls
_SUCCESS letter=a letter=b letter=c
test-parallel-write % ls letter=a
part-00170-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00170-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=b
part-00161-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00161-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=c
part-00122-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00122-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

您还可以控制number of records written per file with this configuration

编辑:没有看到@mazaneicha 的评论,但确实,你可以试试repartition("partitioning column")!比分组表达式还要清晰。

最好的,

巴托兹。

【讨论】:

  • 谢谢。我的一位队友将尝试这个并在 cmets 部分回复
  • 因此,使用这种方法,运行时间从 50 小时缩短到 20 小时!相当大的进步,但我需要将其缩短到 5 小时以下。我正在处理一个 50 GB 的文件,分区列有 170 万个不同的值。你现在还有什么建议吗?
  • 文件的格式是什么?压缩与不压缩?如果您想同时尝试一下,也许您可​​以将大作业拆分为较小的作业,并且通过拆分我的意思是每个作业过滤不同的分区列范围。您并行运行这些作业,除非您对阅读有争议,否则应该加快时间。我没有看到日志,但假设对于 170 万个分区,用于写入的 I/O 部分需要时间,并且对于单个进程,看不到加速它的方法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-10-29
  • 1970-01-01
  • 2017-01-26
  • 2018-03-30
  • 2018-02-09
  • 1970-01-01
  • 2020-02-20
相关资源
最近更新 更多