【问题标题】:Ensuring method execution in Spring Webflux确保 Spring Webflux 中的方法执行
【发布时间】:2020-06-20 06:29:57
【问题描述】:
override fun uploadFileAndNotifyCadmium(
        validatedPages: List<KycFile>,
        documentId: UUID,
    ): Mono<Unit> {

        validatedPages.forEach { it ->
            s3Service.uploadToToxicBucket(
                documentId, it.fileName })
                .then()
        }

        return cadmiumClient.notifyCadmium(documentId) // API call to other microservice
    }

我有这个方法,我试图调用一个方法 (s3Service.uploadToToxicBucket()),它在迭代列表时返回 Unit 的 Mono。我想确保为每个列表元素调用此方法。因此,我将 then() 添加到生成的 Mono 中。此外,我希望方法 cadmiumClient.notifyCadmium() 仅在所有文件上传后才执行。这是这样做的正确方法还是我可以使用其他运算符。 cadmiumClient.notifyCadmium() 也返回 Unit 的 Mono。

当我打电话给uploadFileAndNotifyCadmium()时,我也在打电话给uploadFileAndNotifyCadmium().then()

我使用 then() 是因为执行是惰性的,我不想调用 subscribe() 来确保这些方法的执行。此外,在控制器中调用了方法 uploadFileAndNotifyCadmium(),因此它由 Spring Webflux 自动订阅。我对 then() 用法的理解可能是错误的。

方法2:我也想过这样做:

override fun uploadFileAndNotifyCadmium(
        validatedPages: List<KycFile>,
        documentId: UUID,
    ): Mono<Unit> {

        val list = mutableListOf<Mono<Unit>>()

        validatedPages.forEach { it ->
            val result =s3Service.uploadToToxicBucket(
                documentId, it.fileName })

        list.add(result)

        }

        //Zip all the elements in list together into some variable zippedResult

        return zippedResult.then(cadmiumClient.notifyCadmium(documentId)) // API call to other microservice
    }

对于这种方法,当列表的大小以前不知道时,我找不到任何运算符将所有元素压缩在一起。

【问题讨论】:

  • 你的理解是错误的,不管是谁发起的,都是订阅的。因此,调用客户端将在后台调用 subscribe 并启动链,如果您有一个列表,您应该将其设为通量,并且对于通量中的每个发射对象,您都可以做您想做的事情。当您刚刚给我们一段代码时,这非常困难。由于我们不知道您发布的任何函数的任何返回类型,因此无法为您提供帮助。他们是否返回 Mono、字符串、通量、列表等?
  • @ThomasAndolf 我已经用返回类型更新了问题。另外我知道调用客户端订阅了该方法,在我的情况下,这个方法是从控制器调用的,因此它是由 Webflux 订阅的。我不确定我是否正确使用 then() 并且方法 2 似乎是一种更好的方法,但是当事先不知道列表的大小时,我不知道如何组合多个单声道。
  • 你应该把你的清单放在一个变化中

标签: kotlin reactive-programming spring-webflux project-reactor


【解决方案1】:

我试图理解你想要做什么,因为你的解释很不清楚。

我不会在 Kotlin 中编码,但如果我做对了,你想要做的是:

  • 包含 n 个对象的列表
  • 对列表中的每个对象执行某种类型的副作用
  • 收集所有副作用的某种形式的结果
  • 返回此结果

这就是我在普通反应式 java 中的做法

    return Flux.fromIterable(List.of("one", "two", "three"))
            .flatMap( // do some side effect )
            .collect(Collectors.toList());
  • 将列表转换为Flux&lt;T&gt;
  • flatMap 中为每个发出的项目执行副作用
  • 将所有结果收集为一个列表
  • 返回Mono&lt;List&lt;T&gt;&gt;

【讨论】:

  • 听起来应该是flatMap,因为目标不是简单的副作用,而是异步处理(上传到s3,它返回Mono以表示完成/错误)
  • 是的,我可以同意,这取决于他心目中的副作用类型,你比我有更多的经验西蒙
猜你喜欢
  • 2020-09-27
  • 2019-12-28
  • 2019-06-12
  • 2021-09-25
  • 2017-09-20
  • 2017-12-31
  • 2019-07-18
  • 1970-01-01
  • 2019-05-14
相关资源
最近更新 更多