【问题标题】:Flink re-scalable keyed stream stateful functionFlink re-scalable keyed stream 有状态函数
【发布时间】:2020-05-05 16:12:06
【问题描述】:

我有以下 Flink 作业,我尝试使用后端类型 RockDB 的键控流状态函数 (MapState),

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")

MyRichMapFunction 是一个有状态的函数,它扩展了 RichMapFunction 具有以下代码,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}

将来,我想重新调整并行度(从 2 到 4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后,我可以将相应的缓存键控数据获取到它的对应的任务槽。我试图探索这个,在那里我找到了一个文档here。据此,可以通过使用 ListCheckPointed 接口来实现可重新扩展的操作员状态,该接口为此提供了 snapshotState/restoreState 方法。但不确定如何实现可重新扩展的键控状态 (MyRichMapFunction)?我是否需要为 MyRichMapFunction 类实现 ListCheckPointed 接口?如果是,我如何根据 restoreState 方法上的新并行键哈希重新分配缓存(我的 MapState 将在启用 TTL 的情况下保存大量键,假设它在任何时间点最多可以保存 10 亿个键)?有人可以帮我解决这个问题吗,或者如果你给我指出任何很好的例子。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    您编写的代码已经可以重新扩展; Flink 的托管键状态在设计上是可重新缩放的。通过重新平衡对实例的键分配来重新调整键状态。 (您可以将键状态视为分片键/值存储。从技术上讲,发生的情况是一致的哈希用于将键映射到 键组,并且每个并行实例负责一些键组. 重新缩放只涉及在实例之间重新分配密钥组。)

    ListCheckpointed 接口用于非键上下文中使用的状态,因此它不适合您正在做的事情。另请注意,ListCheckpointed 将在 Flink 1.11 中弃用,取而代之的是更通用的 CheckpointedFunction

    还有一件事:如果MyKeyExtractor 是由value.getEventId() 键入的,那么您可以将ValueState&lt;Boolean&gt; 用于缓存,而不是MapState&lt;String, Boolean&gt;。这是有效的,因为对于键控状态,每个键都有一个单独的 ValueState 值。只有当您需要为流中的每个键存储多个属性/值对时,才需要使用 MapState。

    大部分内容在 Hands-on Training 下的 Flink 文档中进行了讨论,其中包括与您正在做的非常接近的 an example

    【讨论】:

    • 哦,如果我使用托管状态,那么我不需要考虑重新缩放,默认情况下,当我更改并行度时,Flink 会提供这样的功能,对吗?我对this 示例感到很困惑,他们使用了 CheckpointedAsynchronously 功能,所以如果我理解正确的话,GitHub 示例是非托管状态,所以他们明确处理了 CheckpointedAsynchronously 方法,对吧? (顺便感谢 ValueState 的提示)
    • 是的,没错。仅供参考,您找到的那个例子已经很老了,从那时起正确的做事方式已经改变。事实上,CheckpointedAsynchronously 已经不存在了。
    • 像往常一样,我接受了答案,但由于我的声誉低而没有投票:)
    猜你喜欢
    • 1970-01-01
    • 2020-07-27
    • 2020-08-17
    • 2022-11-30
    • 2020-08-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多