【发布时间】:2018-07-27 05:38:29
【问题描述】:
我有一个发出文件行的 Observable(从 GCS 读取的许多 GB)。
return StringObservable.byLine(
Observable.using(
() -> storage.get(blobId).reader(),
reader -> Observable.create(
new OnSubscribeReadChannel(reader, 64 * 1024)
),
ReadChannel::close
)
)
每一行都会导致多次(在某些情况下多次)调用各种数据库,所有这些调用都包含在 Hystrix 命令中。显然,这些线路最终压倒了 Hystrix 命令,电路开始打开,每个人都度过了糟糕的一天。
这大概就是我正在做的事情:
readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
.map(this::deserializeLine)
.flatMap(this::addDataToObjectFromSomeDb)
.flatMap(this::writeObj)
.map(Set::size)
.reduce(0, (a, b) -> a + b)
.toBlocking().single()
有没有办法可以施加一些背压,或者限制一次处理的行数或其他什么?
【问题讨论】:
-
也许
maxConcurrent有帮助,在this answer 中描述。 -
这很有趣。使用
concatMap而不是flatMap会有效地序列化写入吗? -
从输出的角度来看,
concatMap和flatMap(maxConcurrency=1)看起来是一样的。从输入的角度来看,concatMap会预先获取至少 1 个项目,即使有 1 个内部源正在处理,而flatMap(1)仅在最后一个完成时才要求下一个内部源。 -
你使用的是 RxJava 1.x 还是 2.x?
-
RxJava 1.x - Hystrix 被锁定了。
标签: java-8 rx-java observable hystrix