【问题标题】:Scala future sequence and timeout handlingScala 未来序列和超时处理
【发布时间】:2013-07-16 09:27:37
【问题描述】:

有一些很好的提示如何组合期货with timeouts。 但是我很好奇如何使用 Future sequence sequenceOfFutures

我的第一个方法是这样的

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._

object FutureSequenceScala extends App {
  println("Creating futureList")

  val timeout = 2 seconds
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      Thread sleep ms
      ms toString
    }
    Future firstCompletedOf Seq(f, fallback(timeout))
  }

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  val results = Await result (waitingList, timeout * futures.size)
  println(results)

  def fallback(timeout: Duration) = future {
    Thread sleep (timeout toMillis)
    "-1"
  }
}

有没有更好的方法来处理未来序列中的超时,或者这是一个有效的解决方案?

【问题讨论】:

    标签: scala timeout future


    【解决方案1】:

    您的代码中有一些内容可能需要重新考虑。对于初学者,我不太喜欢将任务提交到ExecutionContext,其唯一目的是模拟超时,并且在其中使用Thread.sleepsleep 调用是阻塞的,您可能希望避免在执行上下文中有一个纯粹阻塞的任务,以便等待固定的时间。我将从我的答案here 中窃取信息,并建议对于纯粹的超时处理,您应该使用我在该答案中概述的内容。 HashedWheelTimer 是一种高效的计时器实现,它比只是休眠的任务更适合超时处理。

    现在,如果您走这条路,我建议的下一个更改涉及处理每个未来的各个超时相关的故障。如果您希望单个故障完全使从 sequence 调用返回的聚合 Future 失败,那么什么都不做。如果您不希望这种情况发生,而是希望超时返回一些默认值,那么您可以像这样在Future 上使用recover

    withTimeout(someFuture).recover{
      case ex:TimeoutException => someDefaultValue
    }
    

    完成此操作后,您可以利用非阻塞回调并执行以下操作:

    waitingList onComplete{
      case Success(results) => //handle success
      case Failure(ex) => //handle fail
    }
    

    每个未来都有一个超时,因此不会无限运行。无需 IMO 阻止并通过atMost 参数到Await.result 提供额外的超时处理层。但我想这假设你对非阻塞方法没问题。如果你真的需要在那里阻塞,那么你不应该等待timeout * futures.size 的时间。这些期货并行运行;那里的超时时间应该只需要与期货本身的各个超时时间一样长(或者稍微长一点,以解决 cpu/计时的任何延迟)。它当然不应该是超时 * 期货总数。

    【讨论】:

    • 作为好奇心,HashedWheelTimerTimerTasknewScheduledThreadPoolExecutor 更有效吗?两者都做同样的工作。
    • @Jatin,我想您可以查看此链接以获取更多信息:stackoverflow.com/questions/15347600/…。但本质上,添加更多任务不应该消耗更多资源。它应该是一个更恒定的时间(就消耗的系统资源而言)基于计时器,然后是TimerTimerTask。对于一个高吞吐量系统,您将调度大量基于超时的短期任务,这是一个更好的解决方案,因为资源使用要求不断。
    • 但是与HashedWheelTimer 相比,STPEcoresize 1 如何消耗更多资源?我很抱歉,但我不明白。由于内部堆O(log(n))STPE 的插入时间更长,但滴答时间更短。你能解释一下吗
    • @Jatin,当您可能以非常高的速率插入任务时,插入时间非常重要。根据您为滴答间隔选择的内容,散列轮计时器可能并不完全准确,但如果这对您来说不如让大量任务进出计时器那么重要,那么它被认为是更好的选择。如果这意味着什么,Akka 使用相同的方法(包括 HWT)来处理他们的 Actor ask (?) 超时处理。这就是我为 withTimeout 函数编写代码的基础。
    • 您也可以查看这篇文章了解更多信息:stackoverflow.com/questions/17276393/…
    【解决方案2】:

    这里有一个版本,显示您屏蔽 fallback 的情况有多糟糕。

    请注意,执行器是单线程的,您正在创建许多后备。

    @cmbaxter 是对的,你的主超时不应该是timeout * futures.size,它应该更大!

    @cmbaxter 也是正确的,你想考虑非阻塞。一旦你这样做了,并且你想施加超时,那么你将为此选择一个计时器组件,查看他的链接答案(也链接到你的链接答案)。

    也就是说,我仍然喜欢my answer from your link,因为坐在循环中等待下一个应该超时的事情真的很简单。

    它只需要一个期货列表及其超时和一个备用值。

    也许有一个用例,例如一个简单的应用程序,它只是阻止某些结果(如您的测试)并且在结果出现之前不得退出。

    import scala.concurrent._
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext
    
    import java.util.concurrent.Executors
    import java.lang.System.{ nanoTime => now }
    
    object Test extends App { 
      //implicit val xc = ExecutionContext.global
      implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor)
    
      def timed[A](body: =>A): A = {
        val start = now 
        val res = body
        val end = now
        Console println (Duration fromNanos end-start).toMillis + " " + res
        res
      }
      println("Creating futureList")
    
      val timeout = 1500 millis
      val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
        val f = future {
          timed {
            blocking(Thread sleep ms)
            ms toString
          }
        } 
        Future firstCompletedOf Seq(f, fallback(timeout))
      }   
    
      println("Creating waitinglist")
      val waitingList = Future sequence futures
      println("Created")
    
      timed {
      val results = Await result (waitingList, 2 * timeout * futures.size)
      println(results)
      }     
      xc.shutdown
    
      def fallback(timeout: Duration) = future {
        timed {
          blocking(Thread sleep (timeout toMillis))
          "-1"
        }
      }   
    }   
    

    发生了什么:

    Creating futureList
    Creating waitinglist
    Created
    1001 1000
    1500 -1
    1500 1500
    1500 -1
    1200 1200
    1500 -1
    800 800
    1500 -1
    2000 2000
    1500 -1
    List(1000, 1500, 1200, 800, 2000)
    14007 ()
    

    【讨论】:

      【解决方案3】:

      Monix 任务有timeout 支持:

        import monix.execution.Scheduler.Implicits.global
        import monix.eval._
        import scala.concurrent.duration._
      
        println("Creating futureList")
        val tasks = List(1000, 1500, 1200, 800, 2000).map{ ms =>
          Task {
            Thread.sleep(ms)
            ms.toString
          }.timeoutTo(2.seconds, Task.now("-1"))
        }
      
        println("Creating waitinglist")
        val waitingList = Task.gather(tasks) // Task.sequence is true/literally "sequencing" operation
      
        println("Created")
        val results = Await.result(waitingList, timeout * futures.size)
        println(results)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2013-05-13
        • 2015-11-24
        • 2019-08-16
        • 1970-01-01
        • 2021-02-01
        • 2018-09-09
        • 2013-01-05
        相关资源
        最近更新 更多