【问题标题】:How to batch multiple elements emitted from source using Akka stream Flow.batch如何使用 Akka 流 Flow.batch 批处理从源发出的多个元素
【发布时间】:2016-08-21 08:18:46
【问题描述】:

我正在使用以下代码来测试 akka 流 Flow.batch 的行为,但我无法以某种方式弄清楚为什么结果不是我所期望的:

  Source(1 to 20)
  .map(x => {
    println(s"received: ${x}")
    x
  })
  .batch(max=3, first => first.toString) {(batch, elem) => {
    batch + "," + elem
  }}
  .runWith(Sink.foreach(x=>{
    Thread.sleep(4000)
    println("Out:" + x)
  }))

这是输出: received: 1 received: 2 received: 3 received: 4 Out:1,2,3 received: 5 Out:4 received: 6 Out:5 received: 7 Out:6 received: 8 Out:7 received: 9 Out:8 received: 10 Out:9 received: 11 Out:10 received: 12 Out:11 .... so on .... received: 19 Out:18 received: 20 Out:19 Out:20

这里有几点我看不懂:

  • 首先,我的 Sink 慢得多。我希望该项目将在向下游发出之前一起批处理,例如:Out: 1,2,3;出局:4,5,6;输出:7、8;出:9,10,11 等等。相反,它只被批处理一次 (1,2,3),但随后元素被一个一个发出而不是被批处理。
  • 为什么我一开始就收到了 4 个项目(received: 1, ..., received: 4),而实际上我只设置了 max=3 (batch(max=3))。
  • 因为源比接收器快得多。我希望该元素应该发射得更快,例如:收到:7,收到:8,收到:9;然后输出:7,8,9;但实际上,它只是在 Sink 的 println 函数执行后,才零星地一一发出。

我尝试将 map 更改为 mapAsync,但行为仍然不是我想要的:

  .mapAsync(1)(x => {
    println(s"received: ${x}")
    Future.successful(x)
  })

谢谢。

【问题讨论】:

    标签: akka reactive-programming akka-stream


    【解决方案1】:

    您的代码中没有任何异步边界,它将在单个线程上运行。基本上,当您的 Thread.sleep() 执行时,在此设置中不会发生其他进展,即无法进行批处理(因为线程在 Thread.sleep 上被阻塞)。如果您有这样的设置,那么您可以只使用 grouped() 而不是批处理,或者可能是 groupedWithin()。如果您仍想尝试 batch(),请尝试节流阶段而不是添加睡眠。 Throttle 不会阻塞线程,因此上游进度(批处理)不受影响。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-11-19
      • 1970-01-01
      • 2016-07-06
      • 1970-01-01
      • 2018-12-11
      • 1970-01-01
      • 2017-02-28
      • 1970-01-01
      相关资源
      最近更新 更多