【问题标题】:How to persist data with Spark/ Scala?如何使用 Spark/Scala 持久化数据?
【发布时间】:2018-04-03 01:32:14
【问题描述】:

我正在使用 Spark 和 Scala 执行批处理。 每天,我都需要将销售文件导入 Spark 数据框并执行一些转换。 (具有相同架构的文件,只有日期和销售值可能会更改) 在一周结束时,我需要使用所有每日转换来执行每周聚合。因此,我需要坚持每天的转换,以免在周末让 Spark 做所有事情。 (我想避免在周末导入所有数据并执行所有转换)。 我还希望有一个支持增量更新(upserts)的解决方案。 我经历了一些选项,例如 Dataframe.persist(StorageLevel.DISK_ONLY)。我想知道是否有更好的选择,比如使用 Hive 表? 您对此有何建议? 与 Dataframe.persist 相比,使用 Hive 表有什么优势? 提前谢谢了。

【问题讨论】:

    标签: scala apache-spark hive spark-dataframe


    【解决方案1】:

    您可以将日常转换的结果保存为 parquet(或 orc)格式,按天分区。然后,您可以使用仅过滤上周数据的查询在此 parquet 文件上运行您的每周流程。谓词下推和分区在 Spark 中非常有效地工作,仅加载过滤器选择的数据以进行进一步处理。

      dataframe 
        .write
        .mode(SaveMode.Append)
        .partitionBy("day") // assuming you have a day column in your DF
        .parquet(parquetFilePath)
    

    SaveMode.Append 选项允许您以增量方式将数据添加到 parquet 文件(而不是使用 SaveMode.Overwrite 覆盖它)

    【讨论】:

    • 回到你身边,丹尼斯。说到 upsert,我得到了预期的结果,所以谢谢!
    • 但要小心,将重复的行插入 parquet 文件非常容易!确保您考虑过您的错误恢复策略。与数据库不同,它没有内置检查。特别是没有主键的概念
    • 好吧,这就是问题所在,当我不仅需要追加新记录而且用相同的键覆盖记录时,我需要处理 upserts。所以我想我需要一个数据库层
    • 如果更新不是太频繁,每日数据量不是太大,重新读取原始每日数据可能更容易(与引入数据库相比),在替换时加入更新旧值与所选行的更新值,然后将结果保存为新的镶木地板文件,以代替原始文件。这样做的好处是保留了旧版本,并且在需要时很容易进行回滚。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-06-24
    • 2021-10-29
    • 2021-03-07
    • 1970-01-01
    • 2011-12-27
    • 2018-05-04
    • 2014-12-23
    相关资源
    最近更新 更多