我想在@Mehdi Lamrani 的回答中添加一些信息:
如果你想在 SparkML 中进行多项式线性回归,你可以使用 PolynomialExpansion 类。
有关信息,请查看SparkML Doc 中的课程
或在Spark API Doc
这是一个实现示例:
假设我们有一个训练和测试数据集,存储在两个 csv 文件中,标题包含列的名称(特征、标签)。
每个数据集包含三个名为 f1,f2,f3 的特征,每个特征都是 Double 类型(这是 X 矩阵),以及一个名为 mylabel 的标签特征(Y 向量) 。
对于这段代码,我使用了 Spark+Scala:
斯卡拉版本:2.12.8
Spark 版本 2.4.0。
我们假设 SparkML 库已经下载到 build.sbt 中。
首先,导入库:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.{SparkConf, SparkContext}
创建 Spark 会话和 Spark 上下文:
val ss = org.apache.spark.sql
.SparkSession.builder()
.master("local")
.appName("Read CSV")
.enableHiveSupport()
.getOrCreate()
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
实例化你要使用的变量:
val f_train:String = "path/to/your/train_file.csv"
val f_test:String = "path/to/your/test_file.csv"
val degree:Int = 3 // Set the degree of your choice
val maxIter:Int = 10 // Set the max number of iterations
val lambda:Double = 0.0 // Set your lambda
val alpha:Double = 0.3 // Set the learning rate
首先,我们先创建几个udf-s,用于数据读取和预处理。
udf toFeatures 的参数类型将是Vector,后跟特征参数的类型:(Double,Double,Double)
val toFeatures = udf[Vector, Double, Double, Double] {
(a,b,c) => Vectors.dense(a,b,c)
}
val encodeIntToDouble = udf[Double, Int](_.toDouble)
现在让我们创建一个从 CSV 中提取数据并使用 PolynomialExpansion 从现有特征中创建新特征的函数:
def getDataPolynomial(
currentfile:String,
sc:SparkSession,
sco:SparkContext,
degree:Int
):DataFrame =
{
val df_rough:DataFrame = sc.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.option("inferSchema", value=true)
.load(currentfile)
.toDF("f1", "f2", "f3", "myLabel")
// you may add or not the last line
val df:DataFrame = df_rough
.withColumn("featNormTemp", toFeatures(df_rough("f1"), df_rough("f2"), df_rough("f3")))
.withColumn("label", Tools.encodeIntToDouble(df_rough("myLabel")))
val polyExpansion = new PolynomialExpansion()
.setInputCol("featNormTemp")
.setOutputCol("polyFeatures")
.setDegree(degree)
val polyDF:DataFrame=polyExpansion.transform(df.select("featNormTemp"))
val datafixedWithFeatures:DataFrame = polyDF.withColumn("features", polyDF("polyFeatures"))
val datafixedWithFeaturesLabel = datafixedWithFeatures
.join(df,df("featNormTemp") === datafixedWithFeatures("featNormTemp"))
.select("label", "polyFeatures")
datafixedWithFeaturesLabel
}
现在,为训练数据集和测试数据集运行该函数,使用为多项式展开选择的次数。
val X:DataFrame = getDataPolynomial(f_train,ss,sc,degree)
val X_test:DataFrame = getDataPolynomial(f_test,ss,sc,degree)
使用管道运行算法以获得线性回归模型:
val assembler = new VectorAssembler()
.setInputCols(Array("polyFeatures"))
.setOutputCol("features2")
val lr = new LinearRegression()
.setMaxIter(maxIter)
.setRegParam(lambda)
.setElasticNetParam(alpha)
.setFeaturesCol("features2")
.setLabelCol("label")
// Fit the model:
val pipeline:Pipeline = new Pipeline().setStages(Array(assembler,lr))
val lrModel:PipelineModel = pipeline.fit(X)
// Get prediction on the test set :
val result:DataFrame = lrModel.transform(X_test)
最后,使用均方误差度量评估结果:
def leastSquaresError(result:DataFrame):Double = {
val rm:RegressionMetrics = new RegressionMetrics(
result
.select("label","prediction")
.rdd
.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
Math.sqrt(rm.meanSquaredError)
}
val error:Double = leastSquaresError(result)
println("Error : "+error)
我希望这可能有用!