【发布时间】:2020-05-05 16:12:06
【问题描述】:
我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数 (MapState),
environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")
MyRichMapFunction 是一个有状态的函数,它扩展了 RichMapFunction 具有以下代码,
public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
private transient MapState<String, Boolean> cache;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, Boolean> descriptor =
new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
cache = getRuntimeContext().getMapState(descriptor);
}
@Override
public MyEvent map(MyEvent value) throws Exception {
if (cache.contains(value.getEventId())) {
value.setIsSeenAlready(Boolean.TRUE);
return value;
}
value.setIsSeenAlready(Boolean.FALSE);
cache.put(value.getEventId(), Boolean.TRUE)
return value;
}
}
将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后,我可以将相应的缓存键控数据获取到它的对应的任务槽。我试图探索这个,在那里我找到了一个文档here。据此,可以通过使用 ListCheckPointed 接口来实现可重新扩展的操作员状态,该接口为此提供了 snapshotState/restoreState 方法。但不确定如何实现可重新扩展的键控状态 (MyRichMapFunction)?我是否需要为 MyRichMapFunction 类实现 ListCheckPointed 接口?如果是,我如何根据 restoreState 方法上的新并行键哈希重新分配缓存(我的 MapState 将在启用 TTL 的情况下保存大量键,假设它在任何时间点最多可以保存 10 亿个键)?有人可以帮我解决这个问题吗,或者如果你给我指出任何很好的例子。
【问题讨论】:
标签: apache-flink flink-streaming