【问题标题】:PySpark - create multiple json files from dataframePySpark - 从数据框创建多个 json 文件
【发布时间】:2018-12-26 07:49:47
【问题描述】:

我有以下格式的数据,是从 Hive 中获取到一个数据框中的:

date, stock, price
1388534400, GOOG, 50
1388534400, FB, 60
1388534400, MSFT, 55
1388620800, GOOG, 52
1388620800, FB, 61
1388620800, MSFT, 55

其中 date 是当天午夜的纪元,我们的数据可以追溯到 10 年左右(超过 8 亿行)。 我的目标是最终得到一堆 JSON 文件,每只股票一个,看起来像:

GOOG.json:
{
'1388534400': 50,
'1388620800': 52
}

FB.json:
{
'1388534400': 60,
'1388620800': 61
}

一种天真的方法是获取唯一股票列表,然后通过仅过滤掉每只股票的那些行来获取数据框的子集,但这似乎过于幼稚且效率极低。 这可以在 Spark 中轻松完成吗?我目前使用 PyHive 在本机 Python 中工作,但由于数据量巨大,我宁愿在集群/Spark 上完成。

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    是的。这很简单。您可以使用 DataFrameWriter 并使用 partitionBy - 指定要分区的列(在您的情况下它将是库存)

    来自 Pyspark 文档:

    df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))

    对你来说,这将是

    df.write.partitionBy('stock').json(os.path.join(tempfile.mkdtemp(), 'data'))
    

    注意几点:

    • 这可能需要大量改组,具体取决于 Hive 表的布局方式。
    • 即使在分区之后,每个分区最终可能会拥有多个文件,具体取决于该分区中有多少记录。例如30% 的活动可能是针对 GOOG 的,在这种情况下,针对 GOOG 的分区将比其他分区大得多。如果遇到,您只需要为每个分区运行一个文件连接脚本。但是,每个分区中的文件仍将用于单一库存。

    【讨论】:

    • 谢谢@RobS。我想我应该在问题本身中提到这一点,但我首先想要以 dict 格式返回的数据,这样我就可以运行验证和处理丢失的数据 - 我在这里问了另一个问题 - stackoverflow.com/questions/53932942/…跨度>
    • 您最好在 spark 本身中进行所有验证和数据清理。 sparl 函数模块非常适合您正在谈论的那种东西。将其从 Hive 移动到本地 Dict 然后返回到集群可能会比在集群上并行运行所有验证要慢得多。
    猜你喜欢
    • 2019-12-06
    • 1970-01-01
    • 1970-01-01
    • 2020-01-21
    • 1970-01-01
    • 2022-01-01
    • 2017-06-27
    • 1970-01-01
    相关资源
    最近更新 更多