【问题标题】:How to process first n items and remaining one differently in a observable stream如何在可观察流中处理前 n 个项目并以不同方式保留一个
【发布时间】:2017-03-16 05:51:51
【问题描述】:

例如,

给定一个特定数量 (m) 的数字流 (m1, m2, m3, m4, m5, m6...),并将转换 (2 * i) 应用于前 n 个项目(n 可以小于, 等于或大于 m),对其余项目应用另一个变换 (3 * i)。和

返回结果:m1*2, m2*2, m3*3, m4*3, m5*3, m6*3...(这里假设n=2)。

我试图使用 take(n) 和 skip(n) 然后 concatwith,但看起来 take(n) 会丢弃序列中的剩余项目,并且之后让 skip(n) 什么都不返回。

【问题讨论】:

  • 这是一个有代表性的例子吗?在这种情况下,你不想用第二个 n 2s 的流压缩你的 m 流,然后根据需要尽可能多的 3s 吗?

标签: java rx-java


【解决方案1】:

您可以共享您的 m 的流,然后将 take()skip() 流重新合并在一起,如下所示:

    int m = 10;
    int n = 8;
    Observable<Integer> numbersStream = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .publish();

    Observable<Integer> firstNItemsStream = numbersStream.take(n)
            .map(i -> i * 2);

    Observable<Integer> remainingItemsStream = numbersStream.skip(n)
            .map(i -> i * 3);

    Observable.merge(firstNItemsStream, remainingItemsStream)
            .subscribe(integer -> System.out.println("result = " + integer));
    numbersStream.connect();

编辑:
正如@A.E.所指出的。 Daphne, share() 将开始与第一个订阅者一起发送,因此如果 Observable 已经开始发送项目/秒,第二个订阅者可能会错过通知/秒,所以在这种情况下还有其他可能性:
cache() - 会回复所有缓存发出的项目并回复给每个新订阅者,但会牺牲退订能力,因此需要谨慎使用。
reply().refCount() - 将创建@ 987654327@ 表示reply() 之前的所有项目到每个新订阅者(类似于缓存),但会在最后一个订阅者取消订阅时取消订阅。

在这两种情况下,内存都应该考虑在内,因为Observable 会将所有发出的项目缓存在内存中。

publish() - 在不缓存所有先前项目的情况下,另一种可能性是使用publish() 创建ConnectableObservable,并调用它的connect() 方法以在所有必需的订阅者之后开始发射订阅,因此将获得同步,所有订阅者将正确获得所有通知。

【讨论】:

  • share() 的使用会引入竞争条件吗?
  • 你怎么看?在哪里比赛?
  • 哦,我明白了,你说得对,share 运算符不适合这里,你可以使用 publish 和 connect 方法或 reply().refCount()
猜你喜欢
  • 2016-07-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-08-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多