【问题标题】:Issue with Producer Consumer in Java - Missing synchronization?Java 中的生产者消费者问题 - 缺少同步?
【发布时间】:2012-06-23 15:34:05
【问题描述】:

我想我没有正确使用同步。我得到以下输出。

我有意识地选择不使用 BlockingQueue 或 java 5 并发特性。 我写这个是为了学习同步和一些基础知识。

生产者线程:PRODUCER-1 将项目 0-Name-0 添加到队列
消费者线程 CONSUMER-2 处理项:0-Name-0
生产者线程:PRODUCER-2 将项目 1-Name-1 添加到队列中

你能帮我理解我哪里出错了吗?

public class ProducerConsumerManager {

public static void main(String args[]){

    ItemQueue itemQueue = new ItemQueue();

    Producer producer1 = new Producer(itemQueue,15, 500);
    Producer producer2 = new Producer(itemQueue,15, 1000);
    Consumer consumer1 = new Consumer(itemQueue,500);
    Consumer consumer2 = new Consumer(itemQueue,1500);

    Thread producerThread1 = new Thread(producer1,"PRODUCER-1");
    Thread producerThread2 = new Thread(producer2,"PRODUCER-2");
    Thread consumerThread1 = new Thread(consumer1,"CONSUMER-1");
    Thread consumerThread2 = new Thread(consumer2,"CONSUMER-2");

    producerThread1.start();
    producerThread2.start();

    consumerThread1.start();
    consumerThread2.start();


    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        System.out.println("The MAIN THREAD has been INTERRUPTED");
    }


}
}


 public class Consumer implements Runnable{

private ItemQueue itemQueue;
private int waitTimeInMillis;
public Consumer(ItemQueue queue, int waitTimeInMillis){
    itemQueue = queue;
    this.waitTimeInMillis = waitTimeInMillis;
}

private boolean processItem(Item item){     
    if(item == null){
        System.out.println("Consumer Thread cannot process as Item is null");
        return false;
    }               
    return true;
}

public void run() {
    synchronized(itemQueue){
        try {
        if(itemQueue.hasMoreItems()){
            Item item = itemQueue.getNextItem();
            System.out.println("Consumer Thread "+ Thread.currentThread().getName() + " processing item: " +  item.getItemNo() + "-" + item.getItemName());

            processItem(item);              
                Thread.sleep(waitTimeInMillis);

        }else{

                itemQueue.wait();
            }} catch (InterruptedException e) {
                System.out.println("Consumer Thread INTERRUPTED");                  
            }

    }               
}

}


  public class Producer implements Runnable{

private ItemQueue itemQueue;
private int maxCount;
private int waitTimeInMillis;
public Producer(ItemQueue queue, int maxItems, int waitTimeInMillis){
    itemQueue = queue;  
    this.maxCount = maxItems;
    this.waitTimeInMillis = waitTimeInMillis;
}

public void run() { 
    synchronized(itemQueue){
        try {
        if(itemQueue.queueCount()>=maxCount){

                itemQueue.wait();               
        }
        else{
            produceNewItem();
            Thread.sleep(waitTimeInMillis);
        }
        } catch (InterruptedException e) {
            System.out.println("Producer Thread INTERRUPTED");
        }
    }       
}

private boolean produceNewItem(){
    Item  item = null;
    synchronized(ItemService.class){
        item = ItemService.getNextItem();       
    System.out.println("Producer Thread: " + Thread.currentThread().getName() + " adding item " + item.getItemNo() +"-"+item.getItemName()+"  to queue");
    itemQueue.addItem(item);
    return true;
}
}
}


  import java.util.LinkedList;

  public class ItemQueue {

private LinkedList<Item> itemList = new LinkedList<Item>();

public void addItem(Item item){
    itemList.add(item);
}

public Item getNextItem(){
    return itemList.poll();
}

public boolean hasMoreItems(){
    return  !itemList.isEmpty();
}

public int queueCount(){
    return itemList.size();
}
}


   public class Item {

private String itemName;
private int itemNo;
private String itemDescription;

public String getItemName() {
    return itemName;
}
public void setItemName(String itemName) {
    this.itemName = itemName;
}
public int getItemNo() {
    return itemNo;
}
public void setItemNo(int itemNo) {
    this.itemNo = itemNo;
}
public String getItemDescription() {
    return itemDescription;
}
public void setItemDescription(String itemDescription) {
    this.itemDescription = itemDescription;
}

public Item (int no, String name, String desc){
    itemName = name;
    itemNo = no;
    itemDescription = desc;
}
}


   import java.util.LinkedList;

  public class ItemService {

static LinkedList<Item> itemList = new LinkedList<Item>();
static int counter =0;

static{
    Item item = null;
    for(int i=0;i<10000;i++){
        item = new Item(i, "Name-"+i, "Description for item " + i);
        itemList.add(item);
    }

}

public static Item getNextItem(){
    if(counter < 9999){
        Item item= itemList.get(counter);
        counter++;
        return item;
    }
    else
    {
        System.out.println("Cannot PRODUCE any further items. all exhausted");
        return null;
    }

}

}

【问题讨论】:

  • 使用BlockingQueue 变体之一,而不是LinkedList
  • 谢谢杰弗里。我有意识地选择不使用任何 Java 5 并发或集合。我想了解锁定和同步。我知道我可能会让我的生活变得有点困难,但我想更好地学习基础知识。一旦我设法完成这项工作,我将进入使用 Java 5 功能的下一步。
  • 我认为你是在为学校的操作系统实验室做这件事;)
  • 我希望生产者线程启动,然后启动消费者线程。第二个消费者线程比第一个慢一点。一旦我完成了这项工作,我只想改变生产者和消费者线程的速度,以查看它以不同的时间间隔工作。好吧,这与学校或大学工作无关。我是一名学习java的专业人士,正在寻找一些理解java线程的帮助。我无法理解我哪里出错了。

标签: java multithreading producer-consumer


【解决方案1】:

你还没有说你有哪个问题,即你期望什么而不是你得到的输出,但是你的代码有两个重要的问题:

  1. 两种线程都在 itemQueue 上等待(因为它已满,或者因为它是空的),但代码中没有任何地方是 notify(),或者最好是 notifyAll(),被调用以唤醒等待的线程。这将不可避免地导致饥饿。当生产者将一个项目放入队列时,它应该调用notifyAll() 来唤醒等待中的消费者。当消费者从队列中移除一个项目时,它应该调用notifyAll() 来唤醒等待中的生产者。
  2. 必须始终在循环中创建一个 wait() 方法,以检查线程在唤醒时是否真的可以继续。阅读 Object.wait() 的 javadoc。

另外一个不太重要的问题是,与其强制每个线程实现同步,而wait()/notifyAll(),这些都应该封装在队列里面。该线程只会获取项目并将项目放入队列中,并被队列阻塞,直到可能为止。总之,你应该重新实现 BlockingQueue。

【讨论】:

    【解决方案2】:

    您错误的一点(可能不是您的问题的原因)是在生产者/消费者模型中,您应该在生产者之前启动/运行您的消费者。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多