【问题标题】:Handling JMS transactions and redelivery with multi-threaded listeners使用多线程侦听器处理 JMS 事务和重新传递
【发布时间】:2018-07-15 11:43:13
【问题描述】:

我正在使用 JMS 在 Java 1.8 SE 环境中处理消息。消息源自 Oracle 高级队列。因为处理一条消息可能需要一段时间,所以我决定拥有一个由 5 个工作线程(MessageHandler 对象)组成的池,这样就可以一次处理多个线程。我希望有保证的传递,没有重复的消息传递。

我用

queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);

创建 QueueSession。我使用下面的代码来处理传入的消息。基本上,onMessage 产生一个处理消息的线程。

public class JmsQueueListener implements MessageListener
{
    /** A pool of worker threads for handling requests. */
    private final ExecutorService pool;

    OracleJmsQueue queue;

    public void onMessage(Message msg)
    {
        pool.execute(new MessageHandler(msg));
        // can't commit here - the thread may still be processing
    }

    /**
     * This class provides a "worker thread" for processing a message
     * from the queue.
     */
    private class MessageHandler implements Runnable {

        /**
         * The message to process
         */
        Message message;

        /**
         * The constructor stores the passed in message as a field
         */
        MessageHandler(Message message) {
            this.message = message;
        }

        /**
         * Processes the message provided to the constructor by
         * calling the appropriate business logic.
         */
        public void run() {
            QueueSession queueSession = queue.getQueueSession();
            try {
                String result = requestManager.processMessage(message);

                if (result != null) {
                    queueSession.commit();
                }
                else {
                    queueSession.rollback();
                }
            }
            catch (Exception ex) {
                try {
                    queueSession.rollback();
                }
                catch (JMSException e) {
                }
            }
        }
    }   //  class MessageHandler

我的问题是我不知道如何向原始队列指示处理是否已成功完成。我无法在onMessage 的末尾提交,因为线程可能尚未完成处理。我认为我目前拥有commits 和rollbacks 的地方也没有什么好处。例如,如果 5 个工作线程处于不同的完成状态,那么正在提交的队列会话的状态是什么?

我想我一定错过了一些关于如何以多线程方式处理 JMS 的基本概念。任何帮助将不胜感激。

【问题讨论】:

    标签: jms advanced-queuing


    【解决方案1】:

    您正在使用异步消息处理,因此,除非您实施一种方法来确保每个消息处理都按时间顺序完成,否则您最终会遇到旧消息处理在最近的消息处理之后完成的情况。那么为什么要使用消息服务呢?

    解决您的问题的一个简单方法是在 onMessage 方法的末尾使用 commit,并在您的 messageHandler 正文中重新排队消息以防出错。但是,如果重新入队本身失败,则此解决方案可能会出现问题。

    【讨论】:

      猜你喜欢
      • 2015-06-07
      • 2020-12-25
      • 2017-03-16
      • 1970-01-01
      • 2021-10-09
      • 2021-11-16
      • 1970-01-01
      • 2011-04-19
      • 2012-06-12
      相关资源
      最近更新 更多