【问题标题】:Spring reactive file integrationSpring响应式文件集成
【发布时间】:2020-11-03 02:33:03
【问题描述】:

我正在尝试使用 spring-integration-file 来轮询目录并从放置在该目录中的文件创建反应流。这在大多数情况下都有效,但是当我放置文件但没有订户时,我会遇到异常。为了演示这个问题,我编写了一个小型演示应用程序:

import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.io.File;

@SpringBootApplication
@RestController
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public Publisher<Message<File>> reactiveSource() {
        return IntegrationFlows
                .from(Files.inboundAdapter(new File("."))
                                .patternFilter("*.csv"),
                        e -> e.poller(Pollers.fixedDelay(1000)))
                .channel("processFileChannel")
                .toReactivePublisher();
    }

    @GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> files() {
        return Flux.from(reactiveSource())
                .map(message -> message.getPayload().getAbsolutePath());
    }
}

因此,如果我现在对 localhost:8080/files 进行 curl,然后将 csv 文件放在项目的根目录中,一切都很好,我会看到文件的路径作为对我的 curl 的响应。但是当我不做卷曲然后将文件放在根目录中时,我得到以下异常:

java.lang.IllegalStateException: The [bean 'reactiveSource.channel#0'; defined in: 'com.example.demo.DemoApplication'; 
from source: 'bean method reactiveSource'] doesn't have subscribers to accept messages
        at org.springframework.util.Assert.state(Assert.java:97)
        at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
        ... 38 more

我认为响应式流的属性之一是,当没有订阅者时,由于流是惰性的,因此流不会启动。但显然情况并非如此。如果没有订阅者,有人可以解释我需要做什么才能让流无法启动吗?

【问题讨论】:

    标签: spring-integration reactive


    【解决方案1】:

    如果您使用最新版本之一,那么您可以使用FluxMessageChannel 频道而不是DirectChannel 来代替"processFileChannel"。这样,SourcePollingChannel 适配器将变为反应式,并且确实不会轮询源,直到该 FluxMessageChannel 发生订阅。

    然后,您可以从此FluxMessageChannel 在您的files() API 中创建一个Flux - 在.toReactivePublisher() 中不需要。

    在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#source-polling-channel-adapter

    关键是.toReactivePublisher() 恰好在此时将集成流作为Publisher。在此之前的一切都是常规的、命令式的,并且独立于下游逻辑工作。

    更新

    类似这样的:

    @Bean
    FluxMessageChannel filesChannel() {
        return new FluxMessageChannel();
    }
    
    @Bean
    public IntegrationFlow reactiveSource() {
            return IntegrationFlows
                    .from(Files.inboundAdapter(new File("."))
                                    .patternFilter("*.csv"),
                            e -> e.poller(Pollers.fixedDelay(1000)))
                    .channel(filesChannel())
                    .get();
        }
    
    @GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> files() {
            return Flux.from(filesChannel())
                    .map(message -> ((File) message.getPayload()).getAbsolutePath());
        }
    

    【讨论】:

    • 您好 Artem,非常感谢您的回复。您能否发布一些示例代码,我应该如何做到这一点?我已经非常深入地阅读了文档,但我无法从中获得帮助。谢谢!
    • 在我的回答中查看更新。
    • 感谢 Artem,非常感谢!
    • 嗨,Artem,如果可以的话,还有一个后续问题。我注意到在我关闭订阅者之后,例如通过终止 curl 调用,如果我在轮询目录中放置另一个 csv 文件,那么我再次收到关于没有订阅者接受消息的相同错误消息。似乎当客户端关闭其连接时,这并不意味着订阅被取消。您能否说明我将如何取消上游订阅?
    • 是的......这就是FluxMessageChannel 的实现方式:一旦订阅 - 直到应用程序结束才会取消。我们可能应该考虑在那里进行一些改进......随意提出GH问题!
    猜你喜欢
    • 1970-01-01
    • 2018-11-18
    • 2021-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-20
    • 1970-01-01
    相关资源
    最近更新 更多