【问题标题】:Managing side-effects with multiple RxJava Observables使用多个 RxJava Observable 管理副作用
【发布时间】:2016-06-22 17:28:44
【问题描述】:

我正在使用具有两个协同工作的特性的 BLE GATT 服务。一个是只写特性,您可以将字符串值作为查询提交,另一个是只通知特性,您可以在其中接收对查询的响应。

通知服务有点慢,重要的是在读取通知之前不要继续进行下一个查询 - 否则会丢失响应。

为此,我一直在使用 RxAndroidBle Observables,具有用于写入和通知特性的单独通道。第三个 Observable 提供查询。但是,写入的速度太快了。

ConnectableObservable notifyObservable =
    createNotifyObservable(NOTIFY_UUID).publish();

queryObserverable
    .doOnSubscribe(notifyObservable::connect)
    .doOnNext(query -> Log.d(TAG, "Processing query: " + query))
    .flatMap(query -> createWriteObservable(WRITE_UUID, query)))
    .doOnNext(request -> Log.d(TAG, "Write initiated."))
    .flatMap(request -> notifyObservable)
    .doOnNext(response -> Log.d(TAG, "Query response: " + response));

所以在运行应用程序时,这是我在日志中看到的(包括时间戳、进程 ID 和线程 ID):

06-22 14:30:01.991 14085-15360 Processing query: Query1
06-22 14:30:02.011 14085-15360 Processing query: Query2
06-22 14:30:02.011 14085-15360 Processing query: Query3
06-22 14:30:07.261 14085-15443 Write initiated.
06-22 14:30:07.301 14085-15445 Write initiated
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.321 14085-15449 Write initiated.
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.351 14085-15453 Query response: Response3

RxJava 有没有办法确保下一次写入只发生在收到响应之后?

编辑: 在为两个flatMap 调用指定建议的maxConnection 参数时,调用以正确的顺序发生,但仅适用于来自 Observable 的第一个查询。这是此案例的日志:

06-22 14:39:09.841 22245-23079 Processing query: Query1
06-22 14:39:15.131 22245-23166 Write initiated.
06-22 14:39:15.201 22245-23169 Query response: Response1

【问题讨论】:

  • 能否包含每个日志行的线程 ID?
  • 好的,我已经从日志中添加了更多详细信息。

标签: bluetooth-lowenergy rx-java rx-android rxandroidble


【解决方案1】:

尝试将 maxConcurrent = 1 参数传递给您的 flatMap 调用:

...
.flatMap(query -> createWriteObservable(WRITE_UUID, query), 1)
...
.flatMap(request -> createNotifyObservable(NOTIFY_UUID), 1)
...

【讨论】:

  • 谢谢,我不知道 maxConcurrent 参数。这取得了部分成功,一切都以正确的顺序调用。现在的问题是只评估来自 Observable 的第一个查询。
  • 您的输出现在看起来如何?
  • 我已将日志输出添加到问题中。
  • 会不会是您的 notifyObservable 只发出一项?尝试用第二个 flatMap 中的 Observable.just(1) 之类的东西临时替换它。在这种情况下会卡住吗?
  • 是的,在那种情况下它仍然卡住了。
【解决方案2】:

如果每个查询都只有一个响应,那么您可以将查询打包到一个函数中:

Observable<String> query(String query) {
  return notifyObservable.flatMap(notifyBytesObservable -> // to be sure that when we will start writing we're already listening
    Observable.combineLatest( // with combineLatest both Observables will be subscribed at the same time
      notifyBytesObservable.first(), // to unsubscribe from notification after the first one
      createWriteObservable(uuid, query), 
      { notificationBytes, writtenBytes -> notificationBytes }
    )
  )
    .map(notificationBytes -> String(notificationBytes))
}

现在你应该可以像这样拨打电话了:

queryObserverable
  .flatMap(queryString -> query(queryString), 1)

希望对你有帮助。

最好的问候

【讨论】:

    猜你喜欢
    • 2018-04-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-18
    • 1970-01-01
    • 1970-01-01
    • 2017-10-11
    相关资源
    最近更新 更多