【发布时间】:2014-09-28 22:46:53
【问题描述】:
我是 rxjava 新手,遇到以下问题:
外部系统不规则地将对象放入 FIFO 队列中。我需要一个每秒运行的 Observable,从队列中获取一个项目(如果有的话)并将其发送给订阅者。
两个问题:
队列项是在 Observable 存活时生成的,不可能预先提供所有项。队列可能是空的,在这种情况下,Observable 必须等待并且不发出任何东西。 (如果 Observable 会在暂停后队列中的项目可用时立即启动会很好,但是如果我们不想更频繁地轮询,那么队列可能也需要是 Observable,不想法如何。)
-
外部系统必须能够完成 Observable。我可以设置一个变量并从 Observable 中读取它,但我想知道是否有更优雅的方法来做到这一点。
LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue boolean stopObservable = false; // the variable to stop the observable Observable.create(new Observable.OnSubscribe<Layer>() { @Override public void call(Subscriber<? super Layer> subscriber) { try { if (!queue.isEmpty()) { Layer layer = queue.poll(); subscriber.onNext(layer); } else { if (stopObservable) { subscriber.onCompleted(); } } } catch (Exception e) { subscriber.onError(e); } } }).somethingThatCreatesTheInterval().subscribeOnEtc.
对于间隔,我不能使用 .sample(),因为它会丢弃项目,并且发射所有项目很重要。
.throttleWithTimeout() 看起来更好,但它似乎也丢弃了项目。
rx 非常酷,但很难进入。任何意见表示赞赏。
【问题讨论】:
-
因此,您不介意输入项的队列是否会随着时间的推移而增加 - 您只想每秒发出一个(或者如果在输入队列)?我的第一直觉是看一下计时器(提供“脉冲”)和地图(它不映射任何东西,只是简单地丢弃计时器发出的每个 Long ,而是从输入队列中发出下一个项目 - 或者调用 onCompleted 如果stop 变量设置为 true)。但也许有更优雅的选择......
-
实际上我认为您可能需要在第二步中使用 flatMap(而不是 map) - 以便能够处理输入队列为空的情况。所以你要么发出 Observable.just() 要么和 Observable.empty()。