【问题标题】:Switching threads multiple times in Rx chain在 Rx 链中多次切换线程
【发布时间】:2017-09-06 08:23:41
【问题描述】:

假设我有以下适用于 Android 的案例:

  1. 从网络请求组列表
  2. 为每个组显示一些 UI 元素
  3. 为每个组请求项目
  4. 显示 UI 元素 每个项目

我想用 RxJava 做到这一点:

webService.requestGroups()
        .flatMap(group -> {
            view.showGroup(group);
            return webService.requestItems(group);
        })
        .toList()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(items -> view.showItems(items));

如您所见,我有 2 个视图对象调用,每个调用都必须在主线程上执行。以及 2 次 webService 调用,必须在后台线程上执行。

此代码的问题:第一次调用视图将在后台执行,这会导致 Android RuntimeException(只有原始线程可能会接触视图或其他东西)如果我将.observeOn 转移到链的开头 - 第二个 webService 调用将是在主线程中执行。

如何在 RxJava 链中多次“游过”线程?

【问题讨论】:

    标签: android rx-java rx-java2


    【解决方案1】:

    来自Rx doc for SubscribeOn

    SubscribeOn 操作符指定 Observable 将在哪个线程上开始操作,无论在操作符链中的哪个点调用该操作符。另一方面,ObserveOn 会影响 Observable 将在该运算符出现的下方使用的线程。因此,您可以在 Observable 运算符链中的不同点多次调用 ObserveOn,以更改某些运算符在哪些线程上运行。

    SubscribeOn 运算符只能应用一次并设置起始线程。 ObserveOn 可用于在流中的任何点从一个线程转到另一个线程。所以我认为以下应该做你想要的:

    webService.requestGroups()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .flatMap(group -> {
            view.showGroup(group);
            return webService.requestItems(group)
                             .subscribeOn(Schedulers.io());
        })
        .toList()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(items -> view.showItems(items));
    

    但在我看来,这太复杂了。我只需订阅第一个 observable,然后为每个组创建一个新链,如下所示:

    webService.requestGroups()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(group -> { 
            view.showGroup(group);
            webService.requestItems(group)
                .subscribeOn(Schedulers.io()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(items -> view.showItems(items));
        });
    

    【讨论】:

    • 我们什么时候会在调用 subscribeOn 之前调用 observeOn
    • @IgorGanapolsky 在链中subscribeOn() 的位置无关紧要——第一个subscribeOn() 将是整个链用于启动订阅的线程(无论它是第一个链式方法调用,或最后一个)。相比之下,observeOn() 会更改链中 之后的所有内容的线程。但请注意,在 Samuel 的第二个代码块中,发生了两个 完全独立的 Rx 链。 requestItems() 链没有映射到 requestGroups() 链中。
    【解决方案2】:

    基于 Samuel 的回答,您可以使用更简单的非嵌套语法:

    webService.requestGroups()
    .subscribeOn(Schedulers.io()) // the first operator (requestGroups) on the IO thread
    .observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
    .map(group -> {
        view.showGroup(group);
        return group;
    })
    .observeOn(Schedulers.io()) //everything below on the IO thread
    .flatMap(group -> {
        return webService.requestItems(group);
    })
    .toList()
    .observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
    .subscribe(items -> view.showItems(items));
    

    这里有两条经验法则:

    1. subscribeOn 指示 observable 将在哪个线程上开始执行,它在链中的位置无关紧要,它应该只出现一次。
    2. observeOn 告诉所有后续操作符将在哪个线程上执行(直到遇到另一个 observeOn);它可能在链中出现多次,改变不同代码片段的执行线程(如上面的示例)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-02-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-14
      • 2018-01-16
      相关资源
      最近更新 更多