【问题标题】:Creating multithreading java class to process data创建多线程java类来处理数据
【发布时间】:2012-11-04 09:36:27
【问题描述】:

我想在Java中实现一个类,它将等待来自不同线程的新数据,当他得到它时,这个类将处理它并再次等待新数据。我想仅使用 synchronized、wait、notifyAll 命令来实现这一点。我尝试了一些变体:

1) 使用一个线程,通过命令 lockObject.wait() 等待。但是当所有活动线程完成工作时,该线程将永远等待。当然,我可以制作方法stopProcess(),但它不安全,因为另一个程序员可能会忘记调用它。

2) 使用一个守护线程,它不会工作,因为当所有活动线程完成它们的工作时,我的守护线程死了,但他可以有一些他必须处理的数据

3)当新数据到来时 - 创建新线程,该线程将处理数据。当线程处于活动状态时(他处理给定的数据),他将接收新数据。当没有数据到来并且所有旧数据都被处理时,线程完成工作。这个变体的减号是 - 当数据通过某个时期(当线程有时间处理旧数据并死亡时),将创建一个新线程。我认为这对性能或/和内存不利。我对吗?

是否可以只使用一个或两个(可能结合使用守护进程和活动线程)线程而不使用 stopProcess() 方法来解决我的问题??

这里有一些代码

我对阻塞队列的认识

public class BlockingQueue<T> {
    private Queue<T> queue = new LinkedList<T>();

    public void add(T el){
        synchronized (queue){
            queue.add(el);
        }
    }

    public T getFirst(){
        synchronized (queue){
            return queue.poll();
        }
    }

    public int getSize(){
        synchronized (queue){
            return queue.size();
        }
    }
}

数据类

public class Data {
    //some data

    public void process(){
        //process this data
    }
} 

代码的第一个变体

public class ProcessData {

    private BlockingQueue<Data> queue = new BlockingQueue<Data>();
    private boolean run = false;
    private Thread processThread;
    private Object lock = new Object();

    public synchronized void addData(Data data) throws Exception {
        if (run){
            if (data != null){
                queue.add(data);
                wakeUpToProcess();
            }
        }else{
            throw new Exception("");
        }
    }

    public synchronized void start() {
        if (!run){
            run = true;

            processThread = new Thread(new Runnable() {
                public void run() {

                    while (run || queue.getSize()!=0){

                        while(queue.getSize() == 0 && run){
                            //if stopProcess was not called
                            //and no active threads
                            //it will not die
                            waitForNewData();
                        }

                        Data cur;
                        while(queue.getSize() > 0){
                            cur = queue.getFirst();
                            cur.process();
                        }

                    }
                }
            });
            processThread.start();

        }
    }

    public synchronized void stopProcess() {
        if (run){
            run = false;
            wakeUpToProcess();
        }
    }

    private void waitForNewData(){
        try{
            synchronized (lock){
                lock.wait();
            }
        }catch (InterruptedException ex){
            ex.printStackTrace();
        }
    }

    private void wakeUpToProcess(){
        synchronized (lock){
            lock.notifyAll();
        }
    }
}

在第二个变体中,我将 processThread 作为守护进程。但是当活动线程死亡时,processThread 完成工作,但队列中有一些数据,我必须处理。

第三种变体

public class ProcessData {

    private BlockingQueue<Data> queue = new BlockingQueue<Data>();
    private boolean run = false;
    private Thread processThread = null;

    public synchronized void addData(Data data) throws Exception {
        if (run){
            if (data != null){
                queue.add(data);
                wakeExecutor();
            }
        }else{
            throw new Exception("ProcessData is stopped!");
        }
    }

    public synchronized void start() {
        if (!run){
            run = true;
        }
    }

    public synchronized void stopProcess() {
        if (run){
            run = false;
        }
    }

    public boolean isRunning(){
        return this.run;
    }

    protected void wakeExecutor(){
        if (processThread ==null || !processThread.isAlive()){
            processThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    Data cur;
                    while(queue.getSize() > 0){
                        cur = queue.getFirst();
                        cur.process();
                    }
                }
            });
            processThread.start();
        }
    }
}

重要的是,数据必须按照来自线程的顺序进行处理。

【问题讨论】:

  • 请把你尝试过的写在代码里。
  • 为每个请求生成一个新线程的守护进程有什么问题? (有点像网络服务器......)
  • @bdares 数据的处理顺序很重要。必须处理第一个数据,当处理完成时 - 可以处理第二个数据。等等……
  • 这是一道作业题吗?您不能只使用Executors.newSingleThreadExecutor 有什么特别的原因吗?
  • @IanRoberts 是的,这就像作业问题=)我不能使用并发包=(

标签: java multithreading wait synchronized


【解决方案1】:

您正在认真地重新发明轮子。您想要的一切都在 JDK 中的java.util.concurrent package 中提供。

通过BlockingQueue 实现producer-consumer pattern,您的生产者调用offer(),您的消费者线程调用take(),这会阻塞直到有东西可用。

就是这样。您不需要,也不应该编写您编写的所有这些课程。这些并发类为您完成了所有的锁定和同步,并且正确(这点不容小觑)

【讨论】:

  • 我知道,并发包解决了我所有的问题,但我不能使用它。这是我的作业问题
  • 我的第一个变体是否实现了生产者-消费者模式?(ProcessData 类作为消费者类)
【解决方案2】:

如果不允许您使用来自 java.util.concurrent 的任何内容,那么您将不得不基于类似 LinkedList 的内容实现自己的任务队列。我会将阻塞行为封装在队列中,例如(伪代码)

synchronized Data nextTask() {
  while(the linked list is empty) {
    wait()
  }
  remove and return head of the queue
}

synchronized void addTask(Data d) {
  add d to the queue
  notifyAll()
}

然后你可以有一个消费者线程不断地做这样的事情

while(true) {
  taskQueue.nextTask().process()
}

生产者线程调用taskQueue.addTask 将每个任务添加到队列中。如果您需要在最后正常关闭,那么您要么需要一些“哨兵值”来告诉消费者线程完成,要么找到某种方式在正确的时间调用Thread.interrupt()

【讨论】:

    猜你喜欢
    • 2017-05-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-12
    • 1970-01-01
    • 2021-10-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多