【发布时间】:2017-04-20 06:12:12
【问题描述】:
我正在尝试使用 Scala Futures 生成许多 CPU 密集型作业。因为有这么多,我需要限制这些作业(线程)的创建。为此,我使用:
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._
val numThread = sys.runtime.availableProcessors
import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue
implicit val context = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(
numThread, numThread,
0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[ Runnable ]( numThread ) {
override def offer( e: Runnable ) = {
put( e ); // Waiting for empty room
true
}
})
)
为了测试这一点,我创建了 2 个非常简单的函数:
import scala.util.{ Try, Success, Failure }
import scala.util.Random
def longComputation() = {
val id = Thread.currentThread().getId
//blocking {
println( s"Started thread: $id" )
Thread.sleep( 500 )
println( s"Finished thread: $id" )
//}
id
}
def processResult[T](r : Try[T]) = {
blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
}
}
然后我执行测试以通过多线程执行任务:
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
//s.foreach { x => println(x) ; val f = Future( longComputation ) ; f.onComplete{ processResult } }
s.foreach { x =>
println(x)
val f = Future( longComputation )
val p = Promise[Long]()
p completeWith f
p.future.onComplete{ processResult }
}
println("Finished")
context.shutdown
}
当我执行此操作时,启动了 16 个线程(CPU 计数为 8)。该程序打印了“完成”消息。然后系统锁定,不执行任何其他操作。但是,如果我删除回调,则线程将按预期执行 ad infinitum。
上面我已经尝试了blocking,也使用了Promise。行为没有变化。所以我的问题是:如何在不阻塞回调的情况下限制任务执行?如果这不可能,在线程中进行 I/O 是否可行(未来)?
感谢任何指针。
【问题讨论】:
标签: scala callback deadlock throttling concurrent.futures