【问题标题】:Concurrent processing of project reactor's flux项目反应堆通量的并发处理
【发布时间】:2016-07-07 17:21:43
【问题描述】:

我对项目反应器或反应式编程非常陌生,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程:

给定一个类实体:

Entity {
    private Map<String, String> items;
    public Map<String, String> getItems() {
        return items;
    }
}
  1. 从数据库中读取实体 (ListenableFuture&lt;Entity&gt; readEntity())
  2. 对每个项目执行一些并行异步处理 (boolean processItem(Map.Entry&lt;String, String&gt; item))
  3. 当所有完成调用 doneProcessing (void doneProcessing(boolean b))

目前我的代码是:

handler = this;
Mono
    .fromFuture(readEntity())
    .doOnError(t -> {
        notifyError(“some err-msg” , t);
        return;
    })
    .doOnSuccess(e -> log.info("Got the Entity: " + e))
    .flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
    .all(handler::processItem)
    .consume(handler::doneProcessing);

事情有效,但 handler::processItem 调用不会在所有项目上同时运行。我尝试将dispatchOnpublishOnioasync SchedulerGroup 以及各种参数一起使用,但调用仍然在一个线程上串行运行。 我做错了什么?

除此之外,我相信总体上可以改进上述内容,因此我们将不胜感激。

谢谢

【问题讨论】:

    标签: java project-reactor


    【解决方案1】:

    您需要另一个 flatMap 来为每个单独的地图元素分叉和连接计算:

    Mono.fromFuture(readEntity())
    .flatMap(v -> Flux.fromIterable(v.getItems().entrySet()))
    .flatMap(v -> Flux.just(v)
                    .publishOn(SchedulerGroup.io())
                    .doOnNext(handler::processItem))
    .consume(handler::doneProcessing);
    

    【讨论】:

    • 感谢 @akarnokd 的建议,这听起来很有希望,但由于时间压力,我转向了 CompletableFutures,它看起来更简单且有合理的文档说明。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-06-11
    • 2021-11-15
    • 2019-02-17
    • 1970-01-01
    • 1970-01-01
    • 2018-09-01
    • 2019-10-23
    相关资源
    最近更新 更多