【问题标题】:What is the best method to run several compuations on a large dataset with one time reading?一次读取在大型数据集上运行多个计算的最佳方法是什么?
【发布时间】:2021-04-14 15:33:28
【问题描述】:

我正在处理一个非常大的数据集,其中包括 Spark 中的 200 个压缩 JSON 文件(每个 ~ 8G 未压缩)。我创建了一个主数据框largeDF,以及几个额外的数据框来计算嵌套属性(它们是结构数组)上的聚合。我想执行一般统计计算(填充率和组数)。

对整个数据集的每次处理大约需要 20 分钟(加载文件、解压缩和执行聚合)。对于 50 个字段,这需要很长时间,因为每次我都在更改我的条件并一次又一次地使用其他过滤器运行查询。

我想依靠 PySpark 的惰性求值,避免多次加载数据,所以我可以创建一个复杂的聚合并在整个数据集上应用一次,然后将所有结果转换为 Pandas。或者更好的是,如果我可以预先定义作业并要求 Spark 并行处理它们(加载一次,计算所有),然后分别返回每个作业的结果。

这些不是我的主要 ETL,但我正在尝试提取数据集的语义来编写实际的 ETL 管道。

计算 1:计算统计数据并找到所有字段的填充率:

stats = DF_large.describe().toPandas()

计算 2:使用分类数据处理简单字段:

def group_count(df, col, limit, sort, skip_null):
    """This function groups data-set on based on provided column[s], and counts each group."""
    if skip_null:
        df = df.where(df[col].isNotNull())
    if limit:
        df = df.limit(limit) 
    df = df.groupBy(col).count()
    if sort:
        df = df.sort(col, ascending=False)
    return df.toPandas()

aggregations = {}
for col in group_count_list_of_columns:
    aggregations[col] = group_count(largeDF, col, limit=0, skip_null=True, sort=False)

计算 3:计算和计算嵌套字段的填充率:

def get_nested_fields(spDf, col : str, limit, othercols : tuple, stats = True):
    """This function unwinds a nested array field out of data-set based on provided column, and either returns the whole or statistics of it."""
    spDf = spDf.where(spDf[col].isNotNull())
    df = spDf.select(F.explode(col), *othercols)
    if limit:
        df = df.limit(limit)
    if stats:
        res = df.describe().toPandas()
    else:
        res = df.toPandas()
    return res

nested_fields_aggregate = {}
for col in nested_fields_lists:
    nested_fields_aggregate[col] = get_nested_field(largeDF, col, limit=10**4, othercols =['name', 'id', 'timestamp'], stats = True)

这需要多次读取整个数据集。形状不一样,所以我不能加入。理论上应该有一种方法可以减少时间,因为没有一个计算是相互依赖的。

【问题讨论】:

  • 什么是 ETL?我看不出这与 apache-spark、pyspark 或嵌套文档有什么关系。你想提高速度,对吧? 1.给我们一个最小的工作示例,包括。产生样本数据和预期输出。您可以模拟数据。我是否正确地说主要任务是读取文件(硬盘驱动器上的 IO 任务)、解压缩文件(CPU 受限任务)和进行计算(CPU 任务)。重要提示:是否有 20 个单独的数据分析,还是我们需要将 20 个数据文件合并并对其进行一次分析?
  • 您是否考虑过全部阅读,执行初始 ETL 集,然后对文件进行腌制?
  • @buhtzsaysgetvaccinated 我的主要 ETL 任务是加载仓库中的所有大文件,这不是我目前的意图。现在,我想对数据进行一些统计分析。我有疑问,我想找到一种有效的方法来完全执行它们。只需考虑同时运行多个作业并让 Spark 优化运行它们。
  • 您需要改进您的问题。目前还不清楚,也无法重现。你甚至不回答我的简单问题“ETL 是什么?”。

标签: python apache-spark pyspark large-files nested-documents


【解决方案1】:

每次调用 pandas 时,您都会再次读取 DF_large 数据帧。为避免这种情况,您可以使用 DF_large = DF_large.cache() 缓存此数据帧。

【讨论】:

  • 数据量大,无法缓存在内存中。
  • 您可以使用persist方法将此缓存在磁盘中。 df.persist(StorageLevel.DISK_ONLY_2)
猜你喜欢
  • 2019-06-27
  • 2022-01-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-09-24
相关资源
最近更新 更多