【问题标题】:RxJava Operator that dynamically buffers backpressured elements and emits them in batchesRxJava Operator,动态缓冲背压元素并分批发出它们
【发布时间】:2018-11-14 15:09:04
【问题描述】:

我有一个 Flowable,它发出需要由需要元素数组的昂贵操作处理的事件:

Flowable<T> src
void expensiveOp(List<T> batch)

除了使用常量窗口之外,我想指定一个 max 元素的窗口,该窗口在下游繁忙时填充,而在满时只是背压:

int maxSize = 1024
src.dynamicWindow(maxSize).subscribe(expensiveOp)

因此,窗口的大小既不是常数时间也不是元素,而是取决于背压。当订阅者准备好处理下一个元素时,应该刷新缓冲区。

我缺少什么重载方法?

可能的扩展将是一个 minSize 参数和一个重试机制,该机制通过增加的窗口重试。

【问题讨论】:

标签: rx-java rx-java2 reactivex


【解决方案1】:

我最近遇到了这个问题,这是我的原始答案:https://stackoverflow.com/a/55136139/883330

快速回答。我使用基于时间和计数的缓冲区来支持背压的实现:https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-02
    • 1970-01-01
    • 1970-01-01
    • 2020-04-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多