【问题标题】:What's the simplest way to get a Spark DataFrame from arbitrary Array Data in Scala?从 Scala 中的任意数组数据中获取 Spark DataFrame 的最简单方法是什么?
【发布时间】:2019-05-07 20:24:25
【问题描述】:

这几天我一直在琢磨这个问题。感觉它应该直观地简单......真的希望有人能提供帮助!

我从一些半结构化数据中构建了一个单词出现的org.nd4j.linalg.api.ndarray.INDArray,如下所示:

import org.nd4j.linalg.factory.Nd4j
import org.nd4s.Implicits._

val docMap = collection.mutable.Map[Int,Map[Int,Int]] //of the form Map(phrase -> Map(phrasePosition -> word)
val words = ArrayBuffer("word_1","word_2","word_3",..."word_n")
val windows = ArrayBuffer("$phrase,$phrasePosition_1","$phrase,$phrasePosition_2",..."$phrase,$phrasePosition_n") 

var matrix = Nd4j.create(windows.length*words.length).reshape(windows.length,words.length)
for (row <- matrix.shape(0)){
    for(column <- matrix.shape(1){
        //+1 to (row,column) if word occurs at phrase, phrasePosition indicated by window_n.
    }
}
val finalmatrix = matrix.T.dot(matrix) // to get co-occurrence matrix

到目前为止一切都很好......

在这一点的下游,我需要将数据集成到 Spark 中的现有管道中,并使用 pca 等的实现,因此我需要创建一个 DataFrame,或者至少是一个 RDD。如果我提前知道单词和/或窗口的数量,我可以执行以下操作:

case class Row(window : String, word_1 : Double, word_2 : Double, ...etc)

val dfSeq = ArrayBuffer[Row]()
for (row <- matrix.shape(0)){
    dfSeq += Row(windows(row),matrix.get(NDArrayIndex.point(row), NDArrayIndex.all()))
}
sc.parallelize(dfSeq).toDF("window","word_1","word_2",...etc)

但窗口和单词的数量是在运行时确定的。我正在寻找一个 WindowsxWords org.apache.spark.sql.DataFrame 作为输出,输入是一个 WindowsxWords org.nd4j.linalg.api.ndarray.INDArray

提前感谢您提供的任何帮助。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql nlp nd4j


    【解决方案1】:

    好的,经过几天的工作,看起来简单的答案是:没有。事实上,在这种情况下尝试使用Nd4j 似乎根本是一个坏主意,原因如下:

    1. 一旦您将数据从原生 INDArray 格式中取出,就(真的)很难。
    2. 即使使用guava 之类的方法,.data() 方法brings everything on heap 也会很快变得昂贵。
    3. 您不得不编译一个程序集 jar 或使用 hdfs 等来处理库本身。

    我也考虑过使用Breeze,它实际上可能提供了一个可行的解决方案,但也存在一些相同的问题,而can't be used 则用于分布式数据结构。

    不幸的是,使用原生 Spark / Scala 数据类型,虽然一旦你知道如何更容易,但对于像我这样来自 Python + numpy + pandas 天堂的人来说 - 令人痛苦地令人费解和丑陋。

    不过,我确实成功实施了这个解决方案:

    import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,DenseMatrix,DenseVector}
    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    
    //first make a pseudo-matrix from Scala Array[Double]:
    var rowSeq = Seq.fill(windows.length)(Array.fill(words.length)(0d))
    
    //iterate through 'rows' and 'columns' to fill it:
    for (row 0 until windows.length){
        for (column 0 until words.length){
            // rowSeq(row)(column) += 1 if word occurs at phrase, phrasePosition indicated by window_n.
        }
    }
    
    //create Spark DenseMatrix
    val rows : Array[Double] = rowSeq.transpose.flatten.toArray
    val matrix = new DenseMatrix(windows.length,words.length,rows)
    

    我需要 Nd4J 的主要操作之一是 matrix.T.dot(matrix),但事实证明你不能将 2 个类型为 org.apache.spark.mllib.linalg.DenseMatrix 的矩阵相乘,其中一个 (A) 必须是 org.apache.spark.mllib.linalg.distributed.RowMatrix 和- 你猜对了 - 你不能在 RowMatrix 上调用 matrix.transpose(),只能在 DenseMatrix 上调用!由于它与问题并不真正相关,因此我将省略该部分,除了解释该步骤的结果是RowMatrix。对于解决方案的最后部分,herehere 也应归功于:

    val rowMatrix : [RowMatrix] = transposeAndDotDenseMatrix(matrix)
    
    // get DataFrame from RowMatrix via DenseMatrix
    val newdense = new DenseMatrix(rowMatrix.numRows().toInt,rowMatrix.numCols().toInt,rowMatrix.rows.collect.flatMap(x => x.toArray)) // the call to collect() here is undesirable...
    val matrixRows = newdense.rowIter.toSeq.map(_.toArray)
    val df = spark.sparkContext.parallelize(matrixRows).toDF("Rows")
    
    // then separate columns:
    val df2 = (0 until words.length).foldLeft(df)((df, num) => 
    df.withColumn(words(num), $"Rows".getItem(num)))
    .drop("Rows")
    

    很想听听对此的改进和建议,谢谢。

    【讨论】:

      猜你喜欢
      • 2010-09-25
      • 1970-01-01
      • 2010-11-02
      • 2022-08-20
      • 1970-01-01
      • 2015-02-13
      • 1970-01-01
      • 1970-01-01
      • 2022-10-30
      相关资源
      最近更新 更多