【发布时间】:2020-05-11 16:34:58
【问题描述】:
我有一个ParallelFlux,想在所有rails 中的所有组件都被消耗完时执行副作用操作。我试图使用.then()。
但无法理解如何使用它。
任何人都可以分享它的用法或在所有元素都通过 OnError,OnComplete 后执行副作用的方法吗?
指示代码:
RunTransformation 在转换中提供了 Parallel Flux,
OnCompletion 将记录标记为在单独的注册表中完成。
RunAction 对每个转换后的记录执行一些操作(彼此独立)。
RunError 处理错误。
这里我只想在最终完成时运行 RunCompletion,但必须按顺序执行,尽管消费者可以并行完成。
Mono.just(record)
.flatMap(RunTransformation::tranformParallel) //gives back ParallelFlux running on Schedulers.random()
.sequential()
.doOnTerminate(OnCompletion::markRecordProcessed)
.subscribe(
RunAction::execute,
RunError::handleError);
【问题讨论】:
-
顺序排列有什么问题?
-
消费者 RunAction.execute 本来可以并行运行,但现在它需要按顺序运行。
-
如果您需要在运行之前完成所有操作,它如何并行运行?如果它不需要一切,你可以在没有顺序的情况下运行 onComplete 。您需要提供 MRE,因为您所说的没有任何意义。
-
RunAction.execute() 可以并行运行。完成后,如果我收到一个信号,表明所有轨道的处理已完成,那么想要将原始记录标记为在注册表中处理。希望我现在说得通。
-
为了提供背景信息,我正在使用一条 kafka 消息并在所有处理完成后执行多个任务(RunAction.execute())(彼此没有链接),它需要发送一份手册感谢卡夫卡。如果未添加顺序,它将发送与并行轨道一样多的手动确认。如果添加了顺序,它将多次执行 RunAction.execute()。将检查我是否可以为此制作 MRE。
标签: java spring-webflux project-reactor