消息监听器容器是一个用于查看JMS目标等待消息到达的特殊的bean,一旦消息到达它就可以获取到消息,并通过调用onMessage方法将消息传递一个MessageListener实现。Spring中消息监听器容器的类型如下:

  ❤ SimpleMessageListenerContainer:最简单的消息监听器容器,只能处理固定数量的JMS会话,且不支持事务。

  ❤ DefaultMessageListenerContainer:这个消息监听器容器建立在SimpleMessageListenerContainer容器之上,添加了对事物的支持。

  ❤ serversession.ServerSessionMessage.ListenerContainer:这是功能最强大的消息监听器,与DefaultMessageListenerContainer相同,它支持事务,但是它还允许动态地管理JMS会话。

下面以DefaultMessageListenerContainer为例进行分析,看看消息监听器容器的实现。使用消息监听器容器时一定要将自定义的消息监听器置于到容器中,这样才可以在收到消息时,容器把消息转向监听器处理。下面看一下它的类图:

Spring----监听器容器

同样我们看到了此类实现了InitializingBean接口,按照以往的风格我们还是首先查看接口方法afterPropertiesSet中的逻辑,其方法的实现在其父类AbstractJmsListeningContainer中。

public void afterPropertiesSet() {
        //验证connectionFactory
        super.afterPropertiesSet();
        //验证配置文件
        validateConfiguration();
        //初始化
        initialize();
    }

监听器容器的初始化只包含了三句代码,其中前两句只用于属性的验证,而真正用于初始化的操作委托在initialize中执行。

public void initialize() throws JmsException {
        try {
            synchronized (this.lifecycleMonitor) {
                this.active = true;
                this.lifecycleMonitor.notifyAll();
            }
            doInitialize();
        }
        catch (JMSException ex) {
            synchronized (this.sharedConnectionMonitor) {
                ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
                this.sharedConnection = null;
            }
            throw convertJmsAccessException(ex);
        }
    }
protected void doInitialize() throws JMSException {
        synchronized (this.lifecycleMonitor) {
            for (int i = 0; i < this.concurrentConsumers; i++) {
                scheduleNewInvoker();
            }
        }
    }

这里用到了concurrentConsumers属性,对于此属性的说明如下:

  消息监听器允许创建多个Session和MessageConsumer来接收消息。具体的个数由concurrentConsumers属性指定。需要注意的是,应该只是在Destination为Queue的时候才使用多个MessageConsumer(Queue中的一个消息只能被一个Consumer接收),虽然使用多个MessageConsumer会提高消息的处理性能,但是消息处理的顺序不能得到保证。消息被接收的顺序仍然是消息发送时的顺序,但是由于消息可能被并发处理,因此,消息的处理顺序可能和消息发送顺序不同,此外,不应该在Destination为Topic的时候使用多个MessageConsumer,因为多个MessageConsumer会接收到同样的消息。

对于具体的实现逻辑我们只能继续查看源码:

private void scheduleNewInvoker() {
        AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
        if (rescheduleTaskIfNecessary(invoker)) {
            // This should always be true, since we're only calling this when active.
            this.scheduledInvokers.add(invoker);
        }
    }
protected final boolean rescheduleTaskIfNecessary(Object task) {
        if (this.running) {
            try {
                doRescheduleTask(task);
            }
            catch (RuntimeException ex) {
                logRejectedTask(task, ex);
                this.pausedTasks.add(task);
            }
            return true;
        }
        else if (this.active) {
            this.pausedTasks.add(task);
            return true;
        }
        else {
            return false;
        }
    }

 分析源码得知,根据concurrentConsumers数量建立了对应数量的线程,而每个线程都作为一个独立的接收者在循环接收消息。

反向追踪rescheduleTaskIfNecessary传入的参数invoker,发现这个参数是AsyncMessageListenerInvoker类型的,于是我们把焦点转向这个类的实现,由于它是作为一个Runnable角色去执行,所以我们从这个类的分析从run方法开始。

public void run() {
            //并发控制
            synchronized (lifecycleMonitor) {
                activeInvokerCount++;
                lifecycleMonitor.notifyAll();
            }
            boolean messageReceived = false;
            try {
                //根据每个任务设置的最大处理消息数量而做不同处理,小于0默认为是无限制,一致接收消息
                if (maxMessagesPerTask < 0) {
                    messageReceived = executeOngoingLoop();
                }
                else {
                    int messageCount = 0;
                    //消息数量控制,一旦超出数量则停止循环
                    while (isRunning() && messageCount < maxMessagesPerTask) {
                        messageReceived = (invokeListener() || messageReceived);
                        messageCount++;
                    }
                }
            }
            catch (Throwable ex) {
                //清理操作,包括关闭Session等
                clearResources();
                if (!this.lastMessageSucceeded) {
                    // We failed more than once in a row or on startup -
                    // wait before first recovery attempt.
                    waitBeforeRecoveryAttempt();
                }
                this.lastMessageSucceeded = false;
                boolean alreadyRecovered = false;
                synchronized (recoveryMonitor) {
                    if (this.lastRecoveryMarker == currentRecoveryMarker) {
                        handleListenerSetupFailure(ex, false);
                        recoverAfterListenerSetupFailure();
                        currentRecoveryMarker = new Object();
                    }
                    else {
                        alreadyRecovered = true;
                    }
                }
                if (alreadyRecovered) {
                    handleListenerSetupFailure(ex, true);
                }
            }
            finally {
                synchronized (lifecycleMonitor) {
                    decreaseActiveInvokerCount();
                    lifecycleMonitor.notifyAll();
                }
                if (!messageReceived) {
                    this.idleTaskExecutionCount++;
                }
                else {
                    this.idleTaskExecutionCount = 0;
                }
                synchronized (lifecycleMonitor) {
                    if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
                        // We're shutting down completely.
                        scheduledInvokers.remove(this);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
                        }
                        lifecycleMonitor.notifyAll();
                        clearResources();
                    }
                    else if (isRunning()) {
                        int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
                        if (nonPausedConsumers < 1) {
                            logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
                                    "Check your thread pool configuration! Manual recovery necessary through a start() call.");
                        }
                        else if (nonPausedConsumers < getConcurrentConsumers()) {
                            logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
                                    "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
                                    "to be triggered by remaining consumers.");
                        }
                    }
                }
            }
        }
run

相关文章: