【问题标题】:How to handle FLINK window on stream data's timestamp base?如何处理流数据时间戳基础上的 FLINK 窗口?
【发布时间】:2021-05-04 19:47:40
【问题描述】:

我有一些问题。

根据类中的时间戳,我想做一个逻辑,排除1分钟内输入N次以上的数据。

UserData 类有一个时间戳变量。

class UserData{ 
      public Timestamp timestamp; 
      public String userId; 
    }

一开始我尝试使用翻滚窗口。

SingleOutputStreamOperator<UserData> validStream =
                stream.keyBy((KeySelector<UserData, String>) value -> value.userId)
                        .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
                        .process(new ValidProcessWindow());

public class ValidProcessWindow extends ProcessWindowFunction<UserData, UserData, String, TimeWindow> {

    private int validCount = 10;

    @Override
    public void process(String key, Context context, Iterable<UserData> elements, Collector<UserData> out) throws Exception {
        int count = -1;
        for (UserData element : elements) {
            count++; // start is 0

            if (count >= validCount) // valid click count
            {
                continue;
            }
            
            out.collect(element);
        }
    }
}

但是翻滚窗口的时间计算是基于固定时间的,所以不适合UserData类的时间戳。

如何在流 UserData 类的时间戳基础上处理窗口?

谢谢。

附加信息

我使用这样的代码。

stream.assignTimestampsAndWatermarks(WatermarkStrategy.<UserData>forBoundedOutOfOrderness(Duration.ofSeconds(1))                
.withTimestampAssigner((event, timestamp) -> Timestamps.toMillis(event.timestamp))
.keyBy((KeySelector<UserData, String>) value -> value.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new ValidProcessWindow());

我尝试了一些测试。 150 个样本数据。每条数据的时间戳增加 1 秒。 结果是 |1,2,3....59| |60,61....119| . 我等待最后 30 个数据。但未处理。 我期待 |1,2,3....59| |60,61....119| |120...149|。

如何获取最后的其他数据?

自我回答

我找到了原因。 因为我只使用了 150 个样本数据。

如果在没有元素需要处理的情况下,在 Flink 中使用事件时间无法进行。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html#idling-sources

所以,我测试了 150 个样本数据和虚拟数据。 (每条数据的虚拟数据时间戳增加1秒)。

我收到了正确的数据 |1,2,3....59| |60,61....119| |120...149|。

感谢您的帮助。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    据我了解您的问题,您应该只使用不同的时间特征。处理时间是使用系统时间来计算窗口,您应该为您的应用程序使用事件时间。您可以找到更多关于正确使用事件时间的信息here

    编辑: 这就是 flink 的工作原理,没有数据将 watermark 推到 150 以上,所以窗口没有关闭,因此没有输出。您可以使用自定义触发器,即使未生成水印也会关闭窗口或注入一些数据来移动水印。

    【讨论】:

    • 感谢您的回答。我读了你给我的链接。如果在我的情况下,我可以使用 WatermarkStrategy forBoundedOutOfOrderness 吗?像这样。 stream.assignTimestampsAndWatermarks(WatermarkStrategy .&lt;UserData&gt;forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner((event, timestamp) -&gt; event.timestamp.getSeconds());
    • 是的,应该可以。请注意,水印和时间戳通常以毫秒为单位,因此您可能需要转换它。
    • 我参加了 som 测试。我编辑了我的测试问题。如何获取最后的数据?
    • 请尝试为一般的新问题创建新问题。否则会变得一团糟。
    • 好的。我明白。谢谢你的帮助。
    猜你喜欢
    • 2017-10-18
    • 2018-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多