【问题标题】:Spring integration MessageGroupStoreReaper global errorChannelSpring集成MessageGroupStoreReaper全局errorChannel
【发布时间】:2021-06-03 19:09:59
【问题描述】:

我正在使用 spring 集成聚合器和 MessageGroupStoreReaper,但不知何故错误没有到达全局 errorChannel。

    <int:aggregator id="agg"
                    ref="MyMsgsAggregator"
                    input-channel="myAggInputChannel"
                    output-channel="processInputChannel"
                    discard-channel="errorChannel"
                    method="aggMessages"
                    message-store="messageGroupStore"
                    send-partial-result-on-expiry="true"
                    expire-groups-upon-completion="true" />
                    
<bean id="messageGroupStore" class="org.springframework.integration.store.SimpleMessageStore" />

<task:scheduled-tasks>
        <task:scheduled ref="multipartAggregatorReaper" method="run" fixed-rate="5000" />
</task:scheduled-tasks>

如果在“processInputChannel”后有任何异常(例如到期时的部分结果),则异常未到达全局“errorChannel”。

即使我尝试用轮询器将任务计划的作业替换为入站通道适配器(如 @Gary 所建议的那样),但它仍然不起作用:

<int:inbound-channel-adapter channel="reaperChannel" ref="MyMsgsAggregator" method="triggerReaper"> 
<int:poller error-channel="**errorChannel**" fixed-rate="5000">         
</int:poller>

入站通道适配器>

请推荐

谢谢

【问题讨论】:

  • 第一步是打开o.s.integration的调试日志并跟踪流经通道的消息。
  • 请参阅我的答案以获取一些发现和解释。

标签: spring spring-integration integration aggregator


【解决方案1】:

您的问题在这里:send-partial-result-on-expiry="true"。这个选项确实和discard-channel是互斥的:

protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock) {
    ...
    if (this.sendPartialResultOnExpiry) {
        ...
        completeGroup(correlationKey, group, lock);
    }
    else {
        ...
        group.getMessages()
                .forEach(this::discardMessage);
    }
    ...
}

因此,收割后您的消息转到processInputChannel,而不是errorChannel,这并不奇怪。

另外,discardChannel 与错误无关。当我们收割并且没有从那里发送ErrorMessage 时,不会抛出异常。来自 reaped 组的所有单个消息都作为常规消息发送到配置的 discardChannel

【讨论】:

  • 您好 Artem,感谢您的回复。该问题与“丢弃通道”无关。我期望的是在任何异常(部分或完全聚合)的情况下的正常流程.. 来自“processInputChannel”的部分/完整聚合后的任何异常都不会进入全局错误通道。
  • 我还尝试按照 spring 文档中的建议添加 errorChannel 标头发布通道“processInputChannel”,但这也不起作用..与其他 spring 相比,此流程唯一不同的地方集成流程是通过任务调度器触发 MessageGroupStoreReaper(即使我尝试了使用 Poller 的入站适配器,但也没有用)
  • 嗯,另一种方法是建议使用ExpressionEvaluatingRequestHandlerAdvice 分隔:docs.spring.io/spring-integration/docs/current/reference/html/…。那里不会出现错误,因为 TaskScheduler 没有使用 MessagePublishingErrorHandler 增强。
  • 另见setForceReleaseAdviceChain()AbstractCorrelatingMessageHandler(虽然可能不适用于 XML 配置)...好吧,我明白你的意思了。需要考虑更多,但这确实取决于您如何启动该过程。最后,我们可能只是在聚合器上有一个errorChannel 选项来处理每个组的个别错误。因为现在看起来一个过期和错误组将跳过所有剩余的收获。在该拆分器上进行错误处理确实更好:将其视为常规 Java 循环和每个项目的处理。
  • 哇!我已经为聚合器建议了errorChannel :smile:!请提出一个 GH 问题,我们将与 Gary 一起看看我们能做些什么。感谢您的耐心等待!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-01-08
  • 2019-08-24
  • 1970-01-01
  • 2019-09-13
  • 1970-01-01
  • 2021-04-01
  • 2018-03-09
相关资源
最近更新 更多