【问题标题】:mule reliable pattern with file streaming and JMS具有文件流和 JMS 的 mule 可靠模式
【发布时间】:2015-08-05 08:56:00
【问题描述】:

我使用的是 Mule 3.5.0 CE 版。 我有两个要求:

  • 0 条消息丢失
  • 能够管理从小到大的有效负载

为了使用 Mule 进行管理,我正在考虑使用 ActiveMQ BlobMessage 来管理有效负载和可靠的获取模式。 首先我想知道这是否是最好的方法?

这是我创建的:

  • 带有流和工作目录的文件连接器
  • 带有 URI 的 AMQ 连接器以持久模式将我的 blobmessages 放入 AMQ Jetty Web 服务器
  • 文件端点选择文件
  • 组件获取输入流并创建 blobmessage
  • jms 端点发送已创建的 blob 消息

但是,如果发生 AMQ 崩溃,我会丢失消息...

我在 mule 中有一些警告“尝试从正在处理的文件列表中删除文件 '...' 失败,并且出现以下错误:

错误 2015-05-23 12:55:38,291 [[opx].File.receiver.01] org.mule.exception.DefaultMessagingExceptionStrategy: ****************************************************** ****************************** 消息:无法处理事件,因为“Active_MQ”已停止 类型:org.mule.api.lifecycle.LifecycleException 代码:MULE_ERROR-70167 JavaDoc:http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html ****************************************************** ****************************** 异常堆栈是: 1.“Active_MQ”停止时无法处理事件(org.mule.api.lifecycle.LifecycleException) org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor:38 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html) ****************************************************** ****************************** 根异常堆栈跟踪: org.mule.api.lifecycle.LifecycleException:无法处理事件,因为“Active_MQ”已停止 在 org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor.handleUnaccepted(ProcessIfStartedMessageProcessor.java:38) 在 org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:44) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.endpoint.DefaultOutboundEndpoint.process(DefaultOutboundEndpoint.java:100) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.construct.DynamicPipelineMessageProcessor.process(DynamicPipelineMessageProcessor.java:54) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:51) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:40) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:109) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.construct.AbstractPipeline$3.process(AbstractPipeline.java:207) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) 在 org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) 在 org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 在 org.mule... ****************************************************** ******************************

编辑:这是配置。

流程:

<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616?jms.redeliveryPolicy.initialRedeliveryDelay=3000&amp;jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/"  validateConnections="true" maxRedelivery="-1" cacheJmsSessions="false" persistentDelivery="true"   doc:name="Active MQ" >
    <reconnect frequency="60000" count="20"/>
</jms:activemq-connector>
<file:connector name="File" workDirectory="/home/fs/workDirectory" autoDelete="true" streaming="true"  validateConnections="true"/>
<flow name="ReceiveFromFS" processingStrategy="synchronous">
    <file:inbound-endpoint path="/home/fs/in" pollingFrequency="5000" fileAge="1000" connector-ref="File"/>
    <component class="mypackage.InputStreamToBlobMessage" doc:name="Java"/>
    <jms:outbound-endpoint queue="queue1" connector-ref="Active_MQ" doc:name="JMS"/>
</flow>

我的 java 组件:

公共类 InputStreamToBlobMessage 实现 Callable {

@Override
public Object onCall(MuleEventContext eventContext) throws Exception {

    MuleMessage muleMsg = eventContext.getMessage();
    InputStream is = (InputStream) muleMsg.getPayload();
    JmsConnector amqConnector = (JmsConnector) eventContext.getMuleContext().getRegistry().lookupConnector("Active_MQ");
    BlobMessage bm = null;
    if (amqConnector.isConnected())
    {
        ActiveMQSession session = (ActiveMQSession) amqConnector.getSession(false, false);
        bm = session.createBlobMessage(is);
    }

    return bm;
}

}

【问题讨论】:

  • 你能添加你的配置吗?
  • 刚刚用配置更新了问题。关于 java 组件,在创建 blob 消息后,我想关闭流,但如果这样做,我会关闭错误流...
  • 谢谢,我不知道 ActiveMQ 有这个功能,这看起来有点超出我的能力,但是我支持这个问题并期待解决方案

标签: streaming jms mule


【解决方案1】:

在这种情况下,您的重新连接尝试似乎已用尽,这就是您的连接器保持“停止”状态的原因。

请尝试用

替换您的重新连接策略
<reconnect-forever frequency="60000" />

【讨论】:

  • 我试过了,但不幸的是这对错误没有任何改变。一旦我终止了 activemq 服务,我的所有消息都会触发“无法处理事件,因为“Active_MQ”已停止”错误。我可以看到文件在工作目录文件夹中挂起。一旦所有消息都触发了错误,我就可以看到尝试重新连接的重新连接策略,但是当它重新连接时,未处理的未处理消息
  • 我想我找到了问题所在。它看起来与 MuleStudio 更相关,但我会做更多的测试,并会及时通知您我的发现。
【解决方案2】:

这是我用来使它工作的配置:

<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616" connectionFactory-ref="connectionFactory" validateConnections="true" maxRedelivery="-1" persistentDelivery="true" doc:name="Active MQ">
</jms:activemq-connector>
    <spring:beans>
    <spring:bean name="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" doc:name="Bean">
        <spring:property name="redeliveryPolicy">
            <spring:bean name="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
                <spring:property name="redeliveryDelay" value="60000"/>
                <spring:property name="maximumRedeliveries" value="20"/>
                <spring:property name="initialRedeliveryDelay" value="10000"/>
            </spring:bean>
        </spring:property>
        <spring:property name="blobTransferPolicy">
            <spring:bean name="blobTransferPolicy" class="org.apache.activemq.blob.BlobTransferPolicy">
                <spring:property name="defaultUploadUrl" value="http://localhost:8161/fileserver/"/>
            </spring:bean>
        </spring:property>
    </spring:bean>
</spring:beans>
<file:connector name="File" workDirectory="/home/fs/workDirectory" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>
<flow name="ReceiveFromFS" processingStrategy="synchronous">
  <file:inbound-endpoint path="/home/fs/in" pollingFrequency="5000" fileAge="1000" connector-ref="File"/>
  <component class="mypackage.InputStreamToBlobMessage" doc:name="Java"/>
    <jms:outbound-endpoint queue="queue1" connector-ref="Active_MQ" doc:name="JMS">
        <jms:transaction action="ALWAYS_BEGIN"/>
    </jms:outbound-endpoint>
</flow>

但是,当我在 MuleStudio 中使用 connectionFactory 时,它无法正常工作并且确实管理了重试。漏洞 ?? 在嵌入在 Tomcat 中的 Mule 中使用相同的配置效果很好。

还有一点,在 Mule 3.6 中,JMS 会话默认被缓存,因此可以像在组件中那样访问 Session,或者必须使用 cacheJmsSessions="false"。

瞧 :)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多