【问题标题】:How to transform a categorical variable in Spark into a set of columns coded as {0,1}?如何将 Spark 中的分类变量转换为一组编码为 {0,1} 的列?
【发布时间】:2015-07-18 05:48:30
【问题描述】:

我正在尝试使用 Spark MLlib(使用 Scala)对包含分类变量的数据集执行逻辑回归 (LogisticRegressionWithLBFGS)。我发现 Spark 无法使用这种变量。

在 R 中有一种简单的方法来处理这类问题:我将变量转换为因子(类别),因此 R 创建了一组编码为 {0,1} 指示变量的列。

如何使用 Spark 执行此操作?

【问题讨论】:

  • “不能使用那种变量”是什么意思?我不是 R 专家,但分类变量不只是枚举吗?
  • 我的意思是,如果我不告诉 R 我的变量是分类变量,R 会将它视为一个连续变量(例如,一个等于“'1'”的变量以表示存在特定的特征, "'2'" 如果没有,"'3'" 如果信息缺失)。为了将此变量与 continue 变量区分开来,我告诉 R 使用命令“as.factor”将变量转换为因数。在 Spark 中,该变量自动被视为 continue 并且自动命令“as.factor”不存在,所以我必须自己创建它。

标签: scala apache-spark bigdata apache-spark-mllib categorical-data


【解决方案1】:

使用VectorIndexer,您可以告诉索引器一个字段可能具有的不同值(基数)的数量,以便通过 setMaxCategories() 方法被视为分类。

val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)

来自Scaladocs

Vector 数据集中分类特征列的索引类。

这有两种使用模式:

自动识别分类特征(默认行为)

这有助于将未知向量的数据集处理成具有一些 连续特征和一些分类特征。 连续和分类之间的选择基于 maxCategories 参数。

将 maxCategories 设置为任何分类特征应具有的最大分类数。

例如:特征 0 具有唯一值 {-1.0, 0.0},特征 1 具有唯一值 {1.0, 3.0, 5.0}。如果 maxCategories = 2,则特征 0 将被声明为分类并使用索引 {0, 1},而特征 1 将被声明为连续。

我发现这是提取分类值的一种方便(虽然粗粒度)的方法,但请注意,在任何情况下,如果您有一个想要连续的较低arity 的字段(例如,大学生的年龄与原籍国或美国州)。

【讨论】:

    【解决方案2】:

    Spark 1.4 中即将推出 VectorIndexer,它可能会帮助您进行这种特征转换:http://people.apache.org/~pwendell/spark-1.4.0-rc1-docs/api/scala/index.html#org.apache.spark.ml.feature.VectorIndexer

    但是看起来这只会在 spark.ml 而不是 mllib 中可用

    https://issues.apache.org/jira/browse/SPARK-4081

    【讨论】:

      【解决方案3】:

      如果我理解正确,您不想在几个虚拟列中转换 1 个分类列。您希望 spark 了解该列是分类的而不是数字的。

      我认为这取决于您现在要使用的算法。例如随机森林和GBT都有categoricalFeaturesInfo作为参数在这里检查它:

      https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.tree.RandomForest$

      例如:

      categoricalFeaturesInfo = Map[Int, Int]((1,2),(2,5))

      实际上是说你的特征的第二列(索引从 0 开始,所以 1 是第二列)是一个有 2 个级别的分类特征,第三个也是一个有 5 个级别的分类特征。您可以在训练 randomForest 或 GBT 时指定这些参数。

      您需要确保您的级别映射到 0,1,2...所以如果您有类似 ("good","medium","bad") 之类的内容,请将其映射到 (0,1,2) .

      现在,在您的情况下,您想使用 LogisticRegressionWithLBFGS。在这种情况下,我的建议是将分类列实际转换为虚拟列。例如,具有 3 个级别(“好”、“中”、“坏”)的单列分为 3 列,其中 1/0 取决于哪一个命中。我没有可以使用的示例,所以这里有一个 scala 中应该可以使用的示例代码:

      val dummygen = (data : DataFrame, col:Array[String]) => {
          var temp = data
          for(i <- 0 until col.length) {
            val N = data.select(col(i)).distinct.count.toInt
            for (j<- 0 until N)
            temp = temp.withColumn(col(i) + "_" + j.toString, callUDF(index(j), DoubleType, data(col(i))))
          }
        temp
        }
        val index = (value:Double) => {(a:Double) => {
          if (value==a) {
            1
          } else{
            0
          }
        }}
      

      你可以这样称呼它:

      val results = dummygen(data, Array("CategoricalColumn1","CategoricalColumn2"))
      

      在这里,我为分类列列表执行此操作(以防万一您的功能列表中有超过 1 个)。第一个“for 循环”遍历每个分类列,第二个“for 循环”遍历列中的每个级别并创建与每列的级别数相等的列数。

      重要!!!它假定您首先将它们映射到 0,1,2...

      然后,您可以使用这个新功能集运行 LogisticRegressionWithLBFGS。这种方法也有助于 SVM。

      【讨论】:

        【解决方案4】:

        如果类别可以放入驱动程序内存中,这是我的建议:

        import org.apache.spark.ml.feature.StringIndexer
        import org.apache.spark.sql.functions._
        import org.apache.spark.sql._
        
        
        val df = Seq((0, "a"),(1, "b"),(2, "c"),(3, "a"),(4, "a"),(5, "c"),(6,"c"),(7,"d"),(8,"b"))
                    .toDF("id", "category")
        val indexer = new StringIndexer()
                           .setInputCol("category")
                           .setOutputCol("categoryIndex")
                           .fit(df)
        
        val indexed = indexer.transform(df)
        
        val categoriesIndecies = indexed.select("category","categoryIndex").distinct
        val categoriesMap: scala.collection.Map[String,Double] = categoriesIndecies.map(x=>(x(0).toString,x(1).toString.toDouble)).collectAsMap()
        
        def getCategoryIndex(catMap: scala.collection.Map[String,Double], expectedValue: Double) = udf((columnValue: String) =>
        if (catMap(columnValue) == expectedValue) 1 else 0)
        
        
        val newDf:DataFrame =categoriesMap.keySet.toSeq.foldLeft[DataFrame](indexed)(
             (acc,c) => 
                  acc.withColumn(c,getCategoryIndex(categoriesMap,categoriesMap(c))($"category"))
             )
        
        newDf.show
        
        
        +---+--------+-------------+---+---+---+---+
        | id|category|categoryIndex|  b|  d|  a|  c|
        +---+--------+-------------+---+---+---+---+
        |  0|       a|          0.0|  0|  0|  1|  0|
        |  1|       b|          2.0|  1|  0|  0|  0|
        |  2|       c|          1.0|  0|  0|  0|  1|
        |  3|       a|          0.0|  0|  0|  1|  0|
        |  4|       a|          0.0|  0|  0|  1|  0|
        |  5|       c|          1.0|  0|  0|  0|  1|
        |  6|       c|          1.0|  0|  0|  0|  1|
        |  7|       d|          3.0|  0|  1|  0|  0|
        |  8|       b|          2.0|  1|  0|  0|  0|
        +---+--------+-------------+---+---+---+---+
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2018-05-18
          • 1970-01-01
          • 2022-01-22
          • 2019-11-15
          • 1970-01-01
          • 2019-11-24
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多