【问题标题】:Project Reactor - combine two Publishers statefuly and emit the resultProject Reactor - 有状态地组合两个发布者并发出结果
【发布时间】:2019-05-29 19:39:28
【问题描述】:

我想用 Reactor 设计一个处理管道,如下所示。

我有两个输入发布者orderEntries(冷)和hotBroadcasts(热)。我想将hotBroadcasts 发出的项目聚合到(可变)内存数据结构中 - 比如说HashMap - 对于来自orderEntries 的每个项目我想从该地图中选择一个相应的元素,创建结果项目并推送到下游订阅者。

来自hotBroadcasts 的事件以任意顺序出现,这就是为什么我想将它们存储在内存中以便于检索。

从概念上讲,它应该像这样工作:

       orderEntries                      hotBroadcasts
           |                                   | 
           |                                   | 
           |                                   | 
           \                                   / 
            ----------------> <----------------
                   (aggregate events from hotBroadcasts)     
                             |
                             |
                        resulting item
                             |
                             |
                            \/
                      downstream subcriber  

到目前为止,我设法用 ReplayProcessor 勾勒出一个解决方案,由 Kotlin 伪示例说明:

val orderEntries = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
val hotBroadcasts = ReplayProcessor.create<String>(1000, false)

orderEntries.concatMap { entryId ->
    // problematic filter - skims through all that ReplayProcessor has cached
    hotBroadcasts.filter { broadcastId ->
        "Broadcast:$entryId" == broadcastId
    }
    .take(1)
    .map { "EntryId: $entryId, BroadcastId: $it" }
}.subscribe { LOG.info(it) }

Flux.interval(Duration.of(200, ChronoUnit.MILLIS))
        .concatMap { Flux.just(it, it - 100000) }
        .map { "Broadcast:$it" }
        .subscribe {
            hotBroadcasts.onNext(it)
        }

这里的问题是hotBroadcast 的过滤会浏览来自orderEntries 的每个项目的所有项目。因此我的想法是将它们存储在 HashMap 中。

谁能指点我正确的方向?

【问题讨论】:

  • 你怎么知道你什么时候聚合了所有东西?或者你会一直聚合,直到你收到关于冷通量的消息?无论哪种方式,似乎 takeWhile 都是您想要的?
  • 我不知道。我只是想使用一个时间窗口。
  • 为什么不那么windowcombineLatest?对于所述要求,您的解决方案看起来非常复杂,那么是什么让您的案例与众不同?
  • 能否请您发布一个代码示例?我不认为我的情况很特别,我实际上认为这一定是一个非常普遍的问题,但我不能真正解决这个问题。真的,困扰我的是hotBroadcasts.filter...的效率低下,仅此而已。
  • orderEntry 如何匹配 hotBroadcast?按他们的创建顺序,或者他们都有一些必须匹配的标签?

标签: java kotlin reactive-programming project-reactor


【解决方案1】:

可以聚合来自两个不同发布者的消息的对象是具有 2 个参数的异步过程调用。这样的调用可以在 rxjava 中使用io.reactivex.Single.zip(SingleSource arg1, SingleSource arg2, BiFunction func) 构造,或者在纯Java 中使用java.util.concurrent.CompletableFuture.thenCombine(CompletionStage arg2, BiFunction func)

您需要一个特殊的 HashMap 来保存异步过程调用。第一次使用给定标签访问此 HashMap 时,应自动创建调用。

所以一个 Publicher 调用了

asyncProc=callMap.get(label); // asyncProc is created and stored with the label as a key
asyncProc.arg1.complete(value);

和其他 Publicher 调用

asyncProc=callMap.get(label); // previously created instance returned
asyncProc.arg2.complete(value);

在两个发布者都提供了他们的参数之后,异步过程被执行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-06
    • 1970-01-01
    • 2021-04-24
    • 1970-01-01
    • 2019-11-30
    • 1970-01-01
    相关资源
    最近更新 更多