【问题标题】:Scala Cats Effects - IO Async Shift - How Does it Work?Scala Cats 效果 - IO 异步转换 - 它是如何工作的?
【发布时间】:2019-02-25 12:09:10
【问题描述】:

这是一些使用 IO Monad 的 Scala 猫代码:

import java.util.concurrent.{ExecutorService, Executors}

import cats.effect.IO

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

object Program extends App {

  type CallbackType = (Either[Throwable, Unit]) => Unit

  // IO.async[Unit] is like a Future that returns Unit on completion.
  // Unlike a regular Future, it doesn't start to run until unsafeRunSync is called.
  def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    // "callback" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
    println("LalalaAsync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
        try {
          callback(Right(nothing)) // On success, the callback returns nothing
        } catch {
          case NonFatal(t) => callback(Left(t)) // On failure, it returns an exception
        }
      }
    })
  }

  def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
    println("LalalaSync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        toRun()
      }
    })
  }

  val treadPool: ExecutorService = Executors.newSingleThreadExecutor()
  val mainThread: Thread = Thread.currentThread()

  val Global: ExecutionContextExecutor = ExecutionContext.global

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
   */
  val program = for {
    _ <- IO {
      println("1 Hello World printed synchronously from Main." + Thread.currentThread().getName) // "main" thread
    }
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- forkSync { () =>
      println("Hello World printed synchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- forkAsync { () =>
      println("Hello World printed asynchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- IO {
      println("2 Hello World printed synchronously from Global ." + Thread.currentThread().getName) // "scala-execution-context-global-13" thread
    }
  } yield ()

  program.unsafeRunSync()
}

要运行它,您需要添加:

libraryDependencies ++= Seq(
  "org.typelevel" %% "cats" % "0.9.0",
  "org.typelevel" %% "cats-effect" % "0.3"
),

到你的 build.sbt 文件。

注意输出:

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
 */

基本上,我不明白 IO.shift(Global) 或 IO.async 是如何工作的。

比如为什么我调用了“forkAsync”之后,如果我不调用“IO.shift(Global)”,后面的同步IO对象都运行在“pool-1-thread-1”中。另外,这个例子中的 forkAsync 和 forkSync 有什么区别?它们都从 ExecutionContext.global 开始,然后在“pool.pool-1-thread-1”中执行一个 Runnable。

就像 forkAsync 和 forkSync 做完全相同的事情还是 forkAsync 做不同的事情?如果他们在做同样的事情,那么在 IO.async 中包装代码有什么意义?如果他们不做同样的事情,他们有什么不同?

【问题讨论】:

    标签: scala functional-programming monads scala-cats io-monad


    【解决方案1】:

    比如为什么我调用了“forkAsync”之后,如果我不调用“IO.shift(Global)”,后面的同步IO对象都运行在“pool-1-thread-1”中。

    更重要的问题是你为什么期望它在全局上评估“后续同步 IO 对象”?

    IO 内部没有线程池的概念,它不知道global,因此它无法切换回您的默认线程池,因此您确实需要触发手动切换。

    升级到最新版本1.0.0 并且您在ContextShift 中也有evalOn,它将在指定的线程池上执行IO 操作,然后切换回您的“全局”,我想这是你想要什么。

    另外,这个例子中的 forkAsync 和 forkSync 有什么区别?

    您的forkSync 触发Runnable 的执行,但不等待其完成。这是一场火灾,忘记了。这意味着后续的连锁动作不会产生反压力。

    一些建议:

    1. 升级到最新版本 (1.0.0)
    2. 阅读文档:https://typelevel.org/cats-effect/datatypes/io.html

    【讨论】:

    • 那么这是否意味着我的forkAsync 不是一劳永逸?就像我的forkAsync 是否等待run 方法完成?
    • 啊,我明白了。 forkAsync 等待运行完成,但 forkSync 没有。此外,如果您在 forkSync 之后执行 IO.shift ,它会更改 forkSync 最初运行的线程,但 forkSync 继续被解雇并忘记。
    • 而不是forkSync,我可以直接执行_ &lt;- contextShift.evalOn(ec)(IO{Thread.sleep(1000); println("Hello World")}),它会将这个长时间运行的IO 虹吸到treadPool 上并返回到默认的ExecutionContext。
    • 等一下。 contextShift.evalOn(ec)(IO{Thread.sleep(1000); println("Hello World")}) 等待我的同步 IO 完成,然后再执行序列中的下一个 IO。它不再是火和忘记。现在基本上和forkAsync一样。
    • 好的,所以如果我可以从contextShift.evalOn(IO.apply{...}) 获得相同的“等待完成”语义,并且我可以转移到我的 IO.async 块将使用的线程,那我为什么还要使用 IO.async?就像我不能用阻塞调用 IO.apply 来替换对 IO.async 的调用吗?
    猜你喜欢
    • 2018-10-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-01
    • 1970-01-01
    • 2017-07-09
    相关资源
    最近更新 更多