【问题标题】:Why this LR code run on spark too slowly?为什么这个 LR 代码在 spark 上运行太慢?
【发布时间】:2014-01-13 03:53:47
【问题描述】:

因为 MLlib 不支持稀疏输入。所以我在 Spark 集群上运行支持稀疏输入格式的流动代码。 设置是:

  1. 5个节点,每个节点8核(每个节点上所有cpu都是100%, 98% 用于用户模型,运行代码时)。
  2. 输入:10,000,000+ 个实例,以及 HDFS 上 600,000+ 个维度

代码是:

import java.util.Random
import scala.collection.mutable.HashMap
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Vector
import java.lang.Math
import org.apache.spark.broadcast.Broadcast

object SparseLR {
  val lableNum = 1
  val dimNum = 632918 
  val iteration = 10
  val alpha = 0.1
  val lambda = 0.1
  val rand = new Random(42)
  var w = Vector(dimNum, _=> rand.nextDouble)

  class SparserVector {
    var elements = new HashMap[Int, Double]

    def insert(index: Int, value: Double){
      elements += index -> value;
    }


    def *(scale: Double): Vector = {
      var x = new Array[Double](dimNum)
      elements.keySet.foreach(k => x(k) = scale * elements.get(k).get)
      Vector(x)
    }
  }
  case class DataPoint(x: SparserVector, y: Int)

  def parsePoint(line: String): DataPoint = {
    var features = new SparserVector
    val fields = line.split("\t")
    //println("fields:" + fields(0))
    val y = fields(0).toInt
    fields.filter(_.contains(":")).foreach( f => {
      val feature = f.split(":")
      features.insert(feature(0).toInt, feature(1).toDouble)
    })
    return DataPoint(features, y)
  }

  def gradient(p: DataPoint, w: Broadcast[Vector]) : Vector = {
    def h(w: Broadcast[Vector], x: SparserVector): Double = {
      val wb = w.value
      val features = x.elements
      val s = features.keySet.map(k => features.get(k).get * wb(k)).reduce(_ + _)
      1 / (1 + Math.exp(-p.y * s))
    }
    p.x * (-(1 - p.y *h(w, p.x)))
  }

  def train(sc: SparkContext, dataPoints: RDD[DataPoint]) {
    //val sampleNum = dataPoints.count
    val sampleNum = 11680250

    for(i <- 0 until iteration) {
      val wb = sc.broadcast(w)
      val g = (dataPoints.map(p => gradient(p, wb)).reduce(_ + _) + lambda * wb.value) /sampleNum
      w -= alpha * g

      println("iteration " + i + ": g = " + g)
    }
  }

  def main(args : Array[String]): Unit = {
    System.setProperty("spark.executor.memory", "15g")
    System.setProperty("spark.default.parallelism", "32");
    val sc = new SparkContext("spark://xxx:12036", "LR", "/xxx/spark", List("xxx_2.9.3-1.0.jar"))
    val lines = sc.textFile("hdfs:xxx/xxx.txt", 32)

    val trainset = lines.map(parsePoint _).cache()

    train(sc, trainset)
  }
}

谁能帮帮我?谢谢!

【问题讨论】:

  • 很难说,你能把探查器挂到任务中吗?

标签: scala hadoop machine-learning apache-spark


【解决方案1】:

这真的很难给你一个答案。也许这更适合code review stackoverflow 子站点?

一些显而易见的事情:

您的梯度函数似乎效率低下。当你想为 map 的每个键/值对做一些事情时,这样做会更有效率

for((k,v)<-map) { 
  ...
}

比做

for(k<-map.keySet) { val value = map.get(k).get; 
  ... 
}

此外,对于这样的性能关键代码,最好将 reduce 更改为累积可变值。所以重写的梯度函数将是

def gradient(p: DataPoint, w: Broadcast[Vector]) : Vector = {
  def h(w: Broadcast[Vector], x: SparserVector): Double = {
    val wb = w.value
    val features = x.elements
    var s = 0.0
    for((k,v)<-features)
      s += v * wb(k)
    1 / (1 + Math.exp(-p.y * s))
  }
  p.x * (-(1 - p.y *h(w, p.x)))
}

现在,如果您想进一步提高性能,则必须更改 SparseVector 以使用索引数组和值数组,而不是 Map[Int, Double]。这样做的原因是,在 Map 中,键和值将被装箱为具有相当大开销的对象,而 Array[Int] 或 Array[Double] 只是一个紧凑的内存块

(为方便起见,建议定义一个使用 SortedMap[Int, Double] 并在完成构建时转换为两个数组的构建器)

class SparseVector(val indices: Array[Int], val values: Array[Double]) {
  require(indices.length == values.length)

  def *(scale: Double): Vector = {
    var x = new Array[Double](dimNum)
    var i = 0
    while(i < indices.length) {  
      x(indices(i)) = scale * values(i) 
      i += 1
    }
    Vector(x)
  }
}

请注意,上面的代码示例未经测试,但我想你会明白的。

【讨论】:

  • 感谢您的建议。我将问题移至代码审查:codereview.stackexchange.com/questions/38354/….
  • 感谢您的建议。我将问题移至 [code review:] (codereview.stackexchange.com/questions/38354/…)。我发现最耗时的步骤是:var x = new Array[Double](dimNum)。对于每个输入行,该行将生成一个高维数组。当我将其修改为稀疏向量(数组)时。一次迭代的时间从 3 小时到 20 分钟不等。但是对于应用程序来说,20 分钟是不可接受的。那么,您有什么建议吗?谢谢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-08-31
  • 2017-01-10
  • 2011-08-31
  • 1970-01-01
  • 2017-11-14
  • 1970-01-01
相关资源
最近更新 更多