【问题标题】:Spark, Scala, DataFrame: create feature vectorsSpark、Scala、DataFrame:创建特征向量
【发布时间】:2016-02-25 07:21:19
【问题描述】:

我有一个DataFrame,如下所示:

userID, category, frequency
1,cat1,1
1,cat2,3
1,cat9,5
2,cat4,6
2,cat9,2
2,cat10,1
3,cat1,5
3,cat7,16
3,cat8,2

不同类别的数量是 10,我想为每个 userID 创建一个特征向量,并用零填充缺失的类别。

所以输出会是这样的:

userID,feature
1,[1,3,0,0,0,0,0,0,5,0]
2,[0,0,0,6,0,0,0,0,2,1]
3,[5,0,0,0,0,0,16,2,0,0]

这只是一个说明性示例,实际上我有大约 200,000 个唯一用户 ID 和 300 个唯一类别。

创建特征DataFrame 的最有效方法是什么?

【问题讨论】:

标签: scala apache-spark apache-spark-sql apache-spark-ml


【解决方案1】:

更多DataFrame 为中心的解决方案:

import org.apache.spark.ml.feature.VectorAssembler

val df = sc.parallelize(Seq(
  (1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), (2, "cat4", 6),
  (2, "cat9", 2), (2, "cat10", 1), (3, "cat1", 5), (3, "cat7", 16),
  (3, "cat8", 2))).toDF("userID", "category", "frequency")

// Create a sorted array of categories
val categories = df
  .select($"category")
  .distinct.map(_.getString(0))
  .collect
  .sorted

// Prepare vector assemble
val assembler =  new VectorAssembler()
  .setInputCols(categories)
  .setOutputCol("features")

// Aggregation expressions
val exprs = categories.map(
   c => sum(when($"category" === c, $"frequency").otherwise(lit(0))).alias(c))

val transformed = assembler.transform(
    df.groupBy($"userID").agg(exprs.head, exprs.tail: _*))
  .select($"userID", $"features")

还有一个 UDAF 替代方案:

import org.apache.spark.sql.expressions.{
  MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.types.{
  StructType, ArrayType, DoubleType, IntegerType}
import scala.collection.mutable.WrappedArray

class VectorAggregate (n: Int) extends UserDefinedAggregateFunction {
    def inputSchema = new StructType()
      .add("i", IntegerType)
      .add("v", DoubleType)
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
    def dataType = new VectorUDT()
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array.fill(n)(0.0))
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) {
        val i = input.getInt(0)
        val v = input.getDouble(1)
        val buff = buffer.getAs[WrappedArray[Double]](0) 
        buff(i) += v
        buffer.update(0, buff)
      }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
      val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
      for ((x, i) <- buff2.zipWithIndex) {
        buff1(i) += x
      }
      buffer1.update(0, buff1)
    }

    def evaluate(buffer: Row) =  Vectors.dense(
      buffer.getAs[Seq[Double]](0).toArray)
}

示例用法:

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("category_idx")
  .fit(df)

val indexed = indexer.transform(df)
  .withColumn("category_idx", $"category_idx".cast("integer"))
  .withColumn("frequency", $"frequency".cast("double"))

val n = indexer.labels.size + 1

val transformed = indexed
  .groupBy($"userID")
  .agg(new VectorAggregate(n)($"category_idx", $"frequency").as("vec"))

transformed.show

// +------+--------------------+
// |userID|                 vec|
// +------+--------------------+
// |     1|[1.0,5.0,0.0,3.0,...|
// |     2|[0.0,2.0,0.0,0.0,...|
// |     3|[5.0,0.0,16.0,0.0...|
// +------+--------------------+

在这种情况下,值的顺序由indexer.labels 定义:

indexer.labels
// Array[String] = Array(cat1, cat9, cat7, cat2, cat8, cat4, cat10)

在实践中,我更喜欢Odomontois 的解决方案,所以这些主要是为了参考。

【讨论】:

  • 您好 Zero323,我现在正在尝试您的解决方案,它似乎比 Odomontois 的解决方案更慢且更消耗资源。最后给了我一个java.lang.OutOfMemoryError: GC overhead limit exceeded 异常。
  • 可以的。昂贵的。它在聚合之前将数据帧转换为宽格式,因此对于大量类别,这意味着大量开销。
  • 酷。让我想起了 dplyr 和 tidyr。但是这个 API 设计对我来说看起来很奇怪。 .agg(exprs.head, exprs.tail: _*).agg(exprs:_*) 丑 5 倍,而 c =&gt; sum(when($"category" === c, $"frequency").otherwise(lit(0)) 吓坏了我所有的 LISP
  • @Odomontois 是的,它既丑陋又令人困惑(例如select 有纯可变参数版本)。一直在考虑开个PR加agg(exprs: Column*)版本。
  • @Rami 这是一个棘手的问题。您可以忽略警告,但不能忽略增长计划。有 20 列我不会担心。使用 100 或 1000 时,事情会变得丑陋。
【解决方案2】:

假设:

val cs: SparkContext
val sc: SQLContext
val cats: DataFrame

其中userIdfrequencybigint 列,对应于scala.Long

我们正在创建中间映射RDD

val catMaps = cats.rdd
  .groupBy(_.getAs[Long]("userId"))
  .map { case (id, rows) => id -> rows
    .map { row => row.getAs[String]("category") -> row.getAs[Long]("frequency") }
    .toMap
  }

然后按字典顺序收集所有呈现的类别

val catNames = cs.broadcast(catMaps.map(_._2.keySet).reduce(_ union _).toArray.sorted)

或者手动创建

val catNames = cs.broadcast(1 to 10 map {n => s"cat$n"} toArray)

最后,我们将映射转换为具有 0 值的数组以表示不存在的值

import sc.implicits._
val catArrays = catMaps
      .map { case (id, catMap) => id -> catNames.value.map(catMap.getOrElse(_, 0L)) }
      .toDF("userId", "feature")

现在 catArrays.show() 打印类似

+------+--------------------+
|userId|             feature|
+------+--------------------+
|     2|[0, 1, 0, 6, 0, 0...|
|     1|[1, 0, 3, 0, 0, 0...|
|     3|[5, 0, 0, 0, 16, ...|
+------+--------------------+

这可能不是数据帧最优雅的解决方案,因为我对这个火花领域几乎不熟悉。

请注意,您可以手动创建您的catNames,为缺少的cat3cat5、...添加零。

还要注意,否则catMaps RDD 会被操作两次,你可能要.persist()

【讨论】:

    【解决方案3】:

    鉴于您的意见:

    val df = Seq((1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), 
                 (2, "cat4", 6), (2, "cat9", 2), (2, "cat10", 1), 
                 (3, "cat1", 5), (3, "cat7", 16), (3, "cat8", 2))
               .toDF("userID", "category", "frequency")
    df.show
    +------+--------+---------+
    |userID|category|frequency|
    +------+--------+---------+
    |     1|    cat1|        1|
    |     1|    cat2|        3|
    |     1|    cat9|        5|
    |     2|    cat4|        6|
    |     2|    cat9|        2|
    |     2|   cat10|        1|
    |     3|    cat1|        5|
    |     3|    cat7|       16|
    |     3|    cat8|        2|
    +------+--------+---------+
    

    只要运行:

    val pivoted = df.groupBy("userID").pivot("category").avg("frequency")
    val dfZeros = pivoted.na.fill(0)
    dzZeros.show    
    +------+----+-----+----+----+----+----+----+                                    
    |userID|cat1|cat10|cat2|cat4|cat7|cat8|cat9|
    +------+----+-----+----+----+----+----+----+
    |     1| 1.0|  0.0| 3.0| 0.0| 0.0| 0.0| 5.0|
    |     3| 5.0|  0.0| 0.0| 0.0|16.0| 2.0| 0.0|
    |     2| 0.0|  1.0| 0.0| 6.0| 0.0| 0.0| 2.0|
    +------+----+-----+----+----+----+----+----+
    

    最后,使用VectorAssembler创建org.apache.spark.ml.linalg.Vector

    注意:我还没有检查过这方面的表现......

    编辑:可能更复杂,但可能更高效!

    def toSparseVectorUdf(size: Int) = udf[Vector, Seq[Row]] {
      (data: Seq[Row]) => {
        val indices = data.map(_.getDouble(0).toInt).toArray
        val values = data.map(_.getInt(1).toDouble).toArray
        Vectors.sparse(size, indices, values)
      }
    }
    
    val indexer = new StringIndexer().setInputCol("category").setOutputCol("idx")
    val indexerModel = indexer.fit(df)
    val totalCategories = indexerModel.labels.size
    val dataWithIndices = indexerModel.transform(df)
    val data = dataWithIndices.groupBy("userId").agg(sort_array(collect_list(struct($"idx", $"frequency".as("val")))).as("data"))
    val dataWithFeatures = data.withColumn("features", toSparseVectorUdf(totalCategories)($"data")).drop("data")
    dataWithFeatures.show(false)
    +------+--------------------------+
    |userId|features                  |
    +------+--------------------------+
    |1     |(7,[0,1,3],[1.0,5.0,3.0]) |
    |3     |(7,[0,2,4],[5.0,16.0,2.0])|
    |2     |(7,[1,5,6],[2.0,6.0,1.0]) |
    +------+--------------------------+
    

    注意:StringIndexer 将按频率对类别进行排序 => 最频繁的类别将位于 indexerModel.labels 中的 index=0 处。如果您愿意,请随意使用您自己的映射并将其直接传递给toSparseVectorUdf

    【讨论】:

      猜你喜欢
      • 2018-02-11
      • 2022-01-01
      • 2020-08-09
      • 2015-12-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-03-30
      • 2022-01-05
      相关资源
      最近更新 更多