【问题标题】:map vs flatMap in reactor反应堆中的地图与平面地图
【发布时间】:2018-08-13 09:13:16
【问题描述】:

我找到了很多关于 RxJava 的答案,但我想了解它在 Reactor 中是如何工作的。

我目前的理解非常模糊,我倾向于认为 map 是同步的,而 flatMap 是异步的,但我无法真正理解它。

这是一个例子:

files.flatMap { it ->
    Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
        .map {destFile ->
            destFile.createNewFile()
            destFile    
        }               
        .flatMap(it::transferTo)
}.then()  

我有文件(Flux<FilePart>),我想将它复制到服务器上的某个UPLOAD_ROOT

这个例子取自一本书。

我可以将所有.map 更改为.flatMap,反之亦然,一切仍然有效。我想知道有什么区别。

【问题讨论】:

  • 你能更具体地谈谈你的实际问题吗?您实际指的是哪些方法,Java 中有多个mapflatMap 方法。
  • 是的,现在请更具体地说明这个问题。你到底在哪里理解它有困难?为什么不看一下源代码和文档来消除混乱?有什么你不明白的具体内容吗?
  • 我从文档中了解到,两者都是迭代给定 Flux 的方法,map 是同步的,flatMap 不是。但我也明白我给map的函数是异步执行的,我不知道什么时候用哪个。
  • ...我会用一个具体的例子来更新。

标签: java project-reactor


【解决方案1】:

flatMap 方法类似于 map 方法,主要区别在于您提供给它的供应商应该返回 Mono<T>Flux<T>

使用 map 方法会产生Mono<Mono<T>> 而使用 flatMap 会导致 Mono<T>

例如,当您必须使用返回 Mono 的 java API 进行网络调用来检索数据时,它很有用,然后另一个网络调用需要第一个结果的结果。

// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);

// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end

// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
  map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono

// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
  flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected

此外,它还允许精确地处理错误:

public UserApi {
  
  private HttpClient httpClient;
    
  Mono<User> findUser(String username) {
    String queryUrl = "http://my-api-address/users/" + username;
    
    return Mono.fromCallable(() -> httpClient.get(queryUrl)).
      flatMap(response -> {
        if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
        else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
        else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
        return Mono.just(response.data);
      });
  }
                                           
}

【讨论】:

    【解决方案2】:
    • map 用于同步、非阻塞、一对一的转换
    • flatMap 用于异步(非阻塞)1 对 N 转换

    差异在方法签名中可见:

    • map 接受 Function&lt;T, U&gt; 并返回 Flux&lt;U&gt;
    • flatMap 接受 Function&lt;T, Publisher&lt;V&gt;&gt; 并返回 Flux&lt;V&gt;

    这是主要提示:您可以Function&lt;T, Publisher&lt;V&gt;&gt; 传递给map,但它不知道如何处理Publishers,这将导致@ 987654333@,一系列惰性发布者。

    另一方面,flatMap 期望每个T 都有一个Publisher&lt;V&gt;。它知道如何处理它:订阅它并在输出序列中传播它的元素。因此,返回类型为Flux&lt;V&gt;flatMap 会将每个内部Publisher&lt;V&gt; 展平为所有Vs 的输出序列。

    关于 1-N 方面:

    对于每个&lt;T&gt; 输入元素,flatMap 将其映射到Publisher&lt;V&gt;。在某些情况下(例如 HTTP 请求),该发布者只会发出一个项目,在这种情况下,我们非常接近异步 map

    但这是退化的情况。一般情况是Publisher 可以发出多个元素,flatMap 也可以。

    例如,假设您有一个反应式数据库,并且您从一系列用户 ID 中进行平面映射,请求返回用户的 Badge 集。在所有这些用户的所有徽章中,您最终会得到一个 Flux&lt;Badge&gt;

    map 真的是同步且非阻塞吗?

    是的:它在操作符应用它的方式上是同步的(一个简单的方法调用,然后操作符发出结果)并且在函数本身不应该阻塞调用它的操作符的意义上是非阻塞的。换句话说,它不应该引入延迟。那是因为Flux 作为一个整体仍然是异步的。如果它阻塞了中间序列,它将影响Flux 处理的其余部分,甚至是其他Flux

    如果您的 map 函数正在阻塞/引入延迟,但无法转换为返回 Publisher,请考虑使用 publishOn/subscribeOn 在单独的线程上抵消该阻塞工作。

    【讨论】:

    • 你的意思是地图是blocking,对吧?我也没有真正得到1-to-N。你能举一个例子什么时候一个比另一个有用吗?我知道,当我期望结果是异步的时,我会使用 flatMap,因为一旦结果出现,它就会使发布者变平 - 对吗?
    • 不,map 函数应该是非阻塞的(除非您还使用 publishOn/subscribeOn 抵消了单独线程上的工作)。也就是说,它是同步执行的,但不应该有延迟。 flatMap 函数是异步的,实际上操作员会在结果可用时将其展平
    • 编辑了答案以解释这两个方面 + flatMap 1-N 示例
    • 这个答案过时了吗?我认为你现在所说的 flatMap 是“flatMapMany”,而 flatMap 做了一些不同的事情——github.com/reactor/reactor-core/issues/516
    • 只是 Mono 现在增加了 flatMap(异步 1 到 1 转换)和 flatMapMany(异步 1 到 n)的微妙之处
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-10-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-30
    • 1970-01-01
    相关资源
    最近更新 更多