【发布时间】:2021-03-02 14:05:21
【问题描述】:
我正在向网络服务发送请求,将结果转换为大的 csv 并将 csv 行保存到数据库中。
由于请求长时间运行(10-20 秒),我想并行化请求。
我将所有数据收集在一个包含转换后的 csv 行的 StringBuilder 中。
问题:如果在 csv 中达到我的 1000 行块,我如何将数据取出以进行持久化,而任何其他并发响应将写入新的 StringBuilder?
因为,流的最终变量无法重新初始化。
final StringBuilder sb = new StringBuilder();
AtomicInteger count = new AtomicInteger();
Flux.fromIterable(requests)
.flatMap(req -> {
return webClientService.send(req); //assume long running response
}, 8) //send 8 requests in parallel, as response takes up to 10s
.map(rsp -> {
//convert response to csv values and add to StringBuilder
int c = addCsv(sb, rsp);
if (count.addAndGet(c) > 1000) {
//TODO how can I assign a new StringBuilder,
//so that all further finished responses will append the csv to the new builder?
//same problem with the counter.
databaseWriter.write(sb.build()); //writes the content so far to db, but not threadsafe so far
}
return c;
})
.blockLast();
【问题讨论】:
-
addCsv做了什么,存储的格式是什么样的。你得到了你不满意的答案,因为你的解释不够清楚。如果您想避免某些事情等,请写出ALL的要求,这样我们就不必猜测了。 -
如所写,
addCsv将响应的结果作为 csv 行添加到StringBuilder。
标签: java spring java-stream spring-webflux project-reactor