【问题标题】:Custom RxJava's Scheduler自定义 RxJava 的调度器
【发布时间】:2021-07-08 21:55:05
【问题描述】:

上下文

我现在遇到一个问题,我正在复制 Retrofit 的 HTTP 请求。

在我的应用程序中,我有一个显示ViewPagerFragmentStatePageAdapter 的主屏幕。我的应用程序架构 这是一个非常常见的 MVP,我使用 RxJava 和 Retrofit 和 RxJavaAdapterFactoryCall 转换为 rx.Observables

看起来像这样:

Page1Fragment 显示多个团队的帖子文章列表。问题是我为获取该文章列表而点击的服务器端端点并没有提供与每篇文章关联的团队,它只是提供与每篇文章关联的团队 ID。

所以我必须在访问服务器端端点以获取文章列表的方法和服务器端端点获取团队列表的方法之间执行Observable.zip,然后我将它们合并。

Page3Fragment 显示所有用户团队的列表(由我之前使用的同一端点提供)

问题

这里的问题是,当我打开应用程序时,3 个片段被实例化,Page1FragmentPage2Fragment 都在访问团队的端点以获取用户的团队列表。

我的问题是;有没有办法实现自定义调度程序,将其附加到改造的方法并避免这种东西?做一些验证,比如“好的,如果 N 个不同的订阅者试图进入这个 Retrofit Observable,则为所有订阅者返回相同的响应”,而不是点击 N 次服务器端 API。我目前正在使用 Scheduler.io() 来传递 .subscribeOn 方法。

如果有什么不清楚的地方,请告诉我。

【问题讨论】:

    标签: android retrofit rx-java


    【解决方案1】:

    您不需要一个时间表,而是一种管理对端点的访问的方法。对服务器的每个请求都应该产生一个 observable,它最终会提供结果。

    Map<TeamId, Observable<Team>> results = new ConcurrentHashMap<>();
    

    请求被映射到一个最终会为它发出一个值的可观察对象。

    Observable<Team> lookUpTeam( TeamId id ) {
      return results.compute( id, (id, res) -> 
            res == null ? makeApiRequest( id ) : res );
    }
    Observable<Team> makeApiRequest( TeamId id ) {
      BehaviorSubject<Team> team = BehaviorSubject.create();
      teamServer.getTeam( id )
        .subscribe( team );
      return team;
    }
    

    团队查找的缓存现在保存在保存最后返回值的并发哈希映射中。每个团队 ID 只能发出一个 API 请求。

    【讨论】:

    • 这是一个很好的方法,给了我一些想法,但我想知道我是否可以做这样的事情,但在 2 秒后使缓存无效。我的意思是团队的信息可能会在服务器端发生变化,我不想一直返回相同的响应。
    • 您可以轻松地添加一个运算符,该运算符在获得从地图中删除主题的结果后 2 秒触发。同样,请确保删除线程安全操作,例如 removeIf()
    【解决方案2】:

    具有唯一线程的唯一自定义调度程序(不同于 Schedulers.single())

    class SingleTask : Callable<Scheduler?> {
            @Throws(Exception::class)
            override fun call(): Scheduler {
                return SingleScheduler()
            }
        }
    
    
    val newCustomScheduler : Scheduler = RxJavaPlugins.initSingleScheduler(SingleTask())
    
    fun f() {
    Completable.fromAction {
                // Do an Action here
            }.subscribeOn(newCustomScheduler).subscribe()
    }
    
    

    结果是这样的:

    • 执行 f()
    • 再次执行 f()
    • 再次执行 f()

    第一次执行将处理线程 XYZ 上的操作 第二次执行将处理线程 XYZ 上的操作 第三次执行将处理线程 XYZ 上的操作

    所以所有都将在同一个线程上执行。

    如果您不想使用 Schedulers.single(),这是一种 MainThread 原则,并且是在 Schedulers 类上定义的全局范围。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-11
      • 1970-01-01
      • 1970-01-01
      • 2015-06-24
      • 2017-11-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多