【问题标题】:Joins between multiple streams on the same keys同一键上的多个流之间的连接
【发布时间】:2020-01-16 18:39:21
【问题描述】:

Flink 社区!

我有一个关于在 Flink 中的相同键上加入多个流(等连接)的问题。我仍然是一个新手,正在为我的团队评估 Flink,将我们的 Spark 批处理应用程序迁移到流处理。

注意:我查看了 Fabian Hüske 关于连接处理的这篇文章:Peeking into Apache Flink's Engine Room

为了简化问题,假设您有 3 个流,每个流都有唯一的记录,可以通过 id 字段键入。对于流中的每条记录,您都会在其他流中找到相应的记录。您想在 id 字段中加入这些流。

问题:

  • 当您加入流#1 和流#2 时,我知道这两个流的内容将根据加入密钥进行混洗。当我将结果流与流#3 连接时,我假设流#3 将被打乱,但之前的结果记录会再次打乱(即从#1 和#2 之间的连接开始)?

(在Spark中,我认为之前的join结果不会被打乱,前提是key不改变,并且使用相同的hash partitioner)。

  • 与流#3 连接时,流#1 和#2 的连接结果会被序列化和反序列化吗?

  • 在此示例中,我们有 2 个连接运算符(在流 #1 和 #2 之间,以及在结果连接结果和流 #3 之间)。据我所知,我的每个运营商都有一个状态。假设我使用的是 GlobalWindows 并假设我的联接操作只保留每个字段,两个运算符状态之间是否存在重复数据? (我天真地认为不应该......)

我所说的有点令人困惑,但我认为第一个运算符会记住流#1 和流#2 中的数据,然后,第二个运算符会记住流#1 和流# 中的数据2和流#3。我观察到,我的第一个操作员的状态大小很大(我的实验有 1 年的数据),但第二个操作员的状态大小要大得多......最终的检查点状态大小似乎是 # 1/#2 join,加上#1/#2/#3 join的状态大小(不应该只是#3的大小,如果#1/#2 join数据相同?)

谢谢, 尼古拉斯

【问题讨论】:

    标签: apache-flink


    【解决方案1】:

    目前,在 Flink 中,流上的每个连接都需要一个完整的 shuffle,包括您提到的序列化和反序列化。主要原因是 Flink 无法将具有 2 个输入的运算符链接到前一个运算符。目前有 work in progress 允许 N 个输入运算符,这将完全避免在您的用例中进行额外的洗牌。

    您的每个连接运算符都单独维护其状态。这意味着您的第二个联接包含所有联接记录和流 #3 中的所有记录。如果您的第一个连接的基数为 1,则第二个连接的状态大小比第一个大。看似冗余的复制的原因是,当使用时间窗口时(通常是加入流的唯一合理方式),两个操作员可能处于不同的时间,因此当第二个操作员处理条目时,第一个操作员已经从他的状态中删除了条目.

    【讨论】:

    • 感谢您的解释! :D
    • 顺便说一句,如果您正在处理全局窗口和有界数据集,您可能需要查看 flink 的批处理端。执行经过高度优化,可能会优化您的查询。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-18
    • 2015-07-24
    • 2012-04-30
    相关资源
    最近更新 更多