【发布时间】:2020-06-20 06:29:57
【问题描述】:
override fun uploadFileAndNotifyCadmium(
validatedPages: List<KycFile>,
documentId: UUID,
): Mono<Unit> {
validatedPages.forEach { it ->
s3Service.uploadToToxicBucket(
documentId, it.fileName })
.then()
}
return cadmiumClient.notifyCadmium(documentId) // API call to other microservice
}
我有这个方法,我试图调用一个方法 (s3Service.uploadToToxicBucket()),它在迭代列表时返回 Unit 的 Mono。我想确保为每个列表元素调用此方法。因此,我将 then() 添加到生成的 Mono 中。此外,我希望方法 cadmiumClient.notifyCadmium() 仅在所有文件上传后才执行。这是这样做的正确方法还是我可以使用其他运算符。 cadmiumClient.notifyCadmium() 也返回 Unit 的 Mono。
当我打电话给uploadFileAndNotifyCadmium()时,我也在打电话给uploadFileAndNotifyCadmium().then()。
我使用 then() 是因为执行是惰性的,我不想调用 subscribe() 来确保这些方法的执行。此外,在控制器中调用了方法 uploadFileAndNotifyCadmium(),因此它由 Spring Webflux 自动订阅。我对 then() 用法的理解可能是错误的。
方法2:我也想过这样做:
override fun uploadFileAndNotifyCadmium(
validatedPages: List<KycFile>,
documentId: UUID,
): Mono<Unit> {
val list = mutableListOf<Mono<Unit>>()
validatedPages.forEach { it ->
val result =s3Service.uploadToToxicBucket(
documentId, it.fileName })
list.add(result)
}
//Zip all the elements in list together into some variable zippedResult
return zippedResult.then(cadmiumClient.notifyCadmium(documentId)) // API call to other microservice
}
对于这种方法,当列表的大小以前不知道时,我找不到任何运算符将所有元素压缩在一起。
【问题讨论】:
-
你的理解是错误的,不管是谁发起的,都是订阅的。因此,调用客户端将在后台调用 subscribe 并启动链,如果您有一个列表,您应该将其设为通量,并且对于通量中的每个发射对象,您都可以做您想做的事情。当您刚刚给我们一段代码时,这非常困难。由于我们不知道您发布的任何函数的任何返回类型,因此无法为您提供帮助。他们是否返回 Mono、字符串、通量、列表等?
-
@ThomasAndolf 我已经用返回类型更新了问题。另外我知道调用客户端订阅了该方法,在我的情况下,这个方法是从控制器调用的,因此它是由 Webflux 订阅的。我不确定我是否正确使用 then() 并且方法 2 似乎是一种更好的方法,但是当事先不知道列表的大小时,我不知道如何组合多个单声道。
-
你应该把你的清单放在一个变化中
标签: kotlin reactive-programming spring-webflux project-reactor