【发布时间】:2020-11-03 02:33:03
【问题描述】:
我正在尝试使用 spring-integration-file 来轮询目录并从放置在该目录中的文件创建反应流。这在大多数情况下都有效,但是当我放置文件但没有订户时,我会遇到异常。为了演示这个问题,我编写了一个小型演示应用程序:
import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.io.File;
@SpringBootApplication
@RestController
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Publisher<Message<File>> reactiveSource() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("."))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.channel("processFileChannel")
.toReactivePublisher();
}
@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
return Flux.from(reactiveSource())
.map(message -> message.getPayload().getAbsolutePath());
}
}
因此,如果我现在对 localhost:8080/files 进行 curl,然后将 csv 文件放在项目的根目录中,一切都很好,我会看到文件的路径作为对我的 curl 的响应。但是当我不做卷曲然后将文件放在根目录中时,我得到以下异常:
java.lang.IllegalStateException: The [bean 'reactiveSource.channel#0'; defined in: 'com.example.demo.DemoApplication';
from source: 'bean method reactiveSource'] doesn't have subscribers to accept messages
at org.springframework.util.Assert.state(Assert.java:97)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
... 38 more
我认为响应式流的属性之一是,当没有订阅者时,由于流是惰性的,因此流不会启动。但显然情况并非如此。如果没有订阅者,有人可以解释我需要做什么才能让流无法启动吗?
【问题讨论】:
标签: spring-integration reactive