【发布时间】:2021-12-20 23:27:54
【问题描述】:
Flink 中的会话窗口在 prod env 上无法正常工作(同样的逻辑适用于本地 env)。这个想法是为特定 user Id 和 record id 发出 'sample_event_two' 的计数,以防相同 用户 ID 和 记录 ID。此处使用会话间隔为 30 分钟的 ProcessingTimeSessionWindows 并且 ProcessWindowFunction 具有以下逻辑(我正在通过 user Id 和 record设置窗口大小之前的id字段),
public void process(
String s,
Context context,
Iterable<SampleEvent> sampleEvents,
Collector<EnrichedSampleEvent> collector)
throws Exception {
EnrichedSampleEvent event = null;
boolean isSampleEventOnePresent = false;
int count = 0;
for (SampleEvent sampleEvent : sampleEvents) {
if (sampleEvent.getEventName().equals("sample_event_one_name")) {
Logger.info("Received sample_event_one for userId: {}");
isSampleEventOnePresent = true;
} else {
// Calculate the count for sample_event_two
count++;
if (Objects.isNull(event)) {
event = new EnrichedSampleEvent();
event.setUserId(sampleEvent.getUserId());
}
}
}
if (isSampleEventOnePresent && Objects.nonNull(event)) {
Logger.info(
"Created EnrichedSampleEvent for userId: {} with count: {}",
event.getUserId(),
event.getCount());
collector.collect(event);
} else if (Objects.nonNull(event)) {
Logger.info(
"No sampleOneEvent event found sampleTwoEvent with userId: {}, count: {}",
event.getUserId(),
count);
}
}
虽然集合中存在 sample_event_one(通过验证日志消息“Received sample_event_one”是否存在来确认)并且计数计算正确,但我没有看到任何输出事件被创建。我看到日志消息“No sampleOneEvent event found sampleTwoEvent with userID: “123, count: 5”,而不是发出 EnrichedSampleEvent。有人可以帮我解决这个问题吗?
【问题讨论】:
标签: flink-streaming