【问题标题】:pyspark matrix accumulatorpyspark 矩阵累加器
【发布时间】:2016-03-24 18:56:47
【问题描述】:

我想使用pyspark accumulator 使用从rdd 推断出的值来附加填充矩阵;我发现文档有点不清楚。添加一些背景,以防万一。
我的rddData 包含必须将一个计数添加到矩阵的索引列表。例如,此列表映射到索引:
[1,3,4] -> (11), (13), (14), (33), (34), (44)

现在,这是我的累加器:

from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
    def zero(self, mInitial):
        import numpy as np
        aaZeros = np.zeros(mInitial.shape)
        return aaZeros

    def addInPlace(self, mAdd, lIndex):
        mAdd[lIndex[0], lIndex[1]] += 1
        return mAdd

这是我的映射器函数:

def populate_sparse(lIndices):
    for i1 in lIndices:
        for i2 in lIndices:
            oAccumilatorMatrix.add([i1, i2])

然后运行数据:

oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())

rddData.map(populate_sparse).collect()

现在,当我查看我的数据时:

sum(sum(oAccumilatorMatrix.value))
#= 0.0

这是不应该的。我错过了什么?

编辑 起初用稀疏矩阵尝试了这个,得到了不支持稀疏矩阵的回溯。更改密集 numpy 矩阵的问题:

...

    raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.

【问题讨论】:

    标签: python sparse-matrix pyspark


    【解决方案1】:

    啊哈!我想我明白了。累加器在一天结束时仍然需要添加自己的部分。因此,将addInPlace 更改为:

    def addInPlace(self, mAdd, lIndex):
        if type(lIndex) == list:
            mAdd[lIndex[0], lIndex[1]] += 1
        else:
            mAdd += lIndex
        return mAdd
    

    所以现在它在给定一个列表时添加索引,并在populate_sparse 函数循环之后添加自身以创建我的最终矩阵。

    【讨论】:

    • 你是个天才。几个小时以来一直在努力解决这个问题!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-11-11
    • 1970-01-01
    • 2023-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-02
    相关资源
    最近更新 更多