【发布时间】:2015-12-29 11:48:05
【问题描述】:
我认为 org.apache.synapse.transport.passthru.PassThroughHttpSender 是 wso2esb 4.8.1 中 http 的默认传输发送器(不确定 4.9.0 稍后会检查它)不会将借用的线程返回到工作池在某些情况下例外。
在我看来,当外部序列中发生异常时会发生这种情况(而不是在直接接受传入请求的代理中)。例如,当您使用存储和消息处理器实现存储转发模式时,就会发生这种情况。
这是我重现问题的简单 WSO2ESB 4.8.1 配置:
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
<registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
<parameter name="cachableDuration">15000</parameter>
</registry>
<import name="fileconnector"
package="org.wso2.carbon.connector"
status="enabled"/>
<proxy name="ProxyTest"
transports="http https"
startOnLoad="true"
trace="disable">
<target>
<inSequence>
<property name="FORCE_SC_ACCEPTED"
value="true"
scope="axis2"
type="STRING"/>
<log level="custom">
<property name="text" value="store message"/>
</log>
<store messageStore="TestXMS"/>
<log level="custom">
<property name="text" value="message stored"/>
</log>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
<localEntry key="ESBInstance">Test<description/>
</localEntry>
<endpoint name="HTTPEndpoint">
<http method="post" uri-template="http://localhost/index.php">
<timeout>
<duration>10</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
</markForSuspension>
</http>
</endpoint>
<sequence name="fault">
<log level="full">
<property name="MESSAGE" value="Executing default 'fault' sequence"/>
<property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
<property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
</log>
<drop/>
</sequence>
<sequence name="TestSequence">
<log level="full">
<property name="text" value="message recieved"/>
</log>
<call>
<endpoint key="HTTPEndpoint"/>
</call>
<log level="full">
<property name="text" value="message processed"/>
</log>
</sequence>
<sequence name="main">
<in>
<log level="full"/>
<filter source="get-property('To')" regex="http://localhost:9000.*">
<send/>
</filter>
</in>
<out>
<send/>
</out>
<description>The main sequence for the message mediation</description>
</sequence>
<messageStore name="TestXMS"/>
<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor"
name="TestMP"
messageStore="TestXMS">
<parameter name="interval">1000</parameter>
<parameter name="sequence">TestSequence</parameter>
<parameter name="concurrency">1</parameter>
<parameter name="is.active">true</parameter>
</messageProcessor>
</definitions>
将端点配置为无处发送请求,只需调用 TestProxy 20 次或更多次,以耗尽内部工作线程池...
收到第 20 个请求后,将接受新请求并将其存储在存储中,但工作池将耗尽,并且不会从消息存储中重试消息。
我认为 PassThroughHttpSender 必须有源代码可用...但是反编译 repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar 并查看内部以查看原因会更快问题。
我们应该看看 sendRequestContent 方法的内部:
synchronized (msgContext)
{
while ((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE"))) && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) {
try
{
log.info("msgContext before wait");
msgContext.wait();
log.info("msgContext after wait");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
这是进程堆栈出错的地方。当异常发生时(在另一个线程中)它“忘记”通知当前线程并且 msgContext 永远等待(实际上直到服务器重新启动)
所以我稍微修改了同一个包中的另一个类——DeliveryAgent,方法errorConnecting。此方法用于捕获来自用于连接目标主机的线程的回调...所以当我们捕获回调时,我们会通知 msgContext 并通过在 targetErrorHandler 之后添加新的同步块来通知它继续...
public void errorConnecting(HttpRoute route, int errorCode, String message)
{
Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route);
if (queue != null)
{
MessageContext msgCtx = (MessageContext)queue.poll();
if (msgCtx != null) {
this.targetErrorHandler.handleError(msgCtx, errorCode, "Error connecting to the back end", null, ProtocolState.REQUEST_READY);
synchronized (msgCtx)
{
log.info("errorConnecting: notify message context about error");
msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE);
msgCtx.notifyAll();
}
}
}
else
{
throw new IllegalStateException("Queue cannot be null for: " + route);
}
}
我做了一些测试,看起来它解决了“死”线程的问题。但是,我不确定这是否是一个适当的修复...欢迎任何建议..
链接到反编译和修改的源文件 - src.zip
【问题讨论】:
标签: java multithreading thread-safety wso2esb endpoint