【问题标题】:Error sending message to a QueueChannel from chain in spring integration在 Spring 集成中从链向 QueueChannel 发送消息时出错
【发布时间】:2015-11-30 02:20:42
【问题描述】:

我正在我的项目中使用 spring 集成。接下来是我的应用程序中的流程。当我向 "domainEventChannel" 发布消息时,它会通过转换器,然后将消息发送到 inputFromTransformer 通道,并且由于消息在链中,它会通过拆分器、转换器和过滤器。我已经配置了 output-channel="domainEventQueueChannel" ,这是一个队列通道。

我能够在过滤器之前跟踪消息,但它们永远不会到达 QueueChannel。

我不能从链中向队列通道发送消息吗?

<int:transformer ref="partitionlessTransformer" method="transform"
        input-channel="domainEventChannel" output-channel="inputFromTransformer" />

   <!-- Chain which has the output channel as queue channel-->
    <int:chain input-channel="inputFromTransformer"
        output-channel="domainEventQueueChannel">
        <int:splitter ref="messageSplitter" method="split" />
        <int:transformer ref="jsonToObjectTransformer" />
        <int:filter ref="autopayModelProcessTransFilter"></int:filter>
    </int:chain>

   <!-- Queue Channel-->
    <int:channel id="domainEventQueueChannel">
        <int:queue capacity="10" />
    </int:channel>

     <!-- service activator which polls queueChannel -->
    <int:service-activator id="domainEventReconProcessServiceActivator"
         input-channel="domainEventQueueChannel" ref="domainEventReconProcessServiceActivator" method="intiateAutopayProcessTransRecon">
        <int:poller task-executor="domainEventReconProcessServicetaskExecutor" fixed-rate="10" >
            </int:poller>
    </int:service-activator>

    <task:executor id="domainEventReconProcessServicetaskExecutor"
        pool-size="10" queue-capacity="10" />

更新 2#: 当我添加以下配置时,一切正常。消息从队列通道流向服务激活器:

    <int:chain input-channel="domainEventQueueChannel"
        output-channel="nullChannel">
        <int:service-activator id="domainEventReconProcessServiceActivator"
         ref="domainEventReconProcessServiceActivator" method="intiateAutopayProcessTransRecon" />
     <int:poller task-executor="domainEventReconProcessServicetaskExecutor"  fixed-rate="10"></int:poller> 
 </int:chain>

更新 3#: 链到位后,我看到服务激活器的轮询器轮询 queueChannel。但是没有链,轮询器根本没有启动,我根本看不到轮询器日志。

链条到位:

2015-09-08 09:29:31.438 DEBUG 12817 --- [ask-scheduler-8] o.s.integration.channel.QueueChannel     : preSend on channel 'domainEventQueueChannel', message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@6ac4fcc0, headers={sequenceNumber=1, correlationId=7a91b5fb-b1aa-464a-2e1b-5309652a0520, id=25ae85aa-e0df-6c72-9a54-2e8dac3c5ddc, sequenceSize=1, timestamp=1441718971437}]
    2015-09-08 09:29:31.438 DEBUG 12817 --- [ask-scheduler-8] o.s.integration.channel.QueueChannel     : postSend (sent=true) on channel 'domainEventQueueChannel', message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@6ac4fcc0, headers={sequenceNumber=1, correlationId=7a91b5fb-b1aa-464a-2e1b-5309652a0520, id=25ae85aa-e0df-6c72-9a54-2e8dac3c5ddc, sequenceSize=1, timestamp=1441718971437}]
    2015-09-08 09:29:31.438 DEBUG 12817 --- [ask-scheduler-8] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'inputFromTransformer', message: GenericMessage [payload=[{"eventInfo":{"eventId":"4535345353","eventName":"AutopayModelsExtracted","parentEventId":"4535345353"},"numOfAutopayModels":20,"message":"Published autopaymodels needed to be processed"}], headers={id=7a91b5fb-b1aa-464a-2e1b-5309652a0520, timestamp=1441718971410}]
    2015-09-08 09:29:31.438 DEBUG 12817 --- [ask-scheduler-8] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'domainEventChannel', message: GenericMessage [payload={domainevents={0=[{"eventInfo":{"eventId":"4535345353","eventName":"AutopayModelsExtracted","parentEventId":"4535345353"},"numOfAutopayModels":20,"message":"Published autopaymodels needed to be processed"}]}}, headers={id=438c63c8-1ed1-6516-110e-c3478aca35c8, timestamp=1441718971409}]
    2015-09-08 09:29:31.438 DEBUG 12817 --- [etaskExecutor-1] o.s.integration.channel.QueueChannel     : postReceive on channel 'domainEventQueueChannel', message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@6ac4fcc0, headers={sequenceNumber=1, correlationId=7a91b5fb-b1aa-464a-2e1b-5309652a0520, id=25ae85aa-e0df-6c72-9a54-2e8dac3c5ddc, sequenceSize=1, timestamp=1441718971437}]
    2015-09-08 09:29:31.438 DEBUG 12817 --- [etaskExecutor-1] o.s.i.endpoint.PollingConsumer           : Poll resulted in Message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@6ac4fcc0, headers={sequenceNumber=1, correlationId=7a91b5fb-b1aa-464a-2e1b-5309652a0520, id=25ae85aa-e0df-6c72-9a54-2e8dac3c5ddc, sequenceSize=1, timestamp=1441718971437}]
    2015-09-08 09:29:31.438 DEBUG 12817 --- [etaskExecutor-1] o.s.i.handler.MessageHandlerChain        : org.springframework.integration.handler.MessageHandlerChain#1 received message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@6ac4fcc0, headers={sequenceNumber=1, correlationId=7a91b5fb-b1aa-464a-2e1b-5309652a0520, id=25ae85aa-e0df-6c72-9a54-2e8dac3c5ddc, sequenceSize=1, timestamp=1441718971437}]
    2015-09-08 09:29:31.438 DEBUG 12817 --- [etaskExecutor-1] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@72193fb5] (org.springframework.integration.handler.MessageHandlerChain#1$child.domainEventReconProcessServiceActivator) received message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@6ac4fcc0, headers={sequenceNumber=1, correlationId=7a91b5fb-b1aa-464a-2e1b-5309652a0520, id=25ae85aa-e0df-6c72-9a54-2e8dac3c5ddc, sequenceSize=1, timestamp=1441718971437}]`

无链:

2015-09-08 09:48:02.572 DEBUG 13043 --- [ask-scheduler-1] o.s.i.splitter.MethodInvokingSplitter    : org.springframework.integration.splitter.MethodInvokingSplitter@53abfc07 received message: GenericMessage [payload=[{"eventInfo":{"eventId":"4535345353","eventName":"AutopayModelsExtracted","parentEventId":"4535345353"},"numOfAutopayModels":20,"message":"Published autopaymodels needed to be processed"}], headers={id=f477f025-e92f-f896-9ac4-2ce1b91fb895, timestamp=1441720082572}]
2015-09-08 09:48:02.574 DEBUG 13043 --- [ask-scheduler-1] o.s.i.t.MessageTransformingHandler       : org.springframework.integration.transformer.MessageTransformingHandler@2c8c16c0 received message: GenericMessage [payload={"eventInfo":{"eventId":"4535345353","eventName":"AutopayModelsExtracted","parentEventId":"4535345353"},"numOfAutopayModels":20,"message":"Published autopaymodels needed to be processed"}, headers={sequenceNumber=1, correlationId=f477f025-e92f-f896-9ac4-2ce1b91fb895, id=1364077a-6d5c-87f9-7d7c-3d144e45d661, sequenceSize=1, timestamp=1441720082574}]
2015-09-08 09:48:02.599 DEBUG 13043 --- [ask-scheduler-1] o.s.integration.filter.MessageFilter     : org.springframework.integration.filter.MessageFilter@80bfa9d received message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@fd57501, headers={sequenceNumber=1, correlationId=f477f025-e92f-f896-9ac4-2ce1b91fb895, id=c2a324c8-9eb0-66e0-73a1-5595ed87b3ae, sequenceSize=1, timestamp=1441720082599}]
hey I am in filter and my result would betrue
2015-09-08 09:48:02.599 DEBUG 13043 --- [ask-scheduler-1] o.s.integration.channel.QueueChannel     : preSend on channel 'domainEventQueueChannel', message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@fd57501, headers={sequenceNumber=1, correlationId=f477f025-e92f-f896-9ac4-2ce1b91fb895, id=c2a324c8-9eb0-66e0-73a1-5595ed87b3ae, sequenceSize=1, timestamp=1441720082599}]
2015-09-08 09:48:02.599 DEBUG 13043 --- [ask-scheduler-1] o.s.integration.channel.QueueChannel     : postSend (sent=true) on channel 'domainEventQueueChannel', message: GenericMessage [payload=com.autopayprocesstransrecon.resource.DomainEvent@fd57501, headers={sequenceNumber=1, correlationId=f477f025-e92f-f896-9ac4-2ce1b91fb895, id=c2a324c8-9eb0-66e0-73a1-5595ed87b3ae, sequenceSize=1, timestamp=1441720082599}]
2015-09-08 09:48:02.599 DEBUG 13043 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'inputFromTransformer', message: GenericMessage [payload=[{"eventInfo":{"eventId":"4535345353","eventName":"AutopayModelsExtracted","parentEventId":"4535345353"},"numOfAutopayModels":20,"message":"Published autopaymodels needed to be processed"}], headers={id=f477f025-e92f-f896-9ac4-2ce1b91fb895, timestamp=1441720082572}]
2015-09-08 09:48:02.599 DEBUG 13043 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'domainEventChannel', message: GenericMessage [payload={domainevents={0=[{"eventInfo":{"eventId":"4535345353","eventName":"AutopayModelsExtracted","parentEventId":"4535345353"},"numOfAutopayModels":20,"message":"Published autopaymodels needed to be processed"}]}}, headers={id=bf6f4f20-52bb-27e4-df97-4112ad4be3b5, timestamp=1441720082571}]
2015-09-08 09:48:03.616 DEBUG 13043 --- [ask-scheduler-3] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2015-09-08 09:48:04.634 DEBUG 13043 --- [ask-scheduler-4] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    这仅仅意味着消息没有通过过滤器。

    (过滤器正在丢弃消息)。

    【讨论】:

    • 嗨,加里。过滤器返回“true”,但消息仍然没有通过。当我在过滤器之后在链中添加服务激活器时,消息将发送到服务激活器
    • 在这种情况下,消息应该没有问题地到达队列通道;打开 DEBUG 日志记录并按照流中的消息进行操作。
    • 嗨,加里。经过进一步调查,我发现消息被传递到队列,但服务激活器从未接收到它们。我创建了服务激活器,以便多个线程可以处理队列通道中的消息,但看起来服务激活器不处理 queueChannel 中的这些消息。从服务激活器配置的角度来看,我做错了吗?
    • 当我将服务激活器移动到上面提到的 Update2# 下的链时,事情开始正常工作。那么我是否需要一个服务激活器链才能与 queueChannel 一起使用?
    • 否;它不需要在链中;调试日志是您的朋友。
    猜你喜欢
    • 2015-07-13
    • 2017-12-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-17
    • 2021-03-11
    • 2016-11-22
    相关资源
    最近更新 更多