【问题标题】:Polynomial regression in spark/ or external packages for sparkspark/或外部包中的多项式回归
【发布时间】:2016-12-16 23:08:02
【问题描述】:

在网上为此主题进行了大量搜索之后,如果我能得到一些指导,我会在这里结束。请继续阅读

在分析 Spark 2.0 之后,我得出结论,多项式回归不可能使用 spark(仅 spark),那么 spark 是否有一些可用于多项式回归的扩展? - Rspark 可以做到(但正在寻找更好的选择) - spark 中的 RFormula 进行预测,但系数不可用(这是我的主要要求,因为我主要对系数值感兴趣)

【问题讨论】:

    标签: machine-learning regression apache-spark-mllib


    【解决方案1】:

    我想在@Mehdi Lamrani 的回答中添加一些信息:

    如果你想在 SparkML 中进行多项式线性回归,你可以使用 PolynomialExpansion 类。 有关信息,请查看SparkML Doc 中的课程 或在Spark API Doc

    这是一个实现示例:

    假设我们有一个训练和测试数据集,存储在两个 csv 文件中,标题包含列的名称(特征、标签)。 每个数据集包含三个名为 f1,f2,f3 的特征,每个特征都是 Double 类型(这是 X 矩阵),以及一个名为 mylabel 的标签特征(Y 向量)

    对于这段代码,我使用了 Spark+Scala: 斯卡拉版本:2.12.8 Spark 版本 2.4.0。

    我们假设 SparkML 库已经下载到 build.sbt 中。

    首先,导入库:

    import org.apache.spark.ml.{Pipeline, PipelineModel}
    import org.apache.spark.ml.linalg.{Vector, Vectors}
    import org.apache.spark.ml.regression.LinearRegression
    import org.apache.spark.mllib.evaluation.RegressionMetrics
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.{SparkConf, SparkContext}
    

    创建 Spark 会话和 Spark 上下文:

    val ss = org.apache.spark.sql
      .SparkSession.builder()
      .master("local")
      .appName("Read CSV")
      .enableHiveSupport()
      .getOrCreate()
    
    val conf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(conf)
    

    实例化你要使用的变量:

    val f_train:String = "path/to/your/train_file.csv"
    val f_test:String = "path/to/your/test_file.csv"
    val degree:Int = 3 // Set the degree of your choice
    val maxIter:Int = 10 // Set the max number of iterations
    val lambda:Double = 0.0 // Set your lambda
    val alpha:Double = 0.3 // Set the learning rate 
    

    首先,我们先创建几个udf-s,用于数据读取和预处理。 udf toFeatures 的参数类型将是Vector,后跟特征参数的类型:(Double,Double,Double)

    val toFeatures = udf[Vector, Double, Double, Double] {
            (a,b,c) => Vectors.dense(a,b,c)
        }
    
    val encodeIntToDouble    = udf[Double, Int](_.toDouble)
    

    现在让我们创建一个从 CSV 中提取数据并使用 PolynomialExpansion 从现有特征中创建新特征的函数:

    def getDataPolynomial(
        currentfile:String, 
        sc:SparkSession, 
        sco:SparkContext, 
        degree:Int
        ):DataFrame = 
    {
        val df_rough:DataFrame = sc.read
          .format("csv")
          .option("header", "true") //first line in file has headers
          .option("mode", "DROPMALFORMED")
          .option("inferSchema", value=true)
          .load(currentfile)
          .toDF("f1", "f2", "f3", "myLabel")
           // you may add or not the last line 
    
        val df:DataFrame = df_rough
            .withColumn("featNormTemp", toFeatures(df_rough("f1"), df_rough("f2"), df_rough("f3")))
            .withColumn("label", Tools.encodeIntToDouble(df_rough("myLabel")))
    
        val polyExpansion = new PolynomialExpansion()
          .setInputCol("featNormTemp")
          .setOutputCol("polyFeatures")
          .setDegree(degree)
    
        val polyDF:DataFrame=polyExpansion.transform(df.select("featNormTemp"))
    
        val datafixedWithFeatures:DataFrame = polyDF.withColumn("features", polyDF("polyFeatures"))
    
        val datafixedWithFeaturesLabel = datafixedWithFeatures
              .join(df,df("featNormTemp") === datafixedWithFeatures("featNormTemp"))
              .select("label", "polyFeatures")
    
        datafixedWithFeaturesLabel
    }
    

    现在,为训练数据集和测试数据集运行该函数,使用为多项式展开选择的次数。

    val X:DataFrame = getDataPolynomial(f_train,ss,sc,degree)
    val X_test:DataFrame = getDataPolynomial(f_test,ss,sc,degree)
    

    使用管道运行算法以获得线性回归模型:

    val assembler = new VectorAssembler()
       .setInputCols(Array("polyFeatures"))
       .setOutputCol("features2")
    
    val lr = new LinearRegression()
        .setMaxIter(maxIter)
        .setRegParam(lambda)
        .setElasticNetParam(alpha)
        .setFeaturesCol("features2")
        .setLabelCol("label")
    
    // Fit the model:
    val pipeline:Pipeline = new Pipeline().setStages(Array(assembler,lr))
    val lrModel:PipelineModel = pipeline.fit(X)
    
    // Get prediction on the test set :
    val result:DataFrame = lrModel.transform(X_test)
    

    最后,使用均方误差度量评估结果:

    def leastSquaresError(result:DataFrame):Double = {
        val rm:RegressionMetrics = new RegressionMetrics(
            result
                .select("label","prediction")
                .rdd
                .map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
        Math.sqrt(rm.meanSquaredError)
    }
    
    val error:Double = leastSquaresError(result)
    println("Error : "+error)
    

    我希望这可能有用!

    【讨论】:

      【解决方案2】:

      多项式回归只是线性回归的另一种情况(如Polynomial regression is linear regressionPolynomial regression)。由于 Spark 有一种线性回归方法,您可以调用该方法来更改输入,使新输入适合多项式回归。例如,如果你只有一个自变量 x,并且你想做二次回归,你必须将你的独立输入矩阵更改为 [x x^2]。

      【讨论】:

        猜你喜欢
        • 2017-12-08
        • 1970-01-01
        • 2021-06-03
        • 1970-01-01
        • 2016-09-26
        • 1970-01-01
        • 1970-01-01
        • 2011-04-23
        • 2019-01-24
        相关资源
        最近更新 更多