【问题标题】:Iterate with hasNext() and next() over an asynchronously generated stream of elements在异步生成的元素流上使用 hasNext() 和 next() 进行迭代
【发布时间】:2015-07-20 05:53:55
【问题描述】:

我必须使用 hasNext() 和 next() 方法实现一个 Iterator 接口(由 Java API 定义),它应该返回源自异步处理的 HTTP 响应(由 Akka 演员处理)的结果元素。

必须满足以下要求:

  • 不要阻塞并等待异步操作完成,因为大型结果集的生成可能需要一段时间(迭代器应在结果元素可用时立即返回)
  • Iterator.next() 应该阻塞,直到下一个元素可用(或者如果没有更多元素出现,则抛出异常)
  • 只要还有更多元素,Iterator.hasNext() 应该返回 true(即使下一个尚不可用)
  • 结果的总数事先是未知的。生成结果的 Actor 将在完成后发送特定的“结束消息”。
  • 尽量避免使用 InterruptedException,例如当迭代器在空队列上等待但不会生成更多元素时。

我还没有研究过 Java 8 流或 Akka 流。但由于我基本上必须遍历队列(有限流),我怀疑是否有任何合适的解决方案。

目前,我的 Scala 实现存根使用 java.util.concurrent.BlockingQueue,如下所示:

class ResultStreamIterator extends Iterator[Result] {
    val resultQueue = new ArrayBlockingQueue[Option[Result]](100)

    def hasNext(): Boolean = ???  // return true if not done yet
    def next(): Result = ???      // take() next element if not done yet

    case class Result(value: Any) // sent by result producing actor
    case object Done              // sent by result producing actor when finished

    class ResultCollector extends Actor {
        def receive = {
           case Result(value) => resultQueue.put(Some(value))
           case Done          => resultQueue.put(None)
        }
    }
}

我使用 Option[Result] 来指示结果流的结尾,并使用 None。我已经尝试查看下一个元素并使用“完成”标志,但我希望有一个更简单的解决方案。

额外问题:

  • 如何使用单元测试涵盖同步/异步实现,尤其是测试延迟的结果生成?
  • 如何使迭代器成为线程安全的?

【问题讨论】:

  • 如果您使用 Java 8,请实现 SpliteratorStreamSupport.stream(yourSpliterator, false/true)
  • 你打算使用Java还是Scala?
  • @Bubletan 我更喜欢 Scala。但算法在 Java 和 Scala 中应该无关紧要(语法除外)。
  • @fge 我认为 BlockingQueue.spliterator() 只是为当前队列中的元素创建一个迭代器。还是它也像底层的 BlockingQueue 一样阻塞?
  • @goerlitz 当队列为空时,如何知道是否还有实际结果?如果hasNext() 在队列为空时返回true,但在你添加None 之后,next() 应该返回什么?

标签: java scala asynchronous iterator java-stream


【解决方案1】:

我听从了jiro的建议,根据需要做了一些调整。一般来说,我喜欢将getNext()next() 实现为发送给actor 的ask 消息的方法。这样可以确保任何时候只有一个线程修改队列。

但是,我不确定此实现的性能,因为 askAwait.result 将为 hasNext()next() 的每次调用创建两个线程。

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor.{ActorRef, ActorSystem, Props, Stash}
import akka.pattern.ask
import akka.util.Timeout

case object HasNext
case object GetNext

case class Result(value: Any)
case object Done

class ResultCollector extends Actor with Stash {

  val queue = scala.collection.mutable.Queue.empty[Result]

  def collecting: Actor.Receive = {
    case HasNext       => if (queue.isEmpty) stash else sender ! true
    case GetNext       => if (queue.isEmpty) stash else sender ! queue.dequeue
    case value: Result => unstashAll; queue += value
    case Done          => unstashAll; context become serving
  }

  def serving: Actor.Receive = {
    case HasNext => sender ! queue.nonEmpty
    case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException }
  }

  def receive = collecting
}

class ResultStreamIteration(resultCollector: ActorRef) extends Iterator {

  implicit val timeout: Timeout = Timeout(30 seconds)

  override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
    case b: Boolean => b
  }

  override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match {
    case Result(value: Any) => value
    case e: Throwable       => throw e
  }
}

object Test extends App {
  implicit val exec = scala.concurrent.ExecutionContext.global
  val system = ActorSystem.create("Test")
  val actorRef = system.actorOf(Props[ResultCollector])
  Future {
    for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done
  }
  val iterator = new ResultStreamIteration(actorRef)
  while (iterator.hasNext()) println(iterator.next)
  system.shutdown()
}

【讨论】:

    【解决方案2】:

    您可以使用变量存储下一个元素,然后在两种方法的开头等待它:

    private var nextNext: Option[Result] = null
    
    def hasNext(): Boolean = {
      if (nextNext == null) nextNext = resultQueue.take()
      return !nextNext.isEmpty
    }
    
    def next(): Result = {
      if (nextNext == null) nextNext = resultQueue.take()
      if (nextNext.isEmpty) throw new NoSuchElementException()
      val result = nextNext.get
      nextNext = null
      return result
    }
    

    【讨论】:

    • 我尝试了类似的方法,乍一看,您的代码似乎可以工作。 Hoewer,在 Scala 中不应该使用null,而是用None 初始化nextNext。但是有必要区分初始的NoneNone,这表明不会有更多的结果(例如,使用布尔标志)。我认为,jiro 的解决方案在 Scala/Akka 意义上更优雅,并且在同步方面也更安全——即使它更长。但是感谢您查看此内容。
    • @goerlitz 没问题。我同意将 nextNext 初始化为 null 是不好的,尽管没有看到其他解决方案更安全的任何理由。
    【解决方案3】:

    以下代码将满足要求。 Actor 的字段可以在 Actor 的接收器中安全地修改。 所以resultQueue不应该在Iterator的字段中,而应该在Actor的字段中。

    // ResultCollector should be initialized.
    // Initilize code is like...
    // resultCollector ! Initialize(100)
    class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] {
    
      implicit val timeout: Timeout = ???
    
      override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
        case ResponseHasNext(hasNext) => hasNext
      }
    
      @scala.annotation.tailrec
      final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match {
        case ResponseResult(result) => result
        case Finished => throw new NoSuchElementException("There is not result.")
        case WaitingResult => next()// should be wait for a moment.
      }
    
    }
    
    case object RequestResult
    case object HasNext
    
    case class ResponseResult(result: Result)
    case class ResponseHasNext(hasNext: Boolean)
    case object Finished
    case object WaitingResult
    
    case class Initialize(expects: Int)
    
    // This code may be more ellegant if using Actor FSM
    // Acotr's State is (beforeInitialized)->(collecting)->(allCollected)
    class ResultCollector extends Actor with Stash {
    
      val results = scala.collection.mutable.Queue.empty[Result]
    
      var expects = 0
    
      var counts = 0
    
      var isAllCollected = false
    
      def beforeInitialized: Actor.Receive = {
        case Initialize(n) =>
          expects = n
          if (expects != 0) context become collecting
          else context become allCollected
          unstashAll
        case _ => stash()
      }
    
      def collecting: Actor.Receive = {
        case RequestResult =>
          if (results.isEmpty) sender ! WaitingResult
          else sender ! ResponseResult(results.dequeue())
        case HasNext => ResponseHasNext(true)
        case result: Result =>
          results += result
          counts += 1
          isAllCollected = counts >= expects
          if (isAllCollected) context become allCollected
      }
    
      def allCollected: Actor.Receive = {
        case RequestResult =>
          if (results.isEmpty) sender ! Finished
          else sender ! ResponseResult(results.dequeue())
        case HasNext => ResponseHasNext(!results.isEmpty)
      }
    
      def receive = beforeInitialized
    }
    

    【讨论】:

    • 感谢您将队列放入 Actor 的提示。因此,正如您所说,它更加安全和清洁,因为对队列的所有访问都是通过参与者的事件处理程序同步的。那么使用ask 请求、不同状态(我将不得不研究 FSM)和Stash 也是有意义的。
    • 如果队列是actor的字段,则不必使用BlockingQueue。如果队列是迭代器的字段并且迭代器的方法是直接使用队列实现的,这些方法应该声明为同步,因为actor的线程和迭代器的线程不同。
    • 唯一的事情是,我不知道会产生多少结果(我更新了问题中的要求以澄清这一点)。因此,我必须依赖来自下游参与者的特定事件(例如case object Done),表明它已经完成了结果处理。
    • 然后在collecting 中添加Done 处理程序并将上下文更改为allCollected。(countsexpects 是不必要的)如果ResultDone 是可靠的。
    • 仍然,以下事件顺序存在问题:> 1. [collecting] Result(value) -> results += value > 2. [collecting] HasNext -> true > 3. [collecting ] RequestResult -> results.dequeue > 4. [collecting] HasNext -> true > 5. [collecting] Done -> become allCollected > 6. [AllCollected] RequestResult -> NoSuchElementException 不知何故,hasNext() 和 next() 都应该阻塞直到收到下一个 ResultDone 事件。而且我宁愿延迟 Actor 中的响应。我可以隐藏所有 HasNextResponseResult 并使用下一个 Result(value) 取消隐藏吗?
    猜你喜欢
    • 2020-12-15
    • 2020-12-30
    • 1970-01-01
    • 1970-01-01
    • 2020-09-13
    • 1970-01-01
    • 1970-01-01
    • 2013-03-29
    • 1970-01-01
    相关资源
    最近更新 更多