【问题标题】:How to determine "preferred location" for partitions of PySpark dataframe?如何确定 PySpark 数据框分区的“首选位置”?
【发布时间】:2018-06-15 09:21:25
【问题描述】:

我试图了解coalesce 如何确定如何将初始分区加入最终问题,显然“首选位置”与它有关。

根据this question,Scala Spark 有一个函数preferredLocations(split: Partition) 可以识别这个。但我对 Spark 的 Scala 方面一点也不熟悉。有没有办法在 PySpark 级别确定给定行或分区 ID 的首选位置?

【问题讨论】:

    标签: apache-spark pyspark partitioning


    【解决方案1】:

    是的,理论上是可以的。强制某种形式的偏好的示例数据(可能有一个更简单的示例):

    rdd1 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
    rdd2 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
    
    # Force caching so downstream plan has preferences
    rdd1.cache().count()
    
    rdd3 = rdd1.union(rdd2)
    

    现在你可以定义一个助手了:

    from pyspark import SparkContext
    
    def prefered_locations(rdd):
        def to_py_generator(xs):
            """Convert Scala List to Python generator"""
            j_iter = xs.iterator()
            while j_iter.hasNext():
                yield j_iter.next()
    
        # Get JVM
        jvm =  SparkContext._active_spark_context._jvm
        # Get Scala RDD
        srdd = jvm.org.apache.spark.api.java.JavaRDD.toRDD(rdd._jrdd)
        # Get partitions
        partitions = srdd.partitions()
        return {
            p.index(): list(to_py_generator(srdd.preferredLocations(p)))
            for p in partitions
        }
    

    应用:

    prefered_locations(rdd3)
    
    # {0: ['...'],
    #  1: ['...'],
    #  2: ['...'],
    #  3: ['...'],
    #  4: [],
    #  5: [],
    #  6: [],
    #  7: []}
    

    【讨论】:

    • 此代码在我的 RDD 上运行没有错误,并返回预期的分区数,但都有一个空列表。我可以认为这意味着我的分区实际上没有任何首选位置信息吗?或者这可能是一个错误?
    • 许多 RDD 根本没有首选位置(这就是为什么这个非常复杂的示例的原因)。即使在上面的示例中,也只有一些分区(我相信这是分区感知联合的结果)具有首选位置。如果您使用支持数据局部性约束的来源,它应该更明显。
    猜你喜欢
    • 2018-05-27
    • 1970-01-01
    • 2020-03-16
    • 1970-01-01
    • 2021-12-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    相关资源
    最近更新 更多