【问题标题】:How to browse messages from a queue using Apache Camel?如何使用 Apache Camel 浏览队列中的消息?
【发布时间】:2018-06-27 09:41:01
【问题描述】:

我需要使用 Camel 路由浏览来自活动 mq 的消息而不消耗消息。

JMS 队列中的消息将被读取(仅浏览而不是消费)并移动到数据库,同时确保原始队列保持不变。

public class CamelStarter {

   private static CamelContext camelContext;

            public static void main(String[] args) throws Exception {
                            camelContext = new DefaultCamelContext();
                            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

                            camelContext.addComponent("jms",  JmsComponent.jmsComponent(connectionFactory));

                            camelContext.addRoutes(new RouteBuilder() {

                                            @Override
                                            public void configure() throws Exception {
                                                from("jms:queue:testQueue").to("browse:orderReceived") .to("jms:queue:testQueue1");
                                            }

                                            }
                            );

                            camelContext.start();

                            Thread.sleep(1000);

                             inspectReceivedOrders();

                            camelContext.stop();

            }

public static void inspectReceivedOrders() {

BrowsableEndpoint browse = camelContext.getEndpoint("browse:orderReceived", BrowsableEndpoint.class);
List<Exchange> exchanges = browse.getExchanges();
System.out.println("Browsing queue: "+ browse.getEndpointUri() + " size: " + exchanges.size());
for (Exchange exchange : exchanges) {
  String payload = exchange.getIn().getBody(String.class);
  String msgId = exchange.getIn().getHeader("JMSMessageID", String.class);
  System.out.println(msgId + "=" +payload);

}

【问题讨论】:

    标签: apache-camel


    【解决方案1】:

    Apache 骆驼浏览组件正是为此而设计的。检查here 获取文档。

    由于您没有提供任何其他信息,因此不能多说。

    假设你有这样的路线

    from("activemq:somequeue).to("bean:someBean")  
    

    from("activemq:somequeue).process(exchange -> {})  
    

    你所要做的就是像这样在两者之间放置一个浏览端点

    from("activemq:somequeue).to("browse:someHandler").to("bean:someBean")   
    

    那就这样写一个类

    @Component
    public class BrowseQueue {
    
      @Autowired
      CamelContext camelContext;
    
      public void inspect() {
        BrowsableEndpoint browse = camelContext.getEndpoint("browse:someHandler", BrowsableEndpoint.class);
        List<Exchange> exchanges = browse.getExchanges();
    
    
        for (Exchange exchange : exchanges) {
          ...... 
        }
      }
    
    }
    

    【讨论】:

    • 我试过这样做,但没有奏效。请阅读更新后的问题描述。
    • 这段代码 sn-p 正在创建两个队列之间交换的副本。但是,队列中的消息在读取时会丢失。
    • 谁应该消费队列中的消息,消费者是如何消费的?你能显示你的路线吗
    • 什么是 jms:queue:testQueue1 以及你在 inspectOrders 中做什么?需要解释一下吗?
    • jms:queue:testQueue1 : 它是活动 mq 中的队列名称
    【解决方案2】:

    据我所知,在 Camel 中无法阅读(不消耗!)JMS 消息 :-( 我发现的唯一解决方法(在 JEE 应用程序中)是定义一个带有计时器的启动 EJB,持有 QueueBrowser,并将 msg 处理委托给 Camel 路由:

    @Singleton
    @Startup
    public class MyQueueBrowser  {
    
        private TimerService timerService;
    
        @Resource(mappedName="java:/jms/queue/com.company.myqueue")
        private Queue sourceQueue;
    
        @Inject
        @JMSConnectionFactory("java:/ConnectionFactory")
        private JMSContext jmsContext;  
    
        @Inject
        @Uri("direct:readMessage")
        private ProducerTemplate camelEndpoint;
    
    
        @PostConstruct
        private void init() {       
            TimerConfig timerConfig = new TimerConfig(null, false);
            ScheduleExpression se = new ScheduleExpression().hour("*").minute("*/"+frequencyInMin);
            timerService.createCalendarTimer(se, timerConfig);
        }
    
    
        @Timeout
        public void scheduledExecution(Timer timer) throws Exception {      
            QueueBrowser browser = null;
            try {                       
                browser = jmsContext.createBrowser(sourceQueue);                                           
                Enumeration<Message> msgs = browser.getEnumeration();
                while ( msgs.hasMoreElements() ) { 
                    Message jmsMsg = msgs.nextElement(); 
                    // + here: read body and/or properties of jmsMsg                                            
                    camelEndpoint.sendBodyAndHeader(body, myHeaderName, myHeaderValue);
                }                                                                               
            } catch (JMSRuntimeException jmsException) {
                ...
            } finally {        
                browser.close();
            }
        }
    
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-02-02
      • 2021-01-30
      • 1970-01-01
      • 2016-05-21
      相关资源
      最近更新 更多