【问题标题】:Reactor delayElements within a GroupedFlux delays elements across all groupsGroupedFlux 中的 Reactor delayElements 延迟所有组中的元素
【发布时间】:2021-10-29 01:35:16
【问题描述】:

我有一个用例,我想通过 PartitionKey 创建一堆 GroupedFlux,并在每个组中延迟 100 毫秒的元素。但是,我希望多个组同时开始。因此,如果有 3 个组,我预计每 100 毫秒发出 3 条消息。但是,使用以下代码,我每 100 毫秒只看到 1 条消息。

这是我期望工作的代码。

final Flux<GroupedFlux<String, TData>> groupedFlux =
        flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
        .flatMap(this::doWork)
        .doOnError(throwable -> log.error("error: ", throwable))
        .onErrorResume(e -> Mono.empty())
        .subscribe());

这是日志。

21:24:29.318   parallel-5]  : GroupByKey : 2
21:24:29.424   parallel-6]  : GroupByKey : 3
21:24:29.529   parallel-7]  : GroupByKey : 1
21:24:29.634   parallel-8]  : GroupByKey : 2
21:24:29.739   parallel-9]  : GroupByKey : 3
21:24:29.844  parallel-10]  : GroupByKey : 1
21:24:29.953  parallel-11]  : GroupByKey : 2
21:24:30.059  parallel-12]  : GroupByKey : 3
21:24:30.167   parallel-1]  : GroupByKey : 1

(查看每个日志语句之间几乎 100 毫秒的差异。1s 列是时间戳。

【问题讨论】:

    标签: project-reactor


    【解决方案1】:

    经过更多分析,我发现它运行良好。我的测试有不正确的 PartitionKey 数据,导致 single GroupedFlux。

    回答我自己的问题,以防有人怀疑 delayElements 在 groupedFlux 上的工作方式不同。它没有。

    【讨论】:

      猜你喜欢
      • 2018-07-13
      • 2017-10-09
      • 2021-04-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-08
      • 2011-02-22
      相关资源
      最近更新 更多