【问题标题】:Merging RDD records to obtain a single Row with multiple conditional counters合并 RDD 记录以获得具有多个条件计数器的单个 Row
【发布时间】:2019-05-06 23:52:29
【问题描述】:

作为一点上下文,我在这里试图实现的是给定由一组键分组的多行,在第一次减少之后,我想将它们分组在一个一般行中,例如,日期,每个分组的计数器都是先前计算的。仅仅阅读它可能看起来并不清楚,所以这里是一个应该发生的示例输出(非常简单,并不复杂)。

(("Volvo", "T4", "2019-05-01"), 5)
(("Volvo", "T5", "2019-05-01"), 7)
(("Audi", "RS6", "2019-05-01"), 4)

一旦合并了那些 Row 对象...

date         , volvo_counter     , audi_counter
"2019-05-01" , 12                , 4

我认为这是一个非常极端的情况,可能有不同的方法,但我想知道在同一个 RDD 中是否有任何解决方案,因此不需要多个 RDD 除以计数器。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    你想做的是一个支点。您谈论 RDD,所以我假设您的问题是:“如何使用 RDD API 进行数据透视?”。据我所知,RDD API 中没有内置函数可以做到这一点。你可以这样自己做:

    // let's create sample data
    val rdd = sc.parallelize(Seq(
      (("Volvo", "T4", "2019-05-01"), 5),
      (("Volvo", "T5", "2019-05-01"), 7),
      (("Audi", "RS6", "2019-05-01"), 4)
    ))
    
    // If the keys are not known in advance, we compute their distinct values
    val values = rdd.map(_._1._1).distinct.collect.toSeq
    // values: Seq[String] = WrappedArray(Volvo, Audi)
    
    // Finally we make the pivot and use reduceByKey on the sequence
    val res = rdd
        .map{ case ((make, model, date), counter) =>
            date -> values.map(v => if(make == v) counter else 0)
        }
        .reduceByKey((a, b) => a.indices.map(i => a(i) + b(i)))
    
    // which gives you this
    res.collect.head
    // (String, Seq[Int]) = (2019-05-01,Vector(12, 4))
    

    请注意,您可以使用 SparkSQL API 编写更简单的代码:

    // let's first transform the previously created RDD to a dataframe:
    val df = rdd.map{ case ((a, b, c), d) => (a, b, c, d) }
        .toDF("make", "model", "date", "counter")
    
    // And then it's as simple as that:
    df.groupBy("date")
      .pivot("make")
      .agg(sum("counter"))
      .show
    
    +----------+----+-----+
    |      date|Audi|Volvo|
    +----------+----+-----+
    |2019-05-01|   4|   12|
    +----------+----+-----+
    

    【讨论】:

    • 感谢您的回复,但是当一个案例为 0 时会发生什么?我在旋转后看到一些“空”值,不太确定如何避免它们。使用后映射就足够了吗?
    • 如果在给定日期您没有任何关于给定类别的行,您将得到一个空值。如果你更喜欢0,你可以使用.na.fill(0)
    • 这么快的反应,非常感谢您的帮助! :)
    • 很高兴我能提供帮助,当我看到您的评论时,我只是在寻找答案;-)
    【解决方案2】:

    我认为使用 DataFrame 更容易:

       val data = Seq(
          Record(Key("Volvo", "2019-05-01"), 5),
          Record(Key("Volvo", "2019-05-01"), 7),
          Record(Key("Audi", "2019-05-01"), 4)
        )
    
        val rdd = spark.sparkContext.parallelize(data)
    
        val df = rdd.toDF()
    
        val modelsExpr = df
          .select("key.model").as("model")
          .distinct()
          .collect()
          .map(r => r.getAs[String]("model"))
          .map(m => sum(when($"key.model" === m, $"value").otherwise(0)).as(s"${m}_counter"))
    
        df
          .groupBy("key.date")
          .agg(modelsExpr.head, modelsExpr.tail: _*)
          .show(false)
    

    【讨论】:

      猜你喜欢
      • 2016-06-23
      • 1970-01-01
      • 2017-05-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多