【发布时间】:2019-01-09 09:23:06
【问题描述】:
背景
我有一个项目,我们在 Java 中使用 akka-streams。
在这个项目中,我有一个字符串流和一个对它们执行一些操作的图形。
目标
在我的图表中,我想将该流广播给 2 个工作人员。将所有字符 'a' 替换为 'A' 并在实时接收数据时发送数据。
另一个将接收数据,每 3 个字符串,它将连接这 3 个字符串并将它们映射到数字。
如下所示:
显然Sink 2 接收信息的速度不如Sink 1。但这是预期的行为。这里有趣的部分是worker 2。
问题
做工人 1 很容易,并不难。这里的问题是工人 2。我知道 akka 有可以保存最多 X 条消息的缓冲区,但是看起来我不得不选择现有的 Overflow strategies 之一,这通常会导致选择我想要删除的消息或如果我想保持直播或不直播。
我想要的只是,当我在 worke2 中的缓冲区达到缓冲区的最大大小时,对其拥有的所有消息执行 concat 和 map 操作,然后将它们一起发送(之后重置缓冲区)。
但即使在阅读了 akka 的 stream-rate 文档后,我也找不到这样做的方法,至少使用 Java。
研究
我还检查了一个类似的 SO 问题,Selective request-throttling using akka-http stream 但是已经一年多了,没有人回复。
问题
使用图形 DSL,我将如何创建路径:
Source -> bcast -> worker2 -> Sink 2
??
【问题讨论】:
标签: java akka buffer akka-stream