【问题标题】:RxJava. Initial onNext when subscription starts?RxJava。订阅开始时的初始 onNext?
【发布时间】:2014-12-12 12:45:15
【问题描述】:

我正在尝试实现一个使用 Observable 发出更改的类。 当订阅这个 observable 时,我想发送一个开始/初始化事件。然后我想发送通常的事件。

例如。假设我有两个不同的订阅者 A 和 B。A 和 B 在不同的时间开始订阅。如果 MyClass.getChanges() 发出事件编号。 1,2,3,4 和 5。

如果 A 在事件 1,2 之间开始订阅,那么它应该收到以下事件: 初始事件,2、3、4、5。

如果 B 在事件 4 和 5 之间开始订阅,那么 B 应该会收到以下事件: 初始事件,5。

如何使用 RxJava 做到这一点?

谢谢!

编辑 1

我想我需要解释一下“InitialEvent”每次发出时都是不同的。每次新订阅者开始从 getChanged() 订阅时,MyClass 都会计算它。

我的场景是 MyClass 包含一个列表。 “initialEvent”包含订阅完成时的列表。然后从 getChanges() 发出对该列表的每个更改。

【问题讨论】:

    标签: java rx-java


    【解决方案1】:

    很抱歉在 2 年后发布此问题,但我有同样的需求,发现这个问题没有答案。

    我所做的如下:

    public Observable<Event> observe() {
        return Observable.defer(() -> 
            subject.startWith(createInitialEvent())
        );
    }
    

    思路如下:

    • defer() 在观察者订阅由方法 observe() 返回的 Observable 时执行传入的 lambda 表达式。所以基本上,它执行 subject.startWith(...),它返回一个 Observable,它是订阅者的实际事件源。
    • subject.startWith(...) 发出初始事件(由 startWith(...) 指定),然后是主题发出的事件。

    所以,如果我回到原来的帖子: 如果观察者在事件 1,2 之间开始订阅,那么它应该收到以下事件:InitialEvent, 2, 3, 4, 5。

    【讨论】:

      【解决方案2】:

      您正在寻找的是PublishSubject。对象很热门Observables,因为他们在开始发射他们的项目之前不会等待Observerssubscribe。这是Subjects的一些信息。

      这是您的用例的快速演示

          PublishSubject<String> subject = PublishSubject.create();
          Observable<String> InitEvent = Observable.just("init");
          Observable<String> A = subject.asObservable();
          Observable<String> B = subject.asObservable();
      
          subject.onNext("1");
      
          A.startWith(InitEvent)
                  .subscribe(s -> System.out.println("A: " + s));
      
          subject.onNext("2");
          subject.onNext("3");
          subject.onNext("4");
      
          B.startWith(InitEvent)
                  .subscribe(s -> System.out.println("B: " + s));
      
          subject.onNext("5");
      

      【讨论】:

      • 谢谢。但是 A 和 B 都没有在这里获得“初始事件”。输出应该是:A:init,A:2,A:3,A:4,B:Init,A:5,B:5。
      • 嘿 @ErikZ 我添加了一个额外的 Observable,它与 A 和 B 上的 startWith 一起使用。结果现在正如您在评论中显示的那样。那些对你更有效?
      • 快到了。 :) 请参阅我对我的问题的编辑。初始事件需要在我的 MyClass 类中计算。
      【解决方案3】:

      可能不是很优雅的方式,只使用一个标志怎么样?看起来您只想替换第一个发出的事件。

      例如对于一个订阅,以下逻辑:

      boolean firstTimeA = true;
      
      myCustomObservable.subscribe(s -> {
         System.out.println(firstTimeA ? "initEvent" : s.toString());
         if(firstTimeA) firstTimeA = false;
      });
      

      既然您想进行第二次订阅,只需创建一个 firstTimeB 并将其更新为您的 B 订阅即可。

      【讨论】:

      • 谢谢。请参阅我对我的问题的编辑。初始事件必须在 MyClass 中计算。
      【解决方案4】:

      如果我明白你在问什么,这样的东西应该适合你

      int last = 0;
      Observable obs;
      List<Integer> list = new ArrayList<>();
      
      public SimpleListObservable() {
          obs = Observable.create(new Observable.OnSubscribe<Integer>() {
              @Override
              public void call(Subscriber<? super Integer> subscriber) {
                  while(last < 30) {
                      last++;
                      list.add(last);
                      subscriber.onNext(last);
                  }
                  subscriber.onCompleted();
              }
          });
      }
      
      public Observable<Integer> process() {
          return Observable.from(list).concatWith(obs);
      }
      

      当源 observable 收集值时,它们被添加到 List(您可以根据需要转换项目,将它们过滤掉等),然后当 ObserverB 订阅时,它将获得项目的重播在继续源 observable 输出之前收集到 List

      这个简单的测试应该可以证明结果

      public void testSequenceNext() {
          final SimpleListObservable obs = new SimpleListObservable();
      
          final Observer<Integer> ob2 = Mockito.mock(Observer.class);
      
          obs.process().subscribe(new Observer<Integer>() {
              @Override
              public void onCompleted() {
                  ob1Complete = true;
              }
      
              @Override
              public void onError(Throwable e) {
                  e.printStackTrace();
              }
      
              @Override
              public void onNext(Integer integer) {
                  System.out.println("ob1: " + integer);
                  if (integer == 20) {
                      obs.process().subscribe(ob2);
                  }
              }
          });
      
          ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
      
          Mockito.verify(ob2, Mockito.times(30)).onNext(captor.capture());
      
          for (Integer value : captor.getAllValues()) {
              System.out.println(value);
          }
      
      }
      

      【讨论】:

        【解决方案5】:

        您对此有何看法,我当然是在使用手机时制作了我的 API 的一部分:

        public class StreamOfSomething {
            new StreamOfSomething() {
                // source of events like
                events = Observable.range(0, 1_000_000_000)
                                   .doOnNext(set::add) // some operation there
                                   .map(Event::change)
                                   .publish()
                                   .refCount();
            }
        
            public Observable<Event> observeChanges() {
                return events.startWith(
                         Observable.just(Event.snapshot(set))); // start stream with generated event
            }
        }
        

        客户可以做类似的事情:

        Observable.timer(2, 4, TimeUnit.SECONDS)
                  .limit(2)
                  .flatMap(t -> theSourceToWatch.observeChanges().limit(10))
                  .subscribe(System.out::println);
        

        但是请注意,如果您处于多线程环境中,您可能必须在订阅阻止任何修改时进行同步,否则列表可能会在发出之前发生更改。或者完全围绕 observables 重新设计这个类,不过我还不知道如何实现这一点。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2015-01-11
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-12-30
          相关资源
          最近更新 更多