【问题标题】:Convert a text to matrix in Scala and Spark在 Scala 和 Spark 中将文本转换为矩阵
【发布时间】:2015-07-04 07:34:57
【问题描述】:

这些是我的数据:

0,2 # Spark is more intelligent about how it operates on data. 
1,5 # it always looks to limit how much work it has to do. 
2,3 # Sometimes a data analyst just record for the Chicago store.
...

我想从这些数据中提取一个如下所示的矩阵:

0 2
1 5
2 3
...

我试过了:

 def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("prep").setMaster("local")
    val sc = new SparkContext(conf)
    val sample1 = sc.textFile("data.txt")
    val cnt = sample1.count()
    val tt = DenseMatrix.zeros[Double](cnt.toInt,1)
    var doc_val = sample1.flatMap({ (line) =>
      val tuple = line.split("#")
      val ss = tuple(0).split(",")
      val docid = ss(0).toInt
      val docscore = ss(2)
      tt(docid, 0) = docscore
    })
    println(tt)
}

但它无法编译, 有什么问题?

【问题讨论】:

  • “无法编译” - 需要更多数据。

标签: scala matrix apache-spark


【解决方案1】:

它不能编译,因为 DenseMatrix 不接受类型参数。与org.apache.spark.mllib.linalg 中的其他数据结构相同,它只能存储Double 值。

使用 PySpark 时同样适用。来自pyspark.mllib.linalg 的数据结构存储numpy.float64 值。

另一个问题是您尝试设置 tt 的值。 org.apache.spark.mllib.linalg(在 PyPspark 中 pyspark.mllib.linalg)中的所有数据结构都是不可变的。

最后你不应该使用flatMap 来获得一些副作用。 flatMap 具有清晰的语义和相当具体的签名:

def flatMap[U](f: T => TraversableOnce[U]) RDD[U]

如果可以修改DenseMatrix,您应该使用foreach 方法。由于不是一种解决问题的方法是使用简单的地图:

import org.apache.spark.mllib.linalg.Matrices

val cnt = sample1.count()
val vals = sample1.map {
    case rowIdAndScore(rowId, rowScore) => rowScore.toDouble
}

val tt = Matrices.dense(cnt, 1, vals.collect())

另外一个可能的问题是org.apache.spark.mllib.linalg.DenseMatrix 的使用。我假设您出于某种原因将数据加载为RDD。如果是这样,那么使用来自org.apache.spark.mllib.linalg.distributed 的分布式数据结构之一是有意义的。例如:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

val sample1 = sc.textFile("data.txt")
val rowIdAndScore = """^(\d)+\s*,\s*(\d+)\s*#.*$""".r

val rows = sample1.map {case rowIdAndScore(rowId, rowScore) => {
    IndexedRow(rowId.toLong, Vectors.dense(rowScore.toDouble))
}}

val tt = new IndexedRowMatrix(rows)

假设这个问题与the previous one 有关,您可以通过直接映射来避免至少一部分字符串处理,而无需创建中间 RDD。

【讨论】:

    猜你喜欢
    • 2015-07-22
    • 2018-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-01
    • 1970-01-01
    • 2013-02-27
    • 1970-01-01
    相关资源
    最近更新 更多