【问题标题】:MinMax Normalization in scalascala中的MinMax归一化
【发布时间】:2016-02-28 17:38:22
【问题描述】:

我有一个包含多列的 org.apache.spark.sql.DataFrame。我想使用 MinMax Normalization 或任何技术来缩放 1 列 (lat_long_dist) 以在 -1 和 1 之间缩放数据并将数据类型保留为 org.apache.spark.sql.DataFrame

scala> val df = sqlContext.csvFile("tenop.csv")
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string,
  ip_crowding: string, lat_long_dist: double, stream_name_1: string]

我找到了 StandardScaler 选项,但这需要先转换数据集,然后才能进行转换。有没有简单干净的方法。

【问题讨论】:

    标签: scala apache-spark normalization apache-spark-sql


    【解决方案1】:

    我猜你想要的是这样的

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.functions.{min, max, lit}
    
    val df = sc.parallelize(Seq(
      (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
    )).toDF("k", "v")
    
    val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
      case Row(x: Double, y: Double) => (x, y)
    }
    
    val scaledRange = lit(2) // Range of the scaled variable
    val scaledMin = lit(-1)  // Min value of the scaled variable
    val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range
    
    val vScaled = scaledRange * vNormalized + scaledMin
    
    df.withColumn("vScaled", vScaled).show
    
    // +---+-----+--------------------+
    // |  k|    v|             vScaled|
    // +---+-----+--------------------+
    // |  1|  0.5| -0.3093093093093092|
    // |  2| 10.2| 0.27327327327327344|
    // |  3|  5.7|0.003003003003003...|
    // |  4|-11.0|                -1.0|
    // |  5| 22.3|                 1.0|
    // +---+-----+--------------------+
    

    【讨论】:

      【解决方案2】:

      当您已经在使用 Spark 时,这里有另一个建议。

      为什么不在 ml 包中使用 MinMaxScaler?

      让我们用 zero323 中的相同示例来试试这个。

      import org.apache.spark.mllib.linalg.Vectors
      import org.apache.spark.ml.feature.MinMaxScaler
      import org.apache.spark.sql.functions.udf
      
      val df = sc.parallelize(Seq(
        (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
      )).toDF("k", "v")
      
      //val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))
      
      val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
      val df2 = df.withColumn("vVec", vectorizeCol(df("v"))
      
      val scaler = new MinMaxScaler()
          .setInputCol("vVec")
          .setOutputCol("vScaled")
          .setMax(1)
          .setMin(-1)
      
      scaler.fit(df2).transform(df2).show
      +---+-----+-------+--------------------+
      |  k|    v|   vVec|             vScaled|
      +---+-----+-------+--------------------+
      |  1|  0.5|  [0.5]|[-0.3093093093093...|
      |  2| 10.2| [10.2]|[0.27327327327327...|
      |  3|  5.7|  [5.7]|[0.00300300300300...|
      |  4|-11.0|[-11.0]|              [-1.0]|
      |  5| 22.3| [22.3]|               [1.0]|
      +---+-----+-------+--------------------+
      

      利用一次缩放多个列。

      val df = sc.parallelize(Seq(
          (1.0, -1.0, 2.0),
          (2.0, 0.0, 0.0),
          (0.0, 1.0, -1.0)
      )).toDF("a", "b", "c")
      
      import org.apache.spark.ml.feature.VectorAssembler
      
      val assembler = new VectorAssembler()
          .setInputCols(Array("a", "b", "c"))
          .setOutputCol("features")
      
      val df2 = assembler.transform(df)
      
      // Reusing the scaler instance above with the same min(-1) and max(1) 
      scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
      +---+----+----+--------------+--------------------+
      |  a|   b|   c|      features|      scaledFeatures|
      +---+----+----+--------------+--------------------+
      |1.0|-1.0| 2.0|[1.0,-1.0,2.0]|      [0.0,-1.0,1.0]|
      |2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
      |0.0| 1.0|-1.0|[0.0,1.0,-1.0]|     [-1.0,1.0,-1.0]|
      +---+----+----+--------------+--------------------+
      

      【讨论】:

        【解决方案3】:

        还有另一种解决方案。从 Matt、Lyle 和 zero323 那里获取代码,谢谢!

        import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}
        
        val df = sc.parallelize(Seq(
          (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
        )).toDF("k", "v")
        
        val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
        val df2= assembler.transform(df)
        
        val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)
        
        scaler.fit(df2).transform(df2).show
        

        结果:

        +---+-----+-------+--------------------+
        |  k|    v|   vVec|             vScaled|
        +---+-----+-------+--------------------+
        |  1|  0.5|  [0.5]|[-0.3093093093093...|
        |  2| 10.2| [10.2]|[0.27327327327327...|
        |  3|  5.7|  [5.7]|[0.00300300300300...|
        |  4|-11.0|[-11.0]|              [-1.0]|
        |  5| 22.3| [22.3]|               [1.0]|
        +---+-----+-------+--------------------+
        

        顺便说一句:其他解决方案在我这边产生错误

        java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
          at scala.Predef$.require(Predef.scala:224)
          at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
          at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
          at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
          at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
          at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
          at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
          ... 50 elided
        

        非常感谢!

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2019-04-10
          • 2014-09-22
          • 2015-07-02
          • 2023-03-07
          • 2023-03-30
          • 2023-03-06
          • 1970-01-01
          相关资源
          最近更新 更多