【发布时间】:2019-04-15 21:36:01
【问题描述】:
.
嗨,
使用 Apache Flink 1.8。我有来自 Kafka 的 JSON 记录流并对其进行过滤,一切正常。
现在,我想通过从数据库表中查找值来丰富来自 Kafka 的数据。
这只是创建 2 个流,在第二个流中加载表,然后加入数据的情况吗?
数据库表确实会更新但不频繁,我想避免在通过流的每条记录上查找数据库。
【问题讨论】:
标签: apache-flink
.
嗨,
使用 Apache Flink 1.8。我有来自 Kafka 的 JSON 记录流并对其进行过滤,一切正常。
现在,我想通过从数据库表中查找值来丰富来自 Kafka 的数据。
这只是创建 2 个流,在第二个流中加载表,然后加入数据的情况吗?
数据库表确实会更新但不频繁,我想避免在通过流的每条记录上查找数据库。
【问题讨论】:
标签: apache-flink
Flink 有状态,你可以在这里利用它。我做过类似的事情,我从我的查找表中获取每日查询(在我的情况下,它是一个批量 Web 服务调用),并将结果输入到 kafka 主题中。这个 kafka 主题被同一个服务 flink 作业消耗,因为它需要数据进行查找。两个主题都以相同的值作为键,但我使用查找主题将数据存储到键状态,并且在处理另一个主题时,我会将数据拉回状态。
我有一些额外的逻辑来检查给定键是否还没有状态。如果是这种情况,我会向网络服务发出异步请求。但是,您可能不需要这样做。
这里需要注意的是,我有用于状态管理的内存,而我的查找表只有大约 3000 万条记录,大约 100 个演出分布在 15 个节点上的 45 个插槽中。
[回答 cmets 中的问题] 抱歉,我的回答太长了,所以不得不编辑我的帖子:
我有一个 python 作业,它通过批量 REST 调用加载数据(你可以只进行数据查找)。然后它将数据转换为正确的格式并将其转储到 Kafka。然后我的 flink flow 有两个来源,一个是“真实数据”主题,另一个是“查找数据”主题。来自查找数据主题的数据存储在状态中(我使用了 ValueState,因为每个键都映射到一个可能的值,但是有 other state types。我也有每个条目的 24 小时到期时间,但那是我的用例.
诀窍在于,从查找主题将值存储在状态中的相同操作必须是将值从“真实”主题中拉回状态的操作。这是因为 flink 状态(甚至是键控状态)与创建它们的操作员相关联。
【讨论】: