【问题标题】:Implementing K-medoids in Pyspark在 Pyspark 中实现 K-medoids
【发布时间】:2019-04-26 13:05:26
【问题描述】:

我在 Pyspark 中找不到使用 PAM(K-medoids)的库。

我在 Scala 中找到了这个: https://gist.github.com/erikerlandson/c3c35f0b1aae737fc884

Spark 中的这个问题已于 2016 年解决: https://issues.apache.org/jira/browse/SPARK-4510
https://github.com/apache/spark/pull/3382

但它似乎不起作用,并且这不包含在 mllib 文档中: http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#module-pyspark.mllib.clustering

有人知道 Pyspark 中有任何 PAM 库吗?

谢谢

【问题讨论】:

    标签: pyspark databricks


    【解决方案1】:

    前几天我真的去玩了。不能对性能说太多,因为我对火花很陌生。但这里是带有 K++ 种子的 KMedoids:

    # (c) 2020 Jonathan Kelsey
    # This code is licensed under MIT license
    from pyspark.sql import functions as F
    import pyspark
    import numpy as np
    import sys
    
    def seedKernel(dataB, dataIdValue, centeroids, k, metric):
        data = dataB.value
        point = dataIdValue[1]
        minD = sys.maxsize 
        for j in range(len(centeroids)): 
            distance = metric(point, data[centeroids[j]]) 
            minD = min(minD, distance) 
        return int(minD)
    
    def seedClusters(dataB, dataFrame, k, metric):
        data = dataB.value
        centeroids = list(np.random.choice(data.shape[0], 1, replace=False))
        for i in range(k - 1):
            print("clusterSeed", i)
            distances = []
            mK = dataFrame.rdd.map(lambda dataIdValue: seedKernel(dataB, dataIdValue, centeroids, k, metric))
            mK_collect = mK.collect()
            distances = np.array(mK_collect) 
            nextCenteroid = np.argmax(distances)
            centeroids.append(nextCenteroid) 
        print(centeroids)
        return centeroids 
    
    def nearestCenteroidKernel(dataIdValue, centeroidIdValues, metric):
        dataId, dataValue = dataIdValue
        dataNp = np.asarray(dataValue)
        distances = []
        for centeroidId, centeroidValue in centeroidIdValues:
            centeroidNp = np.asarray(centeroidValue)
            distance = metric(dataNp, centeroidNp)
            distances.append(distance)
        distances = np.asarray(distances)
        closestCenteroid = np.argmin(distances)
        return int(closestCenteroid)
    
    def optimiseClusterMembershipSpark(data, dataFrame, n, metric, intitalClusterIndices=None):
        dataShape = data.shape
        dataRDD = dataFrame.rdd
        lengthOfData = dataShape[0]
        if intitalClusterIndices is None:
            index = np.random.choice(lengthOfData, n, replace=False)
        else:
            index = intitalClusterIndices
        listIndex = [int(i) for i in list(index)]
        centeroidIdValues = [(i,data[index[i]]) for i in range(len(index))]
        dataRDD = dataRDD.filter(lambda dataIdValue: int(dataIdValue["id"]) not in listIndex)
        associatedClusterPoints = dataRDD.map(lambda dataIdValue: (dataIdValue[0],nearestCenteroidKernel(dataIdValue, centeroidIdValues, metric)))
        clusters = associatedClusterPoints.toDF(["id", "bestC"]).groupBy("bestC").agg(F.collect_list("id").alias("cluster"))
        return index, clusters
    
    def costKernel(dataB, testCenteroid, clusterData, metric):
        data = dataB.value
        cluster = np.asarray(clusterData)
        lenCluster = cluster.shape[0]
        lenFeature = data.shape[1]
        testCenteroidColumn = np.zeros(shape=(lenCluster, lenFeature), dtype=data.dtype)
        newClusterColumn = np.zeros(shape=(lenCluster, lenFeature), dtype=data.dtype)
        for i in range(0, lenCluster):
            newClusterColumn[i] = data[cluster[i]]
            testCenteroidColumn[i] = data[int(testCenteroid)] 
        pairwiseDistance =  metric(newClusterColumn, testCenteroidColumn)# (np.absolute(newClusterColumn-testCenteroidColumn).sum(axis=1))# metric(newClusterColumn, testCenteroidColumn)
        cost = np.sum(pairwiseDistance)
        return float(cost) #newClusterColumn.shape[1]
    
    def optimiseCentroidSelectionSpark(dataB, dataFrame, centeroids, clustersFrames, metric):
        data = dataB.value
        dataRDD = dataFrame.rdd
        dataShape = data.shape
        newCenteroidIds = []
        totalCost = 0
        for clusterIdx in range(len(centeroids)):
            print("clusterOpIdx", clusterIdx)
            oldCenteroid = centeroids[clusterIdx]
            clusterFrame = clustersFrames.filter(clustersFrames.bestC == clusterIdx).select(F.explode(clustersFrames.cluster))
            clusterData = clusterFrame.collect()
            if clusterData:
                clusterData = [clusterData[i].col for i in range(len(clusterData))]
            else:
                clusterData = []
            cluster = np.asarray(clusterData)
            costData = clusterFrame.rdd.map(lambda pointId: (pointId[0], costKernel(dataB, pointId[0], clusterData, metric)))
            #print(costData.toDF().show())
            cost = costData.map(lambda pointIdCost: pointIdCost[1]).sum()
            totalCost = totalCost + cost
            pointResult = costData.sortBy(lambda pointId_Cost: pointId_Cost[1]).take(1)
            if (pointResult):
                bestPoint = pointResult[0][0]
            else:
                bestPoint = oldCenteroid
            newCenteroidIds.append(bestPoint)
        return (newCenteroidIds, totalCost)
    
    #vector metrics
    def hammingVector(stack1, stack2):
        return (stack1 != stack2).sum(axis=1)
    def euclideanVector(stack1, stack2):
        return (np.absolute(stack2-stack1)).sum(axis=1)
    # point metrics
    def euclideanPoint(p1, p2): 
        return np.sum((p1 - p2)**2) 
    def hammingPoint(p1, p2): 
        return np.sum((p1 != p2))
    
    def fit(sc, data, nRegions = 2, metric = "euclidean", seeding = "heuristic"):
        if metric == "euclidean":
            pointMetric = euclideanPoint
            vectorMetric = euclideanVector
        elif metric == "hamming":
            pointMetric = hammingPoint
            vectorMetric = hammingVector
        else:
            print("unsuported metric")
            return
    
        dataN = np.asarray(data)
        dataB = sc.broadcast(dataN)
        seeds = None
        dataFrame  = sc.parallelize(data).zipWithIndex().map(lambda xy: (xy[1],xy[0])).toDF(["id", "vector"]).cache()
        if (seeding == "heuristic"):
            seeds = list(seedClusters(dataB, dataFrame, nRegions, pointMetric))
        lastCenteroids, lastClusters = optimiseClusterMembershipSpark(dataN, dataFrame, nRegions, pointMetric, seeds)
        lastCost = float('inf')
        iteration = 0
        escape = False
        while not escape:
            iteration = iteration + 1
            currentCenteroids, currentCost = optimiseCentroidSelectionSpark(dataB, dataFrame, lastCenteroids, lastClusters, vectorMetric)
            currentCenteroids, currentClusters = optimiseClusterMembershipSpark(dataN, dataFrame, nRegions, pointMetric, currentCenteroids)
            print((currentCost<lastCost, currentCost, lastCost, currentCost - lastCost))
            if (currentCost<lastCost):
                print(("iteration",iteration,"cost improving...", currentCost, lastCost))
                lastCost = currentCost
                lastCenteroids = currentCenteroids
                lastClusters = currentClusters
            else:
                print(("iteration",iteration,"cost got worse or did not improve", currentCost, lastCost))
                escape = True
        bc = lastClusters.collect()
        unpackedClusters = [bc[i].cluster for i in range(len(bc))]
        return (lastCenteroids, unpackedClusters)
    

    我使用 pyclustering 中的一些示例数据作为完整性检查:

    from pyclustering.cluster import cluster_visualizer
    from pyclustering.utils import read_sample
    from pyclustering.samples.definitions import FCPS_SAMPLES
    from pyclustering.samples.definitions import SIMPLE_SAMPLES
    sample = read_sample(FCPS_SAMPLES.SAMPLE_GOLF_BALL)
    bestCentroids, bestClusters = fit(sc, sample, 9)
    visualizer = cluster_visualizer()
    visualizer.append_clusters(bestClusters, sample)
    visualizer.show()
    

    【讨论】:

      【解决方案2】:

      您最好的选择是将这个 Python 实现适配到 Scala 中,这样您就可以提前使用 RDD 分区和分布式计算。 https://github.com/letiantian/kmedoids/blob/master/kmedoids.py

      【讨论】:

        猜你喜欢
        • 2021-11-24
        • 2014-02-28
        • 1970-01-01
        • 2023-04-02
        • 2017-05-12
        • 2017-03-19
        • 2021-04-29
        • 2014-05-10
        • 2020-06-21
        相关资源
        最近更新 更多