【发布时间】:2016-04-09 13:58:22
【问题描述】:
我正在尝试编写基于 Spark 的情感分析程序。为此,我使用 word2vec 和 KMeans 聚类。从 word2Vec 我在 100 维空间中收集了 20k 个单词/向量,现在我正在尝试对这个向量空间进行聚类。当我使用默认并行实现运行 KMeans 时,算法工作了 3 个小时!但是使用随机初始化策略大约需要 8 分钟。 我究竟做错了什么?我有 4 个内核处理器和 16 GB 内存的 mac book pro 机器。
K ~= 4000 maxInteration 为 20
var vectors: Iterable[org.apache.spark.mllib.linalg.Vector] =
model.getVectors.map(entry => new VectorWithLabel(entry._1, entry._2.map(_.toDouble)))
val data = sc.parallelize(vectors.toIndexedSeq).persist(StorageLevel.MEMORY_ONLY_2)
log.info("Clustering data size {}",data.count())
log.info("==================Train process started==================");
val clusterSize = modelSize/5
val kmeans = new KMeans()
kmeans.setInitializationMode(KMeans.K_MEANS_PARALLEL)
kmeans.setK(clusterSize)
kmeans.setRuns(1)
kmeans.setMaxIterations(50)
kmeans.setEpsilon(1e-4)
time = System.currentTimeMillis()
val clusterModel: KMeansModel = kmeans.run(data)
火花上下文初始化在这里:
val conf = new SparkConf()
.setAppName("SparkPreProcessor")
.setMaster("local[4]")
.set("spark.default.parallelism", "8")
.set("spark.executor.memory", "1g")
val sc = SparkContext.getOrCreate(conf)
还有一些关于运行这个程序的更新。我在 Intelij IDEA 中运行它。我没有真正的 Spark 集群。但是我以为你的个人机器可以是Spark集群
我从 Spark 代码 LocalKMeans.scala 中看到程序挂在这个循环中:
// Initialize centers by sampling using the k-means++ procedure.
centers(0) = pickWeighted(rand, points, weights).toDense
for (i <- 1 until k) {
// Pick the next center with a probability proportional to cost under current centers
val curCenters = centers.view.take(i)
val sum = points.view.zip(weights).map { case (p, w) =>
w * KMeans.pointCost(curCenters, p)
}.sum
val r = rand.nextDouble() * sum
var cumulativeScore = 0.0
var j = 0
while (j < points.length && cumulativeScore < r) {
cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
j += 1
}
if (j == 0) {
logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
s" Using duplicate point for center k = $i.")
centers(i) = points(0).toDense
} else {
centers(i) = points(j - 1).toDense
}
}
【问题讨论】:
-
你的问题是什么?
-
为什么它在并行模式下运行这么慢?
-
您需要检查数据的大小、Spark UI 中的 DAG 调度程序架构、监控您的集群(如下面的答案中所述)等等。这可能取决于几个事实。您的问题无法挽救。
-
Spark UI 说所有工作都完成了......它在转到 kMeansPlusPlus 算法之前进行了一些转换。我认为问题出在我发布的代码中,因为它占用了 80% 的程序
-
这就是问题所在,循环没有并行化。所以从复杂性角度考虑,如果 k 和 d 是 4000 次迭代的固定倍数,则 kmeans 在 O(nnz k + d k) 中,我认为它可以为您提供一些线索...
标签: scala apache-spark machine-learning k-means apache-spark-mllib