先上书上伪代码:
用java代码实现如下:
生产者(Producer)
import java.util.Random;
public class Producer implements Runnable{
private String producerName;
private Buffer buffer;
private Semaphore semaphore;
public Producer(String producerName, Buffer buffer, Semaphore semaphore){
this.producerName = producerName;
this.buffer = buffer;
this.semaphore = semaphore;
}
@Override
public void run() {
int time;
Random random = new Random();
while (true){
System.out.println(producerName+"生产了一个产品");
Item item = new Item(producerName+"的产品");
time = random.nextInt(5000)+5000;
try {
semaphore.wait("empty");
semaphore.wait("mutex");
System.out.println(producerName+"放进一个产品");
buffer.pushItem(item);
semaphore.signal("mutex");
semaphore.signal("full");
System.out.println("让"+producerName+"睡"+time+"ms");
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
生产者(Consumer)
import java.util.Random;
public class Consumer implements Runnable{
private String consumerName;
private Buffer buffer;
private Semaphore semaphore;
public Consumer(String consumerName, Buffer buffer, Semaphore semaphore){
this.consumerName = consumerName;
this.buffer = buffer;
this.semaphore = semaphore;
}
@Override
public void run() {
int time;
Random random = new Random();
while (true){
time = random.nextInt(5000)+5000;
try {
semaphore.wait("full");
semaphore.wait("mutex");
System.out.println(consumerName+"拿走一个产品");
buffer.popItem();
semaphore.signal("mutex");
semaphore.signal("empty");
System.out.println("让"+consumerName+"睡"+time+"ms");
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
缓冲区(Buffer)
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class Buffer {
private int index;
private int maxSize;
private List<Item> items;
public Buffer(int maxSize) {
this.index = 0;
this.maxSize = maxSize;
items = new ArrayList<Item>();
}
public synchronized void pushItem(Item item) {
if (index==maxSize){
System.out.println("产品已满,程序错误");
return;
}
items.add(index, item);
index++;
System.out.println("缓冲区进入一个产品,还有"+items.size()+"个");
}
public synchronized void popItem(){
if(index==0){
System.out.println("没有产品无法取出,程序错误");
return;
}
index--;
items.remove(index);
System.out.println("缓冲区减少一个产品,还有"+items.size()+"个");
}
public synchronized int getCount(){
return items.size();
}
}
产品(Item)
public class Item {
private String itemName;
public Item(String itemName){
this.itemName = itemName;
}
}
信号量(Semaphore)
public class Semaphore {
private int mutex;
private int empty;
private int full;
private Buffer buffer;
public Semaphore(Buffer buffer, int empty, int mutex) {
this.buffer = buffer;
this.empty = empty;
this.mutex = mutex;
this.full = 0;
}
public synchronized void wait(String type) throws InterruptedException {
if ("empty".equals(type)) {
empty--;
if (empty < 0) {
this.wait();
}
} else if ("full".equals(type)) {
full--;
if (full < 0) {
this.wait();
}
} else if ("mutex".equals(type)) {
mutex--;
if (mutex < 0) {
this.wait();
}
}
}
public synchronized void signal(String type) throws InterruptedException {
if ("empty".equals(type)) {
empty++;
if (empty <= 0) {
this.notifyAll();
}
} else if ("full".equals(type)) {
full++;
if (full <= 0) {
this.notifyAll();
}
} else if ("mutex".equals(type)) {
mutex++;
if (mutex <= 0) {
this.notifyAll();
}
}
}
}
启动类(Start)
public class Start {
public static void main(String args[]){
int maxsize = 10;
Buffer buffer = new Buffer(maxsize);
Semaphore semaphore = new Semaphore(buffer, maxsize, 1);
Producer[] producers = new Producer[5];
Consumer[] consumers = new Consumer[5];
for (int i = 0; i < producers.length; i++) {
producers[i] = new Producer("生产者" + i + "号", buffer, semaphore);
consumers[i] = new Consumer("消费者" + i + "号", buffer, semaphore);
new Thread(producers[i]).start();
new Thread(consumers[i]).start();
}
}
}
运行结果: