Java 多线程实战案例之消费者,生产者

0. 背景

学习多线程编程时,肯定会接触到生产者,消费者 这一案例,操作系统中会有银行家就餐排队看电影等问题,但是这些案例如何在java中展示呢?这里笔者使用生产者,消费者 来演示基本的多线程案例。

1. 基本思想

这里先实现一个简单的多线程案例—— 一个生产者,一个消费者。实现生成者生成数据,消费者消费数据。

2. 代码

2.1 单生产者 + 单消费者

因为消费者和生成者均不止一个,所以抽象出其消费、生产类,分别如下:

  • Consume
package multi.thread.consume;

import multi.thread.result.ResultList;

/**
 * 01.代表抽象消费者
 */
public abstract class Consumer {
    private String name ;//the name of Consumer
    private ResultList resultList;// the shared resultList to read/write

    public Consumer(String name, ResultList resultList) {
        this.name = name;
        this.resultList = resultList;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ResultList getResultList() {
        return resultList;
    }

    public void setResultList(ResultList resultList) {
        this.resultList = resultList;
    }

    public void comsum(){}
}
  • Producer
package multi.thread.produce;


import multi.thread.result.ResultList;

/**
 * 01.抽象类,代表生产者
 */
public abstract class Producer {
    private String name;//the name of producer
    private ResultList resultList;// the share resultList to write

    public Producer(String name, ResultList resultList) {
        this.name = name;
        this.resultList = resultList;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ResultList getResultList() {
        return resultList;
    }

    public void setResultList(ResultList resultList) {
        this.resultList = resultList;
    }

    //生产方法
    public void produce(){
    }
}
  • ConsumerOne
    这个类是具体的消费者类。用于消费ResultList类实例的 List 中存放值。
package multi.thread.consume;

import multi.thread.result.ResultList;

/**
 * 01.factual consumer
 */
public class ConsumerOne extends Consumer {
    public ConsumerOne(String name, ResultList resultList) {
        super(name, resultList);
    }

    @Override
    public void comsum() {
        while(true) {
            synchronized (this.getResultList()) {//lock the list
                if (this.getResultList().getLists().size() == 0) {
                    try {
                        System.out.println("consumer wait...");
                        this.getResultList().wait();
                        System.out.println("consumer is waking...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //if have else,consume
                // the size is more than index [size = peek + 1]
                int index = this.getResultList().getLists().size() - 1;
                System.out.println("start consume:" + this.getResultList().getLists().get(index));
                System.out.println("   before consume: " + this.getResultList().getLists().size());
                this.getResultList().getLists().remove(index);// remove one element
                System.out.println("   after consume: " + this.getResultList().getLists().size());
                this.getResultList().notify();//notify the producer
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 01. wait 和 notify的顺序写反了,导致出现错误。
 **/
  • ProducerOne
    这个类是具体的生产者类。用于往ResultList类实例的 List 中存放值。
package multi.thread.produce;

import multi.thread.produce.Producer;
import multi.thread.result.ResultList;

public class ProducerOne extends Producer {

    public ProducerOne(String name, ResultList resultList) {
        super(name, resultList);
    }

    @Override
    public void produce() {
        while(true) {
            synchronized (this.getResultList()) {//the lock of result
                //System.out.println("produce's size: " + this.getResultList().getLists().size());
                if (this.getResultList().getLists().size() > 0) {
                    try {
                        System.out.println("producer wait...");
                        this.getResultList().wait();//wait the consumer to consume
                        System.out.println("producer is waking...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                if (this.getResultList().getLists().size() == 0) {
                    System.out.println("start produce...");
                    System.out.println("   before produce: "+this.getResultList().getLists().size());
                    this.getResultList().getLists().add("1");// add the string to resultList
                    System.out.println("   after produce: "+this.getResultList().getLists().size());
                    this.getResultList().notify();//notify the cosumer
                }
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • ConsumeThread
package multi.thread.threadpool;

import multi.thread.consume.Consumer;

public class ConsumeThread implements Runnable{
    private Consumer consumer ;

    public ConsumeThread(Consumer consumer){
        this.consumer = consumer;
    }

    public void run() {
        consumer.comsum();
    }
}
  • ProduceThread
package multi.thread.threadpool;

import multi.thread.produce.Producer;

public class ProduceThread implements Runnable {
    private Producer producer;

    public ProduceThread(Producer producer) {
        this.producer = producer;
    }
    public void run() {
        producer.produce();
    }
}

3. 执行结果

Java 多线程实战案例之消费者,生产者

2.3 主要问题

当然,并不是写完代码就可以跑成功的,在本案例中需要注意如下几个点:

  • ConsumeThread类实现Runnable接口之后,并不能直接使用其实例运行run方法。而是需要将这个实现了Runnable的实例作为Thread的构造参数,然后运行run方法。主要如下图所示:
    Java 多线程实战案例之消费者,生产者
  • 因为要模拟一个不停的消费、生产过程,所以需要使用到while(true)循环,但是这个while(true)循环的使用是有很大讲究的,我们希望这个生产者线程不停地生产;这个消费者线程不停的消耗。但是我之前的写法却是下面这个样子:
    Java 多线程实战案例之消费者,生产者

这么写的结果就会导致IllegalThreadStateException,原因就是本来就处于running的线程,再次被执行start方法。
Java 多线程实战案例之消费者,生产者

  • 同样,还有的问题是,执行wait(), notify()方法时,需要注意保证其在持有对象锁的时候执行。如下标注:
    Java 多线程实战案例之消费者,生产者
  • 同时,还有一个问题是,执行wait()方法虽然是当前运行这个代码的线程,但是调用wait()的对象是锁对象。而不是一个单wait()方法。即如下的写法将会出错:

Java 多线程实战案例之消费者,生产者

  • 同时,需要注意使用wait,notify方法的顺序,否则不会得到正确的结果

4. 其它

完整项目可见我的 github ,如果有帮助,别忘了star 哦。

相关文章: