【发布时间】:2021-01-04 09:18:22
【问题描述】:
我正在尝试将 Apache Camel(版本 2.25.3)反应流与 Spring Boot 结合使用来读取大型 csv 文件并使用 Bindy 解组这些行。这是“工作”,因为应用程序运行并在文件出现时检测文件,但我只在流中看到文件的第一行。它似乎与 Bindy 相关,因为如果我从等式中解组,我可以很好地恢复流中 csv 文件的所有行。我已经简化了问题,以便在 SO 上演示。我正在使用 Spring Webflux 来公开生成的 Publisher。
所以我的骆驼路线如下:
import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@RequiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
private final CamelReactiveStreamsService camelRs;
@Override
public void configure() {
var bindy = new BindyCsvDataFormat(LineItem.class);
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.unmarshal(bindy)
.to("reactive-streams:lineItems");
}
public Flux<LineItem> getLineItemFlux() {
Publisher<LineItem> lineItems = camelRs.fromStream("lineItems", LineItem.class);
return Flux.from(lineItems);
}
}
Bindy 类:
@ToString
@Getter
@CsvRecord(separator = ";", skipFirstLine = true, skipField =true)
public class LineItem {
@DataField(pos = 2)
private String description;
}
以及暴露 Flux 的端点:
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
return lineItemFlux;
}
所以当我现在做卷曲时:
curl localhost:8080/lineItems
我只取回第一行,而当我删除“.unmarshal(bind)”行(并将流重构为 String 类型而不是 LineItem)时,我取回了 csv 文件的所有元素。
所以我想我在反应流上下文中没有正确使用 Bindy。我遵循了这个Camel documentation 并尝试将我的路线改写如下:
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.to("reactive-streams:rawLines");
from("reactive-streams:rawLines")
.unmarshal(bindy)
.to("reactive-streams:lineItems");
它表明路线已正确启动:
2021-01-04 10:13:26.798 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Total 2 routes, of which 2 are started
然后我收到一个异常,指出“该流没有有效的订阅”:
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport ] [ 9]
[route1 ] [to1 ] [reactive-streams:rawLines ] [ 5]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: The stream has no active subscriptions
at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
有没有人能指出我如何将 Bindy 与反应流结合使用?谢谢!
编辑
在 burki 的非常有用的帖子之后,我能够修复我的代码。所以路由定义变成了下面这样。如您所见,我删除了 unmarshal 步骤,因此它只是在文件到达时从文件系统中提取文件并将它们放入反应流中:
@Override
public void configure() {
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.to("reactive-streams:extractedFile");
}
然后将文件流公开为 Flux:
public Flux<File> getFileFlux() {
return Flux.from(camelRs.fromStream("extractedFile", File.class));
}
解析 CSV 的代码如下(按照 burki 的建议使用 OpenCSV,但使用 API 的不同部分):
private Flux<LineItem> readLineItems() {
return fileFlux
.flatMap(message -> Flux.using(
() -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
.withSeparator(';')
.withType(LineItem.class)
.build()
.stream(),
Flux::fromStream,
BaseStream::close)
);
}
private FileReader createFileReader(File file) {
System.out.println("Reading file from: " + file.getAbsolutePath());
try {
return new FileReader(file);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
您现在可以将此生成的 Flux 公开为端点:
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
return readLineItems();
}
现在,当您像我在上面所做的那样进行卷曲时,您会从 csv 中获得完整的未编组 LineItems。
无论这是否将整个文件实际加载到内存中,我仍然有待办事项。我不这么认为,我想我只得到一个指向文件的指针,然后我将其流式传输到 OpenCSV bean,但我需要验证这一点,可能是我现在首先将整个文件读入内存然后流式传输这会破坏目的。
【问题讨论】:
标签: java spring-boot apache-camel spring-webflux reactive-streams