【问题标题】:Combinations And Mass Set Intersections With Apache Spark (Pyspark)与 Apache Spark (Pyspark) 的组合和质量集交叉点
【发布时间】:2015-12-31 08:14:10
【问题描述】:

挑战: 数据是 [(u, p), (u, p), ...] 形状的 RDD,其中 u 和 p 都是字符串。我具有挑战性的期望输出是 [((p,p), u), ..] 其中每个元素中的键是共享一个 u 的所有 p 的组合。

已经尝试过:

  1. 使用笛卡尔和过滤器模拟组合。
  2. 尝试在 Python 中对每个元素使用 itertools.combinations,然后使用 flatMap,如下所示:[(tuple(sorted(e)), x[0]) for e in combinations(x[1].split(','),2)])

我已经尝试过的问题是执行程序节点似乎下降了,可能是由于内存消耗。

有什么建议吗?

编辑

以下是该问题的更多信息和背景:

我正在寻找每两个“p”的交集的重叠计数/基数,其中“p”是集合 ID,“u”是集合的成员。

输入是“p”和“u”之间关系记录的巨大列表,例如: [(u1,p1), (u2, p2), (u1, p2), (u2, p1), (u1, p3)]

所需的输出是:[((p1,p2), 2), ((p1, p3), 1), ((p2, p3), 1)] (注意组合而不是排列)

关于输入数据的更多细节:

  1. 有 50k - 100k 个不同的“p”。
  2. 一个“p”可以有几亿个唯一的“u”。
  3. 可能有重复的输入数据行。换句话说,(u,p) 的多个实例,但在将“u”列表视为交集步骤中给定“p”的集合时,这并没有改变问题。

至于硬件,我在 EMR(Yarn 上的 Spark 1.3.1)上使用了大约 41 个 m3.xlarges,并以这种方式启动了外壳: ./pyspark --master yarn-client --driver-memory 4G --executor-memory 3G --num-executors 160

【问题讨论】:

  • 请编辑您的问题以包括您的确切目标(您在 cmets 中描述的目标)、示例输入和预期输出。有关数据量、统计数据(每个 u 的不同 p 的平均数量)和配置的一些详细信息也可能很有用。
  • 当然,我已经完成了请求的更新。如果您需要更多信息,请告诉我
  • 谢谢。澄清一下 - 一个 p 可以有几亿个独特的“u”?另外,您需要一个精确的解决方案还是可以接受一个近似值?
  • 一个 p 可以有几亿个唯一的 "u"s - 也就是说集合 "p" 在该集合(集合/集合的每个成员都用“u”表示)。我正在寻找一个确切的解决方案。我已经有一个可以在一台机器上运行的精确解决方案,但我开始超出它,并希望分发它以使其具有可扩展性。

标签: python apache-spark pyspark


【解决方案1】:
yourRdd.groupByKey().map(lambda (a,b): (b,a))

group by key会将“u”的所有值聚合成

[(u1,(p1,p2,p3..,px)),(ux,(p1,...,px))]

地图将交换键和值,这应该会给出所需的输出。当没有足够的内存可用时,groupByKey 会溢出到磁盘,因此这也应该可以缓解您潜在的内存消耗问题。

//根据新信息进行编辑。

我无法提供准确的答案,但我可以帮助您了解一种适用于大多数代码的方式:

执行 groupByKey 后,数据会被分区,这样单个键的每个实例都在同一台机器上。现在我们知道单个“u”的所有“p”都在同一台机器上,我们可以开始操作这些值。

让我们以我的初始代码为起点,但稍作修改。

rdd_1 = yourRdd.groupByKey()
rdd_2 = rdd1.mapValues(mapFunction).flatMapValues()

mapFunction 是魔法发生的地方。该函数将 p 的所有值作为元组输入,然后输出一个元组列表,其中每个元组是一对 p 值。

#Updated based on Zero's recommendation of generators.
def mapFunction(tple):
    l = list(tple)
    for i in range(len(tple))
        for j in l[i+1:]
             yield (l[i], j)

当您到达 i=len(tple) 时,您需要添加一些要处理的内容,这样我们就不会出现越界异常。您仍然可能会遇到内存问题,但下一部分应该可以帮助您解决这个问题。

我认为我们可以弄乱您的执行程序配置。鉴于您的机器设置,我认为我们可以创建更多更大的执行器。此外,您的应用程序没有缓存我知道的数据,因此我们可以设置所有执行程序内存以用于修改 memoryFraction 设置的对象。我发现一些更大的 executor 比许多更小的 executor 更好(尽管使用 YARN 运行,始终难以获得大量资源,但那是完全不同的对话)。尝试使用 16 到 32GB 的执行器,2-5 个内核。

我会做一个类似这样的 spark sumbit:

spark-submit --master yarn-client --driver-memory 4g --executor-memory 16g --num-executors 30 --executor-cores 4  --conf spark.storage.memoryFraction=0

如果您在任何地方缓存/持久化数据,请跳过 memoryFraction 设置

【讨论】:

  • 我想你可以按照@Paul 的建议做一个 reduceByKey 并为每个键构建一个值数组。最小化数据混洗的唯一方法是以某种方式减少数据。一般来说,reduceByKey 比按键组更有效,因为 reduce 首先在本地发生,然后来自初始 reduce 的数据被打乱。我不确定你的 p 数据集有多大,所以你可能会遇到内存问题。
  • reduceByKey 的计数对我来说很清楚,但真正的问题是获得每对共享“u”作为键的“p”的组合。因此结果中的每个元素都应该是 ((p,p), u)。这个想法是我可以依靠该 RDD 来获取任意两个“p”之间的关系/交叉点的数量
  • 那么一个有效的例子是:(p,u) 输入数据:(1,2), (3,2), (2,2);和所需的输出: ((1,3),2), ((1,2),2), ((3,2),2) ?
  • 是的,没错。我的想法是,然后我可以计算每个结果,从而大量设置交叉点。我已经更新了这个问题,更好地说明了挑战。我非常感谢您的帮助,如果有任何其他信息有帮助,请告诉我。
  • 我认为我们可以通过一个随机播放来完成它。如果我们大量使用 mapValues/flatMapValues,那么在最初的 shuffle 之后,一切都可以在本地完成。我将通过编辑我的答案来回顾我的策略。
【解决方案2】:

您可以尝试的一件事是将计算转移到DataFrame

from pyspark.sql.functions import col

rdd = ...
df = rdd.toDF(["u", "p"])

xs = df.alias("xs")
ys = df.alias("ys")

result = (xs
    .join(ys, (col("xs.u") == col("ys.u")) & (col("xs.p") < col("ys.p")))
    .groupBy(col("xs.p"), col("ys.p"))
    .count())

不过,我并不是特别乐观。如果您想要一个准确的答案,那么必须对一种或另一种数据进行洗牌。

【讨论】:

  • 我同意数据必须至少洗牌一次,除非数据存储在某处并在 u 的值上设置了分区
猜你喜欢
  • 2021-11-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-07
  • 2019-03-20
  • 1970-01-01
  • 2019-07-09
  • 2015-01-13
相关资源
最近更新 更多