【问题标题】:Spark MLLib Kmeans from dataframe, and back again从数据帧中激发 MLLib Kmeans,然后再返回
【发布时间】:2015-10-05 11:33:47
【问题描述】:

我的目标是使用 Spark (1.3.1) MLLib 将 kmeans 聚类算法应用于非常大的数据集。我已经使用 Spark 中的 hiveContext 调用了 HDFS 中的数据,并且最终希望以这种方式将其放回原处 - 以这种格式

    |I.D     |cluster |
    ===================
    |546     |2       |
    |6534    |4       |
    |236     |5       |
    |875     |2       |

我运行了以下代码,其中“数据”是双精度数据帧,第一列的 ID。

    val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
    val clusters = KMeans.train(parsedData, 3, 20)

这运行成功,我现在被困在如上所述的数据框中将集群映射回它们各自的 ID。我可以将其转换为数据框:

    sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

但这就是我所知道的。 This post 是在正确的轨道上,this post 我想我也在问一个类似的问题。

我怀疑需要labeledPoint 库。任何cmets,答案将不胜感激,干杯。

编辑:刚刚在 Spark 用户列表中找到 this,看起来很有希望

【问题讨论】:

    标签: apache-spark k-means


    【解决方案1】:

    我了解您希望最后获得 DataFrame。我看到了两种可能的解决方案。我会说在它们之间进行选择是品味问题。

    从 RDD 创建列

    很容易得到RDD形式的id和cluster对:

    val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
    val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
    val clustersRDD = clusters.predict(idPointRDD.map(_._2))
    val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
    

    然后你从那个创建DataFrame

    val idCluster = idClusterRDD.toDF("id", "cluster")
    

    之所以有效,是因为 map 不会改变 RDD 中数据的顺序,这就是为什么您可以使用预测结果压缩 id。

    使用 UDF(用户定义函数)

    第二种方法涉及使用clusters.predict方法作为UDF:

    val bcClusters = sc.broadcast(clusters)
    def predict(x: Double, y: Double): Int = {
        bcClusters.value.predict(Vectors.dense(x, y))
    }
    sqlContext.udf.register("predict", predict _)
    

    现在我们可以使用它来为数据添加预测:

    val idCluster = data.selectExpr("id", "predict(x, y) as cluster")
    

    请记住,Spark API 不允许取消注册 UDF。这意味着闭包数据将保存在内存中。

    错误/不理想的解决方案

    • 使用 clusters.predict 而不广播

    它在分布式设置中不起作用。编辑:实际上它会起作用,我被 implementation of predict for RDD 弄糊涂了,它使用广播。

    • sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

    toArray 收集驱动程序中的所有数据。这意味着在分布式模式下,您会将集群 ID 复制到一个节点中。

    【讨论】:

      【解决方案2】:

      根据您的代码,我假设:

      • data 是一个包含三列的 DataFrame(label: Doublex1: Doublex2: Double
      • 您希望KMeans.predict 使用x1x2 来进行集群分配closestCluster: Int
      • 结果数据框的格式应为 (label: Double, closestCluster: Int)

      这是一个简单的示例应用程序,其中包含一些遵循假定架构的玩具数据:

      import org.apache.spark.mllib.linalg.Vectors
      import org.apache.spark.mllib.clustering.KMeans
      import org.apache.spark.mllib.regression.LabeledPoint
      import org.apache.spark.sql.functions.{col, udf}
      
      case class DataRow(label: Double, x1: Double, x2: Double)
      val data = sqlContext.createDataFrame(sc.parallelize(Seq(
          DataRow(3, 1, 2),
          DataRow(5, 3, 4),
          DataRow(7, 5, 6),
          DataRow(6, 0, 0)
      )))
      
      val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
      val clusters = KMeans.train(parsedData, 3, 20)
      val t = udf { (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2)) }
      val result = data.select(col("label"), t(col("x1"), col("x2")))
      

      重要的部分是最后两行。

      1. 创建一个 UDF(用户定义函数),可以直接应用于 Dataframe 列(在本例中为 x1x2 两列)。

      2. 选择label 列以及应用于x1x2 列的UDF。由于 UDF 将预测 closestCluster,因此在此之后 result 将是一个由 (label, closestCluster) 组成的 Dataframe

      【讨论】:

        【解决方案3】:

        让我知道此代码是否适合您:

        import org.apache.spark.mllib.linalg.Vectors
        import org.apache.spark.mllib.clustering._
        
        val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
        val vectors = rows.map(r => Vectors.dense(r._1, r._2))
        val kMeansModel = KMeans.train(vectors, 3, 20)
        val predictions = rows.map{r => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))}
        val df = predictions.toDF("id", "cluster")
        df.show
        

        【讨论】:

          【解决方案4】:

          我正在使用 pySpark 做类似的事情。我猜您可以直接将其翻译为 Scala,因为没有特定于 python 的内容。 myPointsWithID 是我的 RDD,每个点都有一个 ID,点表示为值数组。

          # Get an RDD of only the vectors representing the points to be clustered
          points = myPointsWithID.map(lambda (id, point): point)
          clusters = KMeans.train(points, 
                                  100, 
                                  maxIterations=100, 
                                  runs=50,
                                  initializationMode='random')
          
          # For each point in the original RDD, replace the point with the
          # ID of the cluster the point belongs to. 
          clustersBC = sc.broadcast(clusters)
          pointClusters = myPointsWithID.map(lambda (id, point): (id, clustersBC.value.predict(point)))
          

          【讨论】:

            猜你喜欢
            • 2011-01-15
            • 2016-07-08
            • 1970-01-01
            • 2023-01-25
            • 2020-01-08
            • 1970-01-01
            • 1970-01-01
            • 2018-05-11
            • 2011-04-24
            相关资源
            最近更新 更多