【问题标题】:Apache Flink: Stream Join Window is not triggeredApache Flink:未触发流加入窗口
【发布时间】:2018-10-04 17:39:21
【问题描述】:

我正在尝试在 apache flink 中加入两个流以获得一些结果。

我的项目的当前状态是,我正在获取 twitter 数据并将其映射到 2 元组,其中保存了用户的语言和定义的时间窗口中的推文总和。 我对每种语言的推文数量和每种语言的转推数都做了这些。推文/转推聚合在其他进程中运行良好。

我现在想获取时间窗口内转发次数占所有推文数量的百分比。

因此我使用以下代码:

Time windowSize = Time.seconds(15);

// Sum up tweets per language
DataStream<Tuple2<String, Integer>> tweetsLangSum = tweets
        .flatMap(new TweetLangFlatMap())
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

// Get retweets out of all tweets per language
DataStream<Tuple2<String, Integer>> retweetsLangMap = tweets
        .keyBy(new KeyByTweetPostId())
        .flatMap(new RetweetLangFlatMap());

// Sum up retweets per language
DataStream<Tuple2<String, Integer>> retweetsLangSum = retweetsLangMap
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

tweetsLangSum.join(retweetsLangSum)
            .where(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .window(TumblingEventTimeWindows.of(windowSize))
            .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
                @Override
                public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
                    String lang = in1.f0;
                    Double percentage = (double) in1.f1 / in2.f1;
                    return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
                }
            })
            .print();

当我打印 tweetsLangSumretweetsLangSum 时,输出似乎没问题。我的问题是我从来没有从连接中得到输出。有谁知道为什么?还是我在聚合的第一步中使用的窗口函数在加入时出错了?

【问题讨论】:

    标签: twitter apache-flink flink-streaming


    【解决方案1】:

    这可能是由不同时间语义的混合引起的。 KeyedStream.timeWindow() 方法是一个快捷方式,它根据配置的时间特性创建一个窗口运算符,即,如果启用了事件时间,则创建一个事件时间窗口,否则创建一个处理时间窗口。对于连接,您明确定义了一个事件时间窗口。

    您是否启用了事件时间处理?

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    

    【讨论】:

    • 抱歉,回复迟了,但env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 尚未启用,并且可以完成加入。我仍然必须在加入之前在流中调用KeyedStream.timeWindow() 以获得相同的数据“范围”来查看。非常感谢!
    • 调用KeyedStream.timeWindow() 绝对没问题,但是您应该知道它的行为取决于配置的时间特性。您也可以调用 KeyedStream.window(TumblingEventTimeWindows.of(windowSize)) 以获得更明确的代码。
    • 好的,我想我已经理解了细微的差别,非常感谢!
    猜你喜欢
    • 1970-01-01
    • 2020-07-16
    • 1970-01-01
    • 2021-04-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-13
    相关资源
    最近更新 更多