【问题标题】:How Can We Use RxJava for Paginated Web Service Calls, Where Each Page Depends on Previous Page Responses, Without Recursion?我们如何使用 RxJava 进行分页 Web 服务调用,其中每个页面都依赖于前一页面响应,而无需递归?
【发布时间】:2018-02-06 05:48:11
【问题描述】:

Web 服务 API 有时使用分页,其中 Web 服务调用的参数指示要检索的页面。这些大致可以分为两种:

  • 请求页面的参数独立于任何给定的页面响应(例如,“给我第 3 页,页面大小为 10”)

  • 请求页面的参数依赖于之前的页面响应(例如,“在标识符为 foo 的项目之后给我接下来的 10 个项目)

This SO answer 很好地涵盖了第一个场景,其中 Web 服务只需要一个页码,而我们需要从任何给定页面的响应中确定我们是否完成了。

This SO answer 涵盖了第二种情况,但它依赖于递归,因此对于大型数据集,我们将死于 StackOverflowError

Relay 兼容的 GraphQL 支持的 Web 服务(例如 GitHub 的 API)将大量使用第二种情况,因为 Relay's specification for pagination 要求您从先前的响应中提供“光标”以获取之后的下一个项目光标位置。因此,我正在尝试为此找出一种非递归方法,它仍然将所有内容都包装到一个主 Observable 中,就像这两个答案一样。

【问题讨论】:

  • 您仍然可以使用“递归”解决方案,并增加一些异步边界运算符 - 例如Flowable.observeOn(Schedulers.io()) - 内部Flowable 调用
  • Web 服务 API 是阻塞还是返回 Observable?问题在于,为了使用 API,需要等待上一个结果完成才能请求下一个结果。如果您愿意阻止,那么解决方案很容易。

标签: rx-java rx-java2


【解决方案1】:

如果 Web Service API 被阻塞或者您愿意阻塞,那么解决方案很简单

Observable.generate(() -> new ApiResponse(page), (s, emitter) -> {
    ApiResponse r = getResults(s.next);            
    emitter.onNext(r);
    if (r.next == null) emitter.onComplete();
    return r;
});

使用递归答案中的符号。

如果不希望阻塞,您可以像这样使用RxJava2Extensions 中的FlowableTransformers.expand

Flowable
    .just(new ApiResponse(page))
    .compose(FlowableTransformers.expand(r -> r.next == null ? Flowable.empty() : getResults(r.next)));

【讨论】:

  • 这是一个有趣的观点。我一直坚持使用现有的基于 Observable 的 API,但我可以切换到使用阻塞调用并按照您的说明包装它。我会试一试——非常感谢!
  • 请参阅更新答案的第二部分,它不会阻止但需要额外的库
【解决方案2】:

我不知道我是否正确,但我相信你可以在 RxJava 上使用SyncOnSubscribe.createStateful 来解决这个问题。

查看我的示例:

class SimpleTest {
  @Test
  fun testRequestTenPages() {
    getPaginatedDataFromApi()
        .take(10)
        .subscribe { println(it) }
  }

  fun apiCall(previous: Response? = null) : Response {
    return previous?.let {
      val newPage = it.page + 1
      previous.copy(id = "${it.id}_$newPage", page = newPage)
    } ?: Response("1", 1)
  }    

  fun getPaginatedDataFromApi(): Observable<Response> {
    val syncOnSubscribe = SyncOnSubscribe.createStateful<Response?, Response>(
        { null },
        { previous, observer ->
          val response = apiCall(previous)
          observer.onNext(response)
          return@createStateful response
        }
    )

    return Observable.create(syncOnSubscribe)
  }

  data class Response(val id: String, val page: Int)
}

我正在创建一个有状态的可观察对象,它将最后一个响应保持为状态,并使用它来生成下一个响应。

运行此测试,您将看到以下输出:

Response(id=1, page=1)
Response(id=1_2, page=2)
Response(id=1_2_3, page=3)
Response(id=1_2_3_4, page=4)
Response(id=1_2_3_4_5, page=5)
Response(id=1_2_3_4_5_6, page=6)
Response(id=1_2_3_4_5_6_7, page=7)
Response(id=1_2_3_4_5_6_7_8, page=8)
Response(id=1_2_3_4_5_6_7_8_9, page=9)
Response(id=1_2_3_4_5_6_7_8_9_10, page=10)

【讨论】:

  • 这相当于另一个答案中的 Observable.generate 并且类似于它也需要 API 调用被阻塞
猜你喜欢
  • 1970-01-01
  • 2011-05-11
  • 2018-01-16
  • 2019-06-29
  • 2019-08-27
  • 1970-01-01
  • 2016-09-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多