【问题标题】:Convert Spark Data Frame to org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]将 Spark 数据帧转换为 org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
【发布时间】:2017-07-10 01:19:00
【问题描述】:

我对 scala 和 spark 2.1 非常陌生。 我正在尝试计算数据框中许多元素之间的相关性,如下所示:

item_1 | item_2 | item_3 | item_4
     1 |      1 |      4 |      3
     2 |      0 |      2 |      0
     0 |      2 |      0 |      1

这是我尝试过的:

val df = sqlContext.createDataFrame(
  Seq((1, 1, 4, 3),
      (2, 0, 2, 0),
      (0, 2, 0, 1)
).toDF("item_1", "item_2", "item_3", "item_4")


val items = df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0))

并计算元素之间的相关性:

val correlMatrix: Matrix = Statistics.corr(items, "pearson")

带有以下错误消息:

<console>:89: error: type mismatch;
found   : org.apache.spark.rdd.RDD[Seq[Double]]
 required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
       val correlMatrix: Matrix = Statistics.corr(items, "pearson")

我不知道如何从数据框创建org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]

这可能是一项非常容易的任务,但我有点挣扎,我很高兴得到任何建议。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql rdd apache-spark-mllib


    【解决方案1】:

    例如,您可以使用VectorAssembler。组装向量并转换为RDD

    import org.apache.spark.ml.feature.VectorAssembler
    
    val rows = new VectorAssembler().setInputCols(df.columns).setOutputCol("vs")
      .transform(df)
      .select("vs")
      .rdd
    

    Row中提取Vectors

    • Spark 1.x:

      rows.map(_.getAs[org.apache.spark.mllib.linalg.Vector](0))
      
    • Spark 2.x:

      rows
        .map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
        .map(org.apache.spark.mllib.linalg.Vectors.fromML)
      

    关于您的代码:

    • 您有 Integer 列而不是 Double
    • 数据不是array,因此您不能使用_.getSeq[Double](0)

    【讨论】:

    • 非常感谢 - 这就是我正在寻找的解决方案
    【解决方案2】:

    如果您的目标是执行 pearson 相关,您实际上不必使用 RDD 和向量。这是一个直接在 DataFrame 列上执行 pearson 相关的示例(有问题的列是 Doubles 类型)。

    代码:

    import org.apache.spark.sql.{SQLContext, Row, DataFrame}
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
    import org.apache.spark.sql.functions._
    
    
    val rb = spark.read.option("delimiter","|").option("header","false").option("inferSchema","true").format("csv").load("rb.csv").toDF("name","beerId","brewerId","abv","style","appearance","aroma","palate","taste","overall","time","reviewer").cache()
    
    rb.agg(
        corr("overall","taste"),
        corr("overall","aroma"),
        corr("overall","palate"),
        corr("overall","appearance"),
        corr("overall","abv")
        ).show()
    

    在此示例中,我正在导入一个数据帧(带有自定义分隔符、无标题和推断的数据类型),然后简单地针对其中具有多个相关性的数据帧执行 agg 函数。



    输出:

    +--------------------+--------------------+---------------------+-------------------------+------------------+
    |corr(overall, taste)|corr(overall, aroma)|corr(overall, palate)|corr(overall, appearance)|corr(overall, abv)|
    +--------------------+--------------------+---------------------+-------------------------+------------------+
    |  0.8762432795943761|   0.789023067942876|   0.7008942639550395|       0.5663593891357243|0.3539158620897098|
    +--------------------+--------------------+---------------------+-------------------------+------------------+
    

    从结果中可以看出,(整体,口味)列高度相关,而(整体,abv)则没有那么多。

    这是Scala Docs DataFrame page which has the Aggregation Correlation Function的链接。

    【讨论】:

    • 谢谢你的这种方式。它可以工作,但我有 300 多列要计算
    • 有没有办法为许多列计算这个而不具体定义每个组合?
    猜你喜欢
    • 1970-01-01
    • 2016-04-13
    • 2016-09-27
    • 2019-01-16
    • 2020-07-24
    • 1970-01-01
    • 1970-01-01
    • 2017-12-30
    • 1970-01-01
    相关资源
    最近更新 更多