【发布时间】:2019-05-30 15:48:05
【问题描述】:
我有一个流式传输管道,它使用流式传输源 (Kafka),并将写入 CloudSQL 数据库。目标是使用过去一小时内收到的记录的键/值总和聚合来实时更新 CloudSQL 数据库。
例如。在过去的一个小时内,收到了 3 条 KV 为 <001,3>,<001,4>,<001,2> 的记录,数据库应该有一条记录 001, 9。不包括超过一小时的记录。
我当前的解决方案是在KafkaIO.read 之后将SlidingWindow 变成GroupByKey:
.apply(Window.into(SlidingWindows
.of(Duration.standardSeconds(3600))
.every(Duration.standardSeconds(20)))
).apply(GroupByKey.create())
后跟一个 ParDo 对每个键求和,然后更新 SQL 数据库。
结果是每20秒我的CloudSQL db用上一小时的每个key的聚合更新一次,满足功能需求。问题在于这导致 CloudSQL 的 upsert 数量:大多数 KV 输出与前一个窗口相同,因此每个窗口每 20 秒触发一个小时的事务(约 500k)。
仅当使用该键的记录被消耗时触发每个 KV 输出是有意义的,或者避免输出自上一个窗口以来未更改的 KV。或者,在 CloudSQL 插入之前的某种过滤器,它接收所有内容并且只输出更改的 KV。这是可能的还是有其他解决方案?
【问题讨论】:
-
我是否理解正确,平均每小时有 0.5M 键的数据?如果
在过去一小时内有元素,则 KV 应该已经在输出数据。是否有可能所有 0.5M 键实际上每小时都有一个值?就仅在值不同时进行更改而言,您需要开始查看 State API,但最好先确认上述内容以确保没有意外发生。 -
是的,没错。不会输出早于窗口时间的数据,只有在最后一小时内收到数据的键。并非所有可能的键都可以在给定的小时窗口中看到。
标签: google-cloud-platform google-cloud-dataflow apache-beam