【问题标题】:Create many Spark MLlib models based on partitioned DataFrame using Pipeline使用 Pipeline 基于分区 DataFrame 创建许多 Spark MLlib 模型
【发布时间】:2018-09-23 01:05:52
【问题描述】:

scala> spark.version res8: String = 2.2.0

我正在使用包含 locationID 列的 spark Dataframe。我创建了一个 MLlib 管道来构建线性回归模型,当我为单个 locationID 提供数据时它就可以工作。我现在想为每个“locationID”创建许多模型(生产中可能有几千个 locationID)。我想保存每个模型的模型系数。

我不确定如何在 Scala 中做到这一点。

我的管道是这样定义的:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql


// Load the regression input data
val mydata = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("./inputdata.csv")

// Crate month one hot encoding
val monthIndexer = new StringIndexer()
  .setInputCol("month")
  .setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
  .setInputCol(monthIndexer.getOutputCol)
  .setOutputCol("monthVec")
val assembler =  new VectorAssembler()
  .setInputCols(Array("monthVec","tran_adr"))
  .setOutputCol("features")
val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
val pipeline = new Pipeline()
  .setStages(Array(monthIndexer, monthEncoder, assembler, lr))


// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)

然后我可以像这样提取模型细节:

val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]

println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

现在我想对mydata 中的列locationID 进行分组,并在数据的每个分区上运行管道。

我尝试过使用 groupby,但我只能聚合。

val grouped = mydata.groupBy("locationID")

我还尝试将唯一的 locationID 拉为一个列表并循环遍历它:

val locationList = mydata.select(mydata("prop_code")).distinct

locationList.foreach { printLn }

我知道 spark 不适合创建许多较小的模型,它最适合在大量数据上创建一个模型,但我的任务是这样做作为概念证明。

在 spark 中做这样的事情的正确方法是什么?

【问题讨论】:

    标签: scala apache-spark spark-dataframe apache-spark-mllib


    【解决方案1】:

    在 spark 中做这样的事情的正确方法是什么?

    我会冒险声称根本没有好的方法。有许多可以处理核心数据处理的高级工具和许多可用于编排独立学习任务的任务调度库。 Spark 在这里根本不提供任何东西。

    它的调度能力很一般,ML / MLlib 工具也很平庸,当每个任务都是独立的时候,缩放和容错是没有用的。

    您可以使用 Spark 进行通用调度(如果您不介意使用 Python,这个想法通过 sklearn keyed models 实现)但就是这样。

    【讨论】:

    • 谢谢。我实际上已经在 python 中对此进行了编码,现在我正在尝试在 scala/spark 中做同样的事情以进行基准测试。在这一点上,我只是想获得与我的 python/sklearn 代码产生相同结果的东西。
    • 重新表述我的问题。而不是“什么是正确的方法”,我也在寻找“什么方法可行?”
    【解决方案2】:

    我遇到了同样的问题。我的数据在 $"description_pretty" 上进行了分区,这就是我处理它的方式。我在其分区上拆分数据框,将其输入管道,选择相关列,然后将其合并在一起。

        val pipe = new Pipeline().setStages(Array(encoder, assembler, 
             multivariate_linear_model))
    
        val descriptions_pretty = training_df.select("description_pretty").
             distinct.
             as[String].
             rdd.
             collect
    
        val model_predictions_df = descriptions_pretty.par.
             map(x => pipe.fit(training_df.filter($"description_pretty" === x)).
                  transform(prediction_df.filter($"description_pretty" === x)).
                  select($"description", $"description_pretty", 
                  $"standard_event_date".cast("String"), 
                  $"prediction".as("daily_peak_bps"))).
             reduce( _ union _)
    

    您可以在 .transform 之前停止,而是抓取系数

    【讨论】:

      猜你喜欢
      • 2016-09-19
      • 2023-04-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-05-26
      • 1970-01-01
      • 2016-03-06
      • 1970-01-01
      相关资源
      最近更新 更多