【问题标题】:How to: Implement a BatchMessageListenerContainer for bulk consuming a JMS queue如何:实现 BatchMessageListenerContainer 以批量使用 JMS 队列
【发布时间】:2015-05-12 10:38:05
【问题描述】:

我最近在 Spring Integration 中遇到了对 JMS 使用者的需求 - 能够消耗大量突发事件,而不会因太多提交而对目标 Oracle 数据库造成压力。

DefaultMessageListenerContainer 似乎不支持任何消息事务。

我搜索了一些解决方案并找到了一些解决方案 - 但其中很多都不是通过继承 DMLC 而是通过克隆和修改相同的原始源代码来实现的 - 这使得它很容易被破坏以防万一我以后希望移动到更新版本的 spring-jms。此外,被克隆的代码还引用了 DMLC 的私有属性,因此不得不将其排除在外。为了使这一切正常工作,还需要几个接口和一个自定义消息侦听器。总而言之,我感觉不舒服。

那么 - 怎么办?

【问题讨论】:

    标签: performance jms spring-integration bulk consuming


    【解决方案1】:

    嗯 - 这是一个简单而紧凑的解决方案,它完全基于从 DefaultMessageListenerContainer 派生的单个类。

    不过,我只测试了消息驱动通道适配器和 ChainedTransactionManager - 因为这是需要执行此类操作时的基本场景。

    这是代码:

    package dk.itealisten.myservice.spring.components;
    
    import org.springframework.jms.listener.DefaultMessageListenerContainer;
    
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import java.util.ArrayList;
    import java.util.Enumeration;
    
    public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {
    
        public static final int DEFAULT_BATCH_SIZE = 100;
    
        public int batchSize = DEFAULT_BATCH_SIZE;
    
        /**
         * Override the method receiveMessage to return an instance of BatchMessage - an inner class being declared further down.
         */
        @Override
        protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
            BatchMessage batch = new BatchMessage();
            while (!batch.releaseAfterMessage(super.receiveMessage(consumer))) ;
            return batch.messages.size() == 0 ? null : batch;
        }
    
        /**
         * As BatchMessage implements the javax.jms.Message interface it fits perfectly into the DMLC - only caveat is that SimpleMessageConverter dont know how to convert it to a Spring Integration Message - but that can be helped.
         * As BatchMessage will only serve as a container to carry the actual javax.jms.Message's from DMLC to the MessageListener it need not provide meaningful implementations of the methods of the interface as long as they are there.
         */
        protected class BatchMessage implements Message {
    
            public ArrayList<Message> messages = new ArrayList<Message>();
    
            /**
             * Add message to the collection of messages and return true if the batch meets the criteria for releasing it to the MessageListener.
             */
            public boolean releaseAfterMessage(Message message) {
                if (message != null) {
                    messages.add(message);
                }
                // Are we ready to release?
                return message == null || messages.size() >= batchSize;
            }
    
            // Below is only dummy-implementations of the abstract methods of javax.jms.Message
    
            @Override
            public String getJMSMessageID() throws JMSException {
                return null;
            }
    
            @Override
            public void setJMSMessageID(String s) throws JMSException {
    
            }
    
            @Override
            public long getJMSTimestamp() throws JMSException {
                return 0;
            }
    
            @Override
            public void setJMSTimestamp(long l) throws JMSException {
    
            }
    
            @Override
            public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
                return new byte[0];
            }
    
            @Override
            public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {
    
            }
    
            @Override
            public void setJMSCorrelationID(String s) throws JMSException {
    
            }
    
            @Override
            public String getJMSCorrelationID() throws JMSException {
                return null;
            }
    
            @Override
            public Destination getJMSReplyTo() throws JMSException {
                return null;
            }
    
            @Override
            public void setJMSReplyTo(Destination destination) throws JMSException {
    
            }
    
            @Override
            public Destination getJMSDestination() throws JMSException {
                return null;
            }
    
            @Override
            public void setJMSDestination(Destination destination) throws JMSException {
    
            }
    
            @Override
            public int getJMSDeliveryMode() throws JMSException {
                return 0;
            }
    
            @Override
            public void setJMSDeliveryMode(int i) throws JMSException {
    
            }
    
            @Override
            public boolean getJMSRedelivered() throws JMSException {
                return false;
            }
    
            @Override
            public void setJMSRedelivered(boolean b) throws JMSException {
    
            }
    
            @Override
            public String getJMSType() throws JMSException {
                return null;
            }
    
            @Override
            public void setJMSType(String s) throws JMSException {
    
            }
    
            @Override
            public long getJMSExpiration() throws JMSException {
                return 0;
            }
    
            @Override
            public void setJMSExpiration(long l) throws JMSException {
    
            }
    
            @Override
            public long getJMSDeliveryTime() throws JMSException {
                return 0;
            }
    
            @Override
            public void setJMSDeliveryTime(long l) throws JMSException {
    
            }
    
            @Override
            public int getJMSPriority() throws JMSException {
                return 0;
            }
    
            @Override
            public void setJMSPriority(int i) throws JMSException {
    
            }
    
            @Override
            public void clearProperties() throws JMSException {
    
            }
    
            @Override
            public boolean propertyExists(String s) throws JMSException {
                return false;
            }
    
            @Override
            public boolean getBooleanProperty(String s) throws JMSException {
                return false;
            }
    
            @Override
            public byte getByteProperty(String s) throws JMSException {
                return 0;
            }
    
            @Override
            public short getShortProperty(String s) throws JMSException {
                return 0;
            }
    
            @Override
            public int getIntProperty(String s) throws JMSException {
                return 0;
            }
    
            @Override
            public long getLongProperty(String s) throws JMSException {
                return 0;
            }
    
            @Override
            public float getFloatProperty(String s) throws JMSException {
                return 0;
            }
    
            @Override
            public double getDoubleProperty(String s) throws JMSException {
                return 0;
            }
    
            @Override
            public String getStringProperty(String s) throws JMSException {
                return null;
            }
    
            @Override
            public Object getObjectProperty(String s) throws JMSException {
                return null;
            }
    
            @Override
            public Enumeration getPropertyNames() throws JMSException {
                return null;
            }
    
            @Override
            public void setBooleanProperty(String s, boolean b) throws JMSException {
    
            }
    
            @Override
            public void setByteProperty(String s, byte b) throws JMSException {
    
            }
    
            @Override
            public void setShortProperty(String s, short i) throws JMSException {
    
            }
    
            @Override
            public void setIntProperty(String s, int i) throws JMSException {
    
            }
    
            @Override
            public void setLongProperty(String s, long l) throws JMSException {
    
            }
    
            @Override
            public void setFloatProperty(String s, float v) throws JMSException {
    
            }
    
            @Override
            public void setDoubleProperty(String s, double v) throws JMSException {
    
            }
    
            @Override
            public void setStringProperty(String s, String s1) throws JMSException {
    
            }
    
            @Override
            public void setObjectProperty(String s, Object o) throws JMSException {
    
            }
    
            @Override
            public void acknowledge() throws JMSException {
    
            }
    
            @Override
            public void clearBody() throws JMSException {
    
            }
    
            @Override
            public <T> T getBody(Class<T> aClass) throws JMSException {
                return null;
            }
    
            @Override
            public boolean isBodyAssignableTo(Class aClass) throws JMSException {
                return false;
            }
        }
    }
    

    下面是一个示例,展示了如何在 Spring 应用程序上下文中使用它:

    <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:int="http://www.springframework.org/schema/integration"
      xmlns:jms="http://www.springframework.org/schema/integration/jms"
      xmlns:p="http://www.springframework.org/schema/p"
      xsi:schemaLocation="
        http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
        http://www.springframework.org/schema/integration/jms
        http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.0.xsd">
    
    <!-- Plug in the BatchMessageListenerContainer in a message-driven-channel-adapter -->
    <jms:message-driven-channel-adapter container-class="dk.itealisten.myservice.spring.components.BatchMessageListenerContainer"
      acknowledge="transacted"
      channel="from.mq"
      concurrent-consumers="5"
      max-concurrent-consumers="15"
      connection-factory="jmsConnectionFactory"
      transaction-manager="transactionManager"
      destination="my.mq.queue"
      />
    
    <!-- Flow processing the BatchMessages being posted on the "from.mq" channel -->
    <int:chain input-channel="from.mq" output-channel="nullChannel">
      <int:splitter expression="payload.messages" />
      <!-- This is where we deal with conversion to spring messages as the payload is now a single standard javax.jms.Message implementation -->
      <int:transformer ref="smc" method="fromMessage"/>
      <!-- And finally we persist -->
      <int:service-activator ref="jdbcPublisher" method="persist"/>
    </int:chain>
    
    <!-- Various supporting beans -->
    
    <!-- A bean to handle the database persistance --> 
    <bean id="jdbcPersistor" class="dk.itealisten.myservice.spring.components.JdbcPersistor" p:dataSource-ref="dataSource" />
    
    <!-- A bean to handle the conversion that could not take place in the MessageListener as it don't know how to convert a BatchMessage -->
    <bean id="smc" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    
    <!-- Transaction manager must make sure messages are committed outbound (JDBC) before cleaned up inbound (JMS). -->
    <bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
      <constructor-arg name="transactionManagers">
        <list>
          <bean class="org.springframework.jms.connection.JmsTransactionManager" p:connectionFactory-ref="jmsConnectionFactory" />
          <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource" />
        </list>
      </constructor-arg>
    </bean>
    

    【讨论】:

    • 你能发布整个 BatchMessageListenerContainer 类吗?或者你能分享任何参考吗?
    • @AlagammalP 我已经添加了 BatchMessageListenerContainer 的完整实现:-)
    • 感谢 Jens Krogsboell 的更新。我还想在使用队列中的消息后将消息插入数据库。但是我不使用spring-integration并且使用容器,我在想如果使用这种方法,我是否能够在消息侦听器的onMessage()中获取BatchMessage.messages。 stackoverflow.com/questions/55107244/…
    • @AlagammalP 我不能给你任何细节,因为我在没有弹簧集成的情况下没有使用 JMS 的经验。但这应该是可能的。只有 - 我不确定您是否真的有必要使用 BatchMessage - 我只是需要它,因为 spring-integration 没有提供我需要的开箱即用的控制级别。
    • 您在向 MessageListener 接收消息时遇到过任何时间延迟吗?我面临stackoverflow.com/questions/55750449中提到的时间延迟@
    猜你喜欢
    • 2021-10-16
    • 2017-04-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-08
    • 2018-07-12
    • 1970-01-01
    • 2012-02-22
    相关资源
    最近更新 更多