【问题标题】:transforming a Seq[Future[X]] into an Enumerator[X]将 Seq[Future[X]] 转换为 Enumerator[X]
【发布时间】:2013-03-21 09:09:00
【问题描述】:

有没有办法将 Seq[Future[X]] 变成 Enumerator[X] ?用例是我想通过爬网来获取资源。这将返回一个 Futures 序列,我想返回一个 Enumerator,它将按照它们首先完成的顺序将 Futures 推送到 Iteratee。

看起来 Victor Klang 的 Future select gist 可以用来执行此操作 - 尽管它看起来效率很低。

注意:有问题的迭代器和枚举器是播放框架版本 2.x 给出的,即具有以下导入:import play.api.libs.iteratee._

【问题讨论】:

    标签: scala playframework-2.0


    【解决方案1】:

    使用Victor Klang's select method

      /**
       * "Select" off the first future to be satisfied.  Return this as a
       * result, with the remainder of the Futures as a sequence.
       *
       * @param fs a scala.collection.Seq
       */
      def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): 
          Future[(Try[A], Seq[Future[A]])] = {
        @scala.annotation.tailrec
        def stripe(p: Promise[(Try[A], Seq[Future[A]])],
                   heads: Seq[Future[A]],
                   elem: Future[A],
                   tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = {
          elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) }
          if (tail.isEmpty) p.future
          else stripe(p, heads :+ elem, tail.head, tail.tail)
        }
        if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!"))
        else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail)
       }
    }
    

    然后我可以得到我需要的东西

        Enumerator.unfoldM(initialSeqOfFutureAs){ seqOfFutureAs =>
            if (seqOfFutureAs.isEmpty) {
              Future(None)
            } else {
              FutureUtil.select(seqOfFutureAs).map {
                case (t, seqFuture) => t.toOption.map {
                  a => (seqFuture, a)
                }
              }
            }
        }
    

    【讨论】:

    • 我有点担心使用 Victor Klang 的 select 实现效率不够。在这个算法中,我们需要遍历整个序列,这要求每个 Future 在每次传递时都注册一个新的 Promise。应该可以创建一种算法,只需要这样做一次......也许这只是将枚举器子类化,并将序列中的每个未来注册到枚举器的问题。
    【解决方案2】:

    更好、更短且我认为更有效的答案是:

       def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
          def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = {
            Future.sequence(seqFutureX).flatMap { seqX: Seq[X] => 
                seqX.foldLeft(Future.successful(i)) {
                  case (i, x) => i.flatMap(_.feed(Input.El(x)))
                }
            }
          }
        }
    

    【讨论】:

    • 小心,如果 seqFutureX 中的任何一个期货失败,Future.sequence 将返回一个失败的未来。
    • 这个解决方案在喂给 Iteratee 之前等待所有期货完成,而来自@bblfish 的另一个解决方案尽可能早地提出(没有保留顺序!)。
    【解决方案3】:

    我确实意识到这个问题已经有点老了,但是根据 Santhosh 的回答和内置的 Enumterator.enumerate() 实现,我想出了以下内容:

    def enumerateM[E](traversable: TraversableOnce[Future[E]])(implicit ec: ExecutionContext): Enumerator[E] = {
      val it = traversable.toIterator
      Enumerator.generateM {
        if (it.hasNext) {
          val next: Future[E] = it.next()
          next map {
            e => Some(e)
          }
        } else {
          Future.successful[Option[E]] {
            None
          }
        }
      }
    }
    

    请注意,与第一个基于 Viktor 选择的解决方案不同,此解决方案保留了顺序,但您仍然可以之前异步启动所有计算。因此,例如,您可以执行以下操作:

    // For lack of a better name
    def mapEachM[E, NE](eventuallyList: Future[List[E]])(f: E => Future[NE])(implicit ec: ExecutionContext): Enumerator[NE] =
      Enumerator.flatten(
        eventuallyList map { list =>
          enumerateM(list map f)
        }
      )
    

    实际上,当我偶然发现这个线程时,我正在寻找后一种方法。希望它可以帮助某人! :)

    【讨论】:

      【解决方案4】:

      您可以使用 Java 执行器完成服务 (JavaDoc) 构建一个。这个想法是使用创建一系列新期货,每个期货使用ExecutorCompletionService.take() 等待下一个结果。当前一个未来有结果时,每个未来都会开始。

      但请注意,这可能效率不高,因为很多同步发生在幕后。使用一些并行 map reduce 进行计算(例如使用 Scala 的 ParSeq)并让 Enumerator 等待完整的结果可能会更有效。

      【讨论】:

      • “每个未来都将在前一个未来有结果时开始”:这似乎是阻塞的。在我的回答中提供的代码中,seqOfFuturesA 中的所有期货都是并行执行的。
      【解决方案5】:

      警告:在回答之前未编译

      这样的事情怎么样:

      def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
        def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = 
          Future.fold(seqFutureX)(i){ case (i, x) => i.flatMap(_.feed(Input.El(x)))) }
      }
      

      【讨论】:

      • 折叠的签名是def fold[T, R](futures: scala.TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R],但是你的代码有签名fold[T, R](futures: Seq[Future[T]])(zero: Iteratee[T,R])(foldFun: (R, T) => Future[R])(implicit executor: ExecutionContext): Future[R] foldfun 有问题,因为i.flatMap(_.feed(Input.El(x)) 返回一个Future[R] 而不是R
      • 但是 i 的类型是“Iteratee[X, A]”,而 flatMap 应该返回一个“Iteratee[X, A]”,不是吗? (假设提要返回 Iteratee[X, A])
      • Play 的Iteratee[E,A]flatMap 定义为:def flatMap[B](f: A => Iteratee[E, B]): Iteratee[E, B],因此您的案例应该写成case (i,x) => i.flatMap(a=> ...)。然后a 不再是Iteratee,因此它没有feed 方法。另一方面,如果有人尝试做case (i,x) => i feed(Input.El(x)),那么最终会得到Future[Iteratee[...]],这不是 fold 想要的。最棒的是,如果没有 Scala 的类型系统,我想我永远找不到答案... :-)
      【解决方案6】:

      这是我觉得很方便的东西,

      def unfold[A,B](xs:Seq[A])(proc:A => Future[B])(implicit errorHandler:Throwable => B):Enumerator[B] = {
          Enumerator.unfoldM (xs) { xs =>
              if (xs.isEmpty) Future(None)
              else proc(xs.head) map (b => Some(xs.tail,b)) recover {
                  case e => Some((xs.tail,errorHandler(e)))
              }
          }
      }
      
      def unfold[A,B](fxs:Future[Seq[A]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = {
      
          (unfold(Seq(fxs))(fxs => fxs)(errorHandler1)).flatMap(unfold(_)(proc)(errorHandler))
      }
      
      def unfoldFutures[A,B](xsfxs:Seq[Future[Seq[A]]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = {
      
          xsfxs.map(unfold(_)(proc)).reduceLeft((a,b) => a.andThen(b))
      }
      

      【讨论】:

        【解决方案7】:

        我想建议使用广播

        def seqToEnumerator[A](futuresA: Seq[Future[A]])(defaultValue: A, errorHandler: Throwable => A): Enumerator[A] ={
            val (enumerator, channel) = Concurrent.broadcast[A]
            futuresA.foreach(f => f.onComplete({
              case Success(Some(a: A)) => channel.push(a)
              case Success(None) => channel.push(defaultValue)
              case Failure(exception) => channel.push(errorHandler(exception))
            }))
            enumerator
          }
        

        我添加了 errorHandling 和 defaultValues,但您可以使用 onSuccess 或 onFailure 跳过它们,而不是 onComplete

        【讨论】:

          猜你喜欢
          • 2019-12-13
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-07-16
          • 1970-01-01
          • 2017-01-12
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多