【发布时间】:2017-05-16 18:39:23
【问题描述】:
我正在使用 Spark 1.6.2、Scala 2.10.5 和 Java 1.7。
我们的用例需要我们对超过 2 亿行的数据集执行 dense_rank() 而不使用 partitionBy 子句,只使用 orderBy 子句。这目前在 MSSQL 中运行,大约需要 30 分钟才能完成。
我已经在Spark中实现了等价的逻辑,如下图:
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
我正在以 Yarn-cluster 模式提交作业,如下所示。我有一个 2 节点 Hadoop 2.6 集群,每个集群有 4 个 vCore 和 32 GB 内存。
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
在日志中,我可以看到来自 MSSQL 的大约 2 亿行的表在 15 分钟内被导入并缓存在 Spark 中。我看到在此阶段之前使用了大约 5 GB 的内存,其中一个执行程序上仍有大约 6.2 GB 的内存可用,而另一个执行程序上有 11 GB 的内存可用。
但是,dense_rank() 的步骤总是在几分钟后失败,并出现“超出 GC 开销限制”错误。正如您在上面的 spark-submit 命令中所看到的,我什至将驱动程序内存设置为 7g。但是,无济于事! 当然,我理解缺少 partitionBy 子句实际上是在 Spark 中造成了麻烦。但是,不幸的是,这是我们需要处理的用例。
你能在这里解释一下吗?我错过了什么吗?在 Spark 中是否有替代使用 dense_rank 窗口函数的方法?例如,使用本论坛其他地方的一位专家建议的“zipWithIndex”功能?它会产生与 dense_rank 相同的结果,因为我理解“zipWithIndex”方法复制 row_number() 函数而不是 dense_rank ?
感谢任何有用的建议! 非常感谢!
【问题讨论】:
标签: sql-server scala apache-spark hadoop-yarn hadoop2