【问题标题】:Wait and notify in Consumer and Producer Threads在消费者和生产者线程中等待和通知
【发布时间】:2016-06-07 15:47:53
【问题描述】:

刚开始学习多线程。我在多个线程中有 5 个生产者和 2 个消费者。基本上,这个程序将 100 个项目添加到队列中。当队列大小为100时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以重新开始添加。目前生产者会等待,但永远不会收到消费者的通知。

制作人:

public class Producer implements Runnable {

private BlockingQueue sharedQueue;
private final int queueSize;
private Object lock = new Object();

  public Producer(BlockingQueue sharedQueue, int queueSize){
    this.sharedQueue = sharedQueue;
    this.queueSize = queueSize;
  }

  public void run() {
    while(true) {
        if(sharedQueue.size()== queueSize){

                try {
                    synchronized (lock) {
                    sharedQueue.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
         }

        try {
            sharedQueue.put("Producer: " + sharedQueue.size());
            Thread.sleep(500);
            System.out.println("Producer:  Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者:

public class Consumer implements Runnable{

private BlockingQueue sharedQueue;
private final int queueSize;
private final int queueEmpty=0;
private Object lock = new Object();

   public Consumer(BlockingQueue sharedQueue, int queueSize){
    this.sharedQueue = sharedQueue;
    this.queueSize = queueSize;
   }
//Notify awaiting thread if the sharedQueue is empty
   public void run() {
    while (true) {
        if(sharedQueue.size()==queueEmpty){
            synchronized (lock) {
            this.notifyAll();
            }
        }
            try {

                    sharedQueue.take();
                    Thread.sleep(800);
                    System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread());

            }catch(InterruptedException e){
                e.printStackTrace();
            }
    }

  }
}

主类

  public class App{ 

//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service
public static void main( String[] args )
{
    final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100);
    final int queueSize =100;
    final int producerNum = 5;
    final int consumerNum = 2;

    final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum);
    final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum);

    for(int i=0;i<producerNum;i++){
        Producer producer = new Producer(sharedQueue,queueSize);
        executorProducer.execute(producer);
    }

    for(int j=0;j<consumerNum;j++){
        Consumer consumer = new Consumer(sharedQueue,queueSize);
        executorConsumer.execute(consumer);
    }



   }
 }

【问题讨论】:

  • 您认为this.notifyAll(); 会做什么?你为什么这么认为?
  • 如果 notifyAll 被调用,这段代码应该抛出 IllegalMonitorStateException。
  • 我认为 notifyAll 会唤醒所有等待这个对象的线程。在当前线程放弃对该对象的锁定之前,被唤醒的线程将无法继续。所以我的错误是等待和通知需要在同一个对象上使用?
  • 这里有多个错误。阅读有关受保护块的 Oracle 教程应该可以教会您一些东西。
  • 我告诉过你有多个问题。阅读 oracle 教程:docs.oracle.com/javase/tutorial/essential/concurrency/…

标签: java multithreading wait producer-consumer notify


【解决方案1】:

来自 oracle 文档page

BlockingQueue 实现是线程安全的。所有排队方法都使用内部锁或其他形式的并发控制以原子方式实现其效果

由于您已经在使用BlockingQueues,您可以摆脱wait()notify() API。

使用BlockingQueue的多个生产者和消费者的示例代码:

import java.util.concurrent.*;

public class ProducerConsumerDemo {

    public static void main(String args[]){

     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

      Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
      Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
      Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
      Thread consThread2 = new Thread(new Consumer(sharedQueue,2));

      prodThread1.start();
      prodThread2.start();
      consThread1.start();
      consThread2.start(); 
   }

}

class Producer implements Runnable {

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;

    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

}

class Consumer implements Runnable{

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }

    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

它是如何工作的?

  1. 生产者线程 1 将 11 - 15 之间的整数放入 BlockingQueue
  2. 生产者线程 2 将 21 - 25 之间的整数放入 BlockingQueue
  3. 任何消费者线程 - 线程 1 或线程 2 从 BlockingQueue 读取值(本例中为整数)

样本输出:

Produced:21:by thread:2
Produced:11:by thread:1
Produced:12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 21:by thread:1
Consumed: 12:by thread:1
Consumed: 13:by thread:1
Consumed: 14:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:15:by thread:1
Consumed: 11:by thread:2
Consumed: 15:by thread:1

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多