【发布时间】:2021-06-05 17:13:38
【问题描述】:
问题
我有一个有趣的 Scala 生产者-消费者问题实例,它以两种不同的方式编写。两种方式都使用java.util.concurrent.ArrayBlockingQueue作为缓冲区,但它们有以下区别:
-
Version 1将生产者和消费者代码作为常规 Java 启动 线程 -
Version 2使用以下方式启动生产者和消费者代码ExecutionContext.global
在执行完这两个版本后,可以清楚地看到Version1 永远不会执行完(因为消费者的数量超过了生产者的数量,所以至少有一个消费者无限期地等待新的物品消费)。
但是,Version2 的情况也是如此,即消费者的数量也超过了生产者的数量,但这次程序确实完成了执行 - 我在这个问题中要求的东西解释为什么会这样。
我的解释尝试
我想说的是,即使Version2 中的消费者在生产者的供应“枯竭”后也一直在等待新商品,但代码在ExecutionContext 中启动的事实意味着代码已运行在守护线程内部,所以在主线程完成后(这里是人为添加Thread.sleep(1000)以延长执行时间),守护线程也会停止,无论它们是否完成了它们的主要工作或继续等待。
但是,我不确定我的解释是否解决了问题的根源,是否与问题无关。换句话说,我不确定我是否不是在陈述一些琐碎的事情,而是错过了对问题的一些明显解释。您能否验证我的一般理解是否正确,如有必要,请帮助我找到两个版本代码的这种行为的正确解释?谢谢!
代码
object Version1 extends App {
class Producer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (i <- 1 to 10) {println(s"$getName produced $i"); buffer.put(i)}
}
class Consumer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (_ <- 1 to 10) println(s"$getName consumed ${buffer.take}")
}
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
for (i <- 1 to 2) new Producer(s"Producer$i", buffer).start()
for (i <- 1 to 3) new Consumer(s"Consumer$i", buffer).start()
}
object Version2 extends App {
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
val ec = ExecutionContext.global
for (p <- 1 to 2)
ec.execute(() => for (i <- 1 to 10) {println(s"Producer$p produced $i"); buffer.put(i)})
for (c <- 1 to 3)
ec.execute(() => for (_ <- 1 to 10) {println(s"Consumer$c consumed ${buffer.take}")})
Thread.sleep(1000)
}
【问题讨论】:
标签: java multithreading scala concurrency thread-safety