【问题标题】:JAVA: Execution Order between Callables submitted to ExecutorService and Threads Outside of the ExecutorServiceJAVA:提交给ExecutorService的Callables和ExecutorService之外的线程之间的执行顺序
【发布时间】:2014-09-18 23:27:10
【问题描述】:

我正在构建的简化版本具有以下内容:

  1. 一个线程池(一个 ExecutorService),包含线程,每个线程
    • 向服务器发送请求消息
    • 等待通过持续检查共享数据结构完成的请求
  2. 在这个线程池之外,我有一个 MessageListener 线程
    • 监听服务器。
    • 一旦请求被满足,它将在共享数据结构中引发一个“标志”
    • 前面提到的池中的线程会注意到这一点。并因此继续自我完善。

我发现 MessageListener 线程仅在 ExecutorService 终止后执行,而不是与其同时运行,即 ExecutorService 中的线程阻止 MessageListener 线程运行。这显然不符合我的目的。

我想知道是否有人可以向我提供一些指示,说明我的结论是否有效,如果有效,为什么?什么是绕过它的好方法。

我在下面粘贴了一些半伪代码以进一步解释。

public static final int NUM_TASK = 10;
public static final int TIME_OUT = 5000;
public static boolean[] shared_data = new boolean[NUM_TASK];

public static void main(String[] args) throws InterruptedException{

    // CREATE the receiver which will be set as an LISTENER for a JMS consumer queue that
    //  later the tasks in the Executor Service will send request to
    setupJMSProducerConsumer();

    // CREATE the Executor Service (pool)
    ExecutorService fixThreadPoolES = Executors.newFixedThreadPool(10);
    List<Future<String>> futures = new ArrayList<Future<String>>();

    for(int i = 0; i < NUM_TASK; i++){

            // Submit a Callable task to Replay the file
            MyTask task = new MyTask(i);                        
            futures.add(fixThreadPoolES.submit(task));
    }

    // GATHER the result here, based on futures
    // gatherResult(futures);

    // TERMINATE the ExecutorService      
    fixThreadPoolES.shutdown();
    fixThreadPoolES.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

private static class MyTask implements Callable<String> {

    private static final int WAIT_TIME_SLOT = 1000;
    int _idx = -1;

    public MyTask (int idx){
        _idx = idx;
    }

    @Override
    public String call() throws Exception {
        return run();
    }       

    private String run() throws InterruptedException{

        // SEND request
        // sendRequest(_idx);

        // WAIT until the corresponding result in the shared data is set to 'true' by the corresponding LISTENER 
        int timeWaited = 0;
        while (getSharedData(_idx) != true){
            timeWaited += WAIT_TIME_SLOT;
            Thread.sleep(WAIT_TIME_SLOT);
            if (timeWaited > TIME_OUT){
                return "Listener timed out, flag not raised, task incompleted for: " + _idx;
            }
        }

        return "Listener raised the flag, task completed for: " + _idx;                     
    }
}

public static class MyMsgReceiver implements MessageListener{

    @Override
    public void onMessage(Message msg) {
        String msgTxt = null;
        try {
            msgTxt = ((javax.jms.TextMessage) msg).getText();
        } catch (JMSException e) {
            e.printStackTrace();
            return;
        }

        for (int i = 0; i < NUM_TASK; i++){
            if (msgTxt.contains(String.valueOf(i))){
                setSharedData(i, true);;
            };
        }
    }           
}

private static void setupJMSProducerConsumer(){
    com.tibco.tibjms.TibjmsConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(TIBCO_URL);     
    Connection connection = factory.createConnection(TIBCO_USER, TIBCO_PASS);
    Session session = connection.createSession(false, 24);
    MessageConsumer consumer = session.creatConsumer(session.createQueue(QUEUE_NAME));

    MyMsgReceiver receiver = new MyMsgReceiver();
    consumer.setMessageListener(receiver);
}

public static synchronized boolean getSharedData(int idx) {
    return shared_data[idx];
}

public static synchronized void setSharedData(int idx, boolean val) {
    shared_data[idx] = val;
}

【问题讨论】:

  • 为什么?未来已经存在。你在这里重新发明了几个轮子。这里根本没有证据表明 Executor 之外还有第二个线程。执行程序可以阻止该线程是不正确的。不完整。
  • 嗨,@EJP,很抱歉我可能没有在这个简化的伪代码 sn-p 中说清楚。在 ExecutorService 的“外部”存在线程的方式是通过 MessageListener 在服务器上的 JMS 队列上“侦听”。和“// sendRequest(_idx);”在 ExecutorService 的“内部”线程的 run() 中,旨在发送请求以触发服务器将新消息放入该 JMS 队列,因此触发该侦听器(外部线程)上的 onMessage()。而且我不想在这里重新实现未来。如果看起来是这样,请道歉。

标签: java multithreading jms threadpool java.util.concurrent


【解决方案1】:

我发现您的代码中有两处错误:

1) 您没有同步对shared_data 的访问,因此您没有对数组执行原子读写,这将导致不可预测的结果。对共享数组的任何访问都应该在 synchronized(shared_data) { } 块内,或者更方便地使用同步的 getSharedData(int i)setSharedData(int i, boolean value) 方法

2) MyTask.run() 中的等待循环也不完全正确。它的实现方式,如果布尔标志不为真,那么任务将系统地等待直到超时到期,然后报告任务完成,而实际上它并不知道任务已经完成。 相反,您应该这样做:

long start = System.currentTimeMillis();
long elapsed = 0L;
boolean flag = false;
while (!(flag = getSharedData(_idx)) &&
  ((elapsed = System.currentTimeMillis() - start) < TIME_OUT)) {
  Thread.sleep(1L);
}
if (flag) {
  // handle completion
} else {
  // handle timeout expired
}

【讨论】:

  • 1.非常感谢你的评论 2。你在这两个方面都是对的!我相应地修改了伪代码。 3. 但是,我不得不说,这里的主要问题是是否应该阻塞 ExecutorService 外部的侦听器线程,直到 ExecutorService 内部的所有线程都像我所经历的那样完成,如果是,为什么以及如何解决它。代码 sn-p 只是为了澄清我要问的问题。对于其中的不准确之处,我深表歉意。
猜你喜欢
  • 2011-04-25
  • 1970-01-01
  • 1970-01-01
  • 2017-12-01
  • 1970-01-01
  • 2015-11-09
  • 1970-01-01
  • 2020-07-14
相关资源
最近更新 更多