【问题标题】:Throttling Scala Future blocks when onComplete is used使用 onComplete 时限制 Scala Future 块
【发布时间】:2017-04-20 06:12:12
【问题描述】:

我正在尝试使用 Scala Futures 生成许多 CPU 密集型作业。因为有这么多,我需要限制这些作业(线程)的创建。为此,我使用:

import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._

val numThread = sys.runtime.availableProcessors

import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue

implicit val context = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(
      numThread, numThread,
      0L, TimeUnit.SECONDS,
      new ArrayBlockingQueue[ Runnable ]( numThread ) {
        override def offer( e: Runnable ) = {
          put( e ); // Waiting for empty room
          true
        }
      })
     )

为了测试这一点,我创建了 2 个非常简单的函数:

import scala.util.{ Try, Success, Failure }
import scala.util.Random

def longComputation() = {
  val id = Thread.currentThread().getId
  //blocking {
    println( s"Started thread: $id" )
    Thread.sleep( 500 )
    println( s"Finished thread: $id" )
  //}
  id
}

def processResult[T](r : Try[T]) = {
  blocking {
      r match {
        case Success( id ) => println( s"Thread result: $id" )
        case Failure( t )  => println( "An error has occured: " + t.getMessage )
       }
  }

}

然后我执行测试以通过多线程执行任务:

def main( args: Array[ String ] ) {


   val s = Stream.from( 0 )
   //s.foreach { x => println(x) ;  val f = Future( longComputation ) ; f.onComplete{ processResult } }

   s.foreach { x => 
     println(x) 
     val f = Future( longComputation )  
     val p = Promise[Long]()
     p completeWith f
     p.future.onComplete{ processResult } 
   }

   println("Finished")
   context.shutdown
 } 

当我执行此操作时,启动了 16 个线程(CPU 计数为 8)。该程序打印了“完成”消息。然后系统锁定,不执行任何其他操作。但是,如果我删除回调,则线程将按预期执行 ad infinitum

上面我已经尝试了blocking,也使用了Promise。行为没有变化。所以我的问题是:如何在不阻塞回调的情况下限制任务执行?如果这不可能,在线程中进行 I/O 是否可行(未来)?

感谢任何指针。

【问题讨论】:

    标签: scala callback deadlock throttling concurrent.futures


    【解决方案1】:

    程序在死锁中运行。提供的threadPool 具有固定大小,因此会发生以下情况: Future(longComputation) 从线程池中分配一个线程并开始工作。完成后,onComplete 从池中分配一个Thread 来执行提供的函数。

    鉴于完成工作比完成工作需要更长的时间,在某些时候,所有线程都在忙于工作。它们中的任何一个都完成了,onComplete 也需要一个线程,因此它向执行程序请求一个线程。工作无法完成,因为所有线程都忙,机器陷入死锁。

    我们可以通过给消费者预留资源来解决这个生产者-消费者的死锁。这样,固定大小的线程池会限制工作,但我们确保可以进一步处理任何已完成的工作。

    我将context 重命名为fixedContext 的这个sn-p 显示了使用单独的上下文来处理结果,从而解决了死锁。我也去掉了Promise,除了代理未来之外,它并没有起到真正的作用。

    val fixedContext = // same as in question
    val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
    ...
    ...
    def main( args: Array[ String ] ) {
    
       val s = Stream.from( 0 )
    
       s.foreach { x => 
         println(x)
         val f = Future( longComputation )(fixedContext)  
         f.onComplete{ processResult }(singleThreadContext)
       }
    
       println("Finished")
       fixedContext.shutdown
     } 
    }
    

    【讨论】:

    • @Dima OP 执行上下文基于一个固定大小的线程池,资源耗尽。 global 是无限的,将继续接受工作,最终以java.lang.OutOfMemoryError 结束。提议的选项将运行稳定的 b/c 工作与消费分开。
    • @Dima 我更改了执行上下文以处理结果以避免混淆,并使我关于分离两个池的观点更加清晰。
    • 初步测试表明它有效。但是有一个问题:为什么选择单线程上下文?我可能会坚持这样做,因为我将在单个资源上执行 I/O,但我对这个选项背后的推理感兴趣。
    • @user2051561 我使用单线程上下文来清楚地说明我们需要的是单独的上下文这一点。在此示例中,一个线程就足够了,但在现实生活中,使用针对用例确定的资源。
    • 好的,明白了。
    【解决方案2】:

    当一个线程完成longComputation 时,它会尝试将作业放入队列以执行回调,并被阻塞,因为队列已满。因此,最终,第一批“作业”完成了,但所有线程仍然很忙,在队列中等待调度回调,并且没有任何东西可以退出队列。

    解决方案?从队列中删除限制。这样,尝试提交回调的线程不会被阻塞,并且可以用于接收下一个任务。

    您可能希望在您的生产者循环中插入一些东西以减慢它的速度,这样您的无限队列就不会耗尽所有内存。可能是Semaphore

    val sem = new Semaphore(numThread*2)
    def processResult[T](r : Try[T]) = blocking {
      r match {
        case Success( id ) => println( s"Thread result: $id" )
        case Failure( t )  => println( "An error has occured: " + t.getMessage )
      }
      sem.release
    }
    
    Stream.from(0).foreach { _ => 
      sem.acquire
      new Future(longComputation).onComplete(processResult)
    }
    

    您不需要自定义执行上下文 - scala 的默认设置实际上更适合您想要做的事情

    【讨论】:

    • 好的,现在我明白为什么会出现死锁了。请注意,正如 maasg 指出的那样,如果我使用默认上下文(由 ForkJoinPool 支持),队列将无限增长并导致 OOM 错误。至于信号量,我想避免我的所有同步,以避免..所有事情的死锁;-)
    • 是的,我知道它会无限增长——因此是信号量。至于“避免同步”,如果您认为,在不同的池上显式提交回调或使用BlockingQueue 不会同步......再想一想:) 它只会增加解决方案的复杂性,并且违反了问题(你最终拥有比你想要的更多的线程),并序列化回调,这可能会成为一个瓶颈
    • @user2051561 只有在两个进程具有相同的资源约束和优先级的情况下,像这里所做的那样在单个线程中绑定生产者和消费者才是有益的。在这种情况下,繁重的计算与轻量级 I/O 操作相关,我希望我的 CPU 线程尽快空闲,以便他们承担新的工作,并且不会给他们 I/O 任务,这可能会阻塞等待外部资源(磁盘,网络,...)分配给生产和消费者的线程数将取决于这些约束。
    猜你喜欢
    • 1970-01-01
    • 2017-08-01
    • 2021-05-30
    • 2019-09-16
    • 1970-01-01
    • 2014-05-22
    • 1970-01-01
    • 2015-10-14
    • 1970-01-01
    相关资源
    最近更新 更多