rose0705

四、分布式矩阵(Distributed Matrix)

分布式矩阵由长整型的行列索引值和双精度浮点型的元素值组成。
它可以分布式地存储在一个或多个RDD上,
MLlib提供了三种分布式矩阵的存储方案:
行矩阵RowMatrix、
索引行矩阵IndexedRowMatrix、
坐标矩阵CoordinateMatrix、
分块矩阵Block Matrix。
它们都属于org.apache.spark.mllib.linalg.distributed包。

 

行矩阵

package org.onepiece.bigdata.windows.spark_ML

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD

/*
* 分布式矩阵
* 1.行矩阵   (Row Matrix)
* 2.索引矩阵 (IndexedRow Matrix)
* 3.坐标矩阵 (Coordinate Matrix)
* 4.分块矩阵 (Block Matrix)
* */
object ml_3_DistributedMatrix {

  /*
  * 行矩阵
  * 是最基础的分布式矩阵类型。
  * 每行是一个本地向量,行索引无实际意义(无法直接使用)
  * 数据存储在一个由行组成的RDD中,其中每一行都使用一个本地向量来进行存储。
  * */
  def test1_Row_Matrix(): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate()
    import spark.implicits._

    //创建两个本地向量
    val dv1 = Vectors.dense(1.0, 2.0, 3.0)
    val dv2 = Vectors.dense(5.0, 7.0, 9.0)

    //使用两个本地向量创建一个RDD[Vector]
    val rows = spark.sparkContext.parallelize(Array(dv1, dv2)) //: RDD[Vector]
    println(rows.foreach(x => println(x)))

    //通过RDD[Vector]创建一个行矩阵
    val rm: RowMatrix = new RowMatrix(rows)

    //得到行数和列数
    println(rm.numRows()) //2
    println(rm.numCols()) //3

    println(rm.rows.collect().toList) //List([1.0,2.0,3.0], [5.0,7.0,9.0])

    rm.rows.foreach(x => println(x))
    /*
[1.0,2.0,3.0]
[5.0,7.0,9.0]
    * */
  }

  def test1_Row_Matrix_2(): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate()
    //import spark.implicits._

    //创建几个本地向量
    val dv1 = Vectors.dense(1.0, 3.0, 5.0)
    val dv2 = Vectors.dense(2.0, 4.0, 6.0)
    val dv3 = Vectors.dense(7.0, 8.0, 9.0)
    val dv4 = Vectors.dense(8.0, 9.0, 0.0)
    val dv5 = Vectors.dense(0.0, 0.0, 1.0)

    //使用两个本地向量创建一个RDD[Vector]
    val rows = spark.sparkContext.parallelize(Array(dv1, dv2, dv3, dv4, dv5))

    //通过RDD[Vector]创建一个行矩阵
    val rm: RowMatrix = new RowMatrix(rows)

    //得到行数和列数
    println(rm.numRows()) //5
    println(rm.numCols()) //3

    println(rm.rows.collect().toList) //List([1.0,3.0,5.0], [2.0,4.0,6.0], [7.0,8.0,9.0], [8.0,9.0,0.0], [0.0,0.0,1.0])
    rm.rows.foreach(x => println(x))
    /*
[2.0,4.0,6.0]
[7.0,8.0,9.0]
[1.0,3.0,5.0]
[8.0,9.0,0.0]
[0.0,0.0,1.0]
    * */

    val summary = rm.computeColumnSummaryStatistics()

    println("-----------")
    println("行数: " + summary.count)
    println("最大向量: " + summary.max)
    println("最小向量: " + summary.min)
    println("平均向量: " + summary.mean)
    println("方差向量: " + summary.variance)

    println("L1范数向量: " + summary.normL1)
    println("L2范数向量: " + summary.normL2)
    /*
行数: 5
最大向量: [8.0,9.0,9.0]
最小向量: [0.0,0.0,0.0]
平均向量: [3.6,4.800000000000001,4.2]
方差向量: [13.3,13.7,13.7]
L1范数向量: [18.0,24.0,21.0]
L2范数向量: [10.862780491200215,13.038404810405298,11.958260743101398]
    * */
  }
}

 

索引矩阵

package org.onepiece.bigdata.windows.spark_ML

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD

/*
* 分布式矩阵
* 1.行矩阵   (Row Matrix)
* 2.索引矩阵 (IndexedRow Matrix)
* 3.坐标矩阵 (Coordinate Matrix)
* 4.分块矩阵 (Block Matrix)
* */
object ml_3_DistributedMatrix {

  /*
  * 索引行矩阵
  * 索引行矩阵 与 行矩阵 相似
  * 但它(索引行矩阵)的每一行都带着一个有意义的行索引值
  * 这个索引值可以被用来识别不同行,或进行诸如:join之类的操作
  * 它数据存储在一个由IndexedRow组成的RDD里(即:每一行都是一个带长整型索引的本地向量)
  * */
  def test2_Indexed_Row_Matrix(): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate()
    //import spark.implicits._

    val dv1 = Vectors.dense(3.0, 5.0, 7.0)
    val dv2 = Vectors.dense(6.0, 5.0, 4.0)

    println(dv1)
    println(dv2)
    /*
[3.0,5.0,7.0]
[6.0,5.0,4.0]
    * */


    //通过本地向量(dv1,dv2)来创建对应的IndexedRow
    val idxr1 = IndexedRow(1, dv1)
    val idxr2 = IndexedRow(2, dv2)

    println(idxr1)
    println(idxr2)
    /*
IndexedRow(1,[3.0,5.0,7.0])
IndexedRow(2,[6.0,5.0,4.0])
    * */

    //通过IndexedRow创建RDD[IndexedRow]
    val indexRows = spark.sparkContext.parallelize(Array(idxr1, idxr2))
    //通过RDD[IndexedRow]创建一个索引行矩阵
    val irMatrix = new IndexedRowMatrix(indexRows)

    println(indexRows.foreach(println))
    /*
IndexedRow(1,[3.0,5.0,7.0])
IndexedRow(2,[6.0,5.0,4.0])
()
    * */

    println(irMatrix.numRows()) //3
    println(irMatrix.numCols()) //3
    println(irMatrix.rows.foreach(x => println(x)))
    /*
IndexedRow(1,[3.0,5.0,7.0])
IndexedRow(2,[6.0,5.0,4.0])
()
    * */
  }

  def test2_Indexed_Row_Matrix_2(): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate()
    //import spark.implicits._

    val dv1 = Vectors.dense(3.0, 5.0, 7.0)
    val dv2 = Vectors.dense(6.0, 5.0, 4.0)
    val dv3 = Vectors.dense(1.0, 0.0, 0.0)

    println(dv1)
    println(dv2)
    println(dv3)
    /*
[3.0,5.0,7.0]
[6.0,5.0,4.0]
[1.0,0.0,0.0]
    * */

    val idxr1 = IndexedRow(1, dv1)
    val idxr2 = IndexedRow(2, dv2)
    val idxr3 = IndexedRow(3, dv3)

    println(idxr1)
    println(idxr2)
    println(idxr3)
    /*
IndexedRow(1,[3.0,5.0,7.0])
IndexedRow(2,[6.0,5.0,4.0])
IndexedRow(3,[1.0,0.0,0.0])
    * */

    val indexRows = spark.sparkContext.parallelize(Array(idxr1, idxr2, idxr3))
    val irMatrix = new IndexedRowMatrix(indexRows)

    println(indexRows.foreach(println))
    /*
IndexedRow(1,[3.0,5.0,7.0])
IndexedRow(3,[1.0,0.0,0.0])
IndexedRow(2,[6.0,5.0,4.0])
()
    * */

    println(irMatrix.numRows()) //4
    println(irMatrix.numCols()) //3
    println(irMatrix.rows.foreach(x => println(x)))
    /*
IndexedRow(1,[3.0,5.0,7.0])
IndexedRow(2,[6.0,5.0,4.0])
IndexedRow(3,[1.0,0.0,0.0])
()
    * */
  }
}

 

坐标矩阵

package org.onepiece.bigdata.windows.spark_ML

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD

/*
* 分布式矩阵
* 1.行矩阵   (Row Matrix)
* 2.索引矩阵 (IndexedRow Matrix)
* 3.坐标矩阵 (Coordinate Matrix)
* 4.分块矩阵 (Block Matrix)
* */
object ml_3_DistributedMatrix {

  /*
  * 坐标矩阵
  * 它是一个基于矩阵项(MatrixEntry)构成的RDD的分布式矩阵。
  * 每一个矩阵项都是一个三元组:(i:Long, j:Long, value:Double)
  * i是行索引
  * j是列索引
  * value是该位置的值
  * 坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候用
  * */
  def test3_Coordinate_Matrix(): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate()
    //import spark.implicits._

    //创建两个矩阵项(ent1,ent2),每个矩阵项都是由索引和值构成的三元组
    //case class MatrixEntry(val i : Long, val j : Long, val value : Double)
    val ent1 = new MatrixEntry(0, 1, 0.5)
    val ent2 = new MatrixEntry(2, 2, 1.8)

    println(ent1)
    println(ent2)
    /*
MatrixEntry(0,1,0.5)
MatrixEntry(2,2,1.8)
    * */

    //创建RDD[MatrixEntry]
    val entries = spark.sparkContext.parallelize(Array(ent1, ent2))
    //通过RDD[MatrixEntry]创建一个坐标矩阵
    val coordMatrix = new CoordinateMatrix(entries)

    println(entries.foreach(println))
    /*
MatrixEntry(0,1,0.5)
MatrixEntry(2,2,1.8)
()
    * */

    //打印
    coordMatrix.entries.foreach(x => println(x))
    /*
MatrixEntry(0,1,0.5)
MatrixEntry(2,2,1.8)
    * */


    //坐标矩阵可以通过transpose()方法对矩阵进行转置操作,
    //并可以通过自带的toIndexedRowMatrix()方法转换成索引行矩阵(IndexedRowMatrix)
    //coordMatrix.transpose()
    //coordMatrix.toIndexedRowMatrix()
    //coordMatrix.toRowMatrix()

    val transMat = coordMatrix.transpose()
    transMat.entries.foreach(x => println(x))
    /*
MatrixEntry(1,0,0.5)
MatrixEntry(2,2,1.8)
    * */
    val indexedRowMatrix = transMat.toIndexedRowMatrix()
    indexedRowMatrix.rows.foreach(x => println(x))
    /*
IndexedRow(1,(3,[0],[0.5]))
IndexedRow(2,(3,[2],[1.8]))
    * */
  }
}

 

分块矩阵

package org.onepiece.bigdata.windows.spark_ML

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD

/*
* 分布式矩阵
* 1.行矩阵   (Row Matrix)
* 2.索引矩阵 (IndexedRow Matrix)
* 3.坐标矩阵 (Coordinate Matrix)
* 4.分块矩阵 (Block Matrix)
* */
object ml_3_DistributedMatrix {
  /*
  * 分块矩阵
  *
  * */
  def test4_Block_Matrix(): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate()
    //import spark.implicits._

    //创建8个矩阵项,每个矩阵项都由索引和值构成三元组
    val ent1 = new MatrixEntry(0, 0, 1)
    val ent2 = new MatrixEntry(1, 1, 1)
    val ent3 = new MatrixEntry(2, 0, -1)
    val ent4 = new MatrixEntry(2, 1, 2)
    val ent5 = new MatrixEntry(2, 2, 1)
    val ent6 = new MatrixEntry(3, 0, 1)
    val ent7 = new MatrixEntry(3, 1, 1)
    val ent8 = new MatrixEntry(3, 3, 1)

    /* 坐标和值
     [1, 0, 0, 0]
     [0, 1, 0, 0]
     [-1,2, 1, 0]
     [1, 1, 0, 1]
    * */


    //创建RDD[MatrixEntry]
    val entries = spark.sparkContext.parallelize(Array(ent1, ent2, ent3, ent4, ent5, ent6, ent7, ent8))
    //通过RDD[MatrixEntry]创建一个坐标矩阵
    val coordMatrix = new CoordinateMatrix(entries)

    //将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入
    val blockMatrix = coordMatrix.toBlockMatrix(2, 2)
    //val blockMatrix = coordMatrix.toBlockMatrix(4, 4)

    //通过toLocalMatrix()转换成本地矩阵
    println(blockMatrix.toLocalMatrix())
    /*
1.0   0.0  0.0  0.0
0.0   1.0  0.0  0.0
-1.0  2.0  1.0  0.0
1.0   1.0  0.0  1.0
    * */

    //查看其分块情况
    println(blockMatrix.numRowBlocks) //2
    println(blockMatrix.numColBlocks) //2

    blockMatrix.toIndexedRowMatrix().rows.foreach(x => println(x))
    /*
IndexedRow(0,[1.0,0.0,0.0,0.0])
IndexedRow(1,[0.0,1.0,0.0,0.0])
IndexedRow(3,[1.0,1.0,0.0,1.0])
IndexedRow(2,[-1.0,2.0,1.0,0.0])
    * */
    blockMatrix.toCoordinateMatrix().entries.foreach(x => println(x))
    /*
MatrixEntry(2,0,-1.0)
MatrixEntry(3,0,1.0)
MatrixEntry(2,1,2.0)
MatrixEntry(3,1,1.0)
MatrixEntry(0,0,1.0)
MatrixEntry(1,1,1.0)
MatrixEntry(2,2,1.0)
MatrixEntry(3,3,1.0)
    * */
  }
}

 

分类:

技术点:

相关文章: