【发布时间】:2016-03-24 18:56:47
【问题描述】:
我想使用pyspark accumulator 使用从rdd 推断出的值来附加填充矩阵;我发现文档有点不清楚。添加一些背景,以防万一。
我的rddData 包含必须将一个计数添加到矩阵的索引列表。例如,此列表映射到索引:[1,3,4] -> (11), (13), (14), (33), (34), (44)
现在,这是我的累加器:
from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
def zero(self, mInitial):
import numpy as np
aaZeros = np.zeros(mInitial.shape)
return aaZeros
def addInPlace(self, mAdd, lIndex):
mAdd[lIndex[0], lIndex[1]] += 1
return mAdd
这是我的映射器函数:
def populate_sparse(lIndices):
for i1 in lIndices:
for i2 in lIndices:
oAccumilatorMatrix.add([i1, i2])
然后运行数据:
oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())
rddData.map(populate_sparse).collect()
现在,当我查看我的数据时:
sum(sum(oAccumilatorMatrix.value))
#= 0.0
这是不应该的。我错过了什么?
编辑 起初用稀疏矩阵尝试了这个,得到了不支持稀疏矩阵的回溯。更改密集 numpy 矩阵的问题:
...
raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.
【问题讨论】:
标签: python sparse-matrix pyspark