【发布时间】:2022-01-02 02:46:03
【问题描述】:
我需要将日期从一个来源(并行)批量复制到另一个来源。
我这样做了:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.parallel(4)
.runOn(Schedulers.parallel())
.doOnNext(dataTarget::write)
.sequential()
.blockLast();
class dataSource{
public Item getNextItem(){
//...
}
}
class dataTarget{
public void write(List<Item> items){
//...
}
}
它并行接收数据,但一次写入一个。
我需要分批收集它们(比如按 10 个项目),然后编写批处理。
我该怎么做?
更新:
主要思想来源是适合高效发送消息的消息系统(即rabbitmq或nats),但目标是插入批处理效率更高的数据库。
所以最终的结果应该是——我并行接收消息,直到缓冲区没有填满,然后我将所有缓冲区一次性写入数据库。
在常规 java 中很容易做到,但在流的情况下——我不知道该怎么做。如何缓冲数据以及如何暂停读取器,直到写入器尚未准备好获取下一部分。
【问题讨论】:
-
“来源”是什么意思。什么样的来源?如果您显示 write 方法会有所帮助。
-
@lkatiforis 我从消息系统 (nats.io) 中并行读取消息,我需要每次插入将 1000 条记录放入数据库。更新了问题
标签: java reactive-programming project-reactor