【问题标题】:Producer-Consumer using only notify() in the case of a single Consumer and an arbitrary number of Producers在单个消费者和任意数量的生产者的情况下,生产者-消费者仅使用 notify()
【发布时间】:2015-11-26 13:07:11
【问题描述】:

以下是我实现生产者-消费者问题的代码。使用notifyAll() 一切正常,但是由于性能原因,我想用notify() 替换所有出现的notifyAll()

我发现通过将 notifyAll() 更改为 notify() 来替换这些调用会导致死锁发生。但是,替换这些调用的所有其他尝试都失败了。

是否有一些巧妙的方法可以用notify() 替换这些调用,从而使下面的代码适用于单个消费者和任意数量的生产者?

public class Buffer
{
    private volatile String content = "";
    private volatile boolean isEmpty = true;

    public synchronized void addItem(String s)
    {
        while(!isEmpty){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        content = s;
        isEmpty = false;
        notifyAll();

    }

    public synchronized String getItem()
    {
        while(isEmpty) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        String temp = content;
        isEmpty = true;
        notifyAll();
        return temp;
    }
}

public class Producer implements Runnable
{
    private String greeting;
    private int repetitions;
    private Buffer b;

    public Producer(String aGreeting, int aRepetitions, Buffer aBuffer){
        greeting = aGreeting;
        repetitions = aRepetitions;
        b = aBuffer;
    }

    public void run()
    {
        for(int i = 1; i <= repetitions; i++) {
            b.addItem(greeting + i);
        }
    }
}


public class Consumer implements Runnable {
    private String greeting;
    private Buffer b;
    public Consumer(String aGreeting, Buffer aBuffer){
        greeting = aGreeting;
        b = aBuffer;
    }
    public void run()
    {
        try
        {
            while(true){
                System.out.println(greeting + b.getItem());
                Thread.sleep(100);
            }
        }
        catch(InterruptedException exception){}
    }
}

【问题讨论】:

  • 你可以在这里查看:github.com/pvllnspk/concurrency_problems/tree/master/src/cp,也许会发现一些有用的东西
  • 你能用阻塞队列的概念代替你的实现吗? BlockingQueue 通过提供对 put() 和 take() 阻塞的开箱即用支持,惊人地简化了 Producer-Consumer 设计模式的实现。开发人员不需要编写令人困惑和关键的等待通知代码来实现通信。 BlockingQueue 是一个接口,Java 5 为它提供了不同的实现。
  • 请检查这个,stackoverflow.com/questions/13774802/… 这个线程已经解释了一个很好的例子,说明为什么我们不应该使用 notify() 而不是 notifyAll()
  • 您的代码会变得明显更加复杂(您需要为生产者和消费者提供单独的锁定对象),而好处很可能很小。我建议不要这样做。

标签: java concurrency java.util.concurrent


【解决方案1】:

为了能够使用.notify(),您需要保证,任何可能唤醒线程“消耗”通知的全部“原因”。

例如,在您的情况下 consumer(方法 .get_item为缓冲区中的单个元素释放空间。这是消费者通知的原因。因为您使用单一消费者模型,只有生产者(方法.add_item)会因为这个通知而被唤醒。 生产者 使用整个释放的元素将信息存储到其中。

所以,使用.notify() 是消费者可以的

另一方面,由于您使用多个生产者来自一个生产者的通知可能会唤醒另一个生产者。当然,一个生产者不会消耗另一个生产者的效果。

所以,使用 .notify() 是生产者是不好的

解决您的问题的最本地方法是使用不同的通知:一种用于消费者,一种用于生产者。因此,生产者中的通知只能唤醒消费者,消费者会消费生产者存储的信息。使用Condition可以实现同一临界区下的不同通知:

public class Buffer
{
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    // `volatile` isn't needed for objects accessed under critical section
    private String content = "";
    private boolean isEmpty = true;

    // Use lock instead of `synchronized`.
    public void addItem(String s)
    {
        lock.lock();
        try {
            while(!isEmpty){
                try {
                    notFull.await(); // Analogue for wait()
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            content = s;
            isEmpty = false;
            notEmpty.signal(); // Analogue for notify()
        } finally {
            lock.unlock();
        }    
    }

    // Use lock instead of `synchronized`.
    public String getItem()
    {
        lock.lock();
        try {
            while(isEmpty) {
                try {
                    notEmpty.await(); // Analogue for wait()
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            String temp = content;
            isEmpty = true;
            notFull.signal(); // Analogue for notify()
            return temp;
        } finally {
            lock.unlock();
        }    
    }
}

【讨论】:

    【解决方案2】:

    简而言之:虽然 notifyAll() 通知所有等待线程并且 notify() 通知任何随机线程,但现在这个随机线程可能不是您下一个需要的线程,这可能会导致死锁。请参考这个例子:

    以下步骤导致我们陷入僵局。让我们将 limit 设置为 1 以保持示例简短。

    E1 enqueues an item.
    E2 attempts enqueue - checks wait loop - already full - waits
    
    E3 attempts enqueue - checks wait loop - already full - waits
    
    D1 attempts dequeue - and is executing synchronized block
    D2 attempts dequeue - blocks on entry to the (synchronized) block - due to D1
    
    D3 attempts dequeue - blocks on entry to the (synchronized) block - due to D1
    
    D1 is executing enqueue - gets the item, calls notify, exits method
    The notify happens to wake up E2 (i.e. "any waiting thread")
    BUT, D2 enters sync block before E2 can (E2 must reacquire the lock), so E2 blocks on entry to the enqueue sync block
    D2 checks wait loop, no more items in queue, so waits
    
    D3 enters block after D2, but before E2, checks wait loop, no more items in queue, so waits
    
    Now there is E3, D2, and D3 waiting!
    
    Finally E2 acquires the lock, enqueues an item, calls notify, exits method
    
    E2's notification wakes E3 (remember any thread can be woken)
    E3 checks the wait loop condition, there is already an item in the queue, so waits.
    NO MORE THREADS TO CALL NOTIFY and THREE THREADS PERMANENTLY SUSPENDED!
    

    解决方案:用 notifyAll 替换 notify

    适当参考,notify() instead of notifyAll() for blocking queue

    【讨论】:

    • 为什么需要排队?
    • 将入队读作produce,将出队读作consume
    • 如果我总是将队列的容量设置为 1 是否有效?
    • 这只是一个例子。我相信你足够聪明,可以想象当队列大小大于 1 时可能会发生死锁的场景。
    猜你喜欢
    • 2011-11-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-23
    • 1970-01-01
    • 2017-02-24
    • 2015-04-05
    相关资源
    最近更新 更多