【发布时间】:2016-05-30 04:15:57
【问题描述】:
每个人。
我有这样的问题:
我有非常大的 rdd:数十亿个元素,例如:
Array[((Int, Int), Double)] = Array(((0,0),729.0), ((0,1),169.0), ((0,2),1.0), ((0,3),5.0), ...... ((34,45),34.0), .....)
我需要做这样的操作:
通过键(i,j)获取每个元素的值并添加到它
min(rdd_value[(i-1, j)],rdd_value[(i, j-1)], rdd_value[(i-1, j-1)])
如果不使用 collect() 作为 collect() 之后,我怎么能做到这一点,因为我的 rdd 非常大,所以我得到了 Java memory errror。
非常感谢!
我尝试从 python 实现这个算法。当时间序列是 rdds 时。
def DTWDistance(s1, s2):
DTW={}
for i in range(len(s1)):
DTW[(i, -1)] = float('inf')
for i in range(len(s2)):
DTW[(-1, i)] = float('inf')
DTW[(-1, -1)] = 0
for i in range(len(s1)):
for j in range(len(s2)):
dist= (s1[i]-s2[j])**2
DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)])
return sqrt(DTW[len(s1)-1, len(s2)-1])
现在我应该使用 for 循环执行最后一个操作。 dist 已经计算好了。
示例:
输入(如矩阵):
4 5 1
7 2 3
9 0 1
Rdd 看起来像
rdd.take(10)
Array(((1,1), 4), ((1,2), 5), ((1,3), 1), ((2,1), 7), ((2,2), 2), ((2,3), 3), ((3,1), 9), ((3,2), 0), ((3,3), 1))
我想做这个操作
rdd_value[(i, j)] = rdd_value[(i, j)] + min(rdd_value[(i-1, j)],rdd_value[(i, j-1)], rdd_value[(i-1, j-1)])
例如:
((1, 1), 4) = 4 + min(infinity, infinity, 0) = 4 + 0 = 4
4 5 1
7 2 3
9 0 1
然后
((1, 2), 5) = 5 + min(infinity, 4, infinity) = 5 + 4 = 9
4 9 1
7 2 3
9 0 1
然后
....
然后
((2, 2), 2) = 2 + min(7, 9, 4) = 2 + 4 = 6
4 9 1
7 6 3
9 0 1
然后 .....
((3, 3), 1) = 1 + min(3, 0, 2) = 1 + 0 = 1
【问题讨论】:
-
“行”长度是否固定?
-
行长是什么意思?如果您只使用 3 个元素来获得最低限度 - 是的。
-
您的数据看起来像矩阵拉出到向量,一行接一行。我对吗?如果是这样,我的问题是矩阵行的大小是多少?
-
是的,正确!现在我有行长约为 10 000 的矩阵,但将来长度会更大并且每次都不同。此外,行数!= 矩阵中的列数。
-
看起来您正在尝试使用动态编程的某种变体。它不是可以使用 RDD API 高效且简洁地实现的东西。特别是您的 Python 程序按特定顺序执行更新,我们到达 (i - 1, j)、(i, j - 1) 和 (i - 1, j -1) 的 (i, j) 状态已经更新在给定的迭代中。您可以轻松地应用从矩阵 MI 到 MJ 的转换,其中 MJ(i, j)
标签: scala apache-spark