【问题标题】:Blocking in Producer Consumer Java Implementation using Condition Variables使用条件变量阻塞生产者消费者 Java 实现
【发布时间】:2017-12-05 17:15:45
【问题描述】:

我在使用条件变量实现生产者消费者时遇到了消费者阻塞的问题,基本上消费者线程不会获取生产者生产的最后一批产品,因此永远不会结束。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerImpl {

    protected List<Integer> buffer = new ArrayList<Integer>();

    //the lock on which condition variables are taken
    protected volatile Lock lock = new ReentrantLock(true);
    // the consumer will signal using this condition variable to the producer to start it's production
    protected volatile Condition producerStartProducing = lock.newCondition();
    // the producer will use this condition variable to start it's production
    protected volatile Condition consumerStartConsuming = lock.newCondition();

    class Consumer implements Callable<String> {

        int num;

        public Consumer(int i) {
            this.num = i;
        }

        /**
         * In a loop, take a lock each time, check if there is an item to consume from shared buffer
         * If yes, then consume it and loop back
         * If no, then wait for the producer to signal you.
         * Signals the producer each time it consumes an item.
         */
        @Override
        public String call() throws Exception {
            try {
                int i = 0;
                while (i < 10) {
                    while (buffer.isEmpty()) {
                        consumerStartConsuming.await();
                    }
                    lock.lock();
                    System.out.println("Consumer - " + i);
                    buffer.remove(buffer.size() - 1);
                    producerStartProducing.signalAll();
                    i++;
                }
            } finally {
                lock.unlock();
            }
            System.out.println("Consumed All");
            return "Consumed All";

        }

    }

    class Producer implements Callable<String> {

        int num;

        public Producer(int i) {
            this.num = i;
        }

        /**
         * In a loop, take a lock, produce items in a batch of 3, and then wait till the
         * consumer signals you to produce more.
         * Signals the consumer each time it produces an item
         */
        @Override
        public String call() throws Exception {
            try {
                int i = 0;
                while (i < 10) {
                    lock.lock();
                    while (buffer.size() > 2) {
                        producerStartProducing.await();
                    }
                    System.out.println("Producer - " + i);
                    buffer.add(1);
                    consumerStartConsuming.signalAll();
                    i++;
                }
            } finally {
                lock.unlock();
            }
            System.out.println("Produced All");
            return "Produced All";
        }

    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        try {
            ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
            Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
            Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
            final String string1 = p1.get();
            final String string2 = c1.get();
            System.out.println(string1 + " --- " + string2);
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

}

输出:

Producer - 0
Producer - 1
Producer - 2
Consumer - 0
Consumer - 1
Consumer - 2
Producer - 3
Producer - 4
Producer - 5
Consumer - 3
Consumer - 4
Consumer - 5
Producer - 6
Producer - 7
Producer - 8
Consumer - 6
Consumer - 7
Consumer - 8
Producer - 9
Produced All

生产者代码批量生产三个产品,然后使用条件变量信号等待消费者提取它们。

类似地,消费者线程每次从缓冲区中取出一个项目时,都会拾取项目并通知生产者生产更多项目。

这只是任意实现,但此代码中此类操作的任何其他组合都存在问题,如果有人能指出这段代码出了什么问题并指出这里出了什么问题,将不胜感激。

【问题讨论】:

    标签: java multithreading blocking producer-consumer condition-variable


    【解决方案1】:

    锁定范围不正确,所以我在下面的代码中修复了它。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ProducerConsumerImpl {
    
        protected List<Integer> buffer = new ArrayList<Integer>();
    
        // the lock on which condition variables are taken
        protected volatile Lock lock = new ReentrantLock(true);
        // the consumer will signal using this condition variable to the producer to
        // start it's production
        protected volatile Condition producerStartProducing = lock.newCondition();
        // the producer will use this condition variable to start it's production
        protected volatile Condition consumerStartConsuming = lock.newCondition();
    
        class Consumer implements Callable<String> {
    
            int num;
    
            public Consumer(int i) {
                this.num = i;
            }
    
            /**
             * In a loop, take a lock each time, check if there is an item to
             * consume from shared buffer If yes, then consume it and loop back If
             * no, then wait for the producer to signal you. Signals the producer
             * each time it consumes an item.
             */
            @Override
            public String call() throws Exception {
    
                int i = 0;
                while (i < 10) {
                    try {
                        lock.lock();
                        while (buffer.isEmpty()) {
                            consumerStartConsuming.await();
                        }
    
                        System.out.println("Consumer - " + i);
                        buffer.remove(buffer.size() - 1);
                        producerStartProducing.signalAll();
                        i++;
                    } finally {
                        lock.unlock();
                    }
                }
    
                System.out.println("Consumed All");
                return "Consumed All";
    
            }
    
        }
    
        class Producer implements Callable<String> {
    
            int num;
    
            public Producer(int i) {
                this.num = i;
            }
    
            /**
             * In a loop, take a lock, produce items in a batch of 3, and then wait
             * till the consumer signals you to produce more. Signals the consumer
             * each time it produces an item
             */
            @Override
            public String call() throws Exception {
    
                int i = 0;
                while (i < 10) {
                    try {
                        lock.lock();
                        while (buffer.size() > 2) {
                            producerStartProducing.await();
                        }
                        System.out.println("Producer - " + i);
                        buffer.add(1);
                        consumerStartConsuming.signalAll();
                        i++;
                    } finally {
                        lock.unlock();
                    }
                }
    
                System.out.println("Produced All");
                return "Produced All";
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
            try {
                ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
                Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
                Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
                final String string1 = p1.get();
                final String string2 = c1.get();
                System.out.println(string1 + " --- " + string2);
            } finally {
                newFixedThreadPool.shutdown();
            }
        }
    
    }
    

    【讨论】:

    • 谢谢,确实锁被错误地放置了,但这并不能解决问题,如果你看到了,消费者线程永远不会结束。它从不点击并打印“Consumed All”这一行。
    • 我已经运行了它。它正在消耗所有行。执行我的代码并查看
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多