【发布时间】:2019-08-16 10:13:50
【问题描述】:
我正在使用ExecutorService 和BlockingQueue 解决使用生产者-消费者问题的事务轮询案例。我有一个交易清单,我想对其进行验证并采取行动。我得到一个新的交易来不断地验证。
考虑到BlockingQueue,我只有一个生产者,我想保持3-5个并行消费者来加速验证。
我可能需要等待几笔交易才能完成。 (说 30 秒)。 所以,我会验证,如果它是假的并且时间大于 30 秒,我会放弃它。基本上,我只想在数据项可消耗时才消耗。
首先,这种方法好吗?或者我应该尝试一些其他解决方案(目前我还不知道)
这是我改编自this question的代码:
import java.util.concurrent.*;
public class ProducerConsumerWithES {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue,1));
pes.submit(new Producer(sharedQueue,2));
ces.submit(new Consumer(sharedQueue,1));
ces.submit(new Consumer(sharedQueue,2));
// shutdown should happen somewhere along with awaitTermination
/* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
pes.shutdown();
ces.shutdown();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
我知道如果需要,我可以先发送peek(),然后再发送remove()。
但是当我尝试这样做时,所有其他消费者都被困在同一笔交易中。并且从未参与过其他产生的交易。
这是因为存储正在排队 (FIFO)。
当我删除元素时,这种情况永远不会发生,而是进行验证而不是偷看,因为其他消费者可以访问剩余的元素。
我的问题是,在消费者端进行peek() 后跟remove() 或put() 可以吗?
【问题讨论】:
标签: java producer-consumer blockingqueue