【问题标题】:How to use SynchronousQueue appropriately in producer-consumer model? [closed]如何在生产者-消费者模型中正确使用 SynchronousQueue? [关闭]
【发布时间】:2012-08-14 19:01:47
【问题描述】:

我写了一个在生产者-消费者模型中使用 SynchronousQueue 的测试示例。但效果不好。以下是我的代码:

public class QueueTest {

    String input;
    int pos;
    BlockingQueue<String> queue;
    volatile boolean exitFlag;

    QueueTest()
    {
        for(int i=0; i<10000; i++)
            input += "abcde";
        input += "X";
        pos = 0;
        queue = new SynchronousQueue<String>();
        exitFlag = false;
    }

    public static void main(String[] args) {
        QueueTest qtest = new QueueTest();
        qtest.runTest();
    }

    void runTest()
    {
        Thread producer = new Thread( new Producer());
        Thread consumer = new Thread( new Consumer());
        producer.start();
        consumer.start();
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class Producer implements Runnable
    {
        public void run()
        {
            while(true)
            {
                String s = read();
                if(s.equals("X"))
                    break;
                try {
                    queue.put(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            exitFlag = true;
        }
    }

    class Consumer implements Runnable
    {
        public void run()
        {
            while(exitFlag == false)
            {
                String s = null;
                try {
                    s = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                process(s);  
            }
        }
    }

    String read()
    {
        String str = input.substring(pos, pos+1);
        pos++;
        return str;
    }

    void process(String s)
    {
        long sum = 0;
        for(long i=0; i<1000; i++)
            sum = sum * i + i;
    }
}

问题是运行像死锁一样卡住了。这些简单的代码有什么错误吗?

【问题讨论】:

    标签: java multithreading producer-consumer java.util.concurrent


    【解决方案1】:

    您更有可能看到比赛条件。想象一下场景

    Thread 1 put into queue
    Thread 2 takes out of queue quickly processes and awaits another put from thread 1
    Thread 1 finishes and sets exitFlag to true
    

    在这种情况下,线程 2 将永久保留,因为在线程 2 读取之前 exitFlag 未设置为 false。

    您可能需要考虑毒丸。这是给我们已完成的另一个线程的消息。例如:

       final String POISON_PILL = " POISON";
    
    class Producer implements Runnable {
        public void run() {
            while (true) {
                String s = read();
                if (s.equals("X"))
                    break;
                try {
                    queue.put(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                queue.put(POISON_PILL);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
    class Consumer implements Runnable {
        public void run() {
            String s = null;
            try {
                while ((s = queue.take()) != POISON_PILL) {
                    process(s);
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    

    所以当另一个线程被通知另一个线程已经完成时,两个线程都应该优雅地结束。

    【讨论】:

    • 试图帮助 OP 找到解决方案...
    • 这真是个好主意。我现在就试试。
    • 它有效。非常感谢!很好的一课。
    【解决方案2】:

    由于您的 exitFlag 在多个线程之间共享,因此您必须做一些事情来使 Producer 的更新对 Consumer 可见(就 java 内存模型而言)。对于此示例,使值 volatile 就足够了。

    更新:

    您应该生成挂起代码的堆栈转储。这将为您提供有关正在发生的事情的线索。这段代码是一个很好的例子,说明了为什么不应该将标志与 BlockingQueue 一起用于控制。

    更新 2:

    真糟糕,@JohnVint 把猫从袋子里拿出来了。是的,毒丸是解决这种种族状况的方法。

    【讨论】:

    • 好建议,我加了,但问题是一样的。
    • 具体是什么问题?
    • @JackWM - 线程卡在哪里了?
    【解决方案3】:

    你的程序会卡在以下场景:

    Producer 在 Consumer 检查 existFlag 是否为 true 之后设置 exitFlag(不添加新元素) >。如果队列中没有更多元素(Consumer 之前设法处理了所有元素),则消费者将在 queue.take() 上被阻塞。

    你可以使用queue.poll(),这是没有阻塞的方法。这需要稍微改变你的程序。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-01-08
      • 1970-01-01
      • 1970-01-01
      • 2015-10-12
      • 1970-01-01
      • 2022-10-23
      • 1970-01-01
      相关资源
      最近更新 更多