【问题标题】:Pyspark: expanding dataset to include neighboursPyspark:扩展数据集以包括邻居
【发布时间】:2020-02-07 09:05:07
【问题描述】:

我是 Spark 的新手,正在尝试将现有的 python 应用程序迁移到 pyspark。

第一个函数(在本例中为 f(x))应针对数据集中的每个元素运行,但也应考虑数据集中的其他元素。

我能得到的最好的简化是以下伪代码:

    def idx_gen_a(x):
        return x-5

    def idx_gen_b(x):
        return x*3

    def f(i, x, dataset):
        elem1 = dataset.get(idx_gen_a(i))
        elem2 = dataset.get(idx_gen_b(i))
        ...
        return some_calculation(x, elem1, elem2, ...)

    def main(dataset):
        result = []
        for i, x in enumerate(dataset):
            result.append(f(i, x,dataset))

有没有类似 Spark 的方法? foreachPartitionaggregate 似乎不太合适..

【问题讨论】:

标签: python apache-spark pyspark


【解决方案1】:

我认为您所说的 dataset.get 大致映射到 spark 中的 join。我已经使用 pyspark 和 RDD 为您在代码上方编写了一个粗略的翻译。 f1f2 是你的两个函数。你可以使用数据框做一些非常相似的事情。

data = spark.range(10).rdd.map(lambda row: (row[0], row[0] * 10))

def unNest(nested):
  key, ((v1, v2), v3) = nested
  return key, (v1, v2, v3)

def f1(a): return a + 1
def f2(a): return a - 1

one = data.map(lambda pair: (f1(pair[0]), pair[1]))
two = data.map(lambda pair: (f2(pair[0]), pair[1]))
data.join(one).join(two).map(unNest).take(10)

# [(1, (10, 0, 20)),
#  (2, (20, 10, 30)),
#  (3, (30, 20, 40)),
#  (4, (40, 30, 50)),
#  (5, (50, 40, 60)),
#  (6, (60, 50, 70)),
#  (7, (70, 60, 80)),
#  (8, (80, 70, 90))]

有不同类型的连接,例如内连接和外连接,但我希望这足以为您指明正确的方向。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-06-13
    • 2019-10-01
    • 1970-01-01
    • 2016-08-14
    • 2010-12-15
    • 1970-01-01
    • 2011-02-01
    • 1970-01-01
    相关资源
    最近更新 更多