【问题标题】:Is it possible to transfer state between windows in Apache Beam是否可以在 Apache Beam 中的窗口之间传输状态
【发布时间】:2021-10-29 11:51:04
【问题描述】:

根据 Apache 梁文档: https://beam.apache.org/documentation/programming-guide/#state-and-timers

All state for a key is scoped to the current window. This means that the first time a key is seen for a given window any state reads will return empty, and that a runner can garbage collect state when a window is completed.

我有一个用例,当前窗口的输出需要被下一个窗口访问。

窗口是固定的 5 秒窗口,并执行计算以输出此窗口内覆盖的总距离。我需要将此距离添加到下一个窗口总数中。目前我通过将总数写入数据库并在下一个窗口中读取来实现这一点,但这会大大减慢处理速度。

所以我的问题是,状态是否可以在窗口之间传输。 或者我是否必须在总体全局窗口内设置一个 5 秒窗口的全局窗口? 这可能吗?

【问题讨论】:

  • 在我写出完整答案之前,您在下一个窗口中说需要距离是什么意思?就像每个窗口的距离只包括它自己的距离和前一个窗口的距离? (即Sliding Windows)或者每个窗口的距离是该点的全部总和?第二个听起来你可以改回全局窗口,然后使用组合来添加所有距离(可能使用触发器来触发早期结果)。
  • 数据是车辆里程表读数的流,因此我们使用一个短窗口找到最小值和最大值来获取差异,然后对每辆车的这些差异求和。这个总和“窗口距离”被添加到从前一个窗口出来的总数中,以获得当前发送的总距离。
  • 似乎@DanielOliveira 提出的组合对你来说是更好的选择。另一个想法,如果您正在接收里程表数据,为什么不从第一个窗口的较小值中减去最后收到的较大值?它总是会给你总距离,你不必总是写最后收到的距离(如果一些窗口丢失,它也可以防止你在总距离中出错)。
  • 我不确定我是否理解,我遇到的问题是我无法在窗口之间共享信息。

标签: apache-beam


【解决方案1】:

据我所知,没有办法在窗口之间传输状态,如果有某种形式的解决方法,我不会推荐它,因为它违反了 Beam 模型并且可能会出现意外行为。

但是,在坚持 Beam 模型的同时获得您想要的结果应该不会太难。具体细节可能取决于您打算如何使用总距离,但第一步保持不变:将窗口切换回全局窗口,以便我们可以聚合所有元素。

Java 示例:

PCollection<Integer> distances = ... // The windowed distances.
PCollection<Integer> globalDistances =
        distances.apply(Window.<Integer>into(new GlobalWindows()));

现在距离在一个全局窗口中,下面的变换取决于你想要的结果。

选项 1:与触发器组合。 一种简单的方法是使用像 Sum 这样的组合来累加所有距离,同时向窗口添加一个重复触发器以尽早重复触发,从而得到结果定期更新。这主要是 Streaming 管道的解决方案,因为早期触发触发器在 Batch 管道中无效。详情请参阅编程指南的Triggers section

Java 示例:

// Window fires a pane on every 10 elements received.
PCollection<Integer> globalDistances =
        distances.apply(Window.<Integer>into(new GlobalWindows())
                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(10))));
PCollection<Int> summed = globalDistances.apply(Sum.integersGlobally())

选项 2:通过州求和。 另一个选项更接近您的初衷。使用状态编写自定义 DoFn,例如 CombiningState,对接收到的每个元素求和并输出当前总和。这会将求和的每个步骤作为一个单独的元素输出,而不必处理早期触发窗格或类似的事情。如果您明确需要为求和的每个步骤使用单独的元素,或者如果您有批处理管道并且不能使用触发器,则这是最佳选择。

【讨论】:

  • 非常感谢。我正在处理的示例是流式传输,只是一些额外的说明。我假设如果我使用全局窗口,我只需要跟踪每辆车的最低传入里程表读数即可执行距离计算。
猜你喜欢
  • 2021-03-08
  • 1970-01-01
  • 1970-01-01
  • 2018-05-31
  • 1970-01-01
  • 1970-01-01
  • 2021-12-22
  • 2021-04-13
  • 2021-08-14
相关资源
最近更新 更多