【问题标题】:ZIO: Why RejectedExecutionException are being thrown?ZIO:为什么会抛出 RejectedExecutionException?
【发布时间】:2021-03-25 14:29:25
【问题描述】:

我在运行以下代码时遇到 RejectedExecutionException:

override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] = {
    val test = for {
      start  <- ZIO effect Instant.now.toEpochMilli
      loop   = for {
        f <- effectBlocking{Thread sleep 1000}
      } yield ()
      allFib <- loop.fork replicateM 2000
      _      <- Fiber joinAll allFib
      end    <- ZIO effect Instant.now.toEpochMilli
      _      <- putStrLn(s"Total time: ${end-start}")
    } yield ()

    test
      .catchAll(ZIO succeed _.getMessage)
      .map(_ => ExitCode.success)
  }

我做错了什么?当我使用纯 Scala 和 Futures 执行下面的代码时(我认为这或多或少与我在 ZIO 上尝试做的事情相同),一切正常,没有任何异常。

  val tasks = new ListBuffer[Future[Unit]]
  
  for(_ <- 0 to 2000) {
    tasks += Future {
      Thread.sleep(1000)
    }
  }

  Await.result(Future sequence tasks, Duration.Inf)

使用 ZIO,我也尝试过传递一个专用的 ExecutorService,但也没有用。

【问题讨论】:

  • zio/blocking/package.scala 中的代码来看,他们正在为effectBlocking 使用最多1000 个线程的池。您的代码需要两倍。您可能想专门用Executors.newCachedThreadPool() 覆盖执行程序 - 我不知道如何在 ZIO 中做到这一点。
  • @OlegPyzhcov 我尝试使用专用的 ExecutionService(固定和缓存),但到目前为止没有成功。无论如何,我相信每个ThreadPool都有一个“邮箱”(任务队列),这样当所有线程都忙的时候,新的任务就会存储在队列中。
  • 但是...如果我用普通效果替换 effectBlocking,它就可以工作!但我不知道为什么。

标签: multithreading scala executorservice zio


【解决方案1】:

现代效果类型提倡在应用程序中使用两个线程池,一个用于线程阻塞的东西(推荐大小是无限的,但正如我们所知,ZIO 将大小限制为 1000),另一个用于计算(ZIO 是 2*CPU hyperthreads ; 在猫效应中它是 min(2, CPU hyperthreads)) 和可能会阻塞但不值得关心的东西,例如 println

在 ZIO 中,effectBlocking 是您告诉执行切换到另一个池的方式。这对于非无关紧要的阻塞很重要,因为不这样做会使计算池饿死,而您的应用程序的其余部分正在其上运行。

这就是 ZIO.effect 起作用的原因 - 它使用计算池。对我而言,您的代码将在大约 250 秒内完成(4 核意味着 8 个线程,我们有 2000 个睡眠时间)。


现在,我假设您用缓存的线程池替换了错误的线程池,因为这样可以:

object ZioBlockingTest extends zio.App {

  val cachedBlocker = ZLayer.succeed(new Blocking.Service {
    override def blockingExecutor: Executor =
      Executor.fromExecutionContext(Int.MaxValue)(
        ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
      )
  })


  override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = {
    val test = for {
      start  <- ZIO effect Instant.now.toEpochMilli
      loop   = for {
        f <- effectBlocking{Thread sleep 1000}
      } yield ()
      allFib <- loop.fork replicateM 2000
      _      <- Fiber joinAll allFib
      end    <- ZIO effect Instant.now.toEpochMilli
      _      <- putStrLn(s"Total time: ${end-start}")
    } yield ()

    test
      .catchAll(ZIO succeed _.getMessage)
      .map(_ => ExitCode.success)
  }.provideSomeLayer[ZEnv](cachedBlocker) // <-- override the pool used for effectBlocking
}

由于启动和加入 2000 个实际操作系统线程的开销,这对我来说在 2 秒内完成。


P.S.关于线程池具有“邮箱”的说法是正确的,但邮箱本身可能不同。计算池通常使用最大容量的java.util.concurrent.LinkedBlockedQueue (Int.MaxValue),因此当有空闲线程时,总是可以推迟处理。阻塞池使用j.u.c.SynchronousQueue,它的容量基本上为0,所以它是“立即启动或死亡”。它可能被认为是 ZIO 中的一个错误,它使用有限的线程池和SynchronousQueue 来阻塞任务。


P.P.S. Futures 也在使用一些线程池,但您可能已经导入了 ExecutionContext.Implicits.global。它有一点额外的魔力,你可以用它来告诉池“我在这里阻塞,也许添加一些线程”:

  for(_ <- 0 to 2000) {
    tasks += Future {
      blocking { Thread.sleep(1000) }
    }
  }

全局 EC 的确切限制可通过 VM 属性进行配置;阻塞情况下额外线程的默认值为 256。这使得示例代码在几秒钟内完成(仍然比使用具有适当缓存池的 ZIO 慢)。

惰性效果类型不依赖于这种东西,因为如果你正确使用两个池,它实际上会更高效;并且使用不同的池不那么难,因为不是在任何地方都以implicit 的形式传递。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-14
    • 2021-12-20
    • 2021-03-05
    • 2021-03-19
    • 2023-03-14
    • 2020-07-17
    • 2021-04-09
    • 2014-12-18
    相关资源
    最近更新 更多