【问题标题】:How to aggregate by day?如何按天汇总?
【发布时间】:2017-07-04 02:10:43
【问题描述】:

我有以下 Pojo:

public class MyPojo {
   Date startDate;
   Double usageAMount;
   // ... bla bla bla
}

所以我有一个MyPojo 对象列表,作为参数传递给函数:

public Map<Date, Double> getWeeklyCost(@NotNull List<MyPojo> reports) {
        JavaRDD<MyPojo> rdd = context.parallelize(reports);
        JavaPairRDD<Date, Double> result = rdd.mapToPair(
                (PairFunction<MyPojo, Date, Double>) x ->
                        new Tuple2<>(x.getStartDate(), x.getUsageAmount()))
                .reduceByKey((Function2<Double, Double, Double>) (x, y) -> x + y);

        return result.collectAsMap();
}

但是,我返回如下内容:

"2017-06-28T22:00:00.000+0000": 0.02916666,
"2017-06-29T16:00:00.000+0000": 0.02916666,
"2017-06-27T13:00:00.000+0000": 0.03888888,
"2017-06-26T05:00:00.000+0000": 0.05833332000000001,
"2017-06-28T21:00:00.000+0000": 0.03888888,
"2017-06-27T02:00:00.000+0000": 0.03888888,
"2017-06-28T03:00:00.000+0000": 0.07777776000000002,
"2017-06-28T20:00:00.000+0000": 0.01944444,
"2017-06-30T04:00:00.000+0000": 0.00972222,
"2017-06-28T02:00:00.000+0000": 0.05833332000000001,
"2017-06-29T21:00:00.000+0000": 0.03888888,
"2017-06-29T23:00:00.000+0000": 0.06805554000000001,
"2017-06-27T00:00:00.000+0000": 0.05833332000000001,
"2017-06-26T06:00:00.000+0000": 0.03888888,
"2017-06-28T01:00:00.000+0000": 0.09722220000000002,
"2017-06-29T22:00:00.000+0000": 0.01944444,
"2017-06-28T00:00:00.000+0000": 0.11666664000000003,
"2017-06-27T12:00:00.000+0000": 0.01944444,
"2017-06-26T11:00:00.000+0000": 0.01944444,
"2017-06-29T03:00:00.000+0000": 0.01944444,
"2017-06-26T04:00:00.000+0000": 0.07777776000000002,
"2017-06-27T19:00:00.000+0000": 0.01944444,
"2017-06-29T20:00:00.000+0000": 0.048611100000000004,
"2017-06-29T02:00:00.000+0000": 0.02916666,
"2017-06-29T15:00:00.000+0000": 0.01944444,
"2017-06-27T17:00:00.000+0000": 0.01944444,
"2017-06-29T14:00:00.000+0000": 0.02916666,
"2017-06-30T01:00:00.000+0000": 0.02916666,
"2017-06-29T00:00:00.000+0000": 0.01944444,
"2017-06-27T18:00:00.000+0000": 0.03888888,
"2017-06-26T03:00:00.000+0000": 0.07777776000000002,
"2017-06-28T05:00:00.000+0000": 0.05833332000000001,
"2017-06-29T13:00:00.000+0000": 0.01944444,
"2017-06-30T03:00:00.000+0000": 0.00972222,
"2017-06-27T11:00:00.000+0000": 0.01944444,
"2017-06-28T04:00:00.000+0000": 0.05833332000000001,
"2017-06-29T12:00:00.000+0000": 0.00972222,
"2017-06-30T02:00:00.000+0000": 0.06805554000000001,
"2017-06-27T23:00:00.000+0000": 0.09722220000000002,
"2017-06-27T16:00:00.000+0000": 0.01944444,
"2017-06-26T15:00:00.000+0000": 0.01944444,
"2017-06-29T06:00:00.000+0000": 0.00972222,
"2017-06-30T07:00:00.000+0000": 0.00138889,
"2017-06-30T00:00:00.000+0000": 0.01944444,
"2017-06-27T21:00:00.000+0000": 0.01944444,
"2017-06-26T02:00:00.000+0000": 0.07777776000000002,
"2017-06-29T19:00:00.000+0000": 0.00972222,
"2017-06-27T03:00:00.000+0000": 0.03888888,
"2017-06-27T20:00:00.000+0000": 0.01944444,
"2017-06-30T05:00:00.000+0000": 74.1458333,
"2017-06-29T18:00:00.000+0000": 0.00972222,
"2017-06-29T17:00:00.000+0000": 0.01944444,
"2017-06-28T23:00:00.000+0000": 0.00972222,
"2017-06-27T01:00:00.000+0000": 0.01944444,
"2017-06-27T22:00:00.000+0000": 0.05833332000000001

我想返回它按天聚合,按日期降序排序。 例如:

"2017-06-28T03:00:00.000+0000": 0.07777776000000002,
"2017-06-28T20:00:00.000+0000": 0.01944444,

在同一天,因此应添加它们的值(usageAmount)。我只关心一天,而不是小时。如何减少或聚合我的 RDD 以获得所需的结果?

** 更新** 答案一定是Spark RDD 解决方案...

【问题讨论】:

  • 你可以使用 Spark SQL 的 DataFrames 吗?这样以后写和理解起来就容易多了。
  • @JacekLaskowski 数据来自MongoDB....
  • 没有接受的答案?

标签: java apache-spark apache-spark-sql


【解决方案1】:

相对容易(尽管会写很多代码)

让我们从 Pojo 的实现开始:

static class Record
{
    private Date date;
    private double amount;
    public Record(Date d, double a)
    {
        this.date = d;
        this.amount = a;
    }
    @Override
    public String toString() {
        return date.toString() + "\t" + amount;
    }
}

现在一个实用的方法来检查两个记录是否在同一天:

private static boolean sameDay(Record r0, Record r1)
{
    Date d0 = r0.date;
    Date d1 = r1.date;

    Calendar cal = new GregorianCalendar();
    cal.setTime(d0);

    int[] dateParts0 = {cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.MONTH), cal.get(Calendar.YEAR)};

    cal.setTime(d1);

    return cal.get(Calendar.DAY_OF_MONTH) == dateParts0[0] &&
            cal.get(Calendar.MONTH) == dateParts0[1] &&
            cal.get(Calendar.YEAR) == dateParts0[2];
}

既然我们有了这些,我们就可以开始研究算法的主要部分了。 这里的想法是按天对输入列表进行排序。然后循环遍历列表。 对于我们正在处理的每个条目,我们检查它是否与我们聚合数据集的最后一个已知日期相同。如果是,我们添加记录的数量,如果不是,我们添加一个新条目。

public static List<Record> aggregate(Collection<Record> rs)
{
    List<Record> tmp = new ArrayList<>(rs);
    java.util.Collections.sort(tmp, new Comparator<Record>() {
        @Override
        public int compare(Record o1, Record o2) {
            return o1.date.compareTo(o2.date);
        }
    });

    List<Record> out = new ArrayList<>();
    out.add(new Record(tmp.get(0).date, 0));
    for(int i=0;i<tmp.size();i++)
    {
        Record last = out.get(out.size() - 1);
        Record recordBeingProcessed = tmp.get(i);
        if(sameDay(last, recordBeingProcessed))
        {
            last.amount += recordBeingProcessed.amount;
        }
        else
        {
            out.add(recordBeingProcessed);
        }
    }

    return out;
}

最后,一个很好的 main 方法来测试一切:

public static void main(String[] args) throws ParseException {
    DateFormat format = new SimpleDateFormat("MMMM d, yyyy", Locale.ENGLISH);
    String[] dateStrings = {"January 2, 2010", "January 2, 2010", "January 3, 2010"};
    List<Record> rs = new ArrayList<>();
    for(int i=0;i<dateStrings.length;i++)
    {
        rs.add(new Record(format.parse(dateStrings[i]), 1));
    }
    for(Record r : aggregate(rs))
    {
        System.out.println(r);
    }
}

打印出来:

Sat Jan 02 00:00:00 CET 2010    2.0
Sun Jan 03 00:00:00 CET 2010    1.0

【讨论】:

    【解决方案2】:
    public class MyPojo {
    
           Date startDate;
           Double usageAMount;
           static DateFormat format = new SimpleDateFormat("yyyy-mm-dd:hh");
    
        MyPojo(Date startDate, Double usageAMount) {
    
            this.startDate = startDate;
            this.usageAMount = usageAMount;
        }
    
        Date getStrartDate() { return startDate;}
        Double getUsage() { return usageAMount;}
    
        public static void main(String[] args) throws ParseException {
    
            List<MyPojo> reports  = getReports();
    
            //sort by date
            reports = reports.stream().sorted(getComperator()).collect(Collectors.toList());
            output(reports);
    
            //you can collect to map but map keys are not sorted
            //and keys (dates) must be unique
            Map<Date, Double> result = reports.stream().sorted(getComperator()).collect(Collectors
                    .toMap( e-> e.startDate , e-> e.usageAMount));
        }
    
        private static List<MyPojo> getReports() throws ParseException {
    
            List<MyPojo> reports = new ArrayList<>();
    
            reports.add(new MyPojo(format.parse("2017-06-28:01"), 0.02916666));
            reports.add(new MyPojo(format.parse("2017-06-29:01"), 0.02916666));
            reports.add(new MyPojo(format.parse("2017-06-27:01"), 0.03888888));
            reports.add(new MyPojo(format.parse("2017-06-26:01"), 0.05833332000000001));
            reports.add(new MyPojo(format.parse("2017-06-28:02"), 0.03888888));
            reports.add(new MyPojo(format.parse("2017-06-27:02"), 0.03888888));
            reports.add(new MyPojo(format.parse("2017-06-28:03"), 0.07777776000000002));
            reports.add(new MyPojo(format.parse("2017-06-28:04"), 0.01944444));
            reports.add(new MyPojo(format.parse("2017-06-30:01"), 0.00972222));
    
            return reports;
        }
    
        private static Comparator<? super MyPojo> getComperator() {
    
            Comparator<? super MyPojo> comperator = new Comparator<MyPojo>() {
    
                @Override
                public int compare(MyPojo o1, MyPojo o2) {
    
                    if((o1 == o2) || ((o1 == null) && (o2 == null))) {
                        return 0;
                    }
                    if( o1 == null) {
                        return -1;
                    }
                    if( o2 == null) {
                        return  1;
                    }
    
                    return (o1).startDate.compareTo((o2).startDate);
                }
    
            };
            return comperator;
        }
    
        static void output(List<MyPojo> reports) {
    
            for(MyPojo p : reports) {
                System.out.println(format.format(p.startDate) +" - "+ p.usageAMount);
            }
        }
    }
    

    输出:


    2017-06-27:01 - 0.03888888
    2017-06-27:02 - 0.03888888
    2017-06-28:01 - 0.02916666
    2017-06-28:02 - 0.03888888
    2017-06-28:03 - 0.07777776000000002
    2017-06-28:04 - 0.01944444
    2017-06-29:01 - 0.02916666
    2017-06-30:01 - 0.00972222

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-11-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-01-14
      • 2021-09-07
      相关资源
      最近更新 更多