【问题标题】:Share state between operators (KeyedProcessFunction and RichMapFunction) when RocksDBRocksDB 时在算子(KeyedProcessFunction 和 RichMapFunction)之间共享状态
【发布时间】:2020-10-30 15:07:29
【问题描述】:

基于上图,我现在需要在两个操作员之间共享状态,来自一个 KeyedProcessFunction,它将设法处理事件并将它们从 X 类转换为 Y 类,并保持传入的状态记录以始终将 Y 类的最新信息发送到 Python 推理函数。

推理函数的结果需要映射回类 Y 并更新已在 ProcessFunction 上创建的对象的状态,然后再执行 Sink

据我所知,RocksDB 无法进行广播状态。 “没有 RocksDB 状态后端:广播状态在运行时保存在内存中,并且应该相应地进行内存配置。这适用于所有操作员状态。”

问题:

  1. 当我使用 RocksDB 作为状态后端时,最好的方法是什么?
  2. 是否可以在KeyedProcessFunctionRichMapFunction 之间共享状态?

【问题讨论】:

    标签: java apache-flink flink-streaming flink-cep


    【解决方案1】:

    在使用 RocksDB 作为状态后端时,您可以使用广播状态。广播状态不会存储在 RocksDB 中——它会在堆上——但它会被检查点。所以广播状态需要足够小以适应内存。 (此外,每个任务都会独立检查广播状态的副本。)

    但是,我认为广播状态不会对这个用例有所帮助。它只将状态广播到单个操作符的所有实例。

    您不能在运算符之间共享状态。州是严格地方的。您可以将流程函数的输出流式传输到 RichMapFunction 中,以便它具有必要的信息。该映射不能直接影响存储在 process 函数中的状态,但它可以拥有该状态的自己的副本。

    但是,听起来您希望推理函数的输出能够修改过程函数中的状态。 DataStream API 不允许在数据流中出现这样的循环。但您有两种选择:

    (1) 将推理函数的结果流式传输到 kafka/kinesis 之类的东西,然后将该流作为另一个输入添加到 process 函数。 (换句话说,如果您使用外部消息队列来解耦事物,则可能出现循环。当然,这会增加延迟。)

    (2) 使用Stateful Functions API。它提供了有状态组件之间的任意通信模式(您不仅限于 DAG),还具有出色的 Python 支持等等。所有这些都在 Flink 运行时之上,因此您在一致性、恰好一次、可扩展性等方面获得相同的好处。

    【讨论】:

    • 我不确定我是否解释了自己,但想法是在ProccessFunction中创建的状态需要在Python之后从RichFlatMap(或任何运算符)更新推理函数发送结果。所以我需要以某种方式捕获状态并从RichFlatMap(或任何运算符)更新它,以始终在ProccessFunction 中拥有用户的最新状态。
    • 我现在看到了关于 Stateful Functions 的 Stephan Ewen 演示文稿,正如您所提到的,它看起来很像我需要的。有没有关于如何使用Stateful Functions在运营商之间共享信息的示例?
    • DataStream API 和 Stateful Function API 有没有集成的例子?
    • 我相信有使用这种机制的测试,但我不确定本身是否有任何示例。
    猜你喜欢
    • 1970-01-01
    • 2017-02-22
    • 2016-01-25
    • 2010-11-12
    • 1970-01-01
    • 2020-08-07
    • 1970-01-01
    • 2020-08-04
    • 2019-01-31
    相关资源
    最近更新 更多