【问题标题】:How to multiply an IndexedRowMatrix by another IndexedRowMatrix in spark mllib如何在 spark mllib 中将 IndexedRowMatrix 与另一个 IndexedRowMatrix 相乘
【发布时间】:2015-08-01 05:08:06
【问题描述】:

我正在学习如何使用 spark mllib 计算两个矩阵的乘积。现在我的代码是这样的:

val rdd1=sc.textFile("rdd1").map(line=>line.split("\t").map(_.toDouble)).zipWithIndex().map{case(v,i)=>(i,v)}.map(x=>IndexedRow(x._1,Vectors.dense(x._2)))
val rdd2=sc.textFile("rdd2").map(line=>line.split("\t").map(_.toDouble)).zipWithIndex().map{case(v,i)=>(i,v)}.map(x=>IndexedRow(x._1,Vectors.dense(x._2)))
val matrix1=new IndexedRowMatrix(test1)
val matrix2=new IndexedRowMatrix(test2)

我想要 matrix1 与 matrix2 相乘,我试过了:

matrix1.multiply(matrix2)

但是matrix2必须是局部矩阵,不能是IndexedRowMatrix(API文档中说)

def multiply(B: Matrix): IndexedRowMatrix
Multiply this matrix by a local matrix on the right.
B:a local matrix whose number of rows must match the number of columns of this matrix
returns:an IndexedRowMatrix representing the product, which preserves partitioning

还有其他方法可以做到这一点吗?

【问题讨论】:

  • 为什么要创建 IndexedRowMatrix?出于什么目的?为什么不直接创建一个矩阵?

标签: apache-spark rdd apache-spark-mllib


【解决方案1】:

您可以在创建第二个IndexedRowMatrix之前计算局部矩阵并相乘。

val dArray = sc.textFile("rdd2").map(line=>line.split("\t").map(_.toDouble)) 为您提供所需的 Double 数组。

您可以使用Matrices.dense(rows, columns, dArray) 并与第一个矩阵相乘。

然后您可以继续为第二个矩阵创建IndexedRowMatrix

【讨论】:

    【解决方案2】:

    有一种方法可以使用 RDD 将 2 IndexedRowMatrix 相乘,但您需要自己编写。请注意,在我向您展示的实现中,您会得到一个 DenseMatrix 作为结果。

    背景

    假设您有 2 个矩阵 AmxnBnxp,并且您想要计算 Amxn * Bnxp = Cmxp(通常为 n >> m和 n >> p,否则你不会使用 IndexRowMatrices)

    A(i)mx1Amxni 列向量,它存储在一行中IndexedRowMatrix 的。类似地,B(i)1xpi 行向量 存储在对应的 IndexedRowMatrix 的一行中。

    同样不难证明 这样

    上面描述的这两个操作可以很容易地在 map+reduce 操作中实现,或者在 nxp 很大时在 .treeAggregate 中更有效地实现。

    版本 1:使用 Breeze

    使用 Breeze Matrices 执行乘法的简单实现,假设您的矩阵是密集的(如果不是,您可以做一些进一步的优化)。

    import breeze.linalg.{DenseMatrix => BDM}
    
    def distributedMul(a: IndexedRowMatrix, b: IndexedRowMatrix, m: Int, p: Int): Matrix = {
      val aRows = a.rows.map((iV) => (iV.index, iV.vector))
      val bRows = b.rows.map((iV) => (iV.index, iV.vector))
      val joint = aRows.join(bRows)
      def vectorMul(e: (Long, (Vector, Vector))): BDM[Double] = {
        val v1 = BDM.create(rows, 1, e._2._1.toArray)
        val v2 = BDM.create[Double](1, cols, e._2._2.toArray)
        v1 * v2  // This is C(i) 
      }
      Matrices.dense(m, p, joint.map(vectorMul).reduce(_ + _).toArray)
    }
    

    备注

    • IndexedRowMatrix 上的 numRows()numCols() 可能代价高昂。如果您知道尺寸,您可以立即将它们作为参数提供
    • 您可以使用 cartesian 代替 join,但是您需要添加 if 并在索引不同时返回零矩阵

    版本 2:使用 BLAS

    这个版本比另一个更高效(还有另一个版本只使用 Scala 数组,但效率极低)。您需要将它放在一个对象中,因为 BLAS 不可序列化。

    import com.github.fommil.netlib.BLAS
    
    object SuperMul extends Serializable{
    
       val blas = BLAS.getInstance()
    
       def distributedMul(a: IndexedRowMatrix, b: IndexedRowMatrix, m: Int, p: Int): Matrix = {
         val aRows = a.rows.map((iV) => (iV.index, iV.vector))
         val bRows = b.rows.map((iV) => (iV.index, iV.vector))
         val joint = aRows.join(bRows)
         val dim = m * p
         def summul(u: Array[Double], e: (Long, (Vector, Vector))): Array[Double] = {
           // u = a'(i)*b(i) + u
           blas.dgemm("N", "T", m, p, 1, 1.0, e._2._1.toArray, m, e._2._2.toArray, p, 1.0, u, m)
           u
         }
         def sum(u: Array[Double], v: Array[Double]): Array[Double] = {
           blas.daxpy(dim, 1.0, u, 1, v, 1)
           v
         }
    
    Matrices.dense(m, p, joint.treeAggregate(Array.fill[Double](dim)(0))(summul, sum))
       }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2017-02-10
      • 1970-01-01
      • 2016-01-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-18
      相关资源
      最近更新 更多