K-Means这个词第一次使用是在1967,但是它的思想可以追溯到1957年,它是一种非常简单地基于距离的聚类算法,认为每个Cluster由相似的点组成而这种相似性由距离来衡量,不同Cluster间的点应该尽量不相似,每个Cluster都会有一个“重心”;另外它也是一种排他的算法,即任意点必然属于某一Cluster且只属于该Cluster。当然它的缺点也比较明显,例如:对于孤立点敏感、产生最终聚类之间规模的差距不大。
一、基本思想
1、数学描述
给定d维实数向量(),后面就将这个实数向量称作点吧,短!K-Means算法会根据事先制定的参数k,将这些点划分出k个Cluster(k ≤ n),而划分的标准是最小化点与Cluster重心(均值)的距离平方和,假设这些Cluster为:
,则数学描述如下:
,其中
为第i个Cluster的“重心”(Cluster中所有点的平均值)。
聚类的效果类似下图:
具体可见:http://en.wikipedia.org/wiki/K-means_clustering
2、算法步骤
典型的算法如下,它是一种迭代的算法:
(1)、根据事先给定的k值建立初始划分,得到k个Cluster,比如,可以随机选择k个点作为k个Cluster的重心,又或者用Canopy Clustering得到的Cluster作为初始重心(当然这个时候k的值由Canopy Clustering得结果决定);
(2)、计算每个点到各个Cluster重心的距离,将它加入到最近的那个Cluster;
(3)、重新计算每个Cluster的重心;
(4)、重复过程2~3,直到各个Cluster重心在某个精度范围内不变化或者达到最大迭代次数。
别看算法简单,很多复杂算法的实际效果或许都不如它,而且它的局部性较好,容易并行化,对大规模数据集很有意义;算法时间复杂度是:O(nkt),其中:n 是聚类点个数,k 是Cluster个数,t 是迭代次数。
三、并行化策略
K-Means较好地局部性使它能很好的被并行化。第一阶段,生成Cluster的过程可以并行化,各个Slaves读取存在本地的数据集,用上述算法生成Cluster集合,最后用若干Cluster集合生成第一次迭代的全局Cluster集合,然后重复这个过程直到满足结束条件,第二阶段,用之前得到的Cluster进行聚类操作。
用map-reduce描述是:datanode在map阶段读出位于本地的数据集,输出每个点及其对应的Cluster;combiner操作对位于本地包含在相同Cluster中的点进行reduce操作并输出,reduce操作得到全局Cluster集合并写入HDFS。
四、Mahout K-Means Clustering说明
mahout实现了标准K-Means Clustering,思想与前面相同,一共使用了2个map操作、1个combine操作和1个reduce操作,每次迭代都用1个map、1个combine和一个reduce操作得到并保存全局Cluster集合,迭代结束后,用一个map进行聚类操作。可以在mahout-core下的src/main/java中的package:org.apache.mahout.clustering.kmeans中找到相关代码:
1、数据模型
可以参见上一篇相同标题。
2、目录结构
从目录结构上说,需要两个输入目录:一个用于保存数据点集——input,一个用来保存点的初始划分——clusters;在形成Cluster的阶段,每次迭代会生成一个目录,上一次迭代的输出目录会作为下一次迭代的输入目录,这种目录的命名为:Clusters-“迭代次数”;最终聚类点的结果会放在clusteredPoints文件夹中而Cluster信息放在Clusters-“最后一次迭代次数”文件夹中。
3、如何抽象Cluster
可以从Cluster.java及其父类,对于Cluster,mahout实现了一个抽象类AbstractCluster封装Cluster,具体说明可以参考上一篇文章,这里做个简单说明:
(1)、private int id; #每个K-Means算法产生的Cluster的id
(2)、private long numPoints; #Cluster中包含点的个数,这里的点都是Vector
(3)、private Vector center; #Cluster的重心,这里就是平均值,由s0和s1计算而来。
(4)、private Vector Radius; #Cluster的半径,这个半径是各个点的标准差,反映组内个体间的离散程度,由s0、s1和s2计算而来。
(5)、private double s0; #表示Cluster包含点的权重之和,
(6)、private Vector s1; #表示Cluster包含点的加权和,
(7)、private Vector s2; #表示Cluster包含点平方的加权和,
(8)、public void computeParameters(); #根据s0、s1、s2计算numPoints、center和Radius:
这几个操作很重要,最后三步很必要,在后面会做说明。
(9)、public void observe(VectorWritable x, double weight); #每当有一个新的点加入当前Cluster时都需要更新s0、s1、s2的值(10)、public ClusterObservation getObservations(); #这个操作在combine操作时会经常被用到,它会返回由s0、s1、s2初始化的ClusterObservation对象,表示当前Cluster中包含的所有被观察过的点
五、K-Means Clustering的Map-Reduce实现
K-Means Clustering的实现同样包含单机版和MR两个版本,单机版就不说了,MR版用了两个map操作、一个combine操作和一个reduce操作,是通过两个不同的job触发,用Dirver来组织的,map和reduce阶段执行顺序是:
图1
1、初始划分的形成
K-Means算法需要一个对数据点的初始划分,mahout里用了两种方法(以Iris dataset前3个feature为例):
(1)、使用RandomSeedGenerator类
在指定clusters目录生成k个初始划分并以Sequence File形式存储,其选择方法希望能尽可能不让孤立点作为Cluster重心,大概流程如下:
图2
(2)、使用Canopy Clustering
Canopy Clustering常常用来对初始数据做一个粗略的划分,它的结果可以为之后代价较高聚类提供帮助,个人觉得Canopy Clustering用在数据预处理上要比单纯拿来聚类更有用,比如对K-Means来说提供k值,另外还能很好的处理孤立点,当然,需要人工指定的参数由k变成了T1、T2,T1和T2所起的作用是缺一不可的,T1决定了每个Cluster包含点的数目,这直接影响了Cluster的“重心”和“半径”,而T2则决定了Cluster的数目,T2太大会导致只有一个Cluster,而太小则会出现过多的Cluster。通过实验,T1和T2取值会严重影响到算法的效果,如何确定T1和T2,似乎可以用AIC、BIC或者交叉验证去做,但是我没有找到具体做法,希望各位高人给指条明路:)。
以Iris为例,canopy方法选择T1=3、T2=1.5、cd=0.5、x=10,两种方法结果的前三个维度比较如下:
从图中不同Cluster的交界情况可以看出这两种方法的不同效果。
2、配置Cluster信息
K-Means算法的MR实现,第一次迭代需要将随机方法或者Canopy Clustering方法结果目录作为kmeans第一次迭代的输入目录,接下来的每次迭代都需要将上次迭代的输出目录作为本次迭代的输入目录,这就需要能在每次kmeans map和kmeans reduce操作前从该目录得到Cluster的信息,这个功能由KMeansUtil.configureWithClusterInfo实现,它依据指定的HDFS目录将Canopy Cluster或者上次迭代Cluster的信息存储到一个Collection中,这个方法在之后的每个map和reduce操作中都需要。
3、KMeansMapper
对照上一篇第三节关于MR的那幅图,map操作之前的InputFormat、Split、RR与上一篇完全相同。
2:
private KMeansClusterer clusterer;
4:
new ArrayList<Cluster>();
6:
7: @Override
void map(WritableComparable<?> key, VectorWritable point, Context context)
throws IOException, InterruptedException {
this.clusters, context);
11: }
12:
13: @Override
throws IOException, InterruptedException {
super.setup(context);
16: Configuration conf = context.getConfiguration();
try {
18: ClassLoader ccl = Thread.currentThread().getContextClassLoader();
19: DistanceMeasure measure = ccl.loadClass(conf.get(KMeansConfigKeys.DISTANCE_MEASURE_KEY))
class).newInstance();
21: measure.configure(conf);
22:
new KMeansClusterer(measure);
24:
25: String clusterPath = conf.get(KMeansConfigKeys.CLUSTER_PATH_KEY);
if (clusterPath != null && clusterPath.length() > 0) {
new Path(clusterPath), clusters);
if (clusters.isEmpty()) {
);
30: }
31: }
catch (ClassNotFoundException e) {
new IllegalStateException(e);
catch (IllegalAccessException e) {
new IllegalStateException(e);
catch (InstantiationException e) {
new IllegalStateException(e);
38: }
39: }
40: }