【发布时间】:2020-11-21 21:16:49
【问题描述】:
我正在运行一个 spark 作业,其工作是扫描一个大文件并将其拆分为较小的文件。该文件采用 Json Lines 格式,我正在尝试按某个列 (id) 对其进行分区,并将每个分区作为单独的文件保存到 S3。文件大小约为 12 GB,但有大约 500000 个不同的 id 值。查询大约需要 15 个小时。我可以做些什么来提高性能?对于这样的任务,Spark 是不是一个糟糕的选择?请注意,我确实有权确保将源作为每个 id 的固定行数。
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.sql.functions import udf, substring, instr, locate
from datetime import datetime, timedelta
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Get parameters that were passed to the job
args = getResolvedOptions(sys.argv, ['INPUT_FOLDER', 'OUTPUT_FOLDER', 'ID_TYPE', 'DATASET_DATE'])
id_type = args["ID_TYPE"]
output_folder = "{}/{}/{}".format(args["OUTPUT_FOLDER"], id_type, args["DATASET_DATE"])
input_folder = "{}/{}/{}".format(args["INPUT_FOLDER"], id_type, args["DATASET_DATE"])
INS_SCHEMA = StructType([
StructField("camera_capture_timestamp", StringType(), True),
StructField(id_type, StringType(), True),
StructField("image_uri", StringType(), True)
])
data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA)
data = data.withColumn("fnsku_1", F.col("fnsku"))
data.coalesce(1).write.partitionBy(["fnsku_1"]).mode('append').json(output_folder)
我也尝试过重新分区而不是合并。
我正在使用 AWS Glue
【问题讨论】:
-
为什么需要
coalesce(1)?任何原因? - 啊...模式追加,我明白了。 -
Spark 使用分布式文件系统,所以单文件进程真的很糟糕,它让工作变慢。
-
那么你是说如果我有多个文件作为源文件会更快吗?我认为即使源是单个文件,Spark 也会使用分布式处理。
-
我的意思是当你写作,而不是阅读。
-
那么当您尝试
repartition("fnsku_1")而不是coalesce(1)时发生了什么?两者之间的区别在于repartition创建了一个舞台边界,而coalesce可以向前“优化”(spark.apache.org/docs/2.4.0/api/python/… ...但是,如果您要进行剧烈的合并... )
标签: apache-spark pyspark apache-spark-sql aws-glue aws-glue-spark