【问题标题】:Spark Scala - Cluster / Group a list based on timeSpark Scala - 基于时间的集群/分组列表
【发布时间】:2016-09-10 16:59:05
【问题描述】:

我有这种格式的几年的大型时间序列数据:

eventtime,value
---------------
2013-04-17 11:18:39.0,11.4
2013-04-17 11:19:40.0,82.0
2013-04-17 11:20:41.0,53.8
2013-04-17 17:22:00.0,31.0
2013-04-17 17:23:00.0,22.6
2013-04-17 17:24:00.0,43.1
2013-04-17 21:48:00.0,11.0
2013-04-17 21:49:00.0,22.1
2013-04-17 21:50:00.0,3.2
2013-04-17 21:51:00.0,13.1

根据以上数据,我想按时间集群进行分组,然后对每个组执行聚合函数(最大值、平均值)。 请注意,上述样本数据中有三个这样的集群。 期望的输出:

Group, Sum
-------------
[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0],147.2
[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0],96.7
[2013-04-17 21:48:00.0,11.0,2013-04-17 21:49:00.0,22.1,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0],49.4

这些集群可以在一天中的任何时间发生,集群中的事件数量也不是固定的。区分这些集群的一种方法是通过集群的时间差,比如相隔一小时的集群。

请建议我如何在 Spark Scala 中实现这一点。

谢谢

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    您可以使用以下代码实现上述功能:

    import java.text.DecimalFormat
    var doubleFormat = new DecimalFormat("#.00")
    val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd HH")
    
    var groupedRdd = rdd.map(value => value.split(",")).map(arr => (dateFormat.format(dateFormat.parse(arr(0))), (Array(arr(0)), arr(1).toDouble))).cache
    
    // To calculate the sum
    var sumRdd = groupedRdd.reduceByKey((obj1 ,obj2 ) => ((obj1._1 ++ obj2._1), (obj1._2 + obj2._2)))
    sumRdd.foreach(value => println(value._1 + ":[" + value._2._1.mkString(",") + "]:" + doubleFormat.format(value._2._2)))
    
    // Output of Sum func 
    2013-04-17 21:[2013-04-17 21:48:00.0,2013-04-17 21:49:00.0,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0]:49.40
    2013-04-17 17:[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0]:96.70
    2013-04-17 11:[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0]:147.20
    
    
    // To calculate Max value
    var maxRdd = groupedRdd.reduceByKey((obj1 ,obj2 ) => ((obj1._1 ++ obj2._1), Math.max(obj1._2 , obj2._2)))
    maxRdd.foreach(value => println(value._1 + ":[" + value._2._1.mkString(",") + "]:" + doubleFormat.format(value._2._2)))
    
    // Output of Max func
    2013-04-17 21:[2013-04-17 21:48:00.0,2013-04-17 21:49:00.0,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0]:22.10
    2013-04-17 17:[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0]:43.10
    2013-04-17 11:[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0]:82.00
    
    
    // To calculate the min value
    var minRdd = groupedRdd.reduceByKey((obj1 ,obj2 ) => ((obj1._1 ++ obj2._1), Math.min(obj1._2 , obj2._2)))
    minRdd.foreach(value => println(value._1 + ":[" + value._2._1.mkString(",") + "]:" + doubleFormat.format(value._2._2)))
    
    
    // Output of the min value
    2013-04-17 21:[2013-04-17 21:48:00.0,2013-04-17 21:49:00.0,2013-04-17 21:50:00.0,2013-04-17 21:51:00.0]:3.20
    2013-04-17 17:[2013-04-17 17:22:00.0,2013-04-17 17:23:00.0,2013-04-17 17:24:00.0]:22.60
    2013-04-17 11:[2013-04-17 11:18:39.0,2013-04-17 11:19:40.0,2013-04-17 11:20:41.0]:11.40
    

    输出格式为:cluster:[eventimes]:result

    希望上述解决方案能帮助您计算聚合结果。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-05
      • 2014-10-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多