【问题标题】:Fork join pattern for parallel processing mule esb throwing NullPointer exception用于并行处理 mule esb 的叉连接模式抛出 NullPointer 异常
【发布时间】:2016-06-23 10:58:35
【问题描述】:

我正在使用 fork-join 模式来实现批处理中的并行处理。

我参考了以下问题: Mule File Inbound Flow : Control Number of threads

由于我的输入文件夹中确实有太多文件,但我需要实现并行处理。因此,想到了使用这种模式。这是我的配置流程。

            <quartz:connector name="Quartz1" validateConnections="true" doc:name="Quartz">
                    <receiver-threading-profile maxThreadsActive="1"/>
           </quartz:connector>
            <flow name="Mainflow" processingStrategy="synchronous">
          <quartz:inbound-endpoint jobName="EventGeneration" repeatInterval="1000" connector-ref="Quartz1" responseTimeout="10000" doc:name="Quartz">
                    <quartz:event-generator-job/>
                </quartz:inbound-endpoint>

                <mulerequester:request-collection config-ref="Mule_Requester" resource="file:///FileLocation?connector=FileMRTransformer" count="3" doc:name="Mule Requester"/>
                <expression-filter expression="#[payload.size() != 0]" doc:name="Expression"/>
        <request-reply doc:name="Request-Reply" timeout="300000">
                    <processor-chain doc:name="Processor Chain">
                        <collection-splitter doc:name="Collection Splitter"/>
                        <vm:outbound-endpoint exchange-pattern="one-way"  doc:name="VM" connector-ref="VM" path="Batchinput" />
                    </processor-chain>
                    <vm:inbound-endpoint exchange-pattern="one-way"  doc:name="VM" connector-ref="VM" path="Batchoutput">
                        <message-properties-transformer>
                            <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3" />
                        </message-properties-transformer>
                        <collection-aggregator />
                    </vm:inbound-endpoint>
    </request-reply>
</flow>

<batch:job name="BatchDemo" max-failed-records="-1">
        <batch:input>
            <vm:inbound-endpoint exchange-pattern="one-way" path="Batchinput" connector-ref="VM" doc:name="VM"/>
....
required processing.....
.
.
<batch:on-complete>
 <vm:outbound-endpoint exchange-pattern="one-way"  doc:name="VM" connector-ref="VM" path="Batchoutput"/>
</batch:on-complete>

一旦控件进入请求-回复范围,就会抛出以下异常:

ERROR 2016-06-23 10:32:56,190 [scheduler-multithreadint019.1.2_productindexing_hybris_fh_Worker-1] org.mule.exception.CatchMessagingExceptionStrategy: 
********************************************************************************
Message               : null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList
Type                  : org.mule.api.MessagingException
Code                  : MULE_ERROR--2
Payload               : [[B@26589e4d, [B@400e4e6, [B@56b3ba17]
JavaDoc               : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html
********************************************************************************
Exception stack is:
1. null (java.lang.NullPointerException)
  java.util.concurrent.ConcurrentHashMap:-1 (null)
2. null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList (org.mule.api.MessagingException)
  org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html)
********************************************************************************
Root Exception stack trace:
java.lang.NullPointerException
    at java.util.concurrent.ConcurrentHashMap.hash(Unknown Source)
    at java.util.concurrent.ConcurrentHashMap.put(Unknown Source)
    at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:85)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)

为请求-回复处理器中的请求部分尝试了备用配置:

 <vm:outbound-endpoint exchange-pattern="one-way"  doc:name="VM" connector-ref="VM" path="Batchinput">
                <collection-splitter />
            </vm:outbound-endpoint>

但它导致了同样的异常。

只有在使用 MuleRequester 时,我才会收到此异常。如果我使用一些 java sn-p 返回 文件对象的集合,我没有收到此异常,并且控件按预期进入批处理流。但是,我在批处理的输入阶段确实有一个转换器(DataWeave),并且我的转换器无法解析这个文件对象(比如 java.io .filejava.io.FileInputStream)。因此使用 MuleRequester 以便我可以启用流式传输。

我不确定在使用这个 Mulerquester 时出了什么问题??

【问题讨论】:

    标签: multithreading parallel-processing mule batch-processing fork-join


    【解决方案1】:

    如果有人感兴趣,请找到它的替代方案。

    使用轮询组件,使用 Java 返回文件名集合(而不是文件对象)。使用相同的请求回复模式。但是,这一次我使用 MuleRequester 在批处理的 input 阶段从文件中检索有效负载。将此 MuleRequester 的文件名作为输入路径。

    这样运行正常。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-04
      • 1970-01-01
      • 2014-11-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多