【问题标题】:Producer-Consumer, Bounded Buffer Insert Error?生产者-消费者,有界缓冲区插入错误?
【发布时间】:2018-05-05 19:42:15
【问题描述】:

我正在尝试执行一项任务,该任务需要多个生产者和消费者在有界缓冲区中生产和消费随机整数。这是我的代码:

import java.util.Deque;
import java.util.LinkedList;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.Semaphore;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.nio.Buffer;
import java.util.concurrent.TimeUnit;

public class ProducerConsumer 
{   
    interface Buffer
    {
        public abstract void insert(int item);
        public abstract int remove();
    }

    static class BoundedBuffer implements Buffer
    {
        private static final int maxSize = 5;

        private Semaphore mutex;
        private Semaphore empty;
        private Semaphore full;

        private int count;
        private int in;
        private int out;
        private int[] buffer;
        //private ArrayList<Integer> buffer = new ArrayList<Integer>();

        public BoundedBuffer()
        {
            mutex = new Semaphore(1);
            empty = new Semaphore(5);
            full = new Semaphore(0);

            count = 0;
            in = 0;
            out = 0;
        }

        public synchronized int remove()
        {
            int  item = 0;

            while (count == 0)
            {

            }

            try{
                full.acquire();
                mutex.acquire();
            }catch (InterruptedException e) {
                System.out.println("REMOVAL ERROR: " + e);
            }

            count--;
            item = buffer[out];
            out = (out+1) % maxSize;

            System.out.println("      Consumer consumed: " + item);

            mutex.release();
            empty.release();

            return item;
        }

        public synchronized void insert(int item)
        {
            while (count == maxSize)
            {

            }

            try{
                empty.acquire();
                mutex.acquire();
            }catch (InterruptedException e) {
                System.out.println("INSERTION ERROR: " + e);
            }

            count++;
            buffer[in] = item;
            in = (in+1) % maxSize;

            System.out.println("Producer produced " + item);

            mutex.release();
            full.release();
        }
    }

    static class Producer implements Runnable
    {
        private Buffer buffer;

        public Producer(Buffer b)
        {
            buffer = b;
        }

        public void run()
        {
            Random proRand = new Random();
            Random sleepRand = new Random();

            for (int i = 0; i < 100; i++)
            {
                try{
                    Thread.sleep(sleepRand.nextInt((500 - 0) + 0));
                }catch (InterruptedException e) {
                    System.out.println("PRODUCER INTERRUPT: " + e);
                }

                buffer.insert(proRand.nextInt((99999 - 10000) + 10000));
            }

        }
    }

    static class Consumer implements Runnable
    {
        private Buffer buffer;

        public Consumer(Buffer b)
        {
            buffer = b;
        }

        public void run()
        {
            Random sleepRand = new Random();

            for (int i = 0; i < 100; i++)
            {
                try{
                    Thread.sleep(sleepRand.nextInt((500 - 0) + 0));
                }catch (InterruptedException e) {
                    System.out.println("CONSUMER INTERRUPT: " + e);
                }

                buffer.remove();
            }
        }
    }

    public static void main(String[] args)
    {
        Scanner scanner = new Scanner(System.in);
        int sleepTime = scanner.nextInt();
        int numPro = scanner.nextInt();
        int numCon = scanner.nextInt();
        scanner.close();

        System.out.println("Using arguments from command line");
        System.out.println("Sleep time = " + sleepTime);
        System.out.println("Producer threads = " + numPro);
        System.out.println("Consumer threads = " + numCon);
        System.out.println();

        Buffer shared = new BoundedBuffer();

        /* proThread = new Thread(new Producer(shared));
        Thread conThread = new Thread(new Consumer(shared));

        proThread.start();
        conThread.start();*/


        ExecutorService proPool = Executors.newFixedThreadPool(numPro);
        for (int i = 0; i < numPro; i++)
        {
            proPool.submit(new Producer(shared));

        }
        proPool.shutdown();

        ExecutorService conPool = Executors.newFixedThreadPool(numCon);
        for (int i = 0; i < numCon; i++)
        {
            conPool.submit(new Consumer(shared));
        }
        conPool.shutdown();

        try{
            if (!proPool.awaitTermination(20, TimeUnit.SECONDS))
            {
                proPool.shutdownNow();
            }
        }catch (InterruptedException e) {
            System.out.println("TERMINATION ERROR: " + e);
        }

        try{
            if (!conPool.awaitTermination(20, TimeUnit.SECONDS))
            {
                conPool.shutdownNow();
            }
        }catch (InterruptedException e) {
            System.out.println("TERMINATION ERROR: " + e);
        }

        /*for (int i = 0; i < numPro; i++)
        {
            Runnable produce = new Producer();
        }

        for (int i = 0; i < numCon; i++)
        {
            Runnable consume = new Consumer();
        }*/

        //Runnable produce = new Producer();
        //Runnable consume = new Consumer();

        //Thread pro = new Thread(produce, "pro");
        //Thread con = new Thread(consume, "con");
    }

}

现在,我在输入 '20 5 1' 后得到以下输出:

"20 5 1

使用命令行参数

睡眠时间 = 20

生产者线程 = 5

消费者线程 = 1

插入错误:java.lang.InterruptedException

插入错误:java.lang.InterruptedException

插入错误:java.lang.InterruptedException

删除错误:java.lang.InterruptedException

插入错误:java.lang.InterruptedException"

我对造成这种情况的原因有些困惑。我的有界缓冲区需要不同的数据结构吗?

【问题讨论】:

    标签: java multithreading concurrency mutex producer-consumer


    【解决方案1】:

    我能够发现以下错误:

    • int[] 缓冲区从未初始化
    • 对“count”、“in”和“out”使用 volatile 关键字
    • 当你已经在使用锁时不要使用“同步”
    • “full”和“empty”信号量不是必需的,删除它们
    • 由于您插入和删除 100 次,暂停时间长达 500 毫秒,因此在 awaitTermination() 中至少使用 50 秒

    以下版本的程序可以工作

    public static class ProducerConsumer {
    
            interface Buffer {
                void insert(int item);
    
                int remove();
            }
    
            static class BoundedBuffer implements Buffer {
    
                private static final int maxSize = 5;
    
                private final int[] buffer = new int[maxSize];
    
                private final Semaphore mutex;
                private volatile int count;
                private volatile int in;
                private volatile int out;
    
                //private ArrayList<Integer> buffer = new ArrayList<Integer>();
    
                public BoundedBuffer() {
    
                    mutex = new Semaphore(1);
                    count = 0;
                    in = 0;
                    out = 0;
                }
    
                public int remove() {
    
                    int item = 0;
    
                    while (count == 0) {
                    }
    
                    try {
                        mutex.acquire();
                    } catch (InterruptedException e) {
                        System.out.println("REMOVAL ERROR: " + e);
                    }
    
                    count--;
                    item = buffer[out];
                    out = (out + 1) % maxSize;
    
                    System.out.println("Consumer consumed: " + item);
    
                    mutex.release();
    
                    return item;
                }
    
                public void insert(int item) {
    
                    while (count == maxSize) {
                    }
    
                    try {
                        mutex.acquire();
                    } catch (InterruptedException e) {
                        System.out.println("INSERTION ERROR: " + e);
                    }
    
                    count++;
                    buffer[in] = item;
                    in = (in + 1) % maxSize;
    
                    System.out.println("Producer produced " + item);
    
                    mutex.release();
                }
            }
    
            static class Producer implements Runnable {
    
                private Buffer buffer;
    
                public Producer(Buffer b) {
                    buffer = b;
                }
    
                public void run() {
    
                    Random proRand = new Random();
                    Random sleepRand = new Random();
    
                    for (int i = 0; i < 100; i++) {
                        try {
                            Thread.sleep(sleepRand.nextInt(500));
                        } catch (InterruptedException e) {
                            System.out.println("PRODUCER INTERRUPT: " + e);
                        }
    
                        try {
                            buffer.insert(proRand.nextInt((99999 - 10000) + 10000));
                        } catch (Exception e) {
                            System.out.println("Error while inserting " + e);
                        }
                    }
    
                }
            }
    
            static class Consumer implements Runnable {
                private Buffer buffer;
    
                public Consumer(Buffer b) {
                    buffer = b;
                }
    
                public void run() {
    
                    Random sleepRand = new Random();
    
                    for (int i = 0; i < 100; i++) {
                        try {
                            Thread.sleep(sleepRand.nextInt(500));
                        } catch (InterruptedException e) {
                            System.out.println("CONSUMER INTERRUPT: " + e);
                        }
    
                        try {
                            buffer.remove();
                        } catch (Exception e) {
                            System.out.println("Error while removing " + e);
                        }
                    }
                }
            }
    
            public static void main(String[] args) {
    
                int sleepTime = 20;
                int numPro = 5;
                int numCon = 1;
    
                System.out.println("Using arguments from command line");
                System.out.println("Sleep time = " + sleepTime);
                System.out.println("Producer threads = " + numPro);
                System.out.println("Consumer threads = " + numCon);
                System.out.println();
    
                Buffer shared = new BoundedBuffer();
    
            /* proThread = new Thread(new Producer(shared));
            Thread conThread = new Thread(new Consumer(shared));
    
            proThread.start();
            conThread.start();*/
    
                ExecutorService proPool = Executors.newFixedThreadPool(numPro);
                for (int i = 0; i < numPro; i++) {
                    proPool.submit(new Producer(shared));
    
                }
                proPool.shutdown();
    
                ExecutorService conPool = Executors.newFixedThreadPool(numCon);
                for (int i = 0; i < numCon; i++) {
                    conPool.submit(new Consumer(shared));
                }
                conPool.shutdown();
    
                try {
                    if (!proPool.awaitTermination(50, TimeUnit.SECONDS)) {
                        proPool.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    System.out.println("TERMINATION ERROR: " + e);
                }
    
                try {
                    if (!conPool.awaitTermination(50, TimeUnit.SECONDS)) {
                        conPool.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    System.out.println("TERMINATION ERROR: " + e);
                }
            }
    
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-29
      • 1970-01-01
      • 2015-07-03
      • 2015-06-08
      • 2011-03-21
      • 2014-05-13
      • 2017-04-07
      • 1970-01-01
      相关资源
      最近更新 更多