【问题标题】:Problems with sync AMQP connector in MuleMule 中同步 AMQP 连接器的问题
【发布时间】:2015-05-21 03:21:58
【问题描述】:

我正在尝试以持久的方式处理 Mule 中的 XML 对象队列,这些对象已从原始 xml 文件中拆分出来,然后使用“选择”组件进行路由。 选择组件的每个分支都通向具有不同队列的 AMQP 端点。每个队列的另一端是另一个 Mule 流,它应该读取队列,对 XML 执行某些操作并将其作为回复返回。所有的 AMQP 端点都设置为请求-响应。

流程似乎可以正常工作,直到将某些内容放入 AMQP 队列,但随后会立即继续,而不是等待消息。

这也是由似乎正确读取队列的远程流产生的,但随后似乎立即回复,然后继续正确处理。在此结束时,它应该回复消息,但似乎没有这样做。

这里有一些代码片段,以防有人指出我哪里出错了......

主路由流程

    <amqp:connector name="connector.amqp.mule.default" doc:name="AMQP Connector"  validateConnections="true"/>
    <flow name="routerFlow">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/configtest" doc:name="HTTP" allowedMethods="POST" />
    <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
    <splitter expression="#[xpath3('//cfg:Configuration', message.payload, 'NODESET')]" doc:name="Splitter" enableCorrelation="ALWAYS"/>
    <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
    <set-variable variableName="firstElement" value="#[xpath3('name(/*/*[1])', message.payload, 'STRING')]" doc:name="GetConfigItem" />
    <choice doc:name="Route by Config Item Type">
        <when expression="#[flowVars['firstElement'] == 'dir:DirectoryObject']">
            <amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout"  exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="DirectoryObject Queue" connector-ref="connector.amqp.mule.default"/>
            <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
            <logger message="&quot;Back from DirectoryQueue with &quot; + #[payload]" level="INFO" doc:name="Logger"/>
        </when>
        <when expression="#[flowVars['firstElement'] == 'gpo:GroupPolicyObject']">
            <amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout"  exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="GroupPolicy Queue" connector-ref="connector.amqp.mule.default"/>
        </when>
    </choice>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <collection-aggregator timeout="60000" failOnTimeout="true" doc:name="Collection Aggregator"/>

目录队列流程

    <flow name="directoryobjectFlow">
    <amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true"   routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <http:request config-ref="HTTP_Request_Configuration" path="/xml" method="POST" responseTimeout="60000" doc:name="HTTP">
        <http:request-builder>
            <http:header headerName="Content-Type" value="application/xml"/>
            <http:header headerName="Accept" value="application/xml"/>
        </http:request-builder>
    </http:request>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <logger level="INFO" doc:name="Logger" message="Exit DirectoryFlow with #[payload]"/>
    </flow>

和组策略队列(当前设置为显示它什么都不做)

    <flow name="amiab-esb-grouppolicyFlow">
    <amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout"  exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true"   routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <set-payload doc:name="Set Payload" value="Nothing" />
    <logger level="INFO" doc:name="Logger" message="Exit GroupPolicy with #[payload]"/>
    </flow>

我对 Mule 很陌生,刚刚开始掌握它,所以我非常感谢任何想法或见解。

谢谢!

【问题讨论】:

  • 你好 Nich,我是 amqp connecto 的维护者。我应该能够提供帮助,但是我不完全理解这个问题。你能改写一下吗?
  • 嗨维克多,基本上我正在尝试使用 AMQP (RabbitMQ) 队列在流之间进行持久的请求/响应,但我似乎无法使其同步工作,并等待answer 来自 HTTP 请求并将其作为回复发送回来。我正在使用 AMQP 3.4.4.201409101602,这是通过云连接器站点提供的。
  • 有比这更多的新版本,但似乎存在问题,并且更新没有发布到社区用户的交易所。我们明天可能会发布一个新版本。我会带着回购地址回来。

标签: mule amqp


【解决方案1】:

请使用今天发布的连接器的最新版本 3.6.2。这将毫无问题地执行带有请求-响应端点的流程。

【讨论】:

  • 谢谢维克多,我会试一试的。 :)
  • 是的,效果很好!现在一切都按应有的方式运行。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-05-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多