【问题标题】:Supply value to Observable为 Observable 提供价值
【发布时间】:2014-09-28 22:46:53
【问题描述】:

我是 rxjava 新手,遇到以下问题:

外部系统不规则地将对象放入 FIFO 队列中。我需要一个每秒运行的 Observable,从队列中获取一个项目(如果有的话)并将其发送给订阅者。

两个问题:

  • 队列项是在 Observable 存活时生成的,不可能预先提供所有项。队列可能是空的,在这种情况下,Observable 必须等待并且不发出任何东西。 (如果 Observable 会在暂停后队列中的项目可用时立即启动会很好,但是如果我们不想更频繁地轮询,那么队列可能也需要是 Observable,不想法如何。)

  • 外部系统必须能够完成 Observable。我可以设置一个变量并从 Observable 中读取它,但我想知道是否有更优雅的方法来做到这一点。

    LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue
    boolean stopObservable = false; // the variable to stop the observable
    
    Observable.create(new Observable.OnSubscribe<Layer>() {
    
        @Override public void call(Subscriber<? super Layer> subscriber) {
            try {
                if (!queue.isEmpty()) {
                    Layer layer = queue.poll();
                    subscriber.onNext(layer);
                } else {
                    if (stopObservable) { subscriber.onCompleted(); }
                }
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    
    }).somethingThatCreatesTheInterval().subscribeOnEtc.
    

对于间隔,我不能使用 .sample(),因为它会丢弃项目,并且发射所有项目很重要。

.throttleWithTimeout() 看起来更好,但它似乎也丢弃了项目。

rx 非常酷,但很难进入。任何意见表示赞赏。

【问题讨论】:

  • 因此,您不介意输入项的队列是否会随着时间的推移而增加 - 您只想每秒发出一个(或者如果在输入队列)?我的第一直觉是看一下计时器(提供“脉冲”)和地图(它不映射任何东西,只是简单地丢弃计时器发出的每个 Long ,而是从输入队列中发出下一个项目 - 或者调用 onCompleted 如果stop 变量设置为 true)。但也许有更优雅的选择......
  • 实际上我认为您可能需要在第二步中使用 flatMap(而不是 map) - 以便能够处理输入队列为空的情况。所以你要么发出 Observable.just() 要么和 Observable.empty()。

标签: java rx-java


【解决方案1】:

当我需要定期轮询外部 Web 服务时,我做了类似的事情。

  1. 对于时间间隔,您可以继续使用 timer ;在每个以 1s 为粒度的刻度上,可观察链将轮询并可能选择一层,如果该层为空,则不会发出任何内容

    Observable.timer(0, 1, TimeUnit.SECOND)
        .flatMap(tick -> Observable.just(queue.poll()).filter(layer -> layer != null))
        .subscribe(layer -> System.out.format("The layer is : %s",  layer));
    
  2. 现在如果你想中止整个链,你可以添加takeUntil。因此,当您的外部系统想要停止时,它会在stopObservable 中提交一些内容,这将停止后续订阅:

    // somewhere before
    PublishSubject stopNotifier = PublishSubject.create();
    
    // somewhere process the queue
    Observable.timer(0, 1, TimeUnit.SECOND)
        .takeUntil(stopNotifier)
        .flatMap(tick -> Observable.just(queue.poll()))
        .subscribe(layer -> System.out.format("The layer is : %s",  layer));
    
    // when not anymore interested (calling onComplete works too)
    stopNotifier.onNext("cancel everything about the queue");
    

我正在用平板电脑写这个回复,所以你可能会认为我可能拼错了一些单词或犯了幼稚的编程错误;)

【讨论】:

    【解决方案2】:

    如果可能,您应该使用PublishSubject&lt;Layer&gt; 而不是LinkedList&lt;Layer&gt;。然后,外部系统可以通过调用publishSubject.onNext 提供新项目,并且由于PublishSubjectObservable 的子类,您的系统可以将其视为Observable,并且取决于您想要的时间语义,将以下运算符之一应用于它:

    • sample
    • debounce
    • throttleFirst/throttleLast/throttleWithTimeout
    • .zipWith(Observable.timer(1, TimeUnit.SECONDS), (value, tick) -&gt; value)(可能会做很多缓冲!)
    • 根本没有时间修改(也请考虑这一点)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-05-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-16
      • 1970-01-01
      相关资源
      最近更新 更多