【问题标题】:TCP Inbound Endpoint - Mule ESB - multithreadingTCP 入站端点 - Mule ESB - 多线程
【发布时间】:2017-01-31 15:19:29
【问题描述】:

我有一个引用 TCP 连接器的 TCP 入站端点。这是一个请求-响应端点。 TCP 客户端是第 3 方应用程序,它仅在一个套接字上发送请求。这就是我设置 TCP 端点的方式。 端点:

<tcp:inbound-endpoint exchange-pattern="request-response"
            responseTimeout="10000" doc:name="TCP" address="${Endpoint}" encoding="ISO-8859-1" connector-ref="TCP"/>

连接器:

<tcp:connector name="TCP" doc:name="TCP connector"
    clientSoTimeout="${Client_SO_Timeout}" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="${Server_SO_Timeout}" socketSoLinger="0"
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true">
            <receiver-threading-profile maxThreadsActive="${TCP_MaxThreadsActive}" maxThreadsIdle = "${TCP_MaxThreadsIdle}" />
    <reconnect-forever />
    <service-overrides messageReceiver="CustomMessageReceiver" />
    <tcp:custom-protocol ref="CustomLengthProtocol" />
</tcp:connector>

流程运行良好。但是当必须处理并发请求时,最后几个请求会超时。我从中了解到的是,消息正在等待接收方处理(因为只使用一个 TCP 会话),直到前一个请求被 mule 流完成。

为了调整这一点,我正在寻找一种方法来更改 mule 流,如下所示:收到客户端的请求后,我需要将其发送到 mule-flow,它可能会异步处理并推送响应到同一个插座。一旦在端点接收到请求,它就不需要等待前一个请求的流程完成才能处理下一个请求。没有要求从 mule 流中保留请求/响应的顺序。 有没有办法通过扩展 mule TCP 端点功能来实现这一点?这类似于队列异步流处理策略,不同之处在于必须将响应发送回原始 TCP 套接字。

【问题讨论】:

    标签: tcp mule mule-studio mule-component


    【解决方案1】:

    这就是我解决这个问题的方法: (i) 我没有使用 tcp 请求-响应入站端点,而是使用 TCP 入站(单向)和 TCP 出站(单向) (ii) TCP-Inbound 接收请求,自定义长度协议将消息拆分并提供给流。 (iii) 使用带有 VM 端点的请求-回复范围(入站和出站) (iv) vm 端点将消息引导到一个单独的流,该流的处理策略为“排队异步”。我计划在此流级别上设置 maxThreads 以进行线程池。 (v) 第二个流程执行业务逻辑,响应被发送回主流程以发送到套接字。 (vi) 为了从入站端点访问套接字,我重写了 TCPMessageReceiver 类中的 preRouteMuleMessage 方法,并向 mulemessage 添加了一个名为“ClientSocket”的出站属性。然后,我将此属性传播到出站端点的主要流程的末尾。在 TCPOutbound 端点,我创建了自己的 TCPMessageDispatcher 并扩展了 doDispatch 方法。我没有使用出站端点线程池中的套接字,而是使用作为 mulemessage 一部分共享的套接字对象。 示例流程:

    <tcp:connector name="TCP" doc:name="TCP connector"
        clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0"
        sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0"
        validateConnections="true" keepAlive="true" sendTcpNoDelay="true" keepSendSocketOpen="true">
        <receiver-threading-profile
            maxThreadsActive="1" maxThreadsIdle="1" />
        <reconnect-forever />
        <service-overrides messageReceiver="CustomMessageReceiver" />
    
        <tcp:custom-protocol ref="CustomLengthProtocol" />
    </tcp:connector>
    
    <tcp:connector name="TCP2" doc:name="TCP connector"
        clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0"
        sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0"
        validateConnections="true" keepAlive="true" sendTcpNoDelay="true"
        keepSendSocketOpen="true">
        <receiver-threading-profile
            maxThreadsActive="1" maxThreadsIdle="1" />
        <reconnect-forever />
        <service-overrides dispatcherFactory="CustomMessageDispatcherFactory"/>
        <tcp:custom-protocol ref="CustomLengthProtocol" />
    
    </tcp:connector>
    
    
    <spring:beans>
        <spring:bean id="CustomLengthProtocol" name="CustomLengthProtocol"
            class="CustomLengthProtocol" />
    </spring:beans>
    <flow name="tcptestFlow" doc:name="tcptestFlow">
        <tcp:inbound-endpoint address="tcp://localhost:4444" 
            responseTimeout="100000"  doc:name="TCP" connector-ref="TCP" />
        <byte-array-to-string-transformer
            doc:name="Byte Array to String" />
        <logger level="INFO" category="Expression" doc:name="Logger" />
    
        <set-session-variable variableName="Socket"
            value="#[message.outboundProperties['ClientSocket']]"
            doc:name="Session Variable" />
        <logger message="#[payload] - #[Socket]" level="INFO" category="Request"
            doc:name="Logger" />
    
        <request-reply doc:name="Request-Reply">
            <vm:outbound-endpoint exchange-pattern="one-way"
                path="/qin" doc:name="VM" >
                <message-properties-transformer scope="outbound"> 
                    <delete-message-property key="MULE_REPLYTO"></delete-message-property>  
                </message-properties-transformer> 
              </vm:outbound-endpoint>
            <vm:inbound-endpoint exchange-pattern="one-way"
                path="/qout" doc:name="VM" />
        </request-reply>
        <logger message="#[payload]" level="INFO" doc:name="Logger"
            category="Response" />
        <string-to-byte-array-transformer
            doc:name="String to Byte Array" />
        <tcp:outbound-endpoint address="tcp://localhost:4444" 
            responseTimeout="10000" doc:name="TCP" connector-ref="TCP2" />
    
    </flow>
    <flow name="tcptestFlow1" processingStrategy="queued-asynchronous"
        doc:name="tcptestFlow1">
    
        <vm:inbound-endpoint exchange-pattern="one-way"
            path="/qin" doc:name="VM" />
        <logger message="Inside VM Flow" level="INFO" doc:name="Logger" />
        <set-payload value="Appended Response - #[payload]"
            doc:name="Set Payload" />
        <vm:outbound-endpoint exchange-pattern="one-way"
            path="/qout" doc:name="VM" />
    
    </flow>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-12-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-04-15
      • 1970-01-01
      • 2013-02-22
      • 1970-01-01
      相关资源
      最近更新 更多