【问题标题】:WSO2ESB 4.8.1 PassThroughHttpSender does not return thread to worker pool on exceptionWSO2ESB 4.8.1 PassThroughHttpSender 不会在异常时将线程返回到工作池
【发布时间】:2015-12-29 11:48:05
【问题描述】:

我认为 org.apache.synapse.transport.passthru.PassThroughHttpSender 是 wso2esb 4.8.1 中 http 的默认传输发送器(不确定 4.9.0 稍后会检查它)不会将借用的线程返回到工作池在某些情况下例外。

在我看来,当外部序列中发生异常时会发生这种情况(而不是在直接接受传入请求的代理中)。例如,当您使用存储和消息处理器实现存储转发模式时,就会发生这种情况。

这是我重现问题的简单 WSO2ESB 4.8.1 配置:

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
   <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
      <parameter name="cachableDuration">15000</parameter>
   </registry>
   <import name="fileconnector"
           package="org.wso2.carbon.connector"
           status="enabled"/>
   <proxy name="ProxyTest"
          transports="http https"
          startOnLoad="true"
          trace="disable">
      <target>
         <inSequence>
            <property name="FORCE_SC_ACCEPTED"
                      value="true"
                      scope="axis2"
                      type="STRING"/>
            <log level="custom">
               <property name="text" value="store message"/>
            </log>
            <store messageStore="TestXMS"/>
            <log level="custom">
               <property name="text" value="message stored"/>
            </log>
         </inSequence>
         <outSequence/>
         <faultSequence/>
      </target>
   </proxy>
   <localEntry key="ESBInstance">Test<description/>
   </localEntry>
   <endpoint name="HTTPEndpoint">
      <http method="post" uri-template="http://localhost/index.php">
         <timeout>
            <duration>10</duration>
            <responseAction>fault</responseAction>
         </timeout>
         <suspendOnFailure>
            <errorCodes>-1</errorCodes>
            <initialDuration>0</initialDuration>
            <progressionFactor>1.0</progressionFactor>
            <maximumDuration>0</maximumDuration>
         </suspendOnFailure>
         <markForSuspension>
            <errorCodes>-1</errorCodes>
         </markForSuspension>
      </http>
   </endpoint>
   <sequence name="fault">
      <log level="full">
         <property name="MESSAGE" value="Executing default 'fault' sequence"/>
         <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
         <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
      </log>
      <drop/>
   </sequence>
   <sequence name="TestSequence">
      <log level="full">
         <property name="text" value="message recieved"/>
      </log>
      <call>
         <endpoint key="HTTPEndpoint"/>
      </call>
      <log level="full">
         <property name="text" value="message processed"/>
      </log>
   </sequence>
   <sequence name="main">
      <in>
         <log level="full"/>
         <filter source="get-property('To')" regex="http://localhost:9000.*">
            <send/>
         </filter>
      </in>
      <out>
         <send/>
      </out>
      <description>The main sequence for the message mediation</description>
   </sequence>
   <messageStore name="TestXMS"/>
   <messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor"
                     name="TestMP"
                     messageStore="TestXMS">
      <parameter name="interval">1000</parameter>
      <parameter name="sequence">TestSequence</parameter>
      <parameter name="concurrency">1</parameter>
      <parameter name="is.active">true</parameter>
   </messageProcessor>
</definitions>

将端点配置为无处发送请求,只需调用 TestProxy 20 次或更多次,以耗尽内部工作线程池...

收到第 20 个请求后,将接受新请求并将其存储在存储中,但工作池将耗尽,并且不会从消息存储中重试消息。

我认为 PassThroughHttpSender 必须有源代码可用...但是反编译 repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar 并查看内部以查看原因会更快问题。

我们应该看看 sendRequestContent 方法的内部:

synchronized (msgContext)
      {
        while ((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE"))) && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) {
          try
          {
            log.info("msgContext before wait");
            msgContext.wait();
            log.info("msgContext after wait");
          }
          catch (InterruptedException e)
          {
            e.printStackTrace();
          }
        }

这是进程堆栈出错的地方。当异常发生时(在另一个线程中)它“忘记”通知当前线程并且 msgContext 永远等待(实际上直到服务器重新启动)

所以我稍微修改了同一个包中的另一个类——DeliveryAgent,方法errorConnecting。此方法用于捕获来自用于连接目标主机的线程的回调...所以当我们捕获回调时,我们会通知 msgContext 并通过在 targetErrorHandler 之后添加新的同步块来通知它继续...

public void errorConnecting(HttpRoute route, int errorCode, String message)
  {
    Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route);
    if (queue != null)
    {
      MessageContext msgCtx = (MessageContext)queue.poll();
      if (msgCtx != null) {
        this.targetErrorHandler.handleError(msgCtx, errorCode, "Error connecting to the back end", null, ProtocolState.REQUEST_READY);
        synchronized (msgCtx)
        {
            log.info("errorConnecting: notify message context about error");
            msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE);
            msgCtx.notifyAll();
        }
      }
    }
    else
    {
      throw new IllegalStateException("Queue cannot be null for: " + route);
    }
  }

我做了一些测试,看起来它解决了“死”线程的问题。但是,我不确定这是否是一个适当的修复...欢迎任何建议..

链接到反编译和修改的源文件 - src.zip

【问题讨论】:

    标签: java multithreading thread-safety wso2esb endpoint


    【解决方案1】:

    我找到了问题的原因。如果后端应用程序接受请求并且不向 ESB 回复任何内容,则会发生这种情况。结果,连接因超时而关闭。在这种情况下,ESB 工作线程不会返回到线程池中。

    为了克服这个问题,需要修补 org.apache.synapse.transport.passthru.TargetErrorHandler java 类

    synchronized (mc){ 
       log.info("notify message context about error");
       mc.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE); 
       mc.notifyAll(); 
    }
    

    把它放在 catch 块之前。

    它将正确地将消息上下文设置为有错误并通知所有等待的对象,这反过来又允许 ESB 在线程池中返回请求处理线程。这一切都适用于 4.8.1。

    我已对其进行了修补并将其置于生产环境中,该环境在 2016 年 1 月每天处理数百万个请求。从那以后没有任何问题

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-12-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-10-11
      • 2016-01-08
      相关资源
      最近更新 更多