【问题标题】:Producer consumer variant java BlockingQueues生产者消费者变体 java BlockingQueues
【发布时间】:2012-12-12 05:41:36
【问题描述】:

我正在研究 Java 中生产者消费者问题的变体。我有一个生产者线程创建对象,这些对象被放入优先阻塞队列,然后传递到主容器控制器,它是一个有界缓冲区。

队列的原因是当主容器有一定百分比的对象 A 时,它只会接受类型 B 的对象,以及我们被要求查看的其他一些场景。 我无法弄清楚代码出了什么问题,调试器只是从 InQueue 中的 in.offer 和 Producer 中的 in.push 跳转。任何方向或建议将不胜感激。

    import java.util.concurrent.PriorityBlockingQueue;

        public class InQueue implements Runnable {

        Controller c;
        private PriorityBlockingQueue in;

        public InQueue(Controller c) {
            this.c = c;
            in = new PriorityBlockingQueue();
        }

        public void push(C c) {

            in.offer(c);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        public void run() {
            while (true) {
                try {
                    C temp = (C) in.take(); //will block if empty
                    c.arrive(temp);
                } catch (InterruptedException e) {} // TODO
            }
        }
    }

public class Controller {

    private BoundedBuffer buffer;
    private int used;


    Controller(int capacity) {
        this.buffer = new BoundedBuffer(capacity);
        used = 0;
    }


    public void arrive(C c) {
        try {
            buffer.put(c);
            used++;
        } catch (InterruptedException e) { } //TODO
    }

    public C depart() {
        C temp = null; //BAD IDEA?
        try {
            temp = (C)buffer.take();
            used--;
        } catch (InterruptedException e) { } //TODO
        return temp; //could be null
    }
}

【问题讨论】:

    标签: java multithreading concurrency queue


    【解决方案1】:

    由于使用泛型的方式错误,您的代码无法编译。另一件事是BoundedBuffer没有默认实现。下面我为生产者-消费者问题做了一个工作实现,下面是阻塞队列。看看并纠正你的错误。

    package concurrency;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Producer<T> {
        private final BlockingQueue<T> queue;
        private final Consumer consumer;
        private static volatile boolean isShutdown;
        private final static Object lock = new Object();
    
        public Producer() {
            this.queue = new LinkedBlockingQueue<T>();
            this.consumer = new Consumer();
        }
    
        public void start() {
            consumer.start();
        }
    
        public void stop() {
            synchronized (lock) {
                isShutdown = true;
            }
            consumer.interrupt();
        }
    
        public void put(T obj) throws InterruptedException {
            synchronized (lock) {
                if (isShutdown)
                    throw new IllegalStateException("Consumer Thread is not active");
            }
            queue.put(obj);
        }
    
        private class Consumer extends Thread {
    
            public void run() {
                while (true) {
                    synchronized (lock) {
                        if (isShutdown)
                            break;
                    }
    
                    T t = takeItem();
                    // do something with 't'
                    if(t!=null)
                    printItem(t);
                }
            }
    
            private void printItem(T t) {
                System.out.println(t);
            }
    
            private T takeItem() {
                try {
                    return queue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return null;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            Producer<Integer> producer = new Producer<Integer>();
            producer.start();
            for (int i = 0; i <20; i++) {
                producer.put(i);
                if (i >= 7)
                    Thread.sleep(500);
            }
            producer.stop();
        }
    }
    

    【讨论】:

    • 不,伙计,我能做到,你列出的。我承认,泛型确实需要整理,但我认为这不是问题所在。另外,我自己编码了有界缓冲区。设置是生产者 -> BlockingQueue -> BoundedByffer -> OutBlockingQueue -> 消费者。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-24
    • 2012-04-30
    相关资源
    最近更新 更多