【问题标题】:OOM issue while copying couchbase documents from one bucket to another bucket using reactive programming使用反应式编程将 couchbase 文档从一个存储桶复制到另一个存储桶时出现 OOM 问题
【发布时间】:2019-03-05 08:32:07
【问题描述】:

我们正在尝试使用响应式编程(大约 100 万个文档)将数据从一个存储桶复制到另一个存储桶。我们在这段代码中得到了 OOM。我不是 rxjava 专家,需要帮助以防止 OOM。我认为读取发生的速度比写入快,并且由于缓冲区已满而导致 OOM。代码如下:

CountDownLatch countDownLatch5 = new CountDownLatch(1);
Observable
        .from(n1qlKeysForDocsGPC)
        .flatMap(new Func1<String, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(String key) {
                return readPrimaryMainAsyncBucket
                        .get(key, 10, TimeUnit.SECONDS)
                        .onErrorResumeNext(readPrimaryMainAsyncBucket.get(key, 10, TimeUnit.SECONDS))
                        .retry(50)
                        .switchIfEmpty(Observable.empty())
                        .onErrorResumeNext(Observable.empty());
            }
        })
        .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(JsonDocument jsonDocument) {
                return readPrimaryBackupAsyncBucket.upsert(jsonDocument, 10, TimeUnit.SECONDS).retry(50);
            }
        })
        .last()
        .doOnTerminate(new Action0() {
            @Override
            public void call() {
                countDownLatch5.countDown();
            }
        })
        .subscribe();
try {
    countDownLatch5.await();
    logger.info("DataRecoverySchedulers | Completed countDownLatch5");
} catch (InterruptedException e) {
    e.printStackTrace();
}

【问题讨论】:

    标签: rx-java couchbase reactive


    【解决方案1】:

    3.x 之前的 Couchbase Java SDK 版本(在撰写本文时尚未发布)使用 RxJava 版本 1。

    flatmap 调用,正如您现在所拥有的那样,会将操作发布到内部缓冲区以异步执行,返回一个 Observable 以跟踪每个操作。这意味着第一个 flatmap 将以无限制的方式消耗您的 from 调用的输出。换句话说,它将比操作发生得更快地读取整个列表。我预计您看到的 OOM 错误是由于 Couchbase 内部缓冲区溢出所致。

    要更正此问题,您可以使用flatmap 的变体来限制未完成订阅的数量。您只需在 flatmap 调用中添加第二个整数参数。因此,您需要 .flatmap(new Func1&lt;~&gt;..., 10) 一次将自己限制为 10 个出色的操作。

    Couchbase 中的默认缓冲区约为 16000 个未完成的操作,但这远远超过了大多数系统饱和所需的数量。

    作为参考,请参阅此相关的Stack Overflow post 关于限制文件上传的吞吐量。

    【讨论】:

      猜你喜欢
      • 2013-11-01
      • 2017-03-17
      • 2018-11-22
      • 2023-02-01
      • 1970-01-01
      • 2021-07-28
      • 1970-01-01
      • 2014-03-03
      • 1970-01-01
      相关资源
      最近更新 更多