四、分布式矩阵(Distributed Matrix)
分布式矩阵由长整型的行列索引值和双精度浮点型的元素值组成。
它可以分布式地存储在一个或多个RDD上,
MLlib提供了三种分布式矩阵的存储方案:
行矩阵RowMatrix、
索引行矩阵IndexedRowMatrix、
坐标矩阵CoordinateMatrix、
分块矩阵Block Matrix。
它们都属于org.apache.spark.mllib.linalg.distributed包。
行矩阵
package org.onepiece.bigdata.windows.spark_ML import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.distributed._ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD /* * 分布式矩阵 * 1.行矩阵 (Row Matrix) * 2.索引矩阵 (IndexedRow Matrix) * 3.坐标矩阵 (Coordinate Matrix) * 4.分块矩阵 (Block Matrix) * */ object ml_3_DistributedMatrix { /* * 行矩阵 * 是最基础的分布式矩阵类型。 * 每行是一个本地向量,行索引无实际意义(无法直接使用) * 数据存储在一个由行组成的RDD中,其中每一行都使用一个本地向量来进行存储。 * */ def test1_Row_Matrix(): Unit = { val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate() import spark.implicits._ //创建两个本地向量 val dv1 = Vectors.dense(1.0, 2.0, 3.0) val dv2 = Vectors.dense(5.0, 7.0, 9.0) //使用两个本地向量创建一个RDD[Vector] val rows = spark.sparkContext.parallelize(Array(dv1, dv2)) //: RDD[Vector] println(rows.foreach(x => println(x))) //通过RDD[Vector]创建一个行矩阵 val rm: RowMatrix = new RowMatrix(rows) //得到行数和列数 println(rm.numRows()) //2 println(rm.numCols()) //3 println(rm.rows.collect().toList) //List([1.0,2.0,3.0], [5.0,7.0,9.0]) rm.rows.foreach(x => println(x)) /* [1.0,2.0,3.0] [5.0,7.0,9.0] * */ } def test1_Row_Matrix_2(): Unit = { val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate() //import spark.implicits._ //创建几个本地向量 val dv1 = Vectors.dense(1.0, 3.0, 5.0) val dv2 = Vectors.dense(2.0, 4.0, 6.0) val dv3 = Vectors.dense(7.0, 8.0, 9.0) val dv4 = Vectors.dense(8.0, 9.0, 0.0) val dv5 = Vectors.dense(0.0, 0.0, 1.0) //使用两个本地向量创建一个RDD[Vector] val rows = spark.sparkContext.parallelize(Array(dv1, dv2, dv3, dv4, dv5)) //通过RDD[Vector]创建一个行矩阵 val rm: RowMatrix = new RowMatrix(rows) //得到行数和列数 println(rm.numRows()) //5 println(rm.numCols()) //3 println(rm.rows.collect().toList) //List([1.0,3.0,5.0], [2.0,4.0,6.0], [7.0,8.0,9.0], [8.0,9.0,0.0], [0.0,0.0,1.0]) rm.rows.foreach(x => println(x)) /* [2.0,4.0,6.0] [7.0,8.0,9.0] [1.0,3.0,5.0] [8.0,9.0,0.0] [0.0,0.0,1.0] * */ val summary = rm.computeColumnSummaryStatistics() println("-----------") println("行数: " + summary.count) println("最大向量: " + summary.max) println("最小向量: " + summary.min) println("平均向量: " + summary.mean) println("方差向量: " + summary.variance) println("L1范数向量: " + summary.normL1) println("L2范数向量: " + summary.normL2) /* 行数: 5 最大向量: [8.0,9.0,9.0] 最小向量: [0.0,0.0,0.0] 平均向量: [3.6,4.800000000000001,4.2] 方差向量: [13.3,13.7,13.7] L1范数向量: [18.0,24.0,21.0] L2范数向量: [10.862780491200215,13.038404810405298,11.958260743101398] * */ } }
索引矩阵
package org.onepiece.bigdata.windows.spark_ML import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.distributed._ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD /* * 分布式矩阵 * 1.行矩阵 (Row Matrix) * 2.索引矩阵 (IndexedRow Matrix) * 3.坐标矩阵 (Coordinate Matrix) * 4.分块矩阵 (Block Matrix) * */ object ml_3_DistributedMatrix { /* * 索引行矩阵 * 索引行矩阵 与 行矩阵 相似 * 但它(索引行矩阵)的每一行都带着一个有意义的行索引值 * 这个索引值可以被用来识别不同行,或进行诸如:join之类的操作 * 它数据存储在一个由IndexedRow组成的RDD里(即:每一行都是一个带长整型索引的本地向量) * */ def test2_Indexed_Row_Matrix(): Unit = { val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate() //import spark.implicits._ val dv1 = Vectors.dense(3.0, 5.0, 7.0) val dv2 = Vectors.dense(6.0, 5.0, 4.0) println(dv1) println(dv2) /* [3.0,5.0,7.0] [6.0,5.0,4.0] * */ //通过本地向量(dv1,dv2)来创建对应的IndexedRow val idxr1 = IndexedRow(1, dv1) val idxr2 = IndexedRow(2, dv2) println(idxr1) println(idxr2) /* IndexedRow(1,[3.0,5.0,7.0]) IndexedRow(2,[6.0,5.0,4.0]) * */ //通过IndexedRow创建RDD[IndexedRow] val indexRows = spark.sparkContext.parallelize(Array(idxr1, idxr2)) //通过RDD[IndexedRow]创建一个索引行矩阵 val irMatrix = new IndexedRowMatrix(indexRows) println(indexRows.foreach(println)) /* IndexedRow(1,[3.0,5.0,7.0]) IndexedRow(2,[6.0,5.0,4.0]) () * */ println(irMatrix.numRows()) //3 println(irMatrix.numCols()) //3 println(irMatrix.rows.foreach(x => println(x))) /* IndexedRow(1,[3.0,5.0,7.0]) IndexedRow(2,[6.0,5.0,4.0]) () * */ } def test2_Indexed_Row_Matrix_2(): Unit = { val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate() //import spark.implicits._ val dv1 = Vectors.dense(3.0, 5.0, 7.0) val dv2 = Vectors.dense(6.0, 5.0, 4.0) val dv3 = Vectors.dense(1.0, 0.0, 0.0) println(dv1) println(dv2) println(dv3) /* [3.0,5.0,7.0] [6.0,5.0,4.0] [1.0,0.0,0.0] * */ val idxr1 = IndexedRow(1, dv1) val idxr2 = IndexedRow(2, dv2) val idxr3 = IndexedRow(3, dv3) println(idxr1) println(idxr2) println(idxr3) /* IndexedRow(1,[3.0,5.0,7.0]) IndexedRow(2,[6.0,5.0,4.0]) IndexedRow(3,[1.0,0.0,0.0]) * */ val indexRows = spark.sparkContext.parallelize(Array(idxr1, idxr2, idxr3)) val irMatrix = new IndexedRowMatrix(indexRows) println(indexRows.foreach(println)) /* IndexedRow(1,[3.0,5.0,7.0]) IndexedRow(3,[1.0,0.0,0.0]) IndexedRow(2,[6.0,5.0,4.0]) () * */ println(irMatrix.numRows()) //4 println(irMatrix.numCols()) //3 println(irMatrix.rows.foreach(x => println(x))) /* IndexedRow(1,[3.0,5.0,7.0]) IndexedRow(2,[6.0,5.0,4.0]) IndexedRow(3,[1.0,0.0,0.0]) () * */ } }
坐标矩阵
package org.onepiece.bigdata.windows.spark_ML import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.distributed._ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD /* * 分布式矩阵 * 1.行矩阵 (Row Matrix) * 2.索引矩阵 (IndexedRow Matrix) * 3.坐标矩阵 (Coordinate Matrix) * 4.分块矩阵 (Block Matrix) * */ object ml_3_DistributedMatrix { /* * 坐标矩阵 * 它是一个基于矩阵项(MatrixEntry)构成的RDD的分布式矩阵。 * 每一个矩阵项都是一个三元组:(i:Long, j:Long, value:Double) * i是行索引 * j是列索引 * value是该位置的值 * 坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候用 * */ def test3_Coordinate_Matrix(): Unit = { val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate() //import spark.implicits._ //创建两个矩阵项(ent1,ent2),每个矩阵项都是由索引和值构成的三元组 //case class MatrixEntry(val i : Long, val j : Long, val value : Double) val ent1 = new MatrixEntry(0, 1, 0.5) val ent2 = new MatrixEntry(2, 2, 1.8) println(ent1) println(ent2) /* MatrixEntry(0,1,0.5) MatrixEntry(2,2,1.8) * */ //创建RDD[MatrixEntry] val entries = spark.sparkContext.parallelize(Array(ent1, ent2)) //通过RDD[MatrixEntry]创建一个坐标矩阵 val coordMatrix = new CoordinateMatrix(entries) println(entries.foreach(println)) /* MatrixEntry(0,1,0.5) MatrixEntry(2,2,1.8) () * */ //打印 coordMatrix.entries.foreach(x => println(x)) /* MatrixEntry(0,1,0.5) MatrixEntry(2,2,1.8) * */ //坐标矩阵可以通过transpose()方法对矩阵进行转置操作, //并可以通过自带的toIndexedRowMatrix()方法转换成索引行矩阵(IndexedRowMatrix) //coordMatrix.transpose() //coordMatrix.toIndexedRowMatrix() //coordMatrix.toRowMatrix() val transMat = coordMatrix.transpose() transMat.entries.foreach(x => println(x)) /* MatrixEntry(1,0,0.5) MatrixEntry(2,2,1.8) * */ val indexedRowMatrix = transMat.toIndexedRowMatrix() indexedRowMatrix.rows.foreach(x => println(x)) /* IndexedRow(1,(3,[0],[0.5])) IndexedRow(2,(3,[2],[1.8])) * */ } }
分块矩阵
package org.onepiece.bigdata.windows.spark_ML import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.distributed._ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD /* * 分布式矩阵 * 1.行矩阵 (Row Matrix) * 2.索引矩阵 (IndexedRow Matrix) * 3.坐标矩阵 (Coordinate Matrix) * 4.分块矩阵 (Block Matrix) * */ object ml_3_DistributedMatrix { /* * 分块矩阵 * * */ def test4_Block_Matrix(): Unit = { val spark = SparkSession.builder().master("local[*]").appName("spark-ml").getOrCreate() //import spark.implicits._ //创建8个矩阵项,每个矩阵项都由索引和值构成三元组 val ent1 = new MatrixEntry(0, 0, 1) val ent2 = new MatrixEntry(1, 1, 1) val ent3 = new MatrixEntry(2, 0, -1) val ent4 = new MatrixEntry(2, 1, 2) val ent5 = new MatrixEntry(2, 2, 1) val ent6 = new MatrixEntry(3, 0, 1) val ent7 = new MatrixEntry(3, 1, 1) val ent8 = new MatrixEntry(3, 3, 1) /* 坐标和值 [1, 0, 0, 0] [0, 1, 0, 0] [-1,2, 1, 0] [1, 1, 0, 1] * */ //创建RDD[MatrixEntry] val entries = spark.sparkContext.parallelize(Array(ent1, ent2, ent3, ent4, ent5, ent6, ent7, ent8)) //通过RDD[MatrixEntry]创建一个坐标矩阵 val coordMatrix = new CoordinateMatrix(entries) //将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入 val blockMatrix = coordMatrix.toBlockMatrix(2, 2) //val blockMatrix = coordMatrix.toBlockMatrix(4, 4) //通过toLocalMatrix()转换成本地矩阵 println(blockMatrix.toLocalMatrix()) /* 1.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 -1.0 2.0 1.0 0.0 1.0 1.0 0.0 1.0 * */ //查看其分块情况 println(blockMatrix.numRowBlocks) //2 println(blockMatrix.numColBlocks) //2 blockMatrix.toIndexedRowMatrix().rows.foreach(x => println(x)) /* IndexedRow(0,[1.0,0.0,0.0,0.0]) IndexedRow(1,[0.0,1.0,0.0,0.0]) IndexedRow(3,[1.0,1.0,0.0,1.0]) IndexedRow(2,[-1.0,2.0,1.0,0.0]) * */ blockMatrix.toCoordinateMatrix().entries.foreach(x => println(x)) /* MatrixEntry(2,0,-1.0) MatrixEntry(3,0,1.0) MatrixEntry(2,1,2.0) MatrixEntry(3,1,1.0) MatrixEntry(0,0,1.0) MatrixEntry(1,1,1.0) MatrixEntry(2,2,1.0) MatrixEntry(3,3,1.0) * */ } }