【发布时间】:2016-08-21 08:18:46
【问题描述】:
我正在使用以下代码来测试 akka 流 Flow.batch 的行为,但我无法以某种方式弄清楚为什么结果不是我所期望的:
Source(1 to 20)
.map(x => {
println(s"received: ${x}")
x
})
.batch(max=3, first => first.toString) {(batch, elem) => {
batch + "," + elem
}}
.runWith(Sink.foreach(x=>{
Thread.sleep(4000)
println("Out:" + x)
}))
这是输出:
received: 1
received: 2
received: 3
received: 4
Out:1,2,3
received: 5
Out:4
received: 6
Out:5
received: 7
Out:6
received: 8
Out:7
received: 9
Out:8
received: 10
Out:9
received: 11
Out:10
received: 12
Out:11
.... so on ....
received: 19
Out:18
received: 20
Out:19
Out:20
这里有几点我看不懂:
- 首先,我的 Sink 慢得多。我希望该项目将在向下游发出之前一起批处理,例如:Out: 1,2,3;出局:4,5,6;输出:7、8;出:9,10,11 等等。相反,它只被批处理一次 (1,2,3),但随后元素被一个一个发出而不是被批处理。
- 为什么我一开始就收到了 4 个项目(received: 1, ..., received: 4),而实际上我只设置了 max=3 (batch(max=3))。
- 因为源比接收器快得多。我希望该元素应该发射得更快,例如:收到:7,收到:8,收到:9;然后输出:7,8,9;但实际上,它只是在 Sink 的 println 函数执行后,才零星地一一发出。
我尝试将 map 更改为 mapAsync,但行为仍然不是我想要的:
.mapAsync(1)(x => {
println(s"received: ${x}")
Future.successful(x)
})
谢谢。
【问题讨论】:
标签: akka reactive-programming akka-stream