【发布时间】:2019-03-05 08:32:07
【问题描述】:
我们正在尝试使用响应式编程(大约 100 万个文档)将数据从一个存储桶复制到另一个存储桶。我们在这段代码中得到了 OOM。我不是 rxjava 专家,需要帮助以防止 OOM。我认为读取发生的速度比写入快,并且由于缓冲区已满而导致 OOM。代码如下:
CountDownLatch countDownLatch5 = new CountDownLatch(1);
Observable
.from(n1qlKeysForDocsGPC)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String key) {
return readPrimaryMainAsyncBucket
.get(key, 10, TimeUnit.SECONDS)
.onErrorResumeNext(readPrimaryMainAsyncBucket.get(key, 10, TimeUnit.SECONDS))
.retry(50)
.switchIfEmpty(Observable.empty())
.onErrorResumeNext(Observable.empty());
}
})
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(JsonDocument jsonDocument) {
return readPrimaryBackupAsyncBucket.upsert(jsonDocument, 10, TimeUnit.SECONDS).retry(50);
}
})
.last()
.doOnTerminate(new Action0() {
@Override
public void call() {
countDownLatch5.countDown();
}
})
.subscribe();
try {
countDownLatch5.await();
logger.info("DataRecoverySchedulers | Completed countDownLatch5");
} catch (InterruptedException e) {
e.printStackTrace();
}
【问题讨论】:
标签: rx-java couchbase reactive