【问题标题】:Spring Integration call another handler method after aggregationSpring Integration 在聚合后调用另一个处理程序方法
【发布时间】:2018-09-07 22:03:53
【问题描述】:

我正在开发一个系统,该系统将从目录中读取和处理文件。处理完所有文件后,它将调用一个方法,该方法又生成一个文件。此外,它应该根据文件名路由/处理文件,我也使用了 spring 集成路由器。下面是集成的代码sn-p。我的问题是,如果我删除.channel(aggregatorOutputChannel()).channel(confirmChannel()) 中的任何一行,这将不起作用,而且我必须在聚合器之前和之后保持相同的频道.channel(aggregatorOutputChannel())。为什么我需要所有 3 通道声明?如果这是错误的如何纠正它。

我正在使用 JDK 8、Spring 5、Spring boot 2.0.4。

@Configuration
@EnableIntegration
public class IntegrationConfig {

    @Value("${agent.demographic.input.directory}")
    private String inputDir;

    @Value("${agent.demographic.output.directory}")
    private String outputDir;

    @Value("${confirmationfile.directory}")
    private String confirmDir;

    @Value("${input.scan.frequency: 2}")
    private long scanFrequency;

    @Value("${processing.waittime: 6000}")
    private long messageGroupWaiting;

    @Value("${thread.corepoolsize: 10}")
    private int corepoolsize;

    @Value("${thread.maxpoolsize: 20}")
    private int maxpoolsize;

    @Value("${thread.queuecapacity: 1000}")
    private int queuedepth;

    @Bean
    public MessageSource<File> inputFileSource() {
        FileReadingMessageSource src = new FileReadingMessageSource();

        src.setDirectory(new File(inputDir));
        src.setAutoCreateDirectory(true);

        ChainFileListFilter<File> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilter(new AcceptOnceFileListFilter<>() );
        chainFileListFilter.addFilter(new RegexPatternFileListFilter("(?i)^.+\\.xml$"));
        src.setFilter(chainFileListFilter);
        return src;
    }

    @Bean
    public UnZipTransformer unZipTransformer() {
        UnZipTransformer unZipTransformer = new UnZipTransformer();
        unZipTransformer.setExpectSingleResult(false);
        unZipTransformer.setZipResultType(ZipResultType.FILE);
        unZipTransformer.setDeleteFiles(true);

        return unZipTransformer;
    }

    @Bean("agentdemographicsplitter")
    public UnZipResultSplitter splitter() {
        UnZipResultSplitter splitter = new UnZipResultSplitter();
        return splitter;
    }

    @Bean
    public DirectChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel aggregatorOutputChannel() {
        return new DirectChannel();
    }

    @Bean("confirmChannel")
    public DirectChannel confirmChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageHandler fileOutboundChannelAdapter() {
        FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(outputDir));
        adapter.setDeleteSourceFiles(true);
        adapter.setAutoCreateDirectory(true);
        adapter.setExpectReply(true);
        adapter.setLoggingEnabled(true);
        return adapter;
    }


    @Bean
    public MessageHandler confirmationfileOutboundChannelAdapter() {
        FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(confirmDir));
        adapter.setDeleteSourceFiles(true);
        adapter.setAutoCreateDirectory(true);
        adapter.setExpectReply(false);
        adapter.setFileNameGenerator(defaultFileNameGenerator() );
        return adapter;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corepoolsize);
        executor.setMaxPoolSize(maxpoolsize);
        executor.setQueueCapacity(queuedepth);
        return executor;
    }

    @Bean
    public DefaultFileNameGenerator defaultFileNameGenerator() {
        DefaultFileNameGenerator defaultFileNameGenerator = new DefaultFileNameGenerator();
        defaultFileNameGenerator.setExpression("payload.name");
        return defaultFileNameGenerator;
    }

    @Bean
    public IntegrationFlow confirmGeneration() {
        return IntegrationFlows.
                from("confirmChannel")
                .handle(confirmationfileOutboundChannelAdapter())
                .get();
    }

    @Bean
    public IntegrationFlow individualProcessor() {
        return flow -> flow.handle("thirdpartyIndividualAgentProcessor","processfile").channel(outputChannel()).handle(fileOutboundChannelAdapter());
    }

    @Bean
    public IntegrationFlow firmProcessor() {
        return flow -> flow.handle("thirdpartyFirmAgentProcessor","processfile").channel(outputChannel()).handle(fileOutboundChannelAdapter());
    }

    @Bean
    public IntegrationFlow thirdpartyAgentDemographicFlow() {
        return IntegrationFlows
                .from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
                .channel(MessageChannels.executor(taskExecutor()))
                .<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
                        .subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
                        .subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
                        )
                .channel(aggregatorOutputChannel())
                .aggregate(aggregator -> aggregator.groupTimeout(messageGroupWaiting).correlationStrategy(new CorrelationStrategy() {

                    @Override
                    public Object getCorrelationKey(Message<?> message) {
                        return "xyz";
                    }
                }))
                .channel(aggregatorOutputChannel())
                .handle("agentDemograpicOutput","generateAgentDemographicFile")
                .channel(confirmChannel())
                .get();
    }
}

下面是日志

2018-09-07 17:29:20.003 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : preSend on channel 'outputChannel', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=dd70999a-8b8d-93d2-1a43-a961ac2c339f, file_relativePath=18237232_firm.xml, timestamp=1536366560003}]
2018-09-07 17:29:20.003 DEBUG 10060 --- [ taskExecutor-2] o.s.i.file.FileWritingMessageHandler     : fileOutboundChannelAdapter received message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=dd70999a-8b8d-93d2-1a43-a961ac2c339f, file_relativePath=18237232_firm.xml, timestamp=1536366560003}]
2018-09-07 17:29:20.006 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'outputChannel', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=dd70999a-8b8d-93d2-1a43-a961ac2c339f, file_relativePath=18237232_firm.xml, timestamp=1536366560003}]
2018-09-07 17:29:20.006 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'firmProcessor.input', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1a867ae7, file_name=18237232_firm.xml, file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=0e6dcb75-db99-1740-7b58-e9b42bfbf603, file_relativePath=18237232_firm.xml, timestamp=1536366559761}]
2018-09-07 17:29:20.007 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : preSend on channel 'thirdpartyintgAgentDemographicFlow.channel#2', message: GenericMessage [payload=C:\thirdpartyintg\output\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=e6e2a30a-60b9-7cdd-84cc-4977d4c21c97, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366560007}]
2018-09-07 17:29:20.008 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'thirdpartyintgAgentDemographicFlow.channel#2', message: GenericMessage [payload=C:\thirdpartyintg\output\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=e6e2a30a-60b9-7cdd-84cc-4977d4c21c97, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366560007}]
2018-09-07 17:29:20.009 DEBUG 10060 --- [ taskExecutor-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'thirdpartyintgAgentDemographicFlow.subFlow#1.channel#0', message: GenericMessage [payload=C:\thirdpartyintg\input\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=13713de8-91ce-b1fa-f52d-450d3038cf9c, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366559757}]
2018-09-07 17:29:26.009  INFO 10060 --- [ask-scheduler-9] o.s.i.a.AggregatingMessageHandler        : Expiring MessageGroup with correlationKey[processdate]
2018-09-07 17:29:26.011 DEBUG 10060 --- [ask-scheduler-9] o.s.integration.channel.NullChannel      : message sent to null channel: GenericMessage [payload=C:\thirdpartyintg\output\17019222_individual.xml, headers={file_originalFile=C:\thirdpartyintg\input\17019222_individual.xml, id=c654076b-696f-25d4-bded-0a43d1a8ca97, file_name=17019222_individual.xml, file_relativePath=17019222_individual.xml, timestamp=1536366559927}]
2018-09-07 17:29:26.011 DEBUG 10060 --- [ask-scheduler-9] o.s.integration.channel.NullChannel      : message sent to null channel: GenericMessage [payload=C:\thirdpartyintg\output\18237232_firm.xml, headers={file_originalFile=C:\thirdpartyintg\input\18237232_firm.xml, id=e6e2a30a-60b9-7cdd-84cc-4977d4c21c97, file_name=18237232_firm.xml, file_relativePath=18237232_firm.xml, timestamp=1536366560007}]

【问题讨论】:

    标签: spring spring-boot spring-integration


    【解决方案1】:

    首先RegexPatternFileListFilter 应该在ChainFileListFilter 中排在第一位。这样您就不会在 AcceptOnceFileListFilter 中为您不感兴趣的文件占用内存。

    .channel(confirmChannel())thirdpartyAgentDemographicFlow 的末尾需要.channel(confirmChannel()),因为这是confirmGeneration 流的输入。

    我认为你 .channel(aggregatorOutputChannel()) 一点也不含蓄。 您也不需要在子流程中使用 .channel(outputChannel())

    这不行

    请详细说明:你得到了什么错误,它是如何工作的等等...... 您还可以为org.springframework.integration 共享一些调试日志,以确定您的消息如何传播。

    如果您在 GitHub 上分享一些简单的 Spring Boot 项目,让我们按照您提供的说明进行操作和重现,也会有很大帮助。

    更新

    我还注意到您的聚合器基于groupTimeout()。要使其将聚合消息发送到下游,您还需要在此处进行配置:

    /**
     * @param sendPartialResultOnExpiry the sendPartialResultOnExpiry.
     * @return the handler spec.
     * @see AbstractCorrelatingMessageHandler#setSendPartialResultOnExpiry(boolean)
     */
    public S sendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
    

    默认是false,所以你的消息确实会发送到NullChannel。 在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-routing-chapter.html#agg-and-group-to

    【讨论】:

    • 如果我没有 .channel(outputChannel()) fileOutboundChannelAdapter() 将如何将文件归档。
    • 如果我评论 .channel(aggregatorOutputChannel()) 句柄 .handle("agentDemograpicOutput","generateAgentDemographicFile") 不会被调用。我没有收到任何错误,但它什么也没做。
    • 我不知道您的流程中的 archive 是什么,但如果它们只是 DirectChannels:docs.spring.io/spring-integration/docs/current/reference/html/…
    • 存档会将文件从处理位置复制到另一个位置。 fileOutboundChannelAdapter() 从通道中获取文件对象
    • 好吧,你需要在你的代码中更加明确,我只是在你的代码中没有看到任何archive 字......
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-08-14
    • 2015-07-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多