【发布时间】:2021-03-11 14:03:48
【问题描述】:
private MapState<String, EventsHistory> eventsMap = null;
public void processElement2(Event event,
Context context,
Collector<JoinedEvent> collector) throws Exception {
String name = event.getExperimentName();
if (eventsMap.get(name) == null) {
eventsMap.put(name, new EventsHistory());
}
eventsMap.get(name).put(event.getEventTime(), event);
}
class EventsHistory {
private final Map<Long, Event> events = new HashMap<>();
public Map<Long, Event> getEvents() {
return events;
}
public void put(final Long eventTime, final Event event) {
events.put(eventTime, event);
}
}
我有上面的代码,想用Flink的MapState来维护一个map的map。
当我在本地测试时,我可以看到状态更新正常。但是当我在集群中运行它时,eventsMap 总是空的。
在MapState 中使用地图地图是否有效?有没有更好的方法来实现这一点?
作为替代方案,我尝试了以下版本,我自己进行分组。奇怪的是,这行得通。
private MapState<EventKey, Event> assignmentEventsMap = null;
public final class EventKey {
private String name;
private long eventTime;
}
public void processElement2(Event event,
Context context,
Collector<JoinedEvent> collector) throws Exception {
String name = event.getExperimentName();
eventsMap
.put(new EventKey(event.getName(), event.getEventTime()),
event);
}
【问题讨论】:
-
代码看起来是正确的,我认为它不会导致更新出现问题,所以很可能还有其他问题。
-
我已经用另一种可行的方法(虽然效率较低)更新了描述。
-
你是如何对连接的两个流进行键控的?
-
是的,它们是键控的。正如您在下面提到的,这可能是由于不同的并行设置导致的问题吗?
标签: join streaming apache-flink flink-streaming