【发布时间】:2021-03-25 14:29:25
【问题描述】:
我在运行以下代码时遇到 RejectedExecutionException:
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] = {
val test = for {
start <- ZIO effect Instant.now.toEpochMilli
loop = for {
f <- effectBlocking{Thread sleep 1000}
} yield ()
allFib <- loop.fork replicateM 2000
_ <- Fiber joinAll allFib
end <- ZIO effect Instant.now.toEpochMilli
_ <- putStrLn(s"Total time: ${end-start}")
} yield ()
test
.catchAll(ZIO succeed _.getMessage)
.map(_ => ExitCode.success)
}
我做错了什么?当我使用纯 Scala 和 Futures 执行下面的代码时(我认为这或多或少与我在 ZIO 上尝试做的事情相同),一切正常,没有任何异常。
val tasks = new ListBuffer[Future[Unit]]
for(_ <- 0 to 2000) {
tasks += Future {
Thread.sleep(1000)
}
}
Await.result(Future sequence tasks, Duration.Inf)
使用 ZIO,我也尝试过传递一个专用的 ExecutorService,但也没有用。
【问题讨论】:
-
从
zio/blocking/package.scala中的代码来看,他们正在为effectBlocking使用最多1000 个线程的池。您的代码需要两倍。您可能想专门用Executors.newCachedThreadPool()覆盖执行程序 - 我不知道如何在 ZIO 中做到这一点。 -
@OlegPyzhcov 我尝试使用专用的 ExecutionService(固定和缓存),但到目前为止没有成功。无论如何,我相信每个ThreadPool都有一个“邮箱”(任务队列),这样当所有线程都忙的时候,新的任务就会存储在队列中。
-
但是...如果我用普通效果替换 effectBlocking,它就可以工作!但我不知道为什么。
标签: multithreading scala executorservice zio