【问题标题】:Means for performing background operations in Scala在 Scala 中执行后台操作的方法
【发布时间】:2015-07-05 15:18:58
【问题描述】:

我想在后台做一些耗时的任务。所以我需要在不同的线程中开始计算,能够检查它是否完成(可能失败)并且能够在不需要时中止计算。计算结束后应该调用同步回调函数来存储计算值。

它可以被编程为 Thread 类的一些包装器。但我想这个基本功能已经在一些 scala 库中实现了。我试图搜索但只找到对我的简单任务来说太多的 Akka。 scala.concurrent.ExecutionContext 有有用的 execute 方法,但它不返回任何对象来检查计算状态并按需中止它。

什么库包含已经描述的功能?


我检查了scala.concurrent.Future。它缺乏中止计算的能力,这是至关重要的。我使用以下策略:在后台计算一些消耗函数并提供合理的默认值。如果函数的参数发生变化,我会放弃原来的计算并开始新的计算。我无法想象如何用 Future.flatMap 重写这个策略。

【问题讨论】:

  • 你应该看看期货——它们旨在提供你所描述的那种设施,尽管在惯用的使用中你不会“检查状态”——你使用Future 的单子组合器来组合计算。 Twitter's implementation 提供了现成的中断,我相信用标准库期货做类似的事情并不难。
  • 我已经考虑过Futures。 scala.concurent.Future 是一个特征——它不能被实例化,我也找不到它的任何具体实现。在 akka 和 scalaz 中也有 Futures。我想这是许多框架和库中使用的通用名称,这使得谷歌搜索变得更加困难。而且我不需要编写计算。我正在开发很少使用并发的应用程序,因此不鼓励使用像 akka 这样的繁琐框架
  • 您不会手动实例化Future——您在伴随对象上使用Future.apply(或在某些特殊情况下使用Promise.apply)。
  • 请注意,在 Java 或 Scala 中使用基于线程的 API 中止计算没有通用可靠的方法 - 您的计算要么必须支持中止本身(例如通过定期检查标志),要么您需要使用基于进程的 API(例如scala.sys.Process),因为进程可以在需要时终止。
  • docs.oracle.com/javase/7/docs/api/java/util/concurrent/… 文档说Future.cancel(true) 可能会中断执行任务的线程

标签: multithreading scala concurrency


【解决方案1】:

既然您要求取消,我将演示如何通过 Twitter's implementation 使用期货:

import com.twitter.util.{ Await, Future, FuturePool }

def computeFast(i: Int) = { Thread.sleep(1000); i + 1 }
def computeSlow(i: Int) = { Thread.sleep(1000000); i + 1 }

val fastComputation = FuturePool.unboundedPool(computeFast(1))
val slowComputation = FuturePool.unboundedPool(computeSlow(1))

现在您可以轮询结果:

scala> fastComputation.poll
res0: Option[com.twitter.util.Try[Int]] = Some(Return(2))

scala> slowComputation.poll
res1: Option[com.twitter.util.Try[Int]] = None

或者设置回调:

fastComputation.onSuccess(println)
slowComputation.onFailure(println)

不过,大多数时候最好使用mapflatMap 来描述如何组合计算。

取消有点复杂(这只是一个演示——您需要提供自己的取消逻辑):

import com.twitter.util.Promise

def cancellableComputation(i: Int): Future[Int] = {
  val p = Promise[Int]

  p.setInterruptHandler {
    case t =>
      println("Cancelling the computation")
      p.setException(t)
  }

  FuturePool.unboundedPool(computeSlow(i)).onSuccess(p.setValue)
  p
}

然后:

scala> val myFuture = cancellableComputation(10)
myFuture: com.twitter.util.Future[Int] = Promise@129588027(state=Interruptible(List(),<function1>))

scala> myFuture.poll
res4: Option[com.twitter.util.Try[Int]] = None

scala> myFuture.raise(new Exception("Stop this thing"))
Cancelling the computation

scala> myFuture.poll
res6: Option[com.twitter.util.Try[Int]] = Some(Throw(java.lang.Exception: Stop this thing))

您可能可以对标准库的未来做类似的事情。

【讨论】:

  • 我已经在谷歌上搜索过,停止计算的唯一方法是调用已弃用的 Thread.stop()。所有其他方式都需要以特殊形式编写计算。检查线程的中断值。
  • @ayvango Twitter 的futures 的可中断性允许你做的是建立一个计算图,中断将通过该计算图传播。例如,如果你写了myFuture.map(_ + 1),你会得到一个新的未来,如果你打断了这个新的未来,myFuture 也会被打断。
  • 我写了一个小实用程序,提供固定的工作池并利用 Thread.stop() 在计算过程中安全地终止线程
猜你喜欢
  • 2013-06-20
  • 2011-09-07
  • 1970-01-01
  • 1970-01-01
  • 2010-11-05
  • 2022-06-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多