【问题标题】:Scala Futures - built in timeout?Scala Futures - 内置超时?
【发布时间】:2013-04-24 15:26:50
【问题描述】:

从官方教程参考文献中我无法完全理解期货的一个方面。 http://docs.scala-lang.org/overviews/core/futures.html

scala 中的期货是否具有某种内置的超时机制?假设下面的示例是一个 5 GB 的文本文件……“Implicits.global”的隐含范围最终会导致 onFailure 以非阻塞方式触发还是可以定义?如果没有某种默认超时,这是否意味着成功或失败都不会触发?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

【问题讨论】:

  • 请记住,这些解决方案都不会真正阻止Future 运行。唯一可以阻止Future 的地方就是它的内部。
  • @NikitaVolkov 您的链接不再有效。试图找到正确的链接但失败了。

标签: scala concurrency


【解决方案1】:
You can simply run the future to completion without giving any timeout interval by setting the timeout to infinite as below:

**import scala.concurrent.duration._  
Await.result(run(executionContext), Duration.Inf)**

run function can be as below :

def run(implicit ec: ExecutionContext) = {  
      val list = Seq(  
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")},  
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},  
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},  
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},  
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}  
      )  
      Future.sequence(list)  
    }  

【讨论】:

    【解决方案2】:

    此版本无需使用任何外部计时器即可工作(只需 Await.result)

    import scala.concurrent._
    import scala.concurrent.duration.FiniteDuration
    
    object TimeoutFuture {
        def apply[A](
            timeout: FiniteDuration
        )(block: => A)(implicit executor: ExecutionContext): Future[A] =
            try {
                Future { Await.result(Future { block }, timeout) }
            } catch {
                case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
            }
    }
    

    【讨论】:

    • 我印象深刻。虽然其他解决方案很复杂,但这个解决方案确实切中要害,而且非常简单。
    • 你是什么意思它可以在不使用超时的情况下工作?你显然在使用超时
    • 对不起,我不是很清楚。我的意思是不使用任何外部库来处理超时,如 akka、netty 或任何类型的计时器。
    【解决方案3】:

    您可以使用Await 等待未来完成。

    import scala.concurrent.duration._
    import scala.concurrent.{Await, Future}
    
    val meaningOfLife: Int = Await.result(Future(42), 1.nano)
    println (meaningOfLife)
    

    上面打印42

    在这种情况下,您可能需要一个隐式的ExecutionContext,只需添加:

    import scala.concurrent.ExecutionContext.Implicits.global
    

    另一种方法是使用来自monixCoeval。此方法并非在所有情况下都有效,您可以阅读有关它的所有内容here。 基本思想是,有时未来实际上并不需要任何时间,而是返回同步函数调用或值的结果,因此可以在当前线程上评估这个未来。这对于测试和模拟期货也很有用。此外,您不必指定预期的超时,但不必担心这一点仍然很好。

    您首先将未来转换为Task,然后将该任务包装在Coeval 中,然后交叉手指等待看看您会得到什么。这是一个非常简单的例子来展示它是如何工作的:

    您需要一个隐含的Scheduler 才能使用它:

    import monix.execution.Scheduler.Implicits.global
    


    Coeval(Task.fromFuture(Future (42)).runSyncStep).value() match {
       case Right(v) => println(v)
       case Left(task) => println("Task did not finish")
    }
    

    以上完成并将42打印到控制台。

    Coeval(Task.fromFuture(Future {
       scala.concurrent.blocking {
          42
       }
    }).runSyncStep).value() match {
       case Right(v) => println(v)
       case Left(task) => println("Task did not finish")
    }
    

    这个例子打印Task did not finish:

    【讨论】:

      【解决方案4】:

      在Future IMO 上指定超时的最简单方法是使用scala.concurrent.Await.ready 的scala 内置机制,如果Future 花费的时间超过指定的超时时间,这将抛出TimeoutException。否则,它将返回 Future 本身。 这是一个简单的人为示例

      import scala.concurrent.ExecutionContext.Implicits._
      import scala.concurrent.duration._
      val f1: Future[Int] = Future {
        Thread.sleep(1100)
        5
      }
      
      val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds)
      
      val f: Future[Int] = Future {
        Thread.sleep(1100)
        5
      }
      val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds)
      

      【讨论】:

        【解决方案5】:

        Monix Task 有超时 support

        import monix.execution.Scheduler.Implicits.global
        import monix.eval._
        import scala.concurrent.duration._
        import scala.concurrent.TimeoutException
        
        val source = Task("Hello!").delayExecution(10.seconds)
        
        // Triggers error if the source does not complete in 3 seconds after runOnComplete
        val timedOut = source.timeout(3.seconds)
        
        timedOut.runOnComplete(r => println(r))
        //=> Failure(TimeoutException)
        

        【讨论】:

          【解决方案6】:

          所有这些答案都需要额外的依赖。我决定使用 java.util.Timer 编写一个版本,这是将来运行函数的有效方法,在这种情况下触发超时。

          Blog post with more details here

          将它与 Scala 的 Promise 一起使用,我们可以创建一个带有超时的 Future,如下所示:

          package justinhj.concurrency
          
          import java.util.concurrent.TimeoutException
          import java.util.{Timer, TimerTask}
          
          import scala.concurrent.duration.FiniteDuration
          import scala.concurrent.{ExecutionContext, Future, Promise}
          import scala.language.postfixOps
          
          object FutureUtil {
          
            // All Future's that use futureWithTimeout will use the same Timer object
            // it is thread safe and scales to thousands of active timers
            // The true parameter ensures that timeout timers are daemon threads and do not stop
            // the program from shutting down
          
            val timer: Timer = new Timer(true)
          
            /**
              * Returns the result of the provided future within the given time or a timeout exception, whichever is first
              * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
              * Thread.sleep would
              * @param future Caller passes a future to execute
              * @param timeout Time before we return a Timeout exception instead of future's outcome
              * @return Future[T]
              */
            def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {
          
              // Promise will be fulfilled with either the callers Future or the timer task if it times out
              val p = Promise[T]
          
              // and a Timer task to handle timing out
          
              val timerTask = new TimerTask() {
                def run() : Unit = {
                      p.tryFailure(new TimeoutException())
                  }
                }
          
              // Set the timeout to check in the future
              timer.schedule(timerTask, timeout.toMillis)
          
              future.map {
                a =>
                  if(p.trySuccess(a)) {
                    timerTask.cancel()
                  }
              }
              .recover {
                case e: Exception =>
                  if(p.tryFailure(e)) {
                    timerTask.cancel()
                  }
              }
          
              p.future
            }
          
          }
          

          【讨论】:

          • 我赞成这个答案。一件小事是p 不必是可变的。 val 就足够了。
          • 谢谢,我在这里做了更改。
          • 这个解决方案是否真的会在超时后阻止 Future 在后台运行? .
          • 不,它不会取消未来。这里有一些代码支持Future取消viktorklang.com/blog/Futures-in-Scala-protips-6.html你也可以看看像Zio或者Cats Effect这样的效果库
          【解决方案7】:

          我正在使用这个版本(基于上面的 Play 示例),它使用 Akka 系统调度程序:

          object TimeoutFuture {
            def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
              implicit val executionContext = system.dispatcher
          
              val prom = Promise[A]
          
              // timeout logic
              system.scheduler.scheduleOnce(timeout) {
                prom tryFailure new java.util.concurrent.TimeoutException
              }
          
              // business logic
              Future {
                try {
                  prom success block
                } catch {
                  case t: Throwable => prom tryFailure t
                }
              }
          
              prom.future
            }
          }
          

          【讨论】:

            【解决方案8】:

            如果您希望作者(promise holder)成为控制超时逻辑的人,请使用akka.pattern.after,方式如下:

            val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
            Future.firstCompletedOf(Seq(promiseRef.future, timeout))
            

            这样,如果你的 Promise 完成逻辑永远不会发生,你的调用者的未来仍然会在某个失败的时候完成。

            【讨论】:

              【解决方案9】:

              还没有人提到akka-streams。流有一个简单的 completionTimeout 方法,将其应用于单源流就像 Future 一样。

              但是,akka-streams 也会取消,因此它实际上可以终止源运行,即它向源发出超时信号。

              【讨论】:

                【解决方案10】:

                只有在使用阻塞来获取Future 的结果时才会出现超时行为。如果你想使用非阻塞回调onCompleteonSuccessonFailure,那么你将不得不滚动你自己的超时处理。 Akka 为参与者之间的请求/响应 (?) 消息传递内置了超时处理,但不确定您是否要开始使用 Akka。 FWIW,在 Akka 中,对于超时处理,它们通过 Future.firstCompletedOf 组合了两个 Futures,一个代表实际的异步任务,一个代表超时。如果超时计时器(通过HashedWheelTimer)首先弹出,则异步回调失败。

                滚动您自己的一个非常简化的示例可能是这样的。一、一个用于调度超时的对象:

                import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
                import java.util.concurrent.TimeUnit
                import scala.concurrent.duration.Duration
                import scala.concurrent.Promise
                import java.util.concurrent.TimeoutException
                
                object TimeoutScheduler{
                  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
                  def scheduleTimeout(promise:Promise[_], after:Duration) = {
                    timer.newTimeout(new TimerTask{
                      def run(timeout:Timeout){              
                        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
                      }
                    }, after.toNanos, TimeUnit.NANOSECONDS)
                  }
                }
                

                然后是一个函数来获取 Future 并为其添加超时行为:

                import scala.concurrent.{Future, ExecutionContext, Promise}
                import scala.concurrent.duration.Duration
                
                def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
                  val prom = Promise[T]()
                  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
                  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
                  fut onComplete{case result => timeout.cancel()}
                  combinedFut
                }
                

                请注意,我在这里使用的 HashedWheelTimer 来自 Netty。

                【讨论】:

                • 非常感谢!您能否就如何处理期货提供一般性建议(事后)。我正在阅读 Akka,以及使用期货的 Scala 的各种 HTTP 包。似乎在某些时候为了使用 Future,一个阻塞事件必须在那个时刻发生或放弃该过程.. 但许多教程似乎专注于非阻塞调用,而不是事后对其进行任何实际操作?
                • 您绝对可以围绕 Futures 的非阻塞使用构建逻辑,我建议朝这个方向倾斜,因为它的性能要好得多。例如,我们对 HTTP/REST 层使用 Unfiltered。电话进来并转到 Akka Actors 进行服务。我们在调用actor返回的Future上使用非阻塞onComplete,然后完成Unfiltered Netty异步HTTP请求。这只是一个示例(虽然不是很详细),说明如何将非阻塞回调用于实际操作。
                • @cmbaxter - 谢谢。我试图实现这个(顺便说一句 - 蚂蚁许可问题),但它似乎不起作用。尝试在第一个未来使用地图与链接期货一起使用。 1. 我可以有 2 次超时吗?如何? 2.我尝试调用这样的东西,但它似乎什么也没做: TimeoutScheduler.withTimeout(createVMFuture)(global,2 seconds).recover{ case ex:TimeoutException => { logError("create vm timed out") }
                • @cmbaxter 感谢您的TimeoutScheduler。我在stackoverflow.com/questions/21983288/… 中使用过,但遇到了不终止的问题。也许你有第二个......
                • 如果使用 Akka,它有一个方便的 akka.pattern.after 可以使用,例如超时:doc.akka.io/docs/akka/2.4/scala/futures.html#After
                【解决方案11】:

                我很惊讶这在 Scala 中不是标准的。我的版本很短,没有依赖关系

                import scala.concurrent.Future
                
                sealed class TimeoutException extends RuntimeException
                
                object FutureTimeout {
                
                  import scala.concurrent.ExecutionContext.Implicits.global
                
                  implicit class FutureTimeoutLike[T](f: Future[T]) {
                    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
                      Thread.sleep(ms)
                      throw new TimeoutException
                    }))
                
                    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
                  }
                
                }
                

                使用示例

                import FutureTimeout._
                Future { /* do smth */ } withTimeout
                

                【讨论】:

                • 请记住,他不会阻止潜在的未来执行!这只能在未来实现。
                • @ScalaWilliam 有什么建议吗?在 future 中阻塞被认为是安全的。
                • Pablo Fernandez 的回答是正确的做法。
                • 您提到的解决方案使用 Akka 库,该库又依赖于 Java 的 Thread.sleep,这是在 JVM 上暂停线程执行的最低级别的方法。参考:github.com/akka/akka/blob/…
                • @Raul 不同之处在于 Akka 使用一个调度线程来处理所有计划的超时,而此解决方案为每次调用创建一个休眠线程。
                【解决方案12】:

                Play 框架包含 Promise.timeout,因此您可以编写如下代码

                private def get(): Future[Option[Boolean]] = {
                  val timeoutFuture = Promise.timeout(None, Duration("1s"))
                  val mayBeHaveData = Future{
                    // do something
                    Some(true)
                  }
                
                  // if timeout occurred then None will be result of method
                  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
                }
                

                【讨论】:

                  【解决方案13】:

                  我刚刚为同事创建了一个TimeoutFuture 类:

                  超时未来

                  package model
                  
                  import scala.concurrent._
                  import scala.concurrent.duration._
                  import play.libs.Akka
                  import play.api.libs.concurrent.Execution.Implicits._
                  
                  object TimeoutFuture {
                    def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
                  
                      val prom = promise[A]
                  
                      // timeout logic
                      Akka.system.scheduler.scheduleOnce(timeout) {
                        prom tryFailure new java.util.concurrent.TimeoutException
                      }
                  
                      // business logic
                      Future { 
                        prom success block
                      }
                  
                      prom.future
                    } 
                  }
                  

                  用法

                  val future = TimeoutFuture(10 seconds) { 
                    // do stuff here
                  }
                  
                  future onComplete {
                    case Success(stuff) => // use "stuff"
                    case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
                  }
                  

                  注意事项:

                  • 假设播放!框架(但很容易适应)
                  • 每段代码都在同一个ExecutionContext 中运行,这可能并不理想。

                  【讨论】:

                  • 我在这个实现中遇到的一个问题是,如果块抛出异常,并且我执行了 Await.result(future, 5 seconds),则会抛出 TimeoutException 而不是底层异常。我在 scala 2.11 上运行它,所以我不确定如果 block 抛出异常,为什么 prom success 块不会完成 Future。我解决它的方法是在舞会成功块上做一个 try-catch,并在 catch 处理程序中做舞会失败。
                  • 再看这个,我认为问题是如果块抛出异常,它不会被写入舞会。
                  • @pablo 正如你提到的,它需要播放框架,如果我将它直接包含在 scala 应用程序中,它会抛出“没有启动的应用程序”,我将为此启动一个播放应用程序等(其中我不想这样做)。是否有任何其他调度程序可用于运行它,因为我需要在 scala 代码中具有超时功能的未来功能。我对 scala 很陌生。
                  • @AbhishekAnand 它可以调整,但我想你最好尝试其他一些已经工作的答案:)
                  • @PabloFernandez,感谢 Pablo 的回复。我能够使它与java调度程序一起工作。但是,我发现即使确实发生超时,它也只是保证,你会得到未来的回应。 Future 本身不会被杀死或停止。所以我决定让我的应用程序等待 x min/sec 并让未来运行它的过程
                  【解决方案14】:

                  你可以指定等待未来时的超时时间:

                  对于scala.concurrent.Futureresult 方法允许您指定超时。

                  对于scala.actors.FutureFutures.awaitAll 允许您指定超时。

                  我认为 Future 的执行没有内置超时。

                  【讨论】:

                  • 这会阻止调用者。不鼓励阻塞,请参阅我的答案以获得完全非阻塞的解决方案。
                  猜你喜欢
                  • 2012-12-24
                  • 2013-02-09
                  • 2017-01-19
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  • 2011-08-10
                  • 2019-10-13
                  • 2017-03-27
                  相关资源
                  最近更新 更多