【问题标题】:Adding the results of pyspark kmeans algorithm to dataframe?将pyspark kmeans算法的结果添加到数据框?
【发布时间】:2018-01-23 04:20:56
【问题描述】:

我有一个包含地理信息的 spark 数据框。

my_df.show(2)

## +----+----+-----------+----------+
## | x0 | x1 | longitude | latitude |
## +----+----+-----------+----------+
## | ...| ...| 51.043    | 13.6847  | 
## | ...| ...| 42.6753   | 23.3218  |

我从数据框中取出经度和纬度,并使用 pyspark 中的 kmeans 库计算了一些中心点。

#Trains a k-means model
k = 120
model = KMeans.train(dataset, k)
print ("Final centers: " + str(model.clusterCenters))

输出

Final centers: [array([ 51.04307692,  13.68474126]), array([-33.434     , -70.58366667]), array([ 42.67533333,  23.32185981]), array([ 45.876, -61.492]), array([ 53.07465714,   8.4655    ]), array([   4.594,  114.262]), array([ 48.15665306,  11.54269728]), array([ 51.51729851,   7.49838806]), array([ 48.76316125,   9.15357859]), ....

有人知道如何将匹配中心添加到我的数据框吗?

## +----+----+-----------+----------+-----------+----------+
## | x0 | x1 | longitude | latitude | mean_long | mean_lat |
## +----+----+-----------+----------+-----------+----------+
## | ...| ...| 51.043    | 13.6847  | 50.000    | 15.000   |
## | ...| ...| 42.6753   | 23.3218  | 50.000    | 15.000   |

【问题讨论】:

    标签: apache-spark pyspark k-means


    【解决方案1】:

    如果您决定使用DataFrames,您应该使用新的pyspark.ml API,而不是旧的pyspark.mllib。它提供了多种聚类方法,包括K-Means,它的predict方法会将预测列附加到DataFrame

    有关详细信息(API 和所需的输入类型),请查看 ML 文档:

    【讨论】:

    • 好建议,但我目前正在使用 Spark 1.6.3 处理集群,该集群在 psyspark.ml 中没有 k-means 的 python 实现
    【解决方案2】:

    希望这会有所帮助!
    注意 - 我从 Spark 文档页面获取了示例数据)

    from pyspark.ml.linalg import Vectors
    from pyspark.ml.clustering import KMeans
    import pandas as pd
    
    #generate data
    data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
            (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
    df = sqlContext.createDataFrame(data, ["features"])
    df.show()
    
    #run kmeans clustering model
    kmeans = KMeans(k=2, seed=1)
    model = kmeans.fit(df)
    predictions=model.transform(df).withColumnRenamed("prediction","cluster_id")
    
    centers = model.clusterCenters()
    #preprocessing centers so that it can be joined with predictions dataframe
    centers_p_df = pd.DataFrame(centers)
    centers_p_df.insert(0, 'new_col', range(0, len(centers_p_df)))
    centers_df = sqlContext.createDataFrame(centers_p_df, schema=['cluster_id','centers_col1','centers_col2'])
    
    final_df = predictions.join(centers_df, on="cluster_id").drop("cluster_id")
    final_df.show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-12-22
      • 2020-03-22
      • 2011-11-19
      • 2022-01-25
      • 2018-07-24
      • 2016-01-25
      • 1970-01-01
      相关资源
      最近更新 更多