【发布时间】:2014-09-18 23:27:10
【问题描述】:
我正在构建的简化版本具有以下内容:
- 一个线程池(一个 ExecutorService),包含线程,每个线程
- 向服务器发送请求消息
- 等待通过持续检查共享数据结构完成的请求
- 在这个线程池之外,我有一个 MessageListener 线程
- 监听服务器。
- 一旦请求被满足,它将在共享数据结构中引发一个“标志”
- 前面提到的池中的线程会注意到这一点。并因此继续自我完善。
我发现 MessageListener 线程仅在 ExecutorService 终止后执行,而不是与其同时运行,即 ExecutorService 中的线程阻止 MessageListener 线程运行。这显然不符合我的目的。
我想知道是否有人可以向我提供一些指示,说明我的结论是否有效,如果有效,为什么?什么是绕过它的好方法。
我在下面粘贴了一些半伪代码以进一步解释。
public static final int NUM_TASK = 10;
public static final int TIME_OUT = 5000;
public static boolean[] shared_data = new boolean[NUM_TASK];
public static void main(String[] args) throws InterruptedException{
// CREATE the receiver which will be set as an LISTENER for a JMS consumer queue that
// later the tasks in the Executor Service will send request to
setupJMSProducerConsumer();
// CREATE the Executor Service (pool)
ExecutorService fixThreadPoolES = Executors.newFixedThreadPool(10);
List<Future<String>> futures = new ArrayList<Future<String>>();
for(int i = 0; i < NUM_TASK; i++){
// Submit a Callable task to Replay the file
MyTask task = new MyTask(i);
futures.add(fixThreadPoolES.submit(task));
}
// GATHER the result here, based on futures
// gatherResult(futures);
// TERMINATE the ExecutorService
fixThreadPoolES.shutdown();
fixThreadPoolES.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
private static class MyTask implements Callable<String> {
private static final int WAIT_TIME_SLOT = 1000;
int _idx = -1;
public MyTask (int idx){
_idx = idx;
}
@Override
public String call() throws Exception {
return run();
}
private String run() throws InterruptedException{
// SEND request
// sendRequest(_idx);
// WAIT until the corresponding result in the shared data is set to 'true' by the corresponding LISTENER
int timeWaited = 0;
while (getSharedData(_idx) != true){
timeWaited += WAIT_TIME_SLOT;
Thread.sleep(WAIT_TIME_SLOT);
if (timeWaited > TIME_OUT){
return "Listener timed out, flag not raised, task incompleted for: " + _idx;
}
}
return "Listener raised the flag, task completed for: " + _idx;
}
}
public static class MyMsgReceiver implements MessageListener{
@Override
public void onMessage(Message msg) {
String msgTxt = null;
try {
msgTxt = ((javax.jms.TextMessage) msg).getText();
} catch (JMSException e) {
e.printStackTrace();
return;
}
for (int i = 0; i < NUM_TASK; i++){
if (msgTxt.contains(String.valueOf(i))){
setSharedData(i, true);;
};
}
}
}
private static void setupJMSProducerConsumer(){
com.tibco.tibjms.TibjmsConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(TIBCO_URL);
Connection connection = factory.createConnection(TIBCO_USER, TIBCO_PASS);
Session session = connection.createSession(false, 24);
MessageConsumer consumer = session.creatConsumer(session.createQueue(QUEUE_NAME));
MyMsgReceiver receiver = new MyMsgReceiver();
consumer.setMessageListener(receiver);
}
public static synchronized boolean getSharedData(int idx) {
return shared_data[idx];
}
public static synchronized void setSharedData(int idx, boolean val) {
shared_data[idx] = val;
}
【问题讨论】:
-
为什么?未来已经存在。你在这里重新发明了几个轮子。这里根本没有证据表明 Executor 之外还有第二个线程。执行程序可以阻止该线程是不正确的。不完整。
-
嗨,@EJP,很抱歉我可能没有在这个简化的伪代码 sn-p 中说清楚。在 ExecutorService 的“外部”存在线程的方式是通过 MessageListener 在服务器上的 JMS 队列上“侦听”。和“// sendRequest(_idx);”在 ExecutorService 的“内部”线程的 run() 中,旨在发送请求以触发服务器将新消息放入该 JMS 队列,因此触发该侦听器(外部线程)上的 onMessage()。而且我不想在这里重新实现未来。如果看起来是这样,请道歉。
标签: java multithreading jms threadpool java.util.concurrent