【问题标题】:Scala: List[Future] to Future[List] disregarding failed futuresScala:List[Future] 到 Future[List] 忽略失败的期货
【发布时间】:2014-01-19 09:12:36
【问题描述】:

我正在寻找一种将任意长度的 Future 列表转换为列表的 Future 的方法。我正在使用 Playframework,所以最终,我真正想要的是Future[Result],但为了让事情更简单,我们只说Future[List[Int]] 这样做的正常方法是使用Future.sequence(...),但有一个转折.. . 我给出的列表中通常包含大约 10-20 个期货,其中一个期货失败的情况并不少见(它们正在发出外部 Web 服务请求)。如果其中一个失败,我不必重试所有这些,我希望能够获得成功的那些并返回那些。

例如,执行以下操作不起作用

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure

val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: 
                    Future.successful(3) :: Nil

val futureOfList = Future.sequence(listOfFutures)

futureOfList onComplete {
  case Success(x) => println("Success!!! " + x)
  case Failure(ex) => println("Failed !!! " + ex)
}

scala> Failed !!! java.lang.Exception: Failure

我希望能够将 1 和 3 从那里拉出来,而不是获得唯一的例外。我尝试使用Future.fold,但这显然只是在幕后调用Future.sequence

提前感谢您的帮助!

【问题讨论】:

    标签: scala future


    【解决方案1】:

    诀窍是首先确保没有一个期货失败。 .recover 是你这里的朋友,你可以将它与map 结合起来,将所有Future[T] 结果转换为Future[Try[T]]] 实例,所有这些都肯定是成功的未来。

    注意:你也可以在这里使用OptionEither,但如果你特别想捕获异常,Try 是最干净的方法

    def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
      f.map(Success(_)).recover { case x => Failure(x)}
    
    val listOfFutures = ...
    val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))
    

    然后像以前一样使用Future.sequence,给你一个Future[List[Try[T]]]

    val futureListOfTrys = Future.sequence(listOfFutureTrys)
    

    然后过滤:

    val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))
    

    如果需要,您甚至可以提取特定的故障:

    val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))
    

    【讨论】:

    • 谢谢! .recover 对我来说确实是缺失的部分。
    • 您可以使用_.collect{ case Success(x) => x} 而不是_.filter(_.isSuccess) 来摆脱Try 类型的futureListOfSuccesses
    • 在 scala 2010 中 .recover(x => Failure(x)) 无效,请改用 .recover({case e => Failure(e)})
    • 我认为你错过了未来的包装: def futureToFutureOfTry[A](f: Future[A]): Future[Try[A]] = { val p = Promise[Try[A] ]() f.map{ a=> p.success(scala.util.Success(a)) }.recover{ case x: Throwable => p.success(Failure(x)) } p.future }
    • 不是这样。我正在将 Future 映射到另一个 Future,不需要干预 Promise 并且会浪费
    【解决方案2】:

    我尝试了 Kevin 的回答,但在我的 Scala (2.11.5) 版本上遇到了一个小故障......我纠正了这个问题,如果有人感兴趣的话,还写了一些额外的测试......这是我的版本 >

    implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {
    
        /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
          * The returned future is completed only once all of the futures in `fs` have been completed.
          */
        def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
          val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
          Future.sequence(listOfFutureTrys)
        }
    
        def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
          f.map(Success(_)) .recover({case x => Failure(x)})
        }
    
        def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
          allAsTrys(fItems).map(_.filter(_.isFailure))
        }
    
        def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
          allAsTrys(fItems).map(_.filter(_.isSuccess))
        }
    }
    
    
    // Tests... 
    
    
    
      // allAsTrys tests
      //
      test("futureToFutureTry returns Success if no exception") {
        val future =  Future.futureToFutureTry(Future{"mouse"})
        Thread.sleep(0, 100)
        val futureValue = future.value
        assert(futureValue == Some(Success(Success("mouse"))))
      }
      test("futureToFutureTry returns Failure if exception thrown") {
        val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
        Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
        val futureValue = future.value
    
        assertResult(true) {
          futureValue match {
            case Some(Success(Failure(error: IllegalStateException)))  => true
          }
        }
      }
      test("Future.allAsTrys returns Nil given Nil list as input") {
        val future =  Future.allAsTrys(Nil)
        assert ( Await.result(future, 100 nanosecond).isEmpty )
      }
      test("Future.allAsTrys returns successful item even if preceded by failing item") {
        val future1 =  Future{throw new IllegalStateException("bad news")}
        var future2 = Future{"dog"}
    
        val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
        System.out.println("successItem:" + listOfTrys);
    
        assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
        assert(listOfTrys(1) == Success("dog"))
      }
      test("Future.allAsTrys returns successful item even if followed by failing item") {
        var future1 = Future{"dog"}
        val future2 =  Future{throw new IllegalStateException("bad news")}
    
        val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
        System.out.println("successItem:" + listOfTrys);
    
        assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
        assert(listOfTrys(0) == Success("dog"))
      }
      test("Future.allFailedAsTrys returns the failed item and only that item") {
        var future1 = Future{"dog"}
        val future2 =  Future{throw new IllegalStateException("bad news")}
    
        val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
        assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
        assert(listOfTrys.size == 1)
      }
      test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
        var future1 = Future{"dog"}
        val future2 =  Future{throw new IllegalStateException("bad news")}
    
        val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
        val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
        assert(listOfTrys(0) == Success("dog"))
        assert(listOfTrys.size == 1)
      }
    

    【讨论】:

      【解决方案3】:

      我刚刚遇到了这个问题,并提供了另一个解决方案:

      def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
                                                      (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], 
                                                       executor: ExecutionContext): Future[M[A]] = {
          in.foldLeft(Future.successful(cbf(in))) {
            (fr, fa) ⇒ (for (r ← fr; a ← fa) yield r += a) fallbackTo fr
          } map (_.result())
      }
      

      这里的想法是,在折叠中,您正在等待列表中的下一个元素完成(使用 for-comprehension 语法),如果下一个失败,您只需退回到已有的元素。

      【讨论】:

      • 我不喜欢这个名字,但我喜欢它的完成方式,直接来自序列 impl
      【解决方案4】:

      Scala 2.12 对 Future.transform 进行了改进,使其可以使用更少的代码。

      val futures = Seq(Future{1},Future{throw new Exception})
      
      // instead of `map` and `recover`, use `transform`
      val seq = Future.sequence(futures.map(_.transform(Success(_)))) 
      
      val successes = seq.map(_.collect{case Success(x)=>x})
      successes
      //res1: Future[Seq[Int]] = Future(Success(List(1)))
      
      val failures = seq.map(_.collect{case Failure(x)=>x})
      failures
      //res2: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))
      

      【讨论】:

        【解决方案5】:

        您可以使用选项轻松包装未来的结果,然后展平列表:

        def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
            f.map(Some(_)).recover {
              case e => None
            }
        val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))
        
        val futureListOfOptions = Future.sequence(listOfFutureOptions)
        
        val futureListOfSuccesses = futureListOfOptions.flatten
        

        【讨论】:

        • 万一其他人在第一个函数中遇到 Some 错误,可以像这样重写第一个函数以防止编译器错误: def futureToFutureOption[T](f: Future[T]): Future[Option[T]] = f.map(Option(_)).recover { case e => None }
        【解决方案6】:

        您还可以在不同的列表中收集成功和不成功的结果:

        def safeSequence[A](futures: List[Future[A]]): Future[(List[Throwable], List[A])] = {
          futures.foldLeft(Future.successful((List.empty[Throwable], List.empty[A]))) { (flist, future) =>
            flist.flatMap { case (elist, alist) =>
              future
                .map { success => (elist, alist :+ success) }
                .recover { case error: Throwable => (elist :+ error, alist) }
            }
          }
        }
        

        【讨论】:

          【解决方案7】:

          如果您出于某种原因需要保留失败的期货,例如日志记录或条件处理,这适用于 Scala 2.12+。您可以找到工作代码here

          val f1 = Future(1)
          val f2 = Future(2)
          val ff = Future.failed(new Exception())
          
          val futures: Seq[Future[Either[Throwable, Int]]] =
            Seq(f1, f2, ff).map(_.transform(f => Success(f.toEither)))
          
          val sum = Future
            .sequence(futures)
            .map { eithers =>
              val (failures, successes) = eithers.partitionMap(identity)
          
              val fsum = failures.map(_ => 100).sum
              val ssum = successes.sum
          
              fsum + ssum
            }
          
          assert(Await.result(sum, 1.second) == 103)
          

          【讨论】:

            猜你喜欢
            • 2018-03-21
            • 1970-01-01
            • 1970-01-01
            • 2016-05-09
            • 2018-06-29
            • 1970-01-01
            • 2021-01-10
            • 2022-01-03
            相关资源
            最近更新 更多