【问题标题】:Apache Spark | specific time frame aggregation阿帕奇火花 |特定时间框架聚合
【发布时间】:2016-10-28 10:12:30
【问题描述】:

我需要一种每周聚合数据集的方法。这是我的数据集

|      date|organization_id|media_package_id|event_uuid |
+----------+---------------+----------------+-----------+
|2016-10-25|              1|              11|     76304d|
|2016-10-25|              1|              11|     e6285b|
|2016-10-22|              2|              21|     16c04d|
|2016-10-22|              2|              21|     17804d|
|2016-10-22|              2|              21|     18904x|
|2016-10-21|              2|              21|     51564q|
|2016-10-07|              4|              98|     12874t|
|2016-10-05|              4|              98|     11234d|
+----------+---------------+----------------+-----------+

让我们假设 Spark 作业每天都在运行以获得所需的聚合结果。我希望结果以一周为基础,例如聚合后的上述数据集。

|      date|organization_id|media_package_id|      count|
+----------+---------------+----------------+-----------+
|2016-10-24|              1|              11|          2|
|2016-10-17|              2|              21|          4|
|2016-10-03|              4|              98|          2|
+----------+---------------+----------------+-----------+

在这里,如果您看到日期列,它正在一周的第一天(我认为这是最好的方式)

我以某种方式设法每天进行汇总。这是我的做法

val data = MongoSupport.load(spark, "sampleCollection")
val dataForDates = data.filter(dataForDates("date").isin(dates : _*))

val countByDate = proofEventsForDates.groupBy("DATE", "ORGANIZATION_ID", "MEDIA_PACKAGE_ID")
  .agg(count("EVENT_UUID").as("COUNT"))

val finalResult = impressionsByDate
  .select(
    col("DATE").as("date"),
    col("ORGANIZATION_ID").as("organization_id"),
    col("MEDIA_PACKAGE_ID").as("media_package_id"),
    col("COUNT").as("count")
  )

在这里,在开始过滤数据集时,我传递了一个特殊的dates 列表,其中包含至少大约一个月的日期。而我得到的结果是(这不是我想要的)

|      date|organization_id|media_package_id|      count|
+----------+---------------+----------------+-----------+
|2016-10-25|              1|              11|          2|
|2016-10-22|              2|              21|          3|
|2016-10-21|              2|              21|          1|
|2016-10-07|              2|              21|          1|
|2016-10-05|              2|              21|          1|
+----------+---------------+----------------+-----------+

从这里开始,我不知道如何每周汇总这个数据集。

【问题讨论】:

  • organization_id = 5的行怎么了?
  • @mtoto 问题已编辑。我有点错字
  • 根据您在同一行中的预期输出media_package_id 应该是21,不是吗?
  • @mtoto,是的,你是对的

标签: scala apache-spark aggregate-functions aggregation spark-dataframe


【解决方案1】:

假设您的 date 列已经属于 date 类,您可以使用函数 year()weekofyear() 来提取缺少的分组列以进行聚合。

import org.apache.spark.sql.functions.weekofyear
import org.apache.spark.sql.functions.year

(df
  .withColumn("week_nr", weekofyear($"date"))
  .withColumn("year", year($"date"))
  .groupBy("year",
           "week_nr",
           "organization_id",
           "media_package_id")
  .count().orderBy(desc("week_nr"))).show
+----+-------+---------------+----------------+-----+
|year|week_nr|organization_id|media_package_id|count|
+----+-------+---------------+----------------+-----+
|2016|     43|              1|              11|    2|
|2016|     42|              2|              21|    4|
|2016|     40|              4|              98|    2|
+----+-------+---------------+----------------+-----+

【讨论】:

  • 您可能应该 groupBy year($"date") 以及 by weekofyear - 否则如果数据跨度超过一年,您会将不同年份的第 N 周组合在一起。
猜你喜欢
  • 2014-10-09
  • 2014-12-17
  • 1970-01-01
  • 1970-01-01
  • 2010-10-22
  • 1970-01-01
  • 1970-01-01
  • 2019-10-21
  • 1970-01-01
相关资源
最近更新 更多