正如 Jasper 所指出的,您的主要问题(对于此特定代码示例)是您在通过调用 IO(st.put(1, 1)) >> (IO.sleep(2.seconds) >> IO(st.clear())).start 在 use 中启动新光纤后释放 bracket。所以你使用的其实是mutable.Map[Int, Int] => IO[Fiber[IO, Unit]]。
您只需删除该start,您将获得预期的行为(除非您的use IO 完成,否则您将使用mutable.Map[Int, Int] => IO[Unit] 和bracket。)。这意味着地图对于两个打印操作都将是空的。
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >>
IO.sleep(2.seconds) >>
IO(st.clear())
})(mvar.put)
>>
mvar.take.bracket(st =>
IO(println(s"Size before sleep ${st.size}")) >>
IO.sleep(2.seconds) >>
IO(println(s"Size after sleep ${st.size}"))
)(mvar.put)
).unsafeRunSync()
Size before sleep 0
Size after sleep 0
但这实际上只是这个特定代码示例的巧合(IO 与 flatMap 链接,这意味着我们告诉运行时顺序执行这些 IO)。
MVar 让您可以控制变量的重新分配,但您根本没有进行任何重新分配。因此,这段代码甚至没有使用MVar 的任何功能,它只是作为旁观者坐在那里。
因此,以这种方式使用MVar 将对代码的线程安全产生零影响。
mvarF.flatMap(mvar =>
mvar.take.bracket(st =>
IO(println(s"Size before first sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${st.size}"))
)(mvar.put)
).unsafeRunAsyncAndForget()
mvarF.flatMap(mvar =>
mvar.take.bracket(st => {
IO(st.put(1, 1)) >> IO.sleep(2.seconds) >> IO(st.clear())
})(mvar.put)
).unsafeRunAsyncAndForget()
mvarF.flatMap(mvar =>
mvar.take.bracket(st =>
IO(println(s"Size before second sleep - ${st.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${st.size}"))
)(mvar.put)
).unsafeRunAsyncAndForget()
Size before first sleep - 0
Size before second sleep - 1
Size after first sleep - 0
Size after second sleep - 0
您可以使用 Semaphore 来获得无竞争作用域。
class IOWithSemaphore[A](
private val a: A,
private val semaphore: Semaphore[IO]
)(
implicit
F: Concurrent[IO],
T: Timer[IO]) {
def unitUse(use: A => IO[Unit]): IO[Unit] =
for {
_ <- semaphore.acquire
_ <- use(a)
_ <- semaphore.release
} yield ()
}
val map = mutable.Map.empty[Int, Int]
Semaphore[IO](1).map(semaphore => {
val mapIOWithSemaphore = new IOWithSemaphore[mutable.Map[Int, Int]](map, semaphore)
// using unsafeRunAsync to emulate the parallel usage
mapIOWithSemaphore.unitUse(map =>
IO(println(s"Size before first sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after first sleep - ${map.size}"))
).unsafeRunAsyncAndForget()
mapIOWithSemaphore.unitUse(map =>
IO(println(s"MUTATION BEGIN")) >> IO(map.put(1, 1)) >> IO.sleep(2.seconds) >> IO(map.clear()) >> IO(println(s"MUTATION END"))
).unsafeRunAsyncAndForget()
mapIOWithSemaphore.unitUse(map =>
IO(println(s"Size before second sleep - ${map.size}")) >> IO.sleep(2.seconds) >> IO(println(s"Size after second sleep - ${map.size}"))
).unsafeRunAsyncAndForget()
}).unsafeRunAsyncAndForget()
Await.result(Promise[Unit].future, Duration.Inf)
Size before first sleep - 0
Size after first sleep - 0
MUTATION BEGIN
MUTATION END
Size before second sleep - 0
Size after second sleep - 0