【问题标题】:Akka Reactive Streams always one message behindAkka Reactive Streams 总是落后一条消息
【发布时间】:2016-07-10 16:26:28
【问题描述】:

出于某种原因,我的 Akka 流总是在“发出”(?)第一条消息之前等待第二条消息。

这是一些演示我的问题的示例代码。

val rx = Source((1 to 100).toStream.map { t =>
  Thread.sleep(1000)
  println(s"doing $t")
  t
})
rx.runForeach(println)

产生输出:

doing 1
doing 2
1
doing 3
2
doing 4
3
doing 5
4
doing 6
5
...

我想要什么:

doing 1
1
doing 2
2
doing 3
3
doing 4
4
doing 5
5
doing 6
6
...

【问题讨论】:

  • Akka 流在每个计算阶段之前总是有一个元素的缓冲区。可能你看到了。

标签: scala akka akka-stream reactive-streams


【解决方案1】:

您的代码现在的设置方式是,在Source 被允许开始向下游发射元素之前,您已经完全转换了它。通过删除代表源的数字范围上的toStream,您可以清楚地看到这种行为(如@slouc 所述)。如果您这样做,您将看到 Source 在开始响应下游需求之前首先完全转换。如果您真的想将Source 运行到Sink 并在中间有一个转换步骤,那么您可以尝试像这样构建:

val transform =
  Flow[Int].map{ t =>
    Thread.sleep(1000)
    println(s"doing $t")
    t
  }

Source((1 to 100).toStream).
  via(transform ).
  to(Sink.foreach(println)).
  run

如果您进行该更改,那么您将获得所需的效果,即在开始处理下一个元素之前,流向下游的元素一直被处理通过流。

【讨论】:

  • 我认为我可以简化我的问题并解决它,但我认为我过于简单化了。您的回答确实解决了我发布的问题,但我可能需要提出一个包含更多细节的新问题。
【解决方案2】:

您正在使用.toStream(),这意味着整个集合都是惰性的。没有它,您的输出将首先是一百个“做”,然后是从 1 到 100 的数字。但是,Stream 只计算第一个元素,它给出“做 1”的输出,这是它停止的地方。将在需要时评估下一个元素。

现在,我在文档中找不到任何详细信息,但我认为runForeach 有一个实现,它在调用当前元素上的函数之前获取下一个元素。因此,在对元素 n 调用 println 之前,它首先检查元素 n+1(例如检查它是否存在),这会导致“正在执行 n+1”消息。然后它在当前元素上执行您的 println 函数,从而产生消息 "n" 。

runForeach 之前你真的需要map() 吗?我的意思是,你需要两次遍历数据吗?我知道我可能是在说显而易见的,但如果你只是像这样一次性处理你的数据:

val rx = Source((1 to 100).toStream)
rx.runForeach({ t =>
  Thread.sleep(1000)
  println(s"doing $t")
  // do something with 't', which is now equal to what "doing" says
})

那么你就不会有什么时候评估什么的问题了。

【讨论】:

  • 我在发布问题时实际上已经简化了问题。流来自 Java BufferedReader(我从 Socket 获得)。 Source(Stream.continually(myBufferedReader.readLine).map(t => (System.nanoTime, t))) 我最初这样做是为了尝试在我的消息实际到达我的机器时“时间戳”,并在它们最终被处理时再次尝试。不过谢谢你的回答。这对我来说更清楚了!
猜你喜欢
  • 2019-09-27
  • 1970-01-01
  • 1970-01-01
  • 2018-03-08
  • 2018-01-14
  • 1970-01-01
  • 1970-01-01
  • 2012-07-23
  • 1970-01-01
相关资源
最近更新 更多