【问题标题】:Execute a task on completion of ParallelFlux without executing sequential (introduce a side affect)在 ParallelFlux 完成时执行任务而不执行顺序(引入副作用)
【发布时间】:2020-05-11 16:34:58
【问题描述】:

我有一个ParallelFlux,想在所有rails 中的所有组件都被消耗完时执行副作用操作。我试图使用.then()

但无法理解如何使用它。

任何人都可以分享它的用法或在所有元素都通过 OnError,OnComplete 后执行副作用的方法吗?

指示代码:

RunTransformation 在转换中提供了 Parallel Flux,

OnCompletion 将记录标记为在单独的注册表中完成。

RunAction 对每个转换后的记录执行一些操作(彼此独立)。

RunError 处理错误。

这里我只想在最终完成时运行 RunCompletion,但必须按顺序执行,尽管消费者可以并行完成。

   Mono.just(record)
       .flatMap(RunTransformation::tranformParallel)   //gives back ParallelFlux running on Schedulers.random()
       .sequential()
       .doOnTerminate(OnCompletion::markRecordProcessed)
       .subscribe(
            RunAction::execute, 
            RunError::handleError);

【问题讨论】:

  • 顺序排列有什么问题?
  • 消费者 RunAction.execute 本来可以并行运行,但现在它需要按顺序运行。
  • 如果您需要在运行之前完成所有操作,它如何并行运行?如果它不需要一切,你可以在没有顺序的情况下运行 onComplete 。您需要提供 MRE,因为您所说的没有任何意义。
  • RunAction.execute() 可以并行运行。完成后,如果我收到一个信号,表明所有轨道的处理已完成,那么想要将原始记录标记为在注册表中处理。希望我现在说得通。
  • 为了提供背景信息,我正在使用一条 kafka 消息并在所有处理完成后执行多个任务(RunAction.execute())(彼此没有链接),它需要发送一份手册感谢卡夫卡。如果未添加顺序,它将发送与并行轨道一样多的手动确认。如果添加了顺序,它将多次执行 RunAction.execute()。将检查我是否可以为此制作 MRE。

标签: java spring-webflux project-reactor


【解决方案1】:

如下使用 .then()。

Mono.just(record)
        .flatMap(RunTransformation::tranformParallel)   //gives back ParallelFlux running on Schedulers.random()
        .doOnNext(RunAction::execute)
        .doOnError(RunError::handleError)
        .then()
        .doOnTerminate(() -> {System.out.println("all rails completed");})
        .subscribe(); 

【讨论】:

  • 取决于你想在哪里执行副作用。如果在主线程中执行,那么肯定需要主线程阻塞
  • 并行数未知,所以没有运气。我认为 ParallelFlux 有一个 .then()。但找不到任何实现。
  • 是的,使用 .then() 你可以做到。用 then() 编辑了答案,希望它有效。
  • 编辑了答案以匹配有效的解决方案删除了​​注释的顺序和额外的 doOnTerminate()
【解决方案2】:

取自文档

如果在并行处理序列后,您想恢复到“正常”Flux 并以顺序方式应用运算符链的其余部分,则可以在 ParallelFlux 上使用 sequence() 方法。

我认为doOnComplete 是您正在寻找的。

Flux.range(1, 10)
        .parallel(3)
        .runOn(Schedulers.parallel())
        .doOnNext(i -> System.out.println(Thread.currentThread().getName() + " -> " + i))
        .sequential()
        .doOnComplete(() -> System.out.println("All parallel work is done"))
        .subscribe()

这会产生输出:

parallel-1 -> 1
parallel-2 -> 2
parallel-3 -> 3
parallel-2 -> 5
parallel-3 -> 6
parallel-1 -> 4
parallel-1 -> 7
parallel-2 -> 8
parallel-3 -> 9
parallel-1 -> 10
All parallel work is done

Reactor documentation on parallel flux

【讨论】:

    猜你喜欢
    • 2011-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-16
    相关资源
    最近更新 更多