【问题标题】:Scala spark: How to train a distributed sparse regression model?Scala spark:如何训练分布式稀疏回归模型?
【发布时间】:2021-01-19 09:17:11
【问题描述】:

我正在尝试构建一个回归模型,其中基础特征矩阵非常大(418K 行,73K 列)并且非常稀疏(58M 非零值,约占整个矩阵的 0.2%)。

我将矩阵坐标表示为 DataFrame,其中第一列是行坐标i,第二列是列坐标j,第三列是{i,j}th 位置的值。

例如以下矩阵:

+-+-+-+
|0|1|0|
|2|0|0|
|0|0|3|
+-+-+-+

表示
+-+-+-----+
|i|j|value|
+-+-+-----+
|0|1| 1   |
|1|0| 2   |
|2|2| 3   |
+-+-+-----+

我有一个单独的 DataFrame,其中包含每一行 i 的标签。

如果可能的话,我更喜欢使用较新的 ml 库而不是较旧的 mllib 的解决方案

【问题讨论】:

    标签: scala apache-spark linear-regression sparse-matrix apache-spark-ml


    【解决方案1】:

    下面我给出一个小代码示例,说明如何在spark ml 中实现分布式稀疏线性回归。我已经在一个大型集群(Databricks Runtime 版本 6.5 ML - 包括 Apache Spark 2.4.5、Scala 2.11)上将它与相关矩阵一起使用,因此它可以很好地扩展并且只需几分钟即可执行。

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.expressions.UserDefinedFunction
    import org.apache.spark.sql.Dataset
    import org.apache.spark.ml.linalg.SparseVector
    import org.apache.spark.ml.feature.LabeledPoint
    import spark.implicits._
    import org.apache.spark.ml.regression.LinearRegression
    
    // Construct Matrix coordinate representation DataFrame
    val df = Seq(
      (0, 1, 14.0), 
      (0, 0, 13.0), 
      (1, 1, 11.0)
    ).toDF("i", "j", "value")
    
    df.show()
    
    +---+---+-----+
    |  i|  j|value|
    +---+---+-----+
    |  0|  1| 14.0|
    |  0|  0| 13.0|
    |  1|  1| 11.0|
    +---+---+-----+
    
    // Construct label DataFrame
    val df_label = Seq(
      (0, 41.1), 
      (1, 21.9) // beta_1 = 1, beta_2 = 2
    ).toDF("i", "label")
    
    df_label.show()
    
    +---+-----+
    |  i|label|
    +---+-----+
    |  0| 41.1|
    |  1| 21.9|
    +---+-----+
    
    // Use a UDF to sort arrays below
    val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
      rows.map { case Row(j: Int, value: Double) => (j, value) }
        .sortBy { case (j, value) => j }
    })
    
    // collect j and value columns to lists, make sure they are sorted by j
    // then join with labels
    val df_collected_with_labels = df
    .groupBy("i")
    .agg(collect_list(struct("j", "value")) as "j_value")
    .select($"i", sortUdf(col("j_value")).alias("j_value_list"))
    .withColumn("j_list", $"j_value_list".getField("_1"))
    .withColumn("value_list", $"j_value_list".getField("_2"))
    .drop("j_value_list")
    .join(df_label, "i")
    
    df_collected_with_labels.show()
    +---+------+------------+-----+
    |  i|j_list|  value_list|label|
    +---+------+------------+-----+
    |  1|   [1]|      [11.0]| 21.9|
    |  0|[0, 1]|[13.0, 14.0]| 41.1|
    +---+------+------------+-----+
    
    val unique_j = df.dropDuplicates("j").count().toInt
    
    val sparse_df = df_collected_with_labels
    .map(r => LabeledPoint(r.getDouble(3), 
                           new SparseVector(size = unique_j, 
                                            indices = r.getAs[Seq[Int]]("j_list").toArray, 
                                            values = r.getAs[Seq[Double]]("value_list").toArray)))
    
    sparse_df.show()
    
    +-----+--------------------+
    |label|            features|
    +-----+--------------------+
    | 21.9|      (2,[1],[11.0])|
    | 41.1|(2,[0,1],[13.0,14...|
    +-----+--------------------+
    
    // Fit sparse regression!
    val lr = new LinearRegression()
    .setFitIntercept(false)
    
    val lrModel = lr.fit(sparse_df)
    
    lrModel.coefficients
    org.apache.spark.ml.linalg.Vector = [1.0174825174825193,1.9909090909090894]
    
    lrModel.predict(new SparseVector(size = unique_j, indices = Array(0), values = Array(4.0)))
    Double = 4.069930069930077
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-10-25
      • 1970-01-01
      • 2021-03-03
      • 2015-06-10
      • 2014-07-31
      • 2020-05-02
      • 1970-01-01
      • 2015-11-18
      相关资源
      最近更新 更多