【问题标题】:Creating a Synchronized Buffer for Producer/Consumer pattern in Java在 Java 中为生产者/消费者模式创建同步缓冲区
【发布时间】:2013-06-21 21:52:53
【问题描述】:

我为生产者/消费者模式实现了一个缓冲区,但是,消费者似乎永远不会获得锁,因此会发生饥饿。我无法确定为什么会发生这种情况,因为 put() 和 take() 似乎都正确释放了锁......

我知道有 BlockingQueue 和其他不错的实现,但我想使用 wait() 和 notify() 作为练习来实现它。

public class ProducerConsumerRaw {

    public static void main(String[] args) {
        IntBuffer buffer = new IntBuffer(8);

        ConsumerRaw consumer = new ConsumerRaw(buffer);
        ProducerRaw producer = new ProducerRaw(buffer);

        Thread t1 = new Thread(consumer);
        Thread t2 = new Thread(producer);

        t1.start();
        t2.start();

    }
}

class ConsumerRaw implements Runnable{
    private final IntBuffer buffer;

    public ConsumerRaw(IntBuffer b){
        buffer = b;
    }

    public void run() {
        while(!buffer.isEmpty()) {
            int i = buffer.take();
            System.out.println("Consumer reads "+i); // this print may not be in the order
        }
    }
}


class ProducerRaw implements Runnable{
    private final IntBuffer buffer;

    ProducerRaw(IntBuffer b) {
        this.buffer = b;
    }

    public void run(){
        for (int i = 0; i < 20; i++) {
            int n = (int) (Math.random()*100);
            buffer.put(n);
            System.out.println("Producer puts "+n);
        }
    }
}


class IntBuffer{

    private final int[] storage;
    private volatile int end;
    private volatile int start;

    public IntBuffer(int size) {
        this.storage = new int[size];
        end = 0;
        start = 0;
    }


    public void put(int n) { // puts add the END
        synchronized(storage) {
            boolean full = (start == (end+storage.length+1)%storage.length);
            while(full){ // queue is full
                try {
                    storage.notifyAll(); 
                    storage.wait();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.storage[end] = n;
            end = incrementMod(end);
            storage.notifyAll();
        }
    }


    public int take(){

        synchronized(storage) {
            while (end == start) { // empty queue
                try {
                    storage.notifyAll(); // notify waiting producers
                    storage.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            int index = start;
            start = incrementMod(start);
            storage.notifyAll(); // notify waiting producers
            return this.storage[index];

        }
    }

    private int incrementMod(int index) {
        synchronized (storage) {
            if (index == storage.length-1) return 0;
            else return index+1;
        }
    }

    public boolean isEmpty(){
        synchronized (storage) {
            return (start == end);
        }
    }

}

【问题讨论】:

    标签: java multithreading concurrency


    【解决方案1】:

    这至少是一个问题,在您的put 方法中:

    boolean full = (start == (end+storage.length+1)%storage.length);           
    while(full){ // queue is full
        // Code that doesn't change full
    }
    

    如果full 曾经被初始化为true,您希望循环如何结束?

    其他问题是这个循环,在消费者中:

    while(!buffer.isEmpty()) {
        int i = buffer.take();
        System.out.println("Consumer reads "+i);
    }
    

    您假设生产者永远不会让缓冲区变空 - 如果消费者在生产者之前启动,它将立即停止。

    相反,您需要某种方式告诉缓冲区您已停止生成。消费者应该继续接收,直到队列为空并且不会再收到任何数据。

    【讨论】:

    • 哇,感谢您指出这一点!它现在可以工作了,但我很感激任何关于如何改进代码的评论
    • @ksiomelo:显而易见的改进是使用 java.util.concurrent 中专门为此设计的类 :)
    猜你喜欢
    • 2011-03-21
    • 1970-01-01
    • 2015-11-15
    • 1970-01-01
    • 2018-12-13
    • 2017-04-07
    • 1970-01-01
    • 2016-06-29
    • 2011-10-27
    相关资源
    最近更新 更多