【问题标题】:How to get an iterator from an akka streams Source?如何从 akka 流源中获取迭代器?
【发布时间】:2017-09-06 03:28:38
【问题描述】:

我正在尝试创建一个可以通过Iterator 之类的方式使用的流。 我正在实现一个库,它公开了一个类似迭代器的接口,所以这对我来说是最简单的东西。

到目前为止,我设计的图表本质上是Source<Iterator<DataRow>>。到目前为止我看到的一件事是将其展平为Source<DataRow>,然后使用http://doc.akka.io/japi/akka/current/akka/stream/javadsl/StreamConverters.html#asJavaStream--,然后使用https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#iterator--

但是考虑到可能会有很多行,我想知道避免展平步骤是否有意义(至少在 akka 流上下文中,我假设通过时每个元素的开销会很小通过阶段),或者如果有更直接的方式。

另外,我很好奇反压在创建的流中是如何工作的,尤其是子迭代器;它只缓冲一个元素吗?

【问题讨论】:

  • 您能否详细说明为什么需要把它变成一个迭代器?
  • 我正在实现一个 api,它在数据行上公开一个只进游标(本质上是迭代器接口)。要么我需要使用一个迭代器,要么基本上在其他东西之上实现一个。

标签: java akka akka-stream akka-http


【解决方案1】:

展平步骤

Source<Iterator<DataRow>> 扁平化为Source<DataRow> 确实会增加一些开销,因为您将不得不使用flatMapConcat,而这最终会导致create a new GraphStage

但是,如果您有“许多”行,那么这个单独的阶段可能会派上用场,因为它将为展平步骤提供并发性。

背压

如果你查看StreamConverters.asJavaStream 中的at the code,你会看到有一个QueueSink 正在生成一个Future 以从akka 流中提取下一个元素,然后执行Await.result(nextElementFuture, Inf) 以等待Future完成,以便可以将下一个元素转发到 java Stream。

回答你的问题:是的,子迭代器只缓冲一个元素,但 QueueSink 有一个 Future,它可能还有下一个 DataRow。因此,javaStream 和 Iterator 可能有 2 个缓冲的元素,除了原来的 akka Source 中正在进行的缓冲。

【讨论】:

    【解决方案2】:

    或者,您可以在后台使用prefixAndTail(1) 实现一个迭代器,以实现hasNextnext

    【讨论】:

      猜你喜欢
      • 2020-03-22
      • 2014-05-20
      • 1970-01-01
      • 2020-01-15
      • 2012-01-21
      • 1970-01-01
      • 2022-11-20
      • 2011-07-03
      • 1970-01-01
      相关资源
      最近更新 更多