【发布时间】:2016-07-07 17:21:43
【问题描述】:
我对项目反应器或反应式编程非常陌生,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程:
给定一个类实体:
Entity {
private Map<String, String> items;
public Map<String, String> getItems() {
return items;
}
}
- 从数据库中读取实体 (
ListenableFuture<Entity> readEntity()) - 对每个项目执行一些并行异步处理 (
boolean processItem(Map.Entry<String, String> item)) - 当所有完成调用 doneProcessing (
void doneProcessing(boolean b))
目前我的代码是:
handler = this;
Mono
.fromFuture(readEntity())
.doOnError(t -> {
notifyError(“some err-msg” , t);
return;
})
.doOnSuccess(e -> log.info("Got the Entity: " + e))
.flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
.all(handler::processItem)
.consume(handler::doneProcessing);
事情有效,但 handler::processItem 调用不会在所有项目上同时运行。我尝试将dispatchOn 和publishOn 与io 和async SchedulerGroup 以及各种参数一起使用,但调用仍然在一个线程上串行运行。
我做错了什么?
除此之外,我相信总体上可以改进上述内容,因此我们将不胜感激。
谢谢
【问题讨论】:
标签: java project-reactor