【问题标题】:Listen then send on subscribe with RxJava听然后发送订阅 RxJava
【发布时间】:2017-02-07 15:59:59
【问题描述】:

我经常遇到我希望能够通过 RxJava 监听对请求的响应的情况。问题是我不确定如何设置 Observable 以便我监听事件并以正确的顺序在订阅时发送消息。我不想发送消息然后收听,因为如果线程被挂起或响应超快,我可能会错过它。这是我自己能想到的最接近的了

connection.onReceivedMessage()
          .doOnSubscribe(() -> connection.send(message))
          .filter(message -> message.id == id)
          ... // do stuff

Observable.defer(() -> {
    connection.send(message);
    return connection.onReceivedMessage();
})... // do stuff

但是这些似乎我仍然可以发送消息而不是监听响应。有没有其他人尝试过这样做?我觉得我真的想要一种 afterCreate()。

【问题讨论】:

    标签: java android concurrency rx-java


    【解决方案1】:

    我不想发送消息然后听,因为如果线程 被暂停或响应超快,我可能会错过它。

    使用Subject。要么是BehaviorSubject(总是向新订阅者发送最新的可观察对象),要么是ReplySubject(向新订阅者发送所有Observable)。我不确定整个逻辑,但你可以有类似的东西:

     public BehaviorSubject mMessageBehaviorSubject = BehaviorSubject.create();
    
    
    private void sendMessage() {
        connection.onReceivedMessage()
              .doOnSubscribe(() -> connection.send(message))
              .filter(message -> message.id == id)
              .subscribe(mSubject::onNext, Throwable::printStackTrace);
    }
    
    
    public Observable<String> getMessageObservable() {
         return mMessageBehaviorSubject.asObservable();
    }
    

    这样你就可以发送消息,只要你准备好收听,你就会得到,在这种情况下,最新的消息发送

    【讨论】:

      【解决方案2】:

      doOnSubscribe 正是您所需要的,它是您正在寻找的“afterCreate”。在您的第一个示例中,第一条消息将在订阅后发送,此时 Observable 已经准备好处理响应。

      至于您的问题 - 有没有其他人尝试过这样做? - 答案是肯定的。我使用与您的第一个示例相同的技术来 rxify 一些代码。

      【讨论】:

      • doOnSubscribe 是这样工作的吗?我将不得不对此进行更多测试。我承认我从来没有做过单元测试来验证。
      猜你喜欢
      • 2016-12-11
      • 2020-10-08
      • 2016-07-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-19
      相关资源
      最近更新 更多