【问题标题】:Apache Spark page results or view results on large datasetsApache Spark 页面结果或查看大型数据集的结果
【发布时间】:2018-05-07 12:28:53
【问题描述】:

我正在使用带有 Spark 1.6.3 的 Hive

我有一个大型数据集(40000 行,20 列左右,每列可能包含 500 字节 - 3KB 数据)

查询是对 3 个数据集的连接

我希望能够对最终的连接数据集进行分页,并且我发现我可以使用row_number() OVER (ORDER BY 1) 为数据集中的每一行生成一个唯一的行号。

在这之后我可以做

SELECT * FROM dataset WHERE row between 1 AND 100

但是,有些资源建议不要使用 ORDER BY,因为它将所有数据放入 1 个分区(我可以在日志中看到这种情况,其中随机分配将数据移动到一个分区),当这个发生内存不足异常。

如何以更有效的方式对数据集进行分页?

我已启用持久化 - MEMORY_AND_DISK 以便如果分区太大,它会溢出到磁盘(对于某些转换,我可以看到至少有一些数据在我不使用时溢出到磁盘row_number())

【问题讨论】:

    标签: apache-spark hive


    【解决方案1】:

    一种策略可以是首先仅选择数据集的唯一键,然后仅在该数据集上应用 row_number 函数。由于您要从大型数据集中选择单个列,因此它适合单个分区的可能性更高。

    val dfKey = df.select("uniqueKey")
    dfKey.createOrUpdateTempTable("dfKey")
    val dfWithRowNum = spark.sql(select dfKey*, row_number() as row_number OVER (ORDER BY 1))
    // save dfWithRowNum
    

    完成对uniqueKey的row_number操作后;保存该数据框。现在在下一阶段将这个数据框与更大的数据框连接起来,并将 row_number 列附加到该数据框。

    dfOriginal.createOrUpdateTempTable("dfOriginal")
    dfWithRowNum.createOrUpdateTempTable("dfWithRowNum")
    val joined = spark.sql("select dfOriginal.* from dfOriginal join dfWithRowNum on dfOriginal.uniqueKey = dfWithRowNum.uniqueKey")
    // save joined
    

    现在可以查询了

    SELECT * FROM joineddataset WHERE row between 1 AND 100
    

    对于使用 MEMORY_DISK 的持久化,我发现偶尔会因内存不足而失败。我宁愿在性能受到惩罚的情况下使用 DISK_ONLY,尽管可以保证执行。

    【讨论】:

    • 感谢帮助,有唯一键的情况下是可以的,但是在没有唯一键的情况下呢?
    • 仅供参考,这可以通过向 dfOriginal 表添加唯一引用 ID 来应用 - 即使数据本身没有自然主 ID,也可以将其用作 primaryID。例如SELECT reflect('java.util.UUID', 'randomUUID') AS PrimaryID from dfOriginal
    【解决方案2】:

    好吧,您可以在最终的连接数据框上应用 this method

    您还应该将数据框保存为文件以保证排序,因为重新评估可能会创建 different order

    【讨论】:

      猜你喜欢
      • 2016-02-29
      • 2016-01-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-10-23
      • 2018-02-26
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多