【问题标题】:Graphframes PageRank performance: PySpark vs sparklyrGraphframes PageRank 性能:PySpark 与 sparklyr
【发布时间】:2019-03-10 19:57:50
【问题描述】:

我使用来自 Python 和 R 的 Spark/GraphFrames。当我从 Python 对一个小图调用 PageRank 时,它比使用 R 慢很多。为什么使用 Python 会慢得多,考虑到两者Python 和 R 调用相同的库?

我将尝试在下面演示这个问题。

Spark/GraphFrames 包括图形示例,例如 friends,如 this link 中所述。这是一个非常小的有向图,有 6 个节点和 8 条边(请注意,该示例与其他版本的 GraphFrames 相比并不相同)。

当我用 R 运行以下代码时,几乎不需要时间来计算 PageRank:

library(graphframes)
library(sparklyr)
library(dplyr)

nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')

sc <- spark_connect(master = "local", version = "2.1.1")

nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)

graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)

results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)

print(results)

当我使用 PySpark 运行等效程序时,需要 10 到 30 分钟:

from pyspark.sql import SparkSession
from graphframes.examples import Graphs

if __name__ == '__main__':

    sc = SparkSession.builder.master("local").getOrCreate()
    g = Graphs(sc).friends()
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.select("id", "pagerank").show()
    results.edges.select("src", "dst", "weight").show()

我尝试了不同版本的 Spark 和 GraphFrames for Python 以与 R 的设置保持一致。

【问题讨论】:

  • 可能是因为代码不完全等效。特别是这些将导致不同数量的分区,并进一步升级到下游。参见例如Spark iteration time increasing exponentially when using join。如果您想让它在某种程度上等同于 Python 代码开头的 sparklyr 代码集 sc.conf.set("spark.sql.shuffle.partitions", 1) - 它不会扩展,但它会在如此小的图形上快速运行(如 higher parallelism is not always better
  • 谢谢你,我想你回答了我的问题。我确实应该补充一点,Python 代码能够在非常大的实例上成功运行(我没有尝试使用 R 的大型实例)。我的印象是它与并行性有关,但我不知道spark.sql.shuffle.partitions 参数。非常感谢!
  • 乐于助人。我现在没有时间追踪它并找到确切的罪魁祸首,但如果您想进一步调查它,必须在实际调用 PageRank 之前引入问题,可能在 indexedEdges 中。 PageRank 使用较旧的 API 实现,因此不受此设置的影响。

标签: r apache-spark pyspark sparklyr graphframes


【解决方案1】:

一般来说,当您看到不同后端中明显等效的代码片段之间存在如此显着的运行时差异时,您必须考虑两种可能性:

  • 没有真正的等价物。尽管在底层使用了相同的 Java 库,但不同语言用于与 JVM 交互的路径并不相同,并且当代码到达 JVM 时,它可能不会使用相同的调用链。
  • 方法相同,但配置和/或数据分布不一样。

在这种特殊情况下,第一个也是最明显的原因是您如何加载数据。

但是,据我所知,在这种特殊情况下,这些选项不应该影响运行时。此外,在这两种情况下,代码到达 JVM 后端之前的路径似乎并不足以解释差异。

这表明问题出在配置的某个地方。一般来说,至少有两个选项会显着影响数据分布,从而影响执行时间:

  • spark.default.parallelism - 与 RDD API 一起用于确定不同情况下的分区数量,包括默认的后随机分配。有关可能的影响,请参阅例如 Spark iteration time increasing exponentially when using join

    它看起来不会影响您的代码。

  • spark.sql.shuffle.partitions - 与Dataset API 一起使用,以确定洗牌后的分区数(groupByjoin 等)。

    虽然 PageRank 代码使用旧的 GraphX API,并且此参数在那里不直接适用,但在将数据传递给旧 API 之前,involves indexing edges and verticesDataset API。

    如果您检查源代码,您会发现 indexedEdgesindexVertices 都使用连接,因此依赖于 spark.sql.shuffle.partitions

    此外,上述方法设置的分区数量将被GraphXGraph对象继承,显着影响执行时间。

    如果您将spark.sql.shuffle.partitions 设置为最小值:

    spark: SparkSession
    spark.conf.set("spark.sql.shuffle.partitions", 1)
    

    这样小数据的执行时间应该可以忽略不计。

结论

您的环境可能使用不同的 spark.sql.shuffle.partitions 值。

一般路线

如果您看到这样的行为,并想大致缩小问题范围,您应该查看 Spark UI,看看哪里有分歧。在这种情况下,您可能会看到明显不同的任务数量。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-03
    • 1970-01-01
    • 2010-12-24
    相关资源
    最近更新 更多