【发布时间】:2020-05-01 11:56:53
【问题描述】:
我正在尝试使用有状态 DoFn(使用 @ProcessElement 和 @StateId ValueState 元素)在 Apache Beam(通过 Scio)中聚合(每个键)流数据源。我认为这最适合我要解决的问题。要求是:
- 对于给定的键,所有时间的记录都被聚合(基本上求和) - 我不关心以前计算的聚合,只关心最近的
- 根据我控制的某些条件,密钥可能会从状态 (
state.clear()) 中逐出 - 每 5 分钟,无论是否看到任何新密钥,都应输出所有 尚未从状态中驱逐的密钥
鉴于这是一个流式管道并且将无限期地运行,在具有累积触发窗格的全局窗口上使用 combinePerKey 似乎会继续增加其内存占用以及随着时间的推移需要运行的数据量,所以我想避免它。此外,在对此进行测试时,(可能与预期的一样)它只是将新计算的聚合与历史输入一起附加到输出中,而不是使用每个键的最新值。
我的想法是,使用 StatefulDoFn 只会让我输出所有全局状态直到 now(),但这似乎不是一个简单的解决方案。我已经看到使用计时器为此人为执行回调的提示,以及可能使用缓慢增长的侧面输入映射 (How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>) 并以某种方式刷新它,但这基本上需要迭代映射中的所有值而不是加入它。
我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中的许多窗口和计时器概念相对较新,正在寻找有关如何解决此问题的任何建议。谢谢!
【问题讨论】:
-
明天我会尽力帮助解答这个问题!
-
我怀疑窗口/触发选项可能会影响您的解决方案。如果您设置计时器来控制输出速率,则不需要上游触发器。您可以尝试删除上游触发器吗?
-
谢谢@Pablo,这似乎有帮助。我现在可以看到密钥每 5 分钟输出一次,并且当新记录中不存在时,它们会通过计时器触发!知道为什么我会在给定窗口的输出中看到两次相同的键吗?我认为 TimestampCombiner.LATEST 会解决这个问题?
-
所以窗口/触发只会在“洗牌”发生时影响您的管道。在 GBK 上或在有状态 ParDo 之前完成洗牌。您可以尝试添加 GroupByKey 吗? - 我相信在你的情况下,时间戳组合器将确定来自你的 GBK 的 KV
> 的时间戳(并且它不会丢弃旧元素) - 但在 GBK 之后你可以丢弃旧元素自己元素 -
为什么将计时器设置为 150 秒,但在窗口中使用 5 分钟?确保有输出?
标签: google-cloud-dataflow apache-beam spotify-scio