【问题标题】:retry buffer in rxjavarxjava中的重试缓冲区
【发布时间】:2019-10-28 23:19:12
【问题描述】:

一个热的 Observable 发射项目。我想将这些项目上传到服务器。 有两个考虑:

  1. 由于 io 操作的开销,我想批量处理这些项目并作为数组上传
  2. 由于 io 操作不可靠,我希望将失败的批次上传添加到下一批。
Uploads succeed:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(4,5)

First upload fails:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(1,2,3,4,5)

我可以使用buffer 运算符处理第一个要求,但不知道如何满足第二个要求。

【问题讨论】:

    标签: rx-java rx-java2 buffering


    【解决方案1】:

    这是我将失败存储在队列中的想法

    public class StackOverflow {
    
        public static void main(String[] args) {
            // store any failures that may have occurred
            LinkedBlockingQueue<String> failures = new LinkedBlockingQueue<>();
    
            toUpload()
                    // buffer however you want
                    .buffer(5)
                    // here is the interesting part
                    .flatMap(strings -> {
                        // add any previous failures
                        List<String> prevFailures = new ArrayList<>();
                        failures.drainTo(prevFailures);
                        strings.addAll(prevFailures);
    
                        return Flowable.just(strings);
                    })
                    .flatMapCompletable(strings -> {
                        // upload the data
                        return upload(strings).doOnError(throwable -> {
                            // if its an upload failure:
                            failures.addAll(strings);
                        });
                    }).subscribe();
        }
    
        // whatever your source flowable is
        private static Flowable<String> toUpload() {
            return Flowable.fromIterable(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i"));
        }
    
        // some upload operation
        private static Completable upload(List<String> strings) {
            return Completable.complete();
        }
    }
    

    这里的一些边缘情况是事实,如果最后一个可流动的缓冲组失败,这将不会重试那些。这可以通过retryWhen 运算符来实现,但基本思想与使用队列相同

    【讨论】:

    • 这可行,但我不会接受它作为答案,因为它违反了很多关于 Observables 和组合的理解
    • 违反是什么意思?如果您正在寻找仅 Rx 的答案,我想不出一个
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-09-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-27
    • 2012-11-14
    相关资源
    最近更新 更多