【问题标题】:Asynchronous, composable return value for vector/stream data in Java 9Java 9 中向量/流数据的异步、可组合返回值
【发布时间】:2018-03-31 22:41:57
【问题描述】:

假设我有一个 API,它会根据一些查询条件查找或构造一个小部件:

Widget getMatchingWidget(WidgetCriteria c) throws Throwable

(同步)客户端代码如下所示:

try {
  Widget w = getMatchingWidget(criteria);
  processWidget(w);
} catch (Throwable t) {
  handleError(t);
}

现在说查找或构建一个小部件的成本出乎意料地昂贵,我不希望客户在等待它时阻塞。所以我把它改成:

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)

然后客户端可以编写:

CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })

或:

getMatchingWidget(criteria).whenComplete((t, w) -> {
  if (t != null) { handleError(t); }
  else { processWidget(t); }
});

现在,假设同步 API 可以返回 0 到 n 个小部件:

Stream<Widget> getMatchingWidgets(WidgetCriteria c)

天真地,我可以写:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)

但是,这实际上并没有使代码成为非阻塞的,它只是推动了阻塞——Future 阻塞直到所有 Widgets 都可用,或者迭代 Stream 的代码块等待每个Widget。我想要的是能让我在每个小部件到达时对其进行处理的东西,例如:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)

但这不提供错误处理,即使我添加了一个额外的Consumer&lt;Throwable&gt; errorHandler,它也不允许我使用其他查询组合我的小部件检索,或转换结果。

所以我正在寻找一些可组合的东西,它将Stream(可迭代性、可转换性)的特征与CompletableFuture(异步结果和错误处理)的特征相结合。 (而且,当我们这样做时,背压可能会很好。)

这是java.util.concurrent.Flow.Publisher吗? io.reactivex.Observable?更复杂的东西?更简单的?

【问题讨论】:

  • 我倾向于将事物推入队列,然后将事物拉出队列。

标签: java rx-java nonblocking java-9 reactive-streams


【解决方案1】:

您的用例自然而然地落入了 RxJava 正在处理的世界中。如果我们有一个 observable:

Observable<Widget> getMatchingWidgets(wc);

根据条件生成零个或多个小部件,然后您可以使用以下方法处理每个小部件:

getMatchingWidgets(wc)
  .subscribeOn( backgroundScheduler )
  .subscribe( w -> processWidget(w),
              error -> handleError(error) );

可观察链将在backgroundScheduler 上运行,这通常是线程池执行器服务的包装器。如果您需要对 UI 中的每个小部件进行最终处理,可以使用 observeOn() 运算符在处理前切换到 UI 调度程序:

getMatchingWidgets(wc)
  .subscribeOn( backgroundScheduler )
  .observeOn( uiScheduler )
  .subscribe( w -> processWidget(w),
              error -> handleError(error) );

对我来说,RxJava 方法的优雅之处在于它以流畅的方式处理管道管理的许多细节。看看那个观察者链,你就知道到底发生了什么以及在哪里。

【讨论】:

    猜你喜欢
    • 2020-09-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-04-29
    • 1970-01-01
    • 2013-09-27
    • 2014-09-17
    相关资源
    最近更新 更多