【发布时间】:2020-01-29 14:09:44
【问题描述】:
我收到了一个巨大的(140MM 记录)数据集,Dask 派上了用场,但我不确定我是否可以做得更好。想象一下记录大多是数字(两列是日期),所以从 CSV 转换为 parquet 的过程是轻而易举的 (dask.dataframe.read_csv('in.csv').to_parquet('out.pq')),但是
(i) 我想使用 Amazon Athena 上的数据,所以单个 parquet 文件会很好。如何做到这一点?就目前而言,Dask 将其保存为数百个文件。 (ii) 对于我正在尝试使用此数据集的探索性数据分析,在某些操作中,我需要的变量多于几个变量,这些变量不适合内存,因此我不断将二/三变量视图转储到SQL,这段代码对dask的使用效率高吗?
mmm = ['min','mean','max']
MY_COLUMNS = ['emisor','receptor','actividad', 'monto','grupo']
gdict = {'grupo': mmm, 'monto': mmm, 'actividad': ['mean','count']}
df = dd.read_parquet('out.pq', columns=MY_COLUMNS).groupby(['emisor','receptor']).agg(gdict)
df = df.compute()
df.columns = ['_'.join(c) for c in df.columns] # ('grupo','max') -> grupo_max
df.to_sql('er_stats',conn,index=False,if_exists='replace')
读取文件大约需要 80 秒,写入 SQL 大约需要 60 秒。
【问题讨论】:
-
您应该考虑对数据进行分区。使用 dask,您可以直接保存到 S3(添加
storage_options作为参数),并且可以使用partition_on对数据进行分区。 -
如果你只需要一个文件,你可以在保存到 parquet 之前使用
df = df.repartition(npartitions=1)。 -
所以,我学到了很多关于 dask 的使用方式。让我们重新表述一下我的需求:我有一个巨大的镶木地板文件,我想通过 S3 上传到 Amazon Athena。毕竟它不需要是一个文件。但是在清理我的数据的过程中,我意识到对数据进行分区绝对是一件好事,它加快了所有努力。一开始,数据集是 140MM 行,在一个丑陋的管道分隔文件中。必要的匿名化(这是敏感数据)在每个匿名行中生成 18 个字符长的字符串,因此第一个任务是对这些列进行编号重新标记。
-
最终,为了能够使用我们需要的查询类型来访问数据,需要一种可行的数据格式。想到了 Parquet 和 SQL/Postgres 格式,但格式转换成了一个问题。时间和记忆。始终取决于您需要对数据做什么......我今天学到的关键事实:parquet 文件可以作为 parquet 文件夹上传到 Athena。除了 dask 可以有效地 WRITE 到 s3 parquet 文件这一事实之外,我的整个问题变成了在具有足够内存(约 350 Gb RAM)的机器上运行数据流,亚马逊提供了大约 3 美元一小时。
-
想象一下这个流程:df = pd.read_csv('s3://mybucketname/bigfilename.csv') df.to_parquet('s3://mybucketname/bigfilename.parquet')