【发布时间】:2012-11-04 09:36:27
【问题描述】:
我想在Java中实现一个类,它将等待来自不同线程的新数据,当他得到它时,这个类将处理它并再次等待新数据。我想仅使用 synchronized、wait、notifyAll 命令来实现这一点。我尝试了一些变体:
1) 使用一个线程,通过命令 lockObject.wait() 等待。但是当所有活动线程完成工作时,该线程将永远等待。当然,我可以制作方法stopProcess(),但它不安全,因为另一个程序员可能会忘记调用它。
2) 使用一个守护线程,它不会工作,因为当所有活动线程完成它们的工作时,我的守护线程死了,但他可以有一些他必须处理的数据
3)当新数据到来时 - 创建新线程,该线程将处理数据。当线程处于活动状态时(他处理给定的数据),他将接收新数据。当没有数据到来并且所有旧数据都被处理时,线程完成工作。这个变体的减号是 - 当数据通过某个时期(当线程有时间处理旧数据并死亡时),将创建一个新线程。我认为这对性能或/和内存不利。我对吗?
是否可以只使用一个或两个(可能结合使用守护进程和活动线程)线程而不使用 stopProcess() 方法来解决我的问题??
这里有一些代码
我对阻塞队列的认识
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
public void add(T el){
synchronized (queue){
queue.add(el);
}
}
public T getFirst(){
synchronized (queue){
return queue.poll();
}
}
public int getSize(){
synchronized (queue){
return queue.size();
}
}
}
数据类
public class Data {
//some data
public void process(){
//process this data
}
}
代码的第一个变体
public class ProcessData {
private BlockingQueue<Data> queue = new BlockingQueue<Data>();
private boolean run = false;
private Thread processThread;
private Object lock = new Object();
public synchronized void addData(Data data) throws Exception {
if (run){
if (data != null){
queue.add(data);
wakeUpToProcess();
}
}else{
throw new Exception("");
}
}
public synchronized void start() {
if (!run){
run = true;
processThread = new Thread(new Runnable() {
public void run() {
while (run || queue.getSize()!=0){
while(queue.getSize() == 0 && run){
//if stopProcess was not called
//and no active threads
//it will not die
waitForNewData();
}
Data cur;
while(queue.getSize() > 0){
cur = queue.getFirst();
cur.process();
}
}
}
});
processThread.start();
}
}
public synchronized void stopProcess() {
if (run){
run = false;
wakeUpToProcess();
}
}
private void waitForNewData(){
try{
synchronized (lock){
lock.wait();
}
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
private void wakeUpToProcess(){
synchronized (lock){
lock.notifyAll();
}
}
}
在第二个变体中,我将 processThread 作为守护进程。但是当活动线程死亡时,processThread 完成工作,但队列中有一些数据,我必须处理。
第三种变体
public class ProcessData {
private BlockingQueue<Data> queue = new BlockingQueue<Data>();
private boolean run = false;
private Thread processThread = null;
public synchronized void addData(Data data) throws Exception {
if (run){
if (data != null){
queue.add(data);
wakeExecutor();
}
}else{
throw new Exception("ProcessData is stopped!");
}
}
public synchronized void start() {
if (!run){
run = true;
}
}
public synchronized void stopProcess() {
if (run){
run = false;
}
}
public boolean isRunning(){
return this.run;
}
protected void wakeExecutor(){
if (processThread ==null || !processThread.isAlive()){
processThread = new Thread(new Runnable() {
@Override
public void run() {
Data cur;
while(queue.getSize() > 0){
cur = queue.getFirst();
cur.process();
}
}
});
processThread.start();
}
}
}
重要的是,数据必须按照来自线程的顺序进行处理。
【问题讨论】:
-
请把你尝试过的写在代码里。
-
为每个请求生成一个新线程的守护进程有什么问题? (有点像网络服务器......)
-
@bdares 数据的处理顺序很重要。必须处理第一个数据,当处理完成时 - 可以处理第二个数据。等等……
-
这是一道作业题吗?您不能只使用
Executors.newSingleThreadExecutor有什么特别的原因吗? -
@IanRoberts 是的,这就像作业问题=)我不能使用并发包=(
标签: java multithreading wait synchronized