【问题标题】:Camel as a JMS to JMS bridge - Not showing as an AMQ consumerCamel 作为 JMS 到 JMS 的桥梁 - 不显示为 AMQ 消费者
【发布时间】:2016-05-04 17:45:17
【问题描述】:

我对 JMS 很陌生,并且需要在 AMQ 和 WMQ 之间架起桥梁,我在 Camel 文档中看到他们建议使用 Camel,而不是使用 JMS 到 JMS 桥接器。

首先,我试图让我的应用程序从 AMQ 中获取消息并简单地记录它正在这样做,但是每当我在 Jetty 中启动我的应用程序时,它不会在 apiToTopsQueue 上显示为消费者,因此不会'不要将消息从队列中取出。

我的 application-context.xml(加载骆驼上下文)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder location="classpath:tops-bridge.properties" />

    <import resource="classpath:camel-context.xml" />

    <bean id="log4jInitialization"
          class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
        <property name="targetClass" value="org.springframework.util.Log4jConfigurer" />
        <property name="targetMethod" value="initLogging" />
        <property name="arguments">
            <list>
                <value>classpath:log4j.xml</value>
                <value>60000</value> <!-- Refresh Log4j config every 60 seconds -->
            </list>
        </property>
    </bean>
</beans>

然后这是 camel-context.xml(我一直在评论教程中的随机代码,所以它可能看起来很奇怪)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xmlns:broker="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://camel.apache.org/schema/spring
                           http://camel.apache.org/schema/spring/camel-spring.xsd
                           http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <camel:camelContext id="defaultCamelContext">
        <camel:routeBuilder ref="bridgeRouteConfig"/>
        <!--<camel:jmxAgent id="agent" createConnector="true"/>-->
    </camel:camelContext>

    <bean id="bridgeRouteConfig" class="com.caci.asg.rail.tops.bridge.TopsBridgeRouteBuilder">
        <constructor-arg name="amqToBridgeQueue" value="${topsBridgeRouteBuilder.route.amqToBridgeRoute}"/>
        <constructor-arg name="bridgeToWmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToWmqRoute}"/>
        <constructor-arg name="wmqToBridgeQueue" value="${topsBridgeRouteBuilder.route.wmqToBridgeRoute}"/>
        <constructor-arg name="bridgeToAmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToAmqRoute}"/>
    </bean>

    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${application.activemq.url}"/>
        <property name="useAsyncSend" value="true"/>
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8"/>
        <property name="connectionFactory" ref="jmsFactory"/>
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>


    <!-- lets configure the ActiveMQ JMS broker server -->
    <broker:broker useJmx="true" persistent="false" brokerName="myBroker">
        <broker:transportConnectors>
            <!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
            <broker:transportConnector name="vm" uri="vm://myBroker"/>
            <!-- expose a TCP transport for clients to use -->
            <broker:transportConnector name="tcp" uri="${application.activemq.url}"/>
        </broker:transportConnectors>
    </broker:broker>

    <!-- lets configure the Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
    <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="brokerURL" value="vm://myBroker"/>
    </bean>

</beans>

camel-context.xml 使用了如下一些属性(在我让 AMQ 工作之前,引用 wmq 的属性目前是占位符)。

application.activemq.url=tcp://localhost:61616

topsBridgeRouteBuilder.route.amqToBridgeRoute=jms:apiToTopsQueue
topsBridgeRouteBuilder.route.bridgeToWmqRoute=mq:toWmq
topsBridgeRouteBuilder.route.wmqToBridgeRoute=mq:fromWmq
topsBridgeRouteBuilder.route.bridgeToAmqRoute=jms:topsToApiQueue

路由构建器在Java中如下

import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;

public class TopsBridgeRouteBuilder extends RouteBuilder {

    private final String amqToBridgeQueue;
    private final String bridgeToWmqQueue;
    private final String wmqToBridgeQueue;
    private final String bridgeToAmqQueue;

    public TopsBridgeRouteBuilder(String amqToBridgeQueue, String bridgeToWmqQueue, String wmqToBridgeQueue,  String bridgeToAmqQueue) {
        this.amqToBridgeQueue = amqToBridgeQueue;
        this.bridgeToWmqQueue = bridgeToWmqQueue;
        this.wmqToBridgeQueue = wmqToBridgeQueue;
        this.bridgeToAmqQueue = bridgeToAmqQueue;
    }

    @Override
    public void configure() throws Exception {
//        from(amqToBridgeQueue).to(bridgeToWmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToWmqQueue);
//        from(wmqToBridgeQueue).to(bridgeToAmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToAmqQueue);

        from(amqToBridgeQueue).log(LoggingLevel.WARN, "Consuming message from" + amqToBridgeQueue);
    }
}

所以我不太确定为什么在我启动 Jetty 时它没有被列为 apiToTopsQueue 的消费者。我的 pom 包含 AMQ/WMQ/camel 库的依赖项。依赖如下(从父pom继承version/scope)

<dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.jmqi</artifactId>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mqjms</artifactId>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>connector</artifactId>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>dhbcore</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>

我编写了这个测试来查看我可以将消息从我的网桥中放入并从 AMQ 中取出 - 这很有效,并且在 AMQ 管理页面上我可以看到入队/出队的消息计数增加了。

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

import javax.jms.*;

public class TopsBridgeRouteBuilderTest {

    @Test
    public void testAMessageAddedToAmqCanBeRetrieved() throws JMSException {

        String brokerURL = "tcp://localhost:61616";
        String amqQueue = "apiToTopsQueue";
        String messageToSend = "Test message";

        // Put a message on the AMQ
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(amqQueue);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage tm = session.createTextMessage(messageToSend);
        producer.send(tm);

        // Create read only consumer to take the message off the queue
        ActiveMQConnectionFactory connectionFactoryReadOnly = new ActiveMQConnectionFactory(brokerURL);
        Connection connectionReadOnly = connectionFactoryReadOnly.createConnection();
        connectionReadOnly.start();
        Session sessionReadOnly = connectionReadOnly.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = sessionReadOnly.createConsumer(destination);
        final TextMessage message = (TextMessage) consumer.receive();

        System.out.println("Message retrieved = " + message.getText());
        assertThat(message.getText(), is(messageToSend));
    }
}

我的配置是否有问题,这意味着当我启动 Jetty 时,Camel 路由不起作用或正在查看 AMQ?

谢谢。

【问题讨论】:

    标签: java jms activemq ibm-mq bridge


    【解决方案1】:

    原来我的 web.xml 中缺少这部分。为未来吸取的教训!

    <listener>
        <listener-class>
            org.springframework.web.context.ContextLoaderListener
        </listener-class>
    </listener>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-06-04
      • 1970-01-01
      • 1970-01-01
      • 2013-12-29
      • 2014-10-02
      • 2021-01-14
      相关资源
      最近更新 更多