【问题标题】:Is it possible to have a map of map in MapState?是否可以在 MapState 中拥有地图地图?
【发布时间】:2021-03-11 14:03:48
【问题描述】:
private MapState<String, EventsHistory> eventsMap = null;
 
public void processElement2(Event event,
                            Context context,
                            Collector<JoinedEvent> collector) throws Exception {
    String name = event.getExperimentName();
    if (eventsMap.get(name) == null) {
         eventsMap.put(name, new EventsHistory());
    }
    eventsMap.get(name).put(event.getEventTime(), event);
}

class EventsHistory {

    private final Map<Long, Event> events = new HashMap<>();

    public Map<Long, Event> getEvents() {
        return events;
    }

    public void put(final Long eventTime, final Event event) {
        events.put(eventTime, event);
    }
}

我有上面的代码,想用Flink的MapState来维护一个map的map。 当我在本地测试时,我可以看到状态更新正常。但是当我在集群中运行它时,eventsMap 总是空的。

MapState 中使用地图地图是否有效?有没有更好的方法来实现这一点?

作为替代方案,我尝试了以下版本,我自己进行分组。奇怪的是,这行得通。

private MapState<EventKey, Event> assignmentEventsMap = null;

public final class EventKey {

    private String name;
    private long eventTime;
}

    public void processElement2(Event event,
                                Context context,
                                Collector<JoinedEvent> collector) throws Exception {
        String name = event.getExperimentName();
        eventsMap
                .put(new EventKey(event.getName(), event.getEventTime()),
                        event);
    }

【问题讨论】:

  • 代码看起来是正确的,我认为它不会导致更新出现问题,所以很可能还有其他问题。
  • 我已经用另一种可行的方法(虽然效率较低)更新了描述。
  • 你是如何对连接的两个流进行键控的?
  • 是的,它们是键控的。正如您在下面提到的,这可能是由于不同的并行设置导致的问题吗?

标签: join streaming apache-flink flink-streaming


【解决方案1】:

您分享的代码很难理解,但您可能误解了 MapState 是什么。 ValueState 提供了一个分片的键/值存储,分布在整个集群中。 MapState 为您提供了一个分片的键/值存储,其中值本身就是嵌套的 Map。

换句话说,MapState 始终是地图的地图。你最终试图创建一个地图的地图 - 这是一个太远的级别。

我假设您正在尝试构建这个结构,在其中您有效地拥有从实验名称到时间戳到事件的嵌套映射的映射:

name -&gt; (time -&gt; event)

假设您的事件流已经通过实验名称键入,那么您真正想要的是MapState&lt;String, EventsHistory&gt; eventsMap,而不是使用MapState&lt;Long, Event&gt; eventsMap,而不是

eventsMap.get(name).put(event.getEventTime(), event);

你应该这样做

eventsMap.put(event.getEventTime(), event);

有关如何使用这些机制的更多背景信息,请参阅 Flink 文档中的 the tutorial about ValueStatean example using MapState

【讨论】:

  • 关于“从实验名称映射到时间戳到事件的嵌套映射”,您是对的。但是我的事件以不同的属性为键。所以我需要为多个名称维护这个结构。此外,我无法将此属性移动到键中,因为我正在处理两个流的连接,而另一个流没有此属性。
  • 您是否考虑过进行广播加入?在某些情况下效果会更好。
  • 我没有。让我看一看。至于 MapState>,我只是想了解这是否行不通,或者我可能会遗漏一些东西。
  • 同样感到困惑,因为相同的代码适用于本地 Flink 应用程序,但不适用于集群
  • 您是否更改了本地运行和集群中的并行度?
猜你喜欢
  • 1970-01-01
  • 2012-04-23
  • 2021-02-27
  • 2021-11-05
  • 1970-01-01
  • 2021-10-11
  • 1970-01-01
  • 1970-01-01
  • 2018-03-20
相关资源
最近更新 更多