【发布时间】:2020-12-29 20:49:49
【问题描述】:
我遇到了固定窗口问题。我使用固定窗口每分钟使用批处理数据写入文件。
问题是无论出于何种原因使用这个:
| "Window into Fixed Intervals"
>> beam.WindowInto(window.FixedWindows(60),trigger=beam.trigger.AfterProcessingTime(1 * 60),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
似乎没有做任何事情来实际窗口数据。由于某种原因,我需要使用 groupbykey 将数据实际分组到时间窗口中。
我是这样做的
"Add Dummy Key"
>> beam.Map(lambda elem: (None, elem))
| "Groupby"
>> beam.GroupByKey()
| "Abandon Dummy Key"
>> beam.MapTuple(lambda _, val: val)
| "Write to GCS"
>> beam.ParDo(WriteBatchesToGCS(output_path))
这使我的管道增加了 50 倍的处理时间。有没有更好的方法来做到这一点,还是我错过了什么?
数据流甚至是正确的工具吗?
【问题讨论】:
-
你的意思是如果你不把group by by的话你什么都不写GCS?
-
不,它只是将消息逐个写入 GCS,这会引发大量错误,因为它试图在一分钟内修改同一个文件。
-
如果您不计算、聚合、分组或组合您的消息,为什么要执行窗口?一个带有周期性触发的全局窗口就足够了,不是吗?
-
哈哈哈谢谢你提醒我我是个白痴。你说得对。我只是为批处理设置了一个定期触发器,这实际上似乎解决了这个问题。
-
愚蠢的问题可以解决大问题!乐于助人!
标签: google-cloud-storage google-cloud-dataflow apache-beam