【问题标题】:Last tuple of time window时间窗口的最后一个元组
【发布时间】:2017-05-16 14:48:00
【问题描述】:

我有以下情况

stream<Tuple2<String, Integer>
   .keyBy(0)
   .timeWindow(Time.of(10, TimeUnit.SECONDS))
   .sum(1)
   .flatMap(..)
   .sink()

我要做的是计算我的时间窗口的前 N ​​个。 每个窗口的前 N ​​个由 sink 存储。

我可以计算flatmap中的top N,但是不知道什么时候发送到sink进行存储。据我所知,无法从 flatmap 函数中知道窗口何时结束。

我知道有其他替代方法,例如 apply 函数可以同时执行这两种操作,或者在流中创建标记以指示结束,但我想知道是否有更优雅的解决方案。

【问题讨论】:

  • 澄清一下,您希望每 10 秒为所有键计算前 N 条记录,对吗?

标签: ranking apache-flink flink-streaming top-n windowing


【解决方案1】:

如果您想计算所有键的每个窗口的顶部N,那么您应该应用一个长度相同的时间窗口,在其应用方法中您计算顶部N。你可以这样做:

final int n = 10;
stream
    .keyBy(0)
    .timeWindow(Time.of(10L, TimeUnit.SECONDS))
    .sum(1)
    .timeWindowAll(Time.of(10L, TimeUnit.SECONDS))
    .apply(new AllWindowFunction<Tuple2<String,Integer>, Tuple2<String, Integer>, TimeWindow>() {
        @Override
        public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            PriorityQueue<Tuple2<String, Integer>> priorityQueue = new PriorityQueue<>(n, new Comparator<Tuple2<String, Integer>>() {
                @Override
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                    return o1.f1 - o2.f1;
                }
            });

            for (Tuple2<String, Integer> value : values) {
                priorityQueue.offer(value);

                while (priorityQueue.size() > n) {
                    priorityQueue.poll();
                }
            }

            for (Tuple2<String, Integer> stringIntegerTuple2 : priorityQueue) {
                out.collect(stringIntegerTuple2);
            }
        }
    })
    .print();

【讨论】:

  • 酷,这看起来不错。我不知道在普通窗口之后可以使用allwindow,很高兴看到它们仍然可以合并。我的前 N ​​是相似的,唯一的区别是我使用谷歌的 MinMaxPriorityQueue 进行自动驱逐。
猜你喜欢
  • 1970-01-01
  • 2023-04-08
  • 2017-12-10
  • 2021-11-17
  • 1970-01-01
  • 1970-01-01
  • 2018-06-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多