【问题标题】:Wide dataframe operation in Pyspark too slowPyspark 中的宽数据帧操作太慢
【发布时间】:2018-10-22 05:06:21
【问题描述】:

我是 Spark 新手,我正在尝试使用 pyspark (Spark 2.2) 对非常广泛的功能集(约 1300 万行,15000 列)执行过滤和聚合操作。功能集作为 parquet 文件存储在 S3 驱动器上。我正在运行一个测试脚本来加载数据框中的特征集,选择几千条记录,按特定区域代码分组,并对 15k 特征列中的每一个进行平均。问题是作业要么出错,要么耗时太长(对于 5% 的记录样本,大约需要 8 小时)。

是否有任何方法可以加快 Pyspark 中宽数据帧上的此类操作?我正在使用 Jupyter 笔记本,并希望在几分钟而不是几小时内完成这些查询。

这是我的代码

df_feature_store = spark.read.parquet(PATH_FEATURE_STORE).sample(False, 0.05, seed=0).cache()
    logger.info("Initial data set loaded and sampled")

    df_selected_rors = spark.read.csv(PATH_DATA_SOURCE+"ROR Sample.csv", header=True)
    agg_cols = [x for x in df_feature_store.columns if re.search("^G\d{2}",x)]
    agg_cols = agg_cols[:10]  # just testing with fewer columns
    expr = {x:"mean" for x in agg_cols}
    joineddf = df_feature_store.join(df_selected_rors, df_feature_store.ROLLOUTREGION_IDENTIFIER == df_selected_rors.ROR, "inner")
    aggdf = joineddf.groupby("ROLLOUT_REGION_IDENTIFIER").agg(expr)
    # replace groupby
    # loop for a 1000 column aggregations 
    # transpose columns into rows as arrays
    aggdf.write.mode("overwrite").csv(PATH_FEATURE_STORE + "aggregated", header=True)
    logger.info("Done")`

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    我会尝试将其拆分以查看问题所在

    • 某些版本的 Spark 在 DF 中存在很多很多列的问题;我不记得具体的了。
    • 在任何查询之前从 CSV 读取并保存在本地 Parquet 中,如果可以的话,过滤列
    • 运行查询 Parquet local- 到 Parquet local

    作为工作目标的 S3 (a) 提交速度很慢,并且 (b) 由于 S3 的最终一致性,存在丢失数据的风险。除非您使用 S3mper/S3Guard/EMR 一致的 EMRFS,否则您不应该将其用作工作的直接目的地。

    【讨论】:

    • 一些版本 - 在不同程度上更像所有版本。
    • 谢谢,我会尝试选项 3,因为它看起来最有希望。我正在读取的 csv 文件只有 100 条记录,用于过滤较大的数据框,所以我认为将该文件转换为 parquet 不会有帮助。不幸的是,这是一个 POC,目的是想办法利用 15k 左右的特性,所以我不想过滤掉这些列。
    猜你喜欢
    • 2020-01-07
    • 1970-01-01
    • 1970-01-01
    • 2019-05-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-17
    • 1970-01-01
    相关资源
    最近更新 更多