【问题标题】:What is best way to wrap blocking Try[T] in Future[T] in Scala?在 Scala 的 Future[T] 中包装阻塞 Try[T] 的最佳方法是什么?
【发布时间】:2014-04-18 00:11:54
【问题描述】:

问题来了,我有一个库,它有一个阻塞方法返回 Try[T]。但由于它是阻塞的,我想使用 Future[T] 使它成为非阻塞的。在未来的块中,我还想计算一些取决于源阻塞方法的返回值的东西。

但是如果我使用类似下面的东西,那么我的 nonBlocking 将返回 Future[Try[T]] 不太令人信服,因为 Future[T] 已经可以代表 Failure[U],我宁愿将异常传播到Future[T] 是自我。

def blockMethod(x: Int): Try[Int] = Try { 
  // Some long operation to get an Int from network or IO
  throw new Exception("Network Exception") }
}

def nonBlocking(x: Int): Future[Try[Int]] = future {
  blockMethod(x).map(_ * 2)
}

这是我尝试过的,我只是在future {} 块中使用.get 方法,但我不确定这是否是最好的方法。

def blockMethod(x: Int): Try[Int] = Try { 
  // Some long operation to get an Int from network or IO
  throw new Exception("Network Exception") }
}

def nonBlocking(x: Int): Future[Int] = future {
  blockMethod(x).get * 2
}

这是正确的方法吗?还是有一种更 scala 惯用的方式将 t Try[T] 转换为 Future[T]?

【问题讨论】:

    标签: scala


    【解决方案1】:

    这是一个不阻塞的例子,注意你可能想使用自己的执行上下文而不是 scala 的全局上下文:

    import scala.util._
    import scala.concurrent._
    import scala.concurrent.duration._
    import ExecutionContext.Implicits.global
    
    object Main extends App {
    
      def blockMethod(x: Int): Try[Int] = Try {
        // Some long operation to get an Int from network or IO
        Thread.sleep(10000)
        100
      }
    
      def tryToFuture[A](t: => Try[A]): Future[A] = {
        future {
          t
        }.flatMap {
          case Success(s) => Future.successful(s)
          case Failure(fail) => Future.failed(fail)
        }
      }
    
      // Initiate long operation
      val f = tryToFuture(blockMethod(1))
    
      println("Waiting... 10 seconds to complete")
    
      // Should return before 20 seconds...
      val res = Await.result(f, 20 seconds)
    
      println(res) // prints 100
    }
    

    【讨论】:

    • 如果你知道你在阻塞,你不应该把代码包装在blocking吗?
    • fromTry 方法对此有用吗? scala-lang.org/api/current/#scala.concurrent.Future$
    • 自我纠正我之前的评论:fromTry 实际上是块,所以没有运气。
    • 我进行了更多实验,实际上Future(t.get) 似乎也没有阻塞线程。它看起来像一个更简单的解决方案,而不是 flatMapping。想法?
    【解决方案2】:

    在我看来:Try & Future 是一种单子结构和惯用方式,是单子组合(用于理解):

    您需要为 Future[Try[_]] 定义 monad 转换器(您的库的代码):

    case class FutureT[R](run : Future[Try[R]])(implicit e: ExecutionContext) {
      def map[B](f : R => B): FutureT[B] = FutureT(run map { _ map f })
      def flatMap[B](f : R => FutureT[B]): FutureT[B] = {
        val p = Promise[Try[B]]()
        run onComplete {
          case Failure(e)           => p failure e
          case Success(Failure(e))  => p failure e
          case Success(Success(v))  => f(v).run onComplete {
            case Failure(e)         => p failure e
            case Success(s)         => p success s
          }
        }
        FutureT(p.future)
      }
    }
    
    object FutureT {
      def futureTry[R](run : => Try[R])(implicit e: ExecutionContext) = 
        new FutureT(future { run })
    
      implicit def toFutureT[R](run : Future[Try[R]]) = FutureT(run)
      implicit def fromFutureT[R](futureT : FutureT[R]) = futureT.run
    }  
    

    及用法示例:

    def blockMethod(x: Int): Try[Int] = Try {
      Thread.sleep(5000)
      if(x < 10) throw new IllegalArgumentException
      else x + 1
    } 
    
    import FutureT._  
    
    // idiomatic way :)
    val async = for {
      x <- futureTry { blockMethod(15) }
      y <- futureTry { blockMethod(25) }            
    } yield (x + y) * 2  // possible due to using modan transformer  
    
    println("Waiting... 10 seconds to complete")
    
    val res = Await.result(async, 20 seconds)
    println(res)
    
    // example with Exception 
    val asyncWithError = for {
      x <- futureTry { blockMethod(5) }
      y <- futureTry { blockMethod(25) }            
    } yield (x + y) * 2  // possible due to using modan transformer  
    
    // Can't use Await because will get exception 
    // when extract value from FutureT(Failure(java.lang.IllegalArgumentException))
    // no difference between Failure produced by Future or Try
    asyncWithError onComplete {
      case Failure(e) => println(s"Got exception: $e.msg")
      case Success(res) => println(res)
    }
    // Output:
    // Got exception: java.lang.IllegalArgumentException.msg
    

    【讨论】:

    • 某处存在错误。将blockMethod(15) 更改为blockMethod(5) 会导致异常终止程序,并且最后一行println(res) 不会被执行。
    • 谢谢大卫,我错过了异常行为的描述,用异常示例更新我的答案。
    猜你喜欢
    • 2017-08-20
    • 2017-06-15
    • 1970-01-01
    • 1970-01-01
    • 2015-10-09
    • 2016-11-08
    • 2019-02-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多