【发布时间】:2015-07-20 05:53:55
【问题描述】:
我必须使用 hasNext() 和 next() 方法实现一个 Iterator 接口(由 Java API 定义),它应该返回源自异步处理的 HTTP 响应(由 Akka 演员处理)的结果元素。
必须满足以下要求:
- 不要阻塞并等待异步操作完成,因为大型结果集的生成可能需要一段时间(迭代器应在结果元素可用时立即返回)
- Iterator.next() 应该阻塞,直到下一个元素可用(或者如果没有更多元素出现,则抛出异常)
- 只要还有更多元素,Iterator.hasNext() 应该返回 true(即使下一个尚不可用)
- 结果的总数事先是未知的。生成结果的 Actor 将在完成后发送特定的“结束消息”。
- 尽量避免使用 InterruptedException,例如当迭代器在空队列上等待但不会生成更多元素时。
我还没有研究过 Java 8 流或 Akka 流。但由于我基本上必须遍历队列(有限流),我怀疑是否有任何合适的解决方案。
目前,我的 Scala 实现存根使用 java.util.concurrent.BlockingQueue,如下所示:
class ResultStreamIterator extends Iterator[Result] {
val resultQueue = new ArrayBlockingQueue[Option[Result]](100)
def hasNext(): Boolean = ??? // return true if not done yet
def next(): Result = ??? // take() next element if not done yet
case class Result(value: Any) // sent by result producing actor
case object Done // sent by result producing actor when finished
class ResultCollector extends Actor {
def receive = {
case Result(value) => resultQueue.put(Some(value))
case Done => resultQueue.put(None)
}
}
}
我使用 Option[Result] 来指示结果流的结尾,并使用 None。我已经尝试查看下一个元素并使用“完成”标志,但我希望有一个更简单的解决方案。
额外问题:
- 如何使用单元测试涵盖同步/异步实现,尤其是测试延迟的结果生成?
- 如何使迭代器成为线程安全的?
【问题讨论】:
-
如果您使用 Java 8,请实现
Spliterator和StreamSupport.stream(yourSpliterator, false/true) -
你打算使用Java还是Scala?
-
@Bubletan 我更喜欢 Scala。但算法在 Java 和 Scala 中应该无关紧要(语法除外)。
-
@fge 我认为 BlockingQueue.spliterator() 只是为当前队列中的元素创建一个迭代器。还是它也像底层的 BlockingQueue 一样阻塞?
-
@goerlitz 当队列为空时,如何知道是否还有实际结果?如果
hasNext()在队列为空时返回true,但在你添加None之后,next()应该返回什么?
标签: java scala asynchronous iterator java-stream