【问题标题】:Future with Timeout in Scala未来在 Scala 中超时
【发布时间】:2013-05-13 00:25:33
【问题描述】:

假设我有一个函数,它调用一个阻塞interruptible 操作。我想在超时的情况下异步运行它。也就是说,我想在超时到期时中断该功能。

所以我正在尝试做这样的事情:

导入 scala.util.Try 导入 scala.concurrent.Future def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 导入 ExecutionContext.Implicits.global 未来 {Thread.sleep(timeout); aref.get().interrupt} // 1 未来 {aref.set(Thread.currentThread);尝试(f())} // 2 }

问题是 (1) 中的aref 可以为 null,因为 (2) 尚未将其设置为当前线程。在这种情况下,我想等到设置aref。最好的方法是什么?

【问题讨论】:

  • 我假设是这样,但是有没有理由不在一个线程中执行set,然后仅在设置后才拆分两个期货?
  • Future[Unit] 已经为您提供了 Try 值,无需嵌套额外的 Try
  • @0__:同意,但出于某种奇怪的原因,在 M. Odersky 和 ​​Erik Meijer 在 Coursera 上的功能响应式编程课程中,在其中一个讲座视频中,Meijer 使他的示例函数之一“更健壮” " 通过使其返回 Future[Try[T]] 而不是 Future[T]
  • @0__ 谢谢,我就是这么想的。
  • @ErikAllik 你知道我们会看到不同的任何例子吗? /你能详细说明一下吗?我试图看到一些不同的行为设置超时并除以零,但我做不到。我在Await 处添加了一个Try(我认为通过捕获超时异常确实会有所不同)。

标签: scala concurrency future


【解决方案1】:

您可以使用Await 采用更简单的方法。 Await.result 方法将超时持续时间作为第二个参数,并在超时时抛出 TimeoutException

try {
  import scala.concurrent.duration._
  Await.result(aref, 10 seconds);
} catch {
    case e: TimeoutException => // whatever you want to do.
}

【讨论】:

  • 我猜,它不会中断异步运行的函数。
  • 为了中断函数我需要一个类似于aref的机制来存储当前线程。因此我们回到原来的问题:)
  • 在我的代码中 arefAtomicReference 而不是 Future
  • 你不应该在你的生产代码中使用 Await.result。这是一个阻塞函数
  • @PeterPerháč import scala.concurrent.duration._
【解决方案2】:

我也需要相同的行为,所以这就是我解决它的方法。我基本上创建了一个对象,该对象创建一个计时器,如果未来未在指定的持续时间内完成,则使用 TimeoutException 使承诺失败。

package mypackage

import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global

object TimeoutFuture {

  val actorSystem = ActorSystem("myActorSystem")
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    val promise = Promise[A]()
    actorSystem.scheduler.scheduleOnce(timeout) {
      promise tryFailure new java.util.concurrent.TimeoutException
    }

    Future {
      try {
        promise success block
      }
      catch {
        case e:Throwable => promise failure e
      } 
    }

    promise.future
  }
}

【讨论】:

  • PackagerGlobal 从何而来?我得到一个编译错误...not found: value PackagerGlobal
  • 抱歉,PackagerGlobal 是我的代码中的一个对象类,我忘记删除了。我已经修复了上面的示例,因此很清楚您需要什么对象,即 ActorSystem 的实例
【解决方案3】:

如果您添加CountDownLatch,您可以实现您想要的行为。 (请注意,大量Futures 中的阻塞(即卡在await)可能会导致线程池不足。)

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
  val cdl = new java.util.concurrent.CountDownLatch(1)

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt}   // 1
  Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())}  // 2
}

【讨论】:

  • 那如果f完成得快很多,底层的Thread还是会被打断?
【解决方案4】:

虽然您已经获得了一些关于如何通过阻塞额外线程来处理超时来实现它的答案,但我建议您尝试不同的方式,原因是 Rex Kerr 已经给出。我不完全知道,你在f() 中做什么,但如果它是 I/O 绑定的,我建议你只使用异步 I/O 库。如果是某种循环,您可以将超时值直接传递给该函数,如果超过超时,则在该函数中抛出TimeoutException。示例:

import scala.concurrent.duration._
import java.util.concurrent.TimeoutException

def doSth(timeout: Deadline) = {
  for {
    i <- 0 to 10
  } yield {
    Thread.sleep(1000)
    if (timeout.isOverdue)
      throw new TimeoutException("Operation timed out.")

    i
  }
}

scala> future { doSth(12.seconds.fromNow) }
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@3d104456

scala> Await.result(res3, Duration.Inf)
res6: scala.collection.immutable.IndexedSeq[Int] =
  Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> future { doSth(2.seconds.fromNow) }
res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@f7dd680

scala> Await.result(res7, Duration.Inf)
java.util.concurrent.TimeoutException: Operation timed out.
    at $anonfun$doSth$1.apply$mcII$sp(<console>:17)
    at $anonfun$doSth$1.apply(<console>:13)
    at $anonfun$doSth$1.apply(<console>:13)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    ...

scala> res7.value
res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] =
  Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))

这只会使用1个线程,将在超时+单个步骤的执行时间后终止。

【讨论】:

  • 我想在这里,我不能使用任何异步 I/O 我的函数 f 执行长阻塞但 可中断 调用。
【解决方案5】:

您也可以尝试像这样使用CountDownLatch

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  val latch = new CountDownLatch(1)
  Future {
    latch.await()
    aref.get().interrupt
  }

  Future {
    aref.set(Thread.currentThread) 
    latch.countDown()
    Try(f())
  }
}

现在我一直在等待我拨打latch.await(),但您当然可以将其更改为:

latch.await(1, TimeUnit.SECONDS)

然后用Try 包装它以处理何时/如果它超时。

【讨论】:

    猜你喜欢
    • 2013-01-05
    • 2021-02-01
    • 1970-01-01
    • 2020-08-04
    • 1970-01-01
    • 1970-01-01
    • 2017-06-24
    • 1970-01-01
    • 2012-08-14
    相关资源
    最近更新 更多