【问题标题】:Camel JMS request-reply with 'n' reply messages带有“n”条回复消息的 Camel JMS 请求-回复
【发布时间】:2020-10-11 21:31:18
【问题描述】:

我正在使用 Camel JMS 组件来请求-回复以与 MQ 进行通信。对于我的一些请求,我可以收到 n 条回复消息。如何汇总这些回复消息?

我曾想过将聚合器模式与聚合策略一起使用,但无法使用它,因为我不确定可以回复的消息数量。

社区可以帮助我了解正确的做法吗?我做了一些谷歌搜索,但找不到有用的东西。以下是我的示例路线代码

from("direct:"+routeName).routeId(routeName)
                        .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                        .circuitBreaker()
                            .resilience4jConfiguration()
                            .minimumNumberOfCalls(3)
                        .end()
                        .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                            .log("${body}")
                            .unmarshal(customerDetailsOutBound)
                            .process(new Processor() {
                                    @Override
                                    public void process(Exchange exchange) throws Exception {
                                        System.out.println(exchange.getIn().getBody().toString());
                                    }
                            })
                        .onFallback().process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("Store this message to backup");
                            }
                        })
                        .end();

期待从社区中获得一些好的见解。谢谢。

【问题讨论】:

  • request-reply 在这里工作正常。唯一关心的是如何让线程等待其他回复或如何基于相同的correlationId聚合其他回复

标签: apache-camel ibm-mq camel-jms


【解决方案1】:

消息流

  1. 您的第一个路由将消息发送到CAMELDEMO 队列并开始等待新队列CAMELDEMO_AGGREGATED_REPLY 上的单个聚合消息
  2. CAMELDEMO 上收到消息的组件,开始向 CAMELDEMOREPLY 队列发送响应,并指示将发送多少响应
  3. 下面的第二条路由开始监听CAMELDEMOREPLY,聚合消息并将聚合消息发送到CAMELDEMO_AGGREGATED_REPLY
  4. 您在CAMELDEMO_AGGREGATED_REPLY 上等待回复的第一条路由获取汇总回复,接收单个消息并将其发回

原始路线已更新,等待CAMELDEMO_AGGREGATED_REPLY回复

...
.to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&
                replyTo=CAMELDEMO_AGGREGATED_REPLY")
.log("${body}")
.unmarshal(customerDetailsOutBound)
.process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            System.out.println(exchange.getIn().getBody().toString());
        }
})
....

聚合消息的第二条路径

from(mqComponentBeanName+"://CAMELDEMOREPLY?
                          exchangePattern=In&requestTimeout=10000)
.aggregate(header("JMSCorrelationID"), new MyAggregationStrategy())
.to(mqComponentBeanName+"://CAMELDEMO_AGGREGATED_REPLY?
                          exchangePattern=Out&requestTimeout=10000)
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExch, Exchange newExchange) 
    {
        ...
        //Here you check your flag regarding the number of responses
        // you were supposed to receive, and if it is met
        // complete the aggregation by setting it to true
        oldExch.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
                ...
         return oldExchange;
     }
}

【讨论】:

  • Kavitha,不能这样做,因为请求-回复在单线程上工作。一旦收到回复,它就会继续前进。
  • 是的,客户端无法接收多个流响应。对于客户端的 HTTP 请求。下游系统通过 IBM MQ 接收请求并在回复队列中发送响应。对于某些请求,下游系统可以为相同的correlationID 发送多个响应。因此,这条路线的责任是在向客户端返回响应时聚合所有这些请求。此外,来自 MQ 的回复/响应消息有一个字段来标识期望有多少消息作为响应。这也可以用于循环,但问题是如何使用剩余的消息,因为 req-repl 在单线程上工作。
  • 在您的第 3 点中,您如何获取必须为其聚合消息的 JMSCorrelationId?这在第 1 点中可用。
  • 是的回复者需要设置同意。但是第二条路线不知道它必须聚合消息的特定 CorrelationId。
  • 现在明白了。您试图根据相关性 ID 聚合第二条路由中的每条消息。是的,这可能是一个相当大的解决方案。但是,有创建新队列和读写操作的开销成本。作为它的 IBM MQ,它有自己的成本。因此,可以根据成本评估此解决方案。
【解决方案2】:

我能够通过单一路线解决这个问题。解决方案可能不是那么整洁,但有效并实现了目的。我使用了 loopDoWhile 并在 loopDoWhile 内部的处理器中使用纯 java 代码从队列中获取消息。

from("direct:"+routeName).routeId(routeName)
                    .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                    .circuitBreaker()
                        .resilience4jConfiguration()
                        .minimumNumberOfCalls(3)
                    .end()
                    .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                        .log("${body}")
                        .unmarshal(customerDetailsOutBound)
                        .process(new Processor() {
                                @Override
                                public void process(Exchange exchange) throws Exception {
                                    System.out.println(exchange.getIn().getBody().toString());


int msgCount = getMsgCountfromFirstReposnse;
if (msgCount > 1) {
exchange.getIn().setHeader("COUNTER", 0);
exchange.getIn().setHeader("MSG_COUNT", msgCount-1);
exchange.setProperty("connectionFactory", connectionFactory);
}
                                }
                        })
                    .loopDoWhile(simple("${headers.COUNTER} != ${headers.MSG_COUNT}"))
                            .process(simpleJMSConsumerProcess)
                        .end().endCircuitBreaker()
                    .onFallback().process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("Store this message to backup");
                        }
                    })

处理器内的代码:

ConnectionFactory connectionFactory = (ConnectionFactory) exchange.getProperty("connectionFactory");
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

    try {
        Queue queue = session.createQueue("CAMELDEMOREPLY?consumer.priority=10");
        MessageConsumer consumer = session.createConsumer(queue, "JMSCorrelationID = '"+exchange.getIn().getHeader("JMSCorrelationID").toString()+"'");
        connection.start();
        TextMessage textMsg = (TextMessage) consumer.receive();
        System.out.println(textMsg);
        System.out.println("Received: " + textMsg.getText());
        exchange.getIn().setHeader("COUNTER", ((Integer)exchange.getIn().getHeader("COUNTER"))+1);
        if (connection != null) {
            connection.close();
        }
    } finally {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

【讨论】:

    【解决方案3】:

    嗯,传统的请求-回复设计上只有 1 条回复消息。等待响应的线程在第一个回复到达后立即停止侦听。

    使用 JMS 相关 ID(每个请求没有专用线程)理论上可以接收同一请求的多个回复,但我不知道这在 JMS 中是否真的有效/是否允许。

    基于 cmets 的更新

    您在 cmets 中写道,您可以针对一个请求接收多个 JMS 回复,甚至可以获得预期的回复数量。

    如果这一切正常,您可以在 Camel 路由中使用 Aggregator EIP 来收集所有响应,然后再向调用者发送回复。

    聚合器是高度可配置的。您可以决定如何组合响应,还可以定义多个完成条件(超时、消息数等)。

    【讨论】:

    • 但是多回复解决方案的主要问题仍然相同。您要等多久才能将请求视为已完成并期望不再有回复? [我们为此设置了超时。所以这不是问题] 回到单一回复解决方案:您必须在创建回复消息的位置汇总所有回复消息。 [无法在发送方进行此聚合。因为那不在我们的控制范围内。]
    • 我更新了我的答案,因为您在 cmets 中提供了有关您的案例的更多信息。
    • @burik/@Kavitha 我对聚合器 EIP 也有了解,我可以使用它。我强制看到的唯一问题是如何解决这个问题:使用 JMS 相关 ID(每个请求没有专用线程)理论上可以接收同一请求的多个回复,但我不知道这是否真的有效/在 JMS 中是允许的。
    猜你喜欢
    • 2017-11-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-08
    • 1970-01-01
    • 1970-01-01
    • 2012-12-01
    • 2012-10-25
    相关资源
    最近更新 更多