【问题标题】:Spark Scala: How to work with each 3 elements of rdd?Spark Scala:如何处理 rdd 的每 3 个元素?
【发布时间】:2016-05-30 04:15:57
【问题描述】:

每个人。

我有这样的问题:

我有非常大的 rdd:数十亿个元素,例如:

Array[((Int, Int), Double)] = Array(((0,0),729.0), ((0,1),169.0), ((0,2),1.0), ((0,3),5.0), ...... ((34,45),34.0), .....)

我需要做这样的操作:

通过键(i,j)获取每个元素的值并添加到它

min(rdd_value[(i-1, j)],rdd_value[(i, j-1)], rdd_value[(i-1, j-1)])

如果不使用 collect() 作为 collect() 之后,我怎么能做到这一点,因为我的 rdd 非常大,所以我得到了 Java memory errror

非常感谢!

我尝试从 python 实现这个算法。当时间序列是 rdds 时。

def DTWDistance(s1, s2):
    DTW={}

    for i in range(len(s1)):
        DTW[(i, -1)] = float('inf')
    for i in range(len(s2)):
        DTW[(-1, i)] = float('inf')
    DTW[(-1, -1)] = 0

    for i in range(len(s1)):
        for j in range(len(s2)):
            dist= (s1[i]-s2[j])**2
            DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)])

    return sqrt(DTW[len(s1)-1, len(s2)-1])

现在我应该使用 for 循环执行最后一个操作。 dist 已经计算好了。

示例:

输入(如矩阵):

4 5 1
7 2 3
9 0 1

Rdd 看起来像

rdd.take(10)

Array(((1,1), 4), ((1,2), 5), ((1,3), 1), ((2,1), 7), ((2,2), 2), ((2,3), 3), ((3,1), 9), ((3,2), 0), ((3,3), 1))

我想做这个操作

rdd_value[(i, j)] = rdd_value[(i, j)] + min(rdd_value[(i-1, j)],rdd_value[(i, j-1)], rdd_value[(i-1, j-1)])

例如:

((1, 1), 4) = 4 + min(infinity, infinity, 0) = 4 + 0 = 4


4 5 1
7 2 3
9 0 1

然后

((1, 2), 5) = 5 + min(infinity, 4, infinity) = 5 + 4 = 9


4 9 1
7 2 3
9 0 1

然后

....

然后

((2, 2), 2) = 2 + min(7, 9, 4) = 2 + 4 = 6


4 9 1
7 6 3
9 0 1

然后 .....

((3, 3), 1) = 1 + min(3, 0, 2) = 1 + 0 = 1

【问题讨论】:

  • “行”长度是否固定?
  • 行长是什么意思?如果您只使用 3 个元素来获得最低限度 - 是的。
  • 您的数据看起来像矩阵拉出到向量,一行接一行。我对吗?如果是这样,我的问题是矩阵行的大小是多少?
  • 是的,正确!现在我有行长约为 10 000 的矩阵,但将来长度会更大并且每次都不同。此外,行数!= 矩阵中的列数。
  • 看起来您正在尝试使用动态编程的某种变体。它不是可以使用 RDD API 高效且简洁地实现的东西。特别是您的 Python 程序按特定顺序执行更新,我们到达 (i - 1, j)、(i, j - 1) 和 (i - 1, j -1) 的 (i, j) 状态已经更新在给定的迭代中。您可以轻松地应用从矩阵 MI 到 MJ 的转换,其中 MJ(i, j)

标签: scala apache-spark


【解决方案1】:

简短的回答是,您尝试解决的问题无法使用 Spark 高效简洁地表达。如果您选择普通 RDD 是分布式矩阵,这并不重要。

要了解为什么您必须考虑 Spark 编程模型。一个基本的 Spark 概念是一个依赖关系图,其中每个 RDD 依赖于一个或多个父 RDD。如果您的问题定义如下:

  • 给定一个初始矩阵M0
  • 对于 i
    • 找到矩阵Mi其中Mi(m,n) = Mi - 1(m,n) + min(Mi - 1(m-1,n ), Mi - 1(m-1,n-1), Mi - 1(m ,n-1))

那么使用 Spark API(伪代码)表达将是微不足道的:

rdd
    .flatMap(lambda ((i, j), v): 
        [((i + 1, j), v), ((i, j + 1), v), ((i + 1, j + 1), v)])
    .reduceByKey(min)
    .union(rdd)
    .reduceByKey(add)

不幸的是,您正试图表达同一数据结构中各个值之间的依赖关系。 Spark 抛开它是一个更难并行化的问题,更不用说分发了。

这种类型的动态编程很难并行化,因为在不同的点是完全或几乎完全顺序的。例如,当您尝试计算 Mi(0,0)Mi(m,n) 没有什么可以并行化的。它很难分发,因为它可以在块之间产生复杂的依赖关系。

在 Spark 中,通过计算单个块并表达这些块之间的依赖关系或使用迭代算法并在显式图 (GraphX) 上传播消息,有一些重要的方法来处理此问题,但这远非易事。

说到底,对于这种类型的计算而言,相比 Spark,这些工具可能是更好的选择。

【讨论】:

  • 那么,我们不能在 Spark 中实现动态时间包装吗?
  • @kinkajou Cannot 可能很强大,但即使您推导出一些旨在利用分布式处理的近似值,它也不太可能有用且具有成本效益。
猜你喜欢
  • 2017-07-28
  • 2018-08-19
  • 2019-03-25
  • 1970-01-01
  • 2020-05-02
  • 1970-01-01
  • 2016-06-24
  • 2015-10-31
  • 1970-01-01
相关资源
最近更新 更多