【问题标题】:Race condition caused by mvar由 mvar 引起的竞态条件
【发布时间】:2021-11-01 20:52:01
【问题描述】:

考虑以下简单示例:

implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val mvarF = MVar.of[IO, mutable.Map[Int, Int]](mutable.Map.empty)

mvarF.flatMap(mvar =>

  mvar.take.bracket(st => {
    IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).start
  })(mvar.put)
    >>
  mvar.take.bracket(st =>
    IO(println(s"Size before sleep ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after sleep ${st.size}"))
  )(mvar.put)

).unsafeRunSync()

打印出来:

Size before sleep 1
Size after sleep 0

在本例中,在 Fiber 下调度的作业修改了 mvar 下的对象,该对象被另一个作业获取。

这非常不安全。有没有办法禁止这样的使用?

【问题讨论】:

  • 是的,不要使用可变变量。如果你共享一个可变变量,你就是不安全的。
  • @LuisMiguelMejíaSuárez 这并不总是实用的,尤其是在大量收藏的情况下。
  • 此外,bracketrelease 操作几乎会立即运行,因为您的 use 函数会启动纤程。如果你想控制对可变事物的访问,你可能应该使用 MVar 以外的东西来控制自身的变化,而不是它指向的对象。
  • 我认为这里的主要混淆是您说“mvar 是由另一份工作获得的”,但事实并非如此。您从 mvar 中取出值,然后改变该值(而不是 mvar 本身)。那时 mvar 不再被任何人获取。
  • 这个问题很奇怪。您正在改变变量引用的值。 MVar 让您可以控制变量的重新分配。 MVar 不能神奇地将非线程安全的东西升级为线程安全的。

标签: scala concurrency scala-cats cats-effect


【解决方案1】:

正如 Jasper 所指出的,您的主要问题(对于此特定代码示例)是您在通过调用 IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).startuse 中启动新光纤后释放 bracket。所以你使用的其实是mutable.Map[Int, Int] => IO[Fiber[IO, Unit]]

您只需删除该start,您将获得预期的行为(除非您的use IO 完成,否则您将使用mutable.Map[Int, Int] => IO[Unit]bracket。)。这意味着地图对于两个打印操作都将是空的。

mvarF.flatMap(mvar =>
  mvar.take.bracket(st => {
    IO(st.put(1, 1)) >>
      IO.sleep(2.seconds) >>
        IO(st.clear())
  })(mvar.put)
    >>
    mvar.take.bracket(st =>
      IO(println(s"Size before sleep ${st.size}")) >>
        IO.sleep(2.seconds) >>
          IO(println(s"Size after sleep ${st.size}"))
    )(mvar.put)

).unsafeRunSync()
Size before sleep 0
Size after sleep 0

但这实际上只是这个特定代码示例的巧合(IO 与 flatMap 链接,这意味着我们告诉运行时顺序执行这些 IO)。

MVar 让您可以控制变量的重新分配,但您根本没有进行任何重新分配。因此,这段代码甚至没有使用MVar 的任何功能,它只是作为旁观者坐在那里。

因此,以这种方式使用MVar 将对代码的线程安全产生零影响。

mvarF.flatMap(mvar =>
  mvar.take.bracket(st =>
    IO(println(s"Size before first sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${st.size}"))
  )(mvar.put)
).unsafeRunAsyncAndForget()

mvarF.flatMap(mvar =>
  mvar.take.bracket(st => {
    IO(st.put(1, 1)) >> IO.sleep(2.seconds) >> IO(st.clear())
  })(mvar.put)
).unsafeRunAsyncAndForget()

mvarF.flatMap(mvar =>
    mvar.take.bracket(st =>
      IO(println(s"Size before second sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${st.size}"))
    )(mvar.put)
).unsafeRunAsyncAndForget()
Size before first sleep - 0
Size before second sleep - 1
Size after first sleep - 0
Size after second sleep - 0

您可以使用 Semaphore 来获得无竞争作用域。

class IOWithSemaphore[A](
    private val a: A, 
    private val semaphore: Semaphore[IO]
  )(
    implicit
    F: Concurrent[IO],
    T: Timer[IO]) {

  def unitUse(use: A => IO[Unit]): IO[Unit] =
    for {
      _ <- semaphore.acquire
      _ <- use(a)
      _ <- semaphore.release
    } yield ()

}
val map = mutable.Map.empty[Int, Int]

Semaphore[IO](1).map(semaphore => {
  val mapIOWithSemaphore = new IOWithSemaphore[mutable.Map[Int, Int]](map, semaphore)

  // using unsafeRunAsync to emulate the parallel usage

  mapIOWithSemaphore.unitUse(map =>
    IO(println(s"Size before first sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${map.size}"))
  ).unsafeRunAsyncAndForget()

  mapIOWithSemaphore.unitUse(map =>
    IO(println(s"MUTATION BEGIN")) >> IO(map.put(1, 1)) >> IO.sleep(2.seconds) >> IO(map.clear()) >> IO(println(s"MUTATION END"))
  ).unsafeRunAsyncAndForget()

  mapIOWithSemaphore.unitUse(map =>
    IO(println(s"Size before second sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${map.size}"))
  ).unsafeRunAsyncAndForget()

}).unsafeRunAsyncAndForget()

Await.result(Promise[Unit].future, Duration.Inf)
Size before first sleep - 0
Size after first sleep - 0
MUTATION BEGIN
MUTATION END
Size before second sleep - 0
Size after second sleep - 0

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-30
    • 2010-10-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多