【问题标题】:¿What is maxIter parameter in MultilayerPerceptronClassifier - Spark - mllib?¿ MultilayerPerceptronClassifier - Spark - mllib 中的 maxIter 参数是什么?
【发布时间】:2020-04-29 15:33:13
【问题描述】:

¿MultilayerPerceptronClassifier - Spark - mllib 中的 maxIter 是什么?

1.参数maxIter告诉优化算法为了找到最小误差而允许做的最大跳数?

2.参数maxIter告诉最大epochs(整个数据集通过网络的最大次数)?

class pyspark.ml.classification.MultilayerPerceptronClassifier(featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, tol=1e-06, seed=None, layers=None, blockSize=128, stepSize=0.03, solver='l-bfgs', initialWeights=None, probabilityCol='probability', rawPredictionCol='rawPrediction')

【问题讨论】:

  • 我查看了MultilayerPerceptronClassifier类的源码,看到maxIter参数是梯度计算停止条件之一,spark mapPartitions方法中使用了blockSize。非常感谢您的帮助@EmiCareOfCell44

标签: apache-spark apache-spark-mllib


【解决方案1】:

Spark 梯度优化器使用 RDD treeAggregate 函数工作。每次迭代它会占用 RDD 的一部分,默认为 1,并将梯度优化操作分配给 worker,每次迭代都会占用整个 RDD。在这种情况下,可以将一次迭代视为一个 epoch。这种方法简化了使用 Spark 的优化过程。还有另一种更高级的深度学习优化器实现,例如 BigDL,它允许设置批量大小并使用 BlockManager 计算每次迭代的分布式梯度聚合。在这种情况下,一次迭代对应于一次小批量执行。

【讨论】:

    【解决方案2】:

    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

      /**
       * Aggregates the elements of this RDD in a multi-level tree pattern.
       * This method is semantically identical to [[org.apache.spark.rdd.RDD#aggregate]].
       *
       * @param depth suggested depth of the tree (default: 2)
       */
      def treeAggregate[U: ClassTag](zeroValue: U)(
          seqOp: (U, T) => U,
          combOp: (U, U) => U,
          depth: Int = 2): U = withScope {
        require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
        if (partitions.length == 0) {
          Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
        } else {
          val cleanSeqOp = context.clean(seqOp)
          val cleanCombOp = context.clean(combOp)
          val aggregatePartition =
            (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
          var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
          var numPartitions = partiallyAggregated.partitions.length
          val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
          // If creating an extra level doesn't help reduce
          // the wall-clock time, we stop tree aggregation.
    
          // Don't trigger TreeAggregation when it doesn't save wall-clock time
          while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
            numPartitions /= scale
            val curNumPartitions = numPartitions
            partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
              (i, iter) => iter.map((i % curNumPartitions, _))
            }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
          }
          val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
          partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
        }
      }
    

    【讨论】:

      猜你喜欢
      • 2017-11-17
      • 2016-03-24
      • 2016-10-25
      • 2015-07-13
      • 2016-10-20
      • 2019-06-29
      • 2017-09-28
      • 1970-01-01
      • 2016-12-14
      相关资源
      最近更新 更多