【问题标题】:Observable emitter generates too much pressure for HystrixCommandsObservable 发射器对 HystrixCommands 产生了太大的压力
【发布时间】:2018-07-27 05:38:29
【问题描述】:

我有一个发出文件行的 Observable(从 GCS 读取的许多 GB)。

return StringObservable.byLine(
    Observable.using(
        () -> storage.get(blobId).reader(),
            reader -> Observable.create(
                    new OnSubscribeReadChannel(reader, 64 * 1024)
                ),
            ReadChannel::close
    )
)

每一行都会导致多次(在某些情况下多次)调用各种数据库,所有这些调用都包含在 Hystrix 命令中。显然,这些线路最终压倒了 Hystrix 命令,电路开始打开,每个人都度过了糟糕的一天。

这大概就是我正在做的事情:

readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
            .map(this::deserializeLine)
            .flatMap(this::addDataToObjectFromSomeDb)
            .flatMap(this::writeObj)
            .map(Set::size)
            .reduce(0, (a, b) -> a + b)
            .toBlocking().single()

有没有办法可以施加一些背压,或者限制一次处理的行数或其他什么?

【问题讨论】:

  • 也许maxConcurrent 有帮助,在this answer 中描述。
  • 这很有趣。使用concatMap 而不是flatMap 会有效地序列化写入吗?
  • 从输出的角度来看,concatMapflatMap(maxConcurrency=1) 看起来是一样的。从输入的角度来看,concatMap 会预先获取至少 1 个项目,即使有 1 个内部源正在处理,而 flatMap(1) 仅在最后一个完成时才要求下一个内部源。
  • 你使用的是 RxJava 1.x 还是 2.x?
  • RxJava 1.x - Hystrix 被锁定了。

标签: java-8 rx-java observable hystrix


【解决方案1】:

你需要使用Emitter.BackpressureMode.BUFFER

BUFFER
Buffers (unbounded) all onNext calls until the downstream can consume them.

http://reactivex.io/RxJava/1.x/javadoc/index.html?rx/Emitter.BackpressureMode.html

【讨论】:

  • 这个问题是缓冲区可能会变得很多 GB,在某些时候必须限制阅读器。
猜你喜欢
  • 1970-01-01
  • 2016-12-19
  • 1970-01-01
  • 2014-08-06
  • 1970-01-01
  • 1970-01-01
  • 2011-12-05
  • 2023-04-08
相关资源
最近更新 更多