【问题标题】:Spark dense_rank window function - without a partitionBy clauseSpark dense_rank 窗口函数 - 没有 partitionBy 子句
【发布时间】:2017-05-16 18:39:23
【问题描述】:

我正在使用 Spark 1.6.2、Scala 2.10.5 和 Java 1.7。

我们的用例需要我们对超过 2 亿行的数据集执行 dense_rank() 而不使用 partitionBy 子句,只使用 orderBy 子句。这目前在 MSSQL 中运行,大约需要 30 分钟才能完成。

我已经在Spark中实现了等价的逻辑,如下图:

val df1 = hqlContext.read.format("jdbc").options(
  Map("url" -> url, "driver" -> driver,
  "dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()

df1.cache()

val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))

我正在以 Yarn-cluster 模式提交作业,如下所示。我有一个 2 节点 Hadoop 2.6 集群,每个集群有 4 个 vCore 和 32 GB 内存。

spark-submit --class com.spgmi.csd.OshpStdCarryOver --master  yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar

在日志中,我可以看到来自 MSSQL 的大约 2 亿行的表在 15 分钟内被导入并缓存在 Spark 中。我看到在此阶段之前使用了大约 5 GB 的内存,其中一个执行程序上仍有大约 6.2 GB 的内存可用,而另一个执行程序上有 11 GB 的内存可用。

但是,dense_rank() 的步骤总是在几分钟后失败,并出现“超出 GC 开销限制”错误。正如您在上面的 spark-submit 命令中所看到的,我什至将驱动程序内存设置为 7g。但是,无济于事! 当然,我理解缺少 partitionBy 子句实际上是在 Spark 中造成了麻烦。但是,不幸的是,这是我们需要处理的用例。

你能在这里解释一下吗?我错过了什么吗?在 Spark 中是否有替代使用 dense_rank 窗口函数的方法?例如,使用本论坛其他地方的一位专家建议的“zipWithIndex”功能?它会产生与 dense_rank 相同的结果,因为我理解“zipWithIndex”方法复制 row_number() 函数而不是 dense_rank ?

感谢任何有用的建议! 非常感谢!

【问题讨论】:

    标签: sql-server scala apache-spark hadoop-yarn hadoop2


    【解决方案1】:

    这里有两个不同的问题:

    • 您通过 JDBC 连接加载数据,而无需提供分区列或分区谓词。这使用单个执行程序线程加载所有数据。

      这个问题通常很容易解决,要么使用现有的列之一,要么提供人工密钥。

    • 您使用没有partitionBy 的窗口函数。因此,所有数据都被重新洗牌到单个分区,在本地排序,并使用单个线程进行处理。

      一般来说,没有通用的解决方案可以仅使用Dataset API 来解决这个问题,但您可以使用一些技巧:

      • 创建反映所需记录顺序的人工分区。我在对Avoid performance impact of a single partition mode in Spark window functions的回答中描述了这种方法

        在您的情况下可以使用类似的方法,但它需要多步骤过程,相当于下面描述的方法。

      • 使用关联方法,您可以对排序的RDD 使用两个单独的扫描(应该可以在不从Dataset 转换的情况下做类似的事情)和其他操作:

        • 计算每个分区的部分结果(在您的情况下为给定分区排名)。
        • 收集所需的摘要(在您的情况下,分区边界和每个分区的累积排名值)。
        • 执行第二次扫描以纠正来自先前分区的部分聚合。

      可以在How to compute cumulative sum using Spark 中找到这种方法的一个示例,可以轻松调整以适合您的情况。

    【讨论】:

    • 非常感谢您的建议!我能够使用 JDBC 数据源中的“partitionColumn”选项减少从 MSSQL 导入数据的时间。但是,没有 partitionBy 的密集排名的建议需要更多时间让我消化,因为我对 Scala 还很陌生。但是,非常感谢您指导我!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-09
    • 2022-01-04
    • 2014-03-26
    • 2017-01-15
    • 2015-12-30
    • 2020-11-29
    相关资源
    最近更新 更多