【问题标题】:Spark.ml regressions do not calculate same models as scikit-learnSpark.ml 回归不计算与 scikit-learn 相同的模型
【发布时间】:2017-08-01 10:48:26
【问题描述】:

我在 scikit-learn 和 spark.ml 中设置了一个非常简单的逻辑回归问题,结果出现分歧:他们学习的模型不同,但我不知道为什么(数据相同,模型类型相同,正则化相同...)。

毫无疑问,我在一侧或另一侧缺少一些设置。哪个设置?我应该如何设置 scikit 或 spark.ml 以找到与其对应的相同模型?

下面我给出sklearn代码和spark.ml代码。两者都应该准备好剪切和粘贴并运行。

scikit-learn 代码:

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
    [-0.7306653538519616, 0.0],
    [0.6750417712898752, -0.4232874171873786],
    [0.1863463229359709, -0.8163423997075965],
    [-0.6719842051493347, 0.0],
    [0.9699938346531928, 0.0],
    [0.22759406190283604, 0.0],
    [0.9688721028330911, 0.0],
    [0.5993795346650845, 0.0],
    [0.9219423508390701, -0.8972778242305388],
    [0.7006904841584055, -0.5607635619919824]
])

y = np.array([
    0.0,
    1.0,
    1.0,
    0.0,
    1.0,
    1.0,
    1.0,
    0.0,
    0.0,
    0.0
])

m, n = X.shape

# Add intercept term to simulate inputs to GameEstimator
X_with_intercept = np.hstack((X, np.ones(m)[:,np.newaxis]))

l = 0.3
e = LogisticRegression(
    fit_intercept=False,
    penalty='l2',
    C=1/l,
    max_iter=100,
    tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# => [[ 0.98662189  0.45571052 -0.23467255]]

# Linear regression is called Ridge in sklearn
e = Ridge(
    fit_intercept=False,
    alpha=l,
    max_iter=100,
    tol=1e-11)

e.fit(X_with_intercept, y)

print e.coef_
# =>[ 0.32155545  0.17904355  0.41222418]

spark.ml 代码:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.SQLContext

object TestSparkRegression {
  def main(args: Array[String]): Unit = {
    import org.apache.log4j.{Level, Logger}

    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)

    val sparkTrainingData = new SQLContext(sc)
      .createDataFrame(Seq(
        LabeledPoint(0.0, Vectors.dense(-0.7306653538519616, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.6750417712898752, -0.4232874171873786)),
        LabeledPoint(1.0, Vectors.dense(0.1863463229359709, -0.8163423997075965)),
        LabeledPoint(0.0, Vectors.dense(-0.6719842051493347, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.9699938346531928, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.22759406190283604, 0.0)),
        LabeledPoint(1.0, Vectors.dense(0.9688721028330911, 0.0)),
        LabeledPoint(0.0, Vectors.dense(0.5993795346650845, 0.0)),
        LabeledPoint(0.0, Vectors.dense(0.9219423508390701, -0.8972778242305388)),
        LabeledPoint(0.0, Vectors.dense(0.7006904841584055, -0.5607635619919824))))
      .toDF("label", "features")

    val logisticModel = new LogisticRegression()
      .setRegParam(0.3)
      .setLabelCol("label")
      .setFeaturesCol("features")
      .fit(sparkTrainingData)

    println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
    // Spark logistic model coefficients: [0.5451588538376263,0.26740606573584713] Intercept: -0.13897955358689987

    val linearModel = new LinearRegression()
      .setRegParam(0.3)
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setSolver("l-bfgs")
      .fit(sparkTrainingData)

    println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
    // Spark linear model coefficients: [0.19852664861346023,0.11501200541407802] Intercept: 0.45464906876832323

    sc.stop()
  }
}

【问题讨论】:

  • 在初始化估计器或将数据拆分为训练和测试时存在某种随机性。你考虑过 scikit 中的random_state 吗?我确信 Spark 中一定有类似的东西。
  • 我不太买它。给定足够的迭代,两者都应该收敛到同一个模型。这个问题是凸的,所以只有一个最小值。随机性可能会在存在多个最优值的情况下发挥作用,但恕我直言,这里没有。
  • 可能是这样。请显示spark与scikit的结果。 scikit 中还有一个LinearRigression()
  • 结果与两者的代码一起显示。对于 scikit,我使用了 Ridge,应该与带有正则化参数的 spark.ml LinearRegression 相同。如果我理解正确,Scikit LinearRegression 是没有正则化的 OLS(我可能是错的,我是 scikit 的新手)。

标签: apache-spark scikit-learn apache-spark-mllib


【解决方案1】:

您需要执行以下操作:

  1. 首先标准化 python 和 spark 数据帧。 Spark 在内部默认使用标准化。注意考虑两个包中标准缩放器实现中标准差公式的差异。

  2. 对于逻辑回归,Spark 使用对数损失的平均值(分母是权重之和,即所有权重为 1 时的训练实例数),而 sklearn 使用对数损失之和。在线性回归中,与 sklearn 不同,spark 在误差平方和项中使用 1/2n 因子。 Spark 正则化需要相应地缩小 - 在本例中,逻辑回归为 1/10 倍,线性回归为 1/20 倍。

Scikit 学习代码

import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge

X = np.array([
    [-0.7306653538519616, 0.0],
    [0.6750417712898752, -0.4232874171873786],
    [0.1863463229359709, -0.8163423997075965],
    [-0.6719842051493347, 0.0],
    [0.9699938346531928, 0.0],
    [0.22759406190283604, 0.0],
    [0.9688721028330911, 0.0],
    [0.5993795346650845, 0.0],
    [0.9219423508390701, -0.8972778242305388],
    [0.7006904841584055, -0.5607635619919824]
])

y = np.array([
    0.0,
    1.0,
    1.0,
    0.0,
    1.0,
    1.0,
    1.0,
    0.0,
    0.0,
    0.0
])

m, n = X.shape


from sklearn.preprocessing import StandardScaler

## sqrt(n-1)/sqrt(n) factor for getting the same standardization as spark
Xsc=StandardScaler().fit_transform(X)*3.0/np.sqrt(10.0)

l = 0.3
e = LogisticRegression(
    fit_intercept=True,
    penalty='l2',
    C=1/l,
    max_iter=100,
    tol=1e-11,
    solver='lbfgs',verbose=1)

e.fit(Xsc, y)

print e.coef_, e.intercept_
# => [[ 0.82122437 0.32615256]] [-0.01181534]

#e.get_params(deep=True)

# Linear regression is called Ridge in sklearn
e = Ridge(
    fit_intercept=True,
    alpha=l,
    max_iter=100,
    tol=1e-11)

e.fit(Xsc, y)

print e.coef_,e.intercept_
# =>[ 0.21310109 0.09203616] 0.5

Spark 代码(重构为使用 ML API)

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.StandardScaler

val sparkTrainingData_orig = new SQLContext(sc).
  createDataFrame(Seq(
    (0.0, Vectors.dense(Array(-0.7306653538519616, 0.0))),
    (1.0, Vectors.dense(Array(0.6750417712898752, -0.4232874171873786))),
    (1.0, Vectors.dense(Array(0.1863463229359709, -0.8163423997075965))),
    (0.0, Vectors.dense(Array(-0.6719842051493347, 0.0))),
    (1.0, Vectors.dense(Array(0.9699938346531928, 0.0))),
    (1.0, Vectors.dense(Array(0.22759406190283604, 0.0))),
    (1.0, Vectors.dense(Array(0.9688721028330911, 0.0))),
    (0.0, Vectors.dense(Array(0.5993795346650845, 0.0))),
    (0.0, Vectors.dense(Array(0.9219423508390701, -0.8972778242305388))),
    (0.0, Vectors.dense(Array(0.7006904841584055, -0.5607635619919824))))).
  toDF("label", "features_orig")

val sparkTrainingData=new StandardScaler().
  setWithMean(true).
  setInputCol("features_orig").
  setOutputCol("features").
  fit(sparkTrainingData_orig).
  transform(sparkTrainingData_orig)

//Make regularization 0.3/10=0.03
val logisticModel = new LogisticRegression().
  setRegParam(0.03).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark logistic model coefficients: ${logisticModel.coefficients} Intercept: ${logisticModel.intercept}")
// Spark logistic model coefficients: [0.8212244419577079,0.32615245441495727] Intercept: -0.011815325216668142

//Make regularization 0.3/20=0.015    
val linearModel = new LinearRegression().
  setRegParam(0.015).
  setLabelCol("label").
  setFeaturesCol("features").
  setTol(1e-12).
  setMaxIter(100).
  fit(sparkTrainingData)

println(s"Spark linear model coefficients: ${linearModel.coefficients} Intercept: ${linearModel.intercept}")
// Spark linear model coefficients: [0.21394341729353747,0.09257340293212045] Intercept: 0.5

【讨论】:

  • v 不错的分析
猜你喜欢
  • 2018-12-01
  • 2016-11-28
  • 2021-11-03
  • 2020-03-30
  • 1970-01-01
  • 2014-06-13
  • 2014-01-07
  • 2013-09-30
相关资源
最近更新 更多