【发布时间】:2021-05-04 19:47:40
【问题描述】:
我有一些问题。
根据类中的时间戳,我想做一个逻辑,排除1分钟内输入N次以上的数据。
UserData 类有一个时间戳变量。
class UserData{
public Timestamp timestamp;
public String userId;
}
一开始我尝试使用翻滚窗口。
SingleOutputStreamOperator<UserData> validStream =
stream.keyBy((KeySelector<UserData, String>) value -> value.userId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new ValidProcessWindow());
public class ValidProcessWindow extends ProcessWindowFunction<UserData, UserData, String, TimeWindow> {
private int validCount = 10;
@Override
public void process(String key, Context context, Iterable<UserData> elements, Collector<UserData> out) throws Exception {
int count = -1;
for (UserData element : elements) {
count++; // start is 0
if (count >= validCount) // valid click count
{
continue;
}
out.collect(element);
}
}
}
但是翻滚窗口的时间计算是基于固定时间的,所以不适合UserData类的时间戳。
如何在流 UserData 类的时间戳基础上处理窗口?
谢谢。
附加信息
我使用这样的代码。
stream.assignTimestampsAndWatermarks(WatermarkStrategy.<UserData>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> Timestamps.toMillis(event.timestamp))
.keyBy((KeySelector<UserData, String>) value -> value.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new ValidProcessWindow());
我尝试了一些测试。 150 个样本数据。每条数据的时间戳增加 1 秒。 结果是 |1,2,3....59| |60,61....119| . 我等待最后 30 个数据。但未处理。 我期待 |1,2,3....59| |60,61....119| |120...149|。
如何获取最后的其他数据?
自我回答
我找到了原因。 因为我只使用了 150 个样本数据。
如果在没有元素需要处理的情况下,在 Flink 中使用事件时间无法进行。
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html#idling-sources
所以,我测试了 150 个样本数据和虚拟数据。 (每条数据的虚拟数据时间戳增加1秒)。
我收到了正确的数据 |1,2,3....59| |60,61....119| |120...149|。
感谢您的帮助。
【问题讨论】:
标签: apache-flink flink-streaming