【问题标题】:Can my consumer become producer (apparently) for few cases我的消费者能否在少数情况下(显然)成为生产者
【发布时间】:2019-08-16 10:13:50
【问题描述】:

我正在使用ExecutorServiceBlockingQueue 解决使用生产者-消费者问题的事务轮询案例。我有一个交易清单,我想对其进行验证并采取行动。我得到一个新的交易来不断地验证。 考虑到BlockingQueue,我只有一个生产者,我想保持3-5个并行消费者来加速验证。

我可能需要等待几笔交易才能完成。 (说 30 秒)。 所以,我会验证,如果它是假的并且时间大于 30 秒,我会放弃它。基本上,我只想在数据项可消耗时才消耗。

首先,这种方法好吗?或者我应该尝试一些其他解决方案(目前我还不知道)

这是我改编自this question的代码:

import java.util.concurrent.*;

public class ProducerConsumerWithES {

    public static void main(String args[]){

        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);

        pes.submit(new Producer(sharedQueue,1));
        pes.submit(new Producer(sharedQueue,2));

        ces.submit(new Consumer(sharedQueue,1));
        ces.submit(new Consumer(sharedQueue,2));
        // shutdown should happen somewhere along with awaitTermination
        /* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
        pes.shutdown();
        ces.shutdown();
    }
}
class Producer implements Runnable {

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;

    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;

    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {

        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

我知道如果需要,我可以先发送peek(),然后再发送remove()。 但是当我尝试这样做时,所有其他消费者都被困在同一笔交易中。并且从未参与过其他产生的交易。 这是因为存储正在排队 (FIFO)。

当我删除元素时,这种情况永远不会发生,而是进行验证而不是偷看,因为其他消费者可以访问剩余的元素。

我的问题是,在消费者端进行peek() 后跟remove()put() 可以吗?

【问题讨论】:

    标签: java producer-consumer blockingqueue


    【解决方案1】:

    不,peek() + remove() 是不安全的。所有消费者都可以在相同的值上peek,但删除不同的元素。要么取回元素并将元素放回(在队列末尾),要么更好地 - 更早地过滤元素并将它们放入单独的队列中。

    【讨论】:

    • 酷!谢谢。我的另一个选择是take()put()。我没有看到任何其他问题,但“消费者在少数情况下(显然)成为生产者”的想法让我担心。没事吧?
    猜你喜欢
    • 1970-01-01
    • 2013-03-25
    • 2011-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-23
    • 1970-01-01
    相关资源
    最近更新 更多