【发布时间】:2021-05-09 01:24:38
【问题描述】:
我们使用 Spring Cloud Gateway 将请求路由到多个底层服务。对这些底层服务的调用将是连续的,并且可能会相互馈送(来自一个用于下一个请求的响应)。当我们需要在主请求之前按顺序发出这些请求时,我们有一个可行的解决方案,但在主请求之后,我们在将一个代理请求的响应提供给下一个请求的请求时遇到问题。
我们计划将响应从一个请求提供给下一个请求的方式是使用 GatewayFilter 中的 WebClient 发出请求,并将响应字符串存储在交换的属性存储中。然后在下一个代理请求期间,我们提供一个属性名称来选择性地从中提取请求正文。这在使用“pre”过滤器时效果很好,因为第一个代理请求在构建和执行第二个请求之前构建、执行并缓存响应,因此属性链按预期工作。使用“发布”过滤器时会出现问题。在 post 代理中,Web 客户端请求都是在后续请求完成之前构建的。因此,属性存储永远不会收到上一个请求的响应,这意味着下一个请求无法按预期工作,因为它没有有效的请求正文。
我的理解是调用chain.filter(exchange).then(Mono.fromRunnable{ ... }) 将导致.then 逻辑仅在先前的过滤器完全完成后执行。情况似乎并非如此。在其他过滤器类型(如日志记录、响应操作等)中,后置过滤器以正确的顺序执行,但在创建 WebClient 时它们似乎没有。
有人对如何实现这种期望的行为有任何想法吗?
预代理过滤器代码(工作):
class PreProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PreProxyGatewayFilterFactory.Params>(Params::class.java) {
override fun apply(params: Params): GatewayFilter {
return OrderedGatewayFilter(
{ exchange, chain ->
ServerWebExchangeUtils.cacheRequestBody(exchange){
val cachedExchange = exchange.mutate().request(it).build()
executeRequest(cachedExchange, params)
.map { response ->
val body = response.body.toString()
cacheResponse(
response.body.toString(),
params.cachedResponseBodyAttributeName,
cachedExchange
)
}
.flatMap(chain::filter)
}
}, params.order)
}
private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
if(!attributeName.isNullOrBlank()){
exchange.attributes[attributeName] = response
}
return exchange
}
private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
val request = when(exchange.request.method){
HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
else -> throw Exception("Invalid request method passed in to the proxy filter")
}
return request.headers { headers ->
headers.addAll(exchange.request.headers)
headers.remove(CONTENT_LENGTH)
}
.exchange()
.flatMap{ response ->
response.toEntity(String::class.java)
}
}
private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
val cachedBody = attributeName?.let { attrName ->
exchange.getAttributeOrDefault<String>(attrName, "null")
} ?: "null"
return if(cachedBody != "null"){
BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
} else {
BodyInserters.fromDataBuffers(exchange.request.body)
}
}
data class Params(
val proxyPath: String = "",
val cachedRequestBodyAttributeName: String? = null,
val cachedResponseBodyAttributeName: String? = null,
val order: Int = 0
)
}
后代理过滤器代码(不工作)
class PostProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
override fun apply(params: Params): GatewayFilter {
return OrderedGatewayFilter(
{ exchange, chain ->
ServerWebExchangeUtils.cacheRequestBody(exchange){
val cachedExchange = exchange.mutate().request(it).build()
//Currently using a cached body does not work in post proxy
chain.filter(cachedExchange).then( Mono.fromRunnable{
executeRequest(cachedExchange, params)
.map { response ->
cacheResponse(
response.body.toString(),
params.cachedResponseBodyAttributeName,
cachedExchange
)
}
.flatMap {
Mono.empty<Void>()
}
})
}
}, params.order)
}
private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
if(!attributeName.isNullOrBlank()){
exchange.attributes[attributeName] = response
}
return exchange
}
private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
val request = when(exchange.request.method){
HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
else -> throw Exception("Invalid request method passed in to the proxy filter")
}
return request.headers { headers ->
headers.addAll(exchange.request.headers)
headers.remove(CONTENT_LENGTH)
}
.exchange()
.flatMap{ response ->
response.toEntity(String::class.java)
}
}
private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
val cachedBody = attributeName?.let { attrName ->
exchange.getAttributeOrDefault<String>(attrName, "null")
} ?: "null"
return if(cachedBody != "null"){
BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
} else {
BodyInserters.fromDataBuffers(exchange.request.body)
}
}
data class Params(
val proxyPath: String = "",
val cachedRequestBodyAttributeName: String? = null,
val cachedResponseBodyAttributeName: String? = null,
val order: Int = 0
)
}
【问题讨论】:
标签: kotlin reactive-programming spring-cloud spring-webflux spring-cloud-gateway