【问题标题】:Rxjava - Group/Batch bursts of elements in an observable sequenceRxjava - 可观察序列中的元素组/批量突发
【发布时间】:2019-12-12 14:23:29
【问题描述】:

我有一个可观察的序列。插入第一个元素时,我想启动一个计时器并在计时器的时间跨度内批处理后续插入的元素。然后,在序列中插入另一个元素之前,计时器不会再次启动。

--------|=====timespan====|---------------|=====timespan====|-------------->
        1  2 3 4    5                     6 

会产生:

[1,2,3,4,5], [6] 

我尝试使用Observable.buffer()timespan,但从我的实验中,我可以看到,一旦我们订阅了可观察序列,计时器就会启动,并且在前一个计时器完成后立即重新启动。

因此,与前面的示例具有相同的序列并使用buffer()timespan,我会得到这样的结果:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
        1  2 3 4                          5 6           

会产生这个:

[1,2,3,4], [], [5,6], []

这与29858974 本质上是相同的问题,但针对的是java。

所以问题是,由于我不想过多地延迟我的流,我希望有一个非常短的计时器,并且该计时器会非常密集。我可以简单地过滤空列表,但我认为这对 CPU 影响太大。

【问题讨论】:

  • 这听起来有点像 window() 函数。您是否检查过这是否满足您的需求?
  • 不是 window 和 buffer() 完全一样,而是用 observables 作为输出吗?

标签: java rx-java rx-java2


【解决方案1】:

window 运算符将充当buffer,您不能直接使用它。

这个想法是通过第一个可观察的(我称之为insertions)的发射来控制timer。为此,您必须包含第三个参数来链接两个 observables(stopWatchSubject 在下面的解决方案中)。

    @Test
    public void stop_watch_observable() {

        Subject<Long> stopWatch = PublishSubject.create();

        Observable<Long> insertions = insertions();

        //share to use it as a timer (looking for the first emission)
        //and to recieve the items
        Observable<Long> shared = insertions.share();

        //for each emission of insertions we start a new timer
        //but only the first one is emitted
        //the others are stopped by the takeUntil(stopWatch)
        Observable<Long> window = shared
                .flatMap(e -> Observable.timer(3, TimeUnit.SECONDS).takeUntil(stopWatch));

        shared.buffer(window)
                //each time a window is generated we kill all the current timers
                .doOnNext(e -> stopWatch.onNext(0L))
                .blockingSubscribe(System.out::println);
    }

    // insertions generator which is comming randomly
    private Observable<Long> insertions() {
        AtomicLong al = new AtomicLong(0);
        return Observable.generate((Emitter<Long> emitter) -> {
            if (al.getAndIncrement() % 4 == 0) {
                Long timeToWait = Long.parseLong(RandomStringUtils.randomNumeric(1));
                System.out.println("sleeping for: " + timeToWait);
                sleep(timeToWait * 1000);
            } else {
                sleep(500);
            }
            emitter.onNext(al.get());
        }).subscribeOn(Schedulers.newThread());
    }

【讨论】:

    【解决方案2】:

    第一个解决方案的缺点是每次发出插入时都会启动一个timer(它可能是 CPU 密集型的)。这是另一种只启动一个计时器的解决方案(我认为这种方式更有效:

    @Test
        public void stop_watch_observable() {
    
            Observable<Long> insertions = insertions();
            Observable<Long> shared = insertions.share();
    
            AtomicBoolean timerOn = new AtomicBoolean(false);
    
            Observable<Long> window = shared
                    .flatMap(e -> timerOn.get() ? Observable.empty() : Observable.timer(3, TimeUnit.SECONDS)
                            .doOnSubscribe(x -> timerOn.set(true))
                    );
    
            shared.buffer(window)
                    //each time a window is generated we kill all the current timers
                    .doOnNext(e -> timerOn.set(false))
                    .blockingSubscribe(System.out::println);
        }
    

    【讨论】:

      猜你喜欢
      • 2015-07-03
      • 1970-01-01
      • 2018-11-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-23
      相关资源
      最近更新 更多