【问题标题】:Spring Cloud Gateway: Post Filter Web Client RequestSpring Cloud Gateway:后过滤 Web 客户端请求
【发布时间】:2021-05-09 01:24:38
【问题描述】:

我们使用 Spring Cloud Gateway 将请求路由到多个底层服务。对这些底层服务的调用将是连续的,并且可能会相互馈送(来自一个用于下一个请求的响应)。当我们需要在主请求之前按顺序发出这些请求时,我们有一个可行的解决方案,但在主请求之后,我们在将一个代理请求的响应提供给下一个请求的请求时遇到问题。

我们计划将响应从一个请求提供给下一个请求的方式是使用 GatewayFilter 中的 WebClient 发出请求,并将响应字符串存储在交换的属性存储中。然后在下一个代理请求期间,我们提供一个属性名称来选择性地从中提取请求正文。这在使用“pre”过滤器时效果很好,因为第一个代理请求在构建和执行第二个请求之前构建、执行并缓存响应,因此属性链按预期工作。使用“发布”过滤器时会出现问题。在 post 代理中,Web 客户端请求都是在后续请求完成之前构建的。因此,属性存储永远不会收到上一个请求的响应,这意味着下一个请求无法按预期工作,因为它没有有效的请求正文。

我的理解是调用chain.filter(exchange).then(Mono.fromRunnable{ ... }) 将导致.then 逻辑仅在先前的过滤器完全完成后执行。情况似乎并非如此。在其他过滤器类型(如日志记录、响应操作等)中,后置过滤器以正确的顺序执行,但在创建 WebClient 时它们似乎没有。

有人对如何实现这种期望的行为有任何想法吗?

预代理过滤器代码(工作):

class PreProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PreProxyGatewayFilterFactory.Params>(Params::class.java) {
    override fun apply(params: Params): GatewayFilter {
        return OrderedGatewayFilter(
            { exchange, chain ->
                ServerWebExchangeUtils.cacheRequestBody(exchange){
                    val cachedExchange = exchange.mutate().request(it).build()
                    executeRequest(cachedExchange, params)
                        .map { response ->
                            val body = response.body.toString()
                            cacheResponse(
                                response.body.toString(),
                                params.cachedResponseBodyAttributeName,
                                cachedExchange
                            )
                        }
                        .flatMap(chain::filter)
                }
            }, params.order)
    }

    private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
        if(!attributeName.isNullOrBlank()){
            exchange.attributes[attributeName] = response
        }
        return exchange
    }

    private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
        val request = when(exchange.request.method){
            HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
            HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
            else -> throw Exception("Invalid request method passed in to the proxy filter")
        }
        return request.headers { headers ->
            headers.addAll(exchange.request.headers)
            headers.remove(CONTENT_LENGTH)
        }
        .exchange()
        .flatMap{ response ->
            response.toEntity(String::class.java)
        }
    }

    private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
        val cachedBody = attributeName?.let { attrName ->
            exchange.getAttributeOrDefault<String>(attrName, "null")
        } ?: "null"
        return if(cachedBody != "null"){
            BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
        } else {
            BodyInserters.fromDataBuffers(exchange.request.body)
        }
    }

    data class Params(
            val proxyPath: String = "",
            val cachedRequestBodyAttributeName: String? = null,
            val cachedResponseBodyAttributeName: String? = null,
            val order: Int = 0
    )
}

后代理过滤器代码(不工作)

class PostProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
    override fun apply(params: Params): GatewayFilter {
        return OrderedGatewayFilter(
            { exchange, chain ->
                ServerWebExchangeUtils.cacheRequestBody(exchange){
                    val cachedExchange = exchange.mutate().request(it).build()

                    //Currently using a cached body does not work in post proxy
                    chain.filter(cachedExchange).then( Mono.fromRunnable{
                        executeRequest(cachedExchange, params)
                            .map { response ->
                                cacheResponse(
                                    response.body.toString(),
                                    params.cachedResponseBodyAttributeName,
                                    cachedExchange
                                )
                            }
                            .flatMap {
                                Mono.empty<Void>()
                            }
                    })
                }
            }, params.order)
    }

    private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
        if(!attributeName.isNullOrBlank()){
            exchange.attributes[attributeName] = response
        }
        return exchange
    }

    private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
        val request = when(exchange.request.method){
            HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
            HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
            else -> throw Exception("Invalid request method passed in to the proxy filter")
        }
        return request.headers { headers ->
            headers.addAll(exchange.request.headers)
            headers.remove(CONTENT_LENGTH)
        }
        .exchange()
        .flatMap{ response ->
            response.toEntity(String::class.java)
        }
    }

    private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
        val cachedBody = attributeName?.let { attrName ->
            exchange.getAttributeOrDefault<String>(attrName, "null")
        } ?: "null"
        return if(cachedBody != "null"){
            BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
        } else {
            BodyInserters.fromDataBuffers(exchange.request.body)
        }
    }


    data class Params(
            val proxyPath: String = "",
            val cachedRequestBodyAttributeName: String? = null,
            val cachedResponseBodyAttributeName: String? = null,
            val order: Int = 0
    )
}

【问题讨论】:

    标签: kotlin reactive-programming spring-cloud spring-webflux spring-cloud-gateway


    【解决方案1】:

    终于能够为后过滤器代理从属性中提取其请求正文找到一个可行的解决方案。这是一个相对简单的修复,我只是找不到答案。而不是使用chain.filter(exchange).then(Mono.fromRunnable { ...execute proxy request...}),我只需要使用chain.filter(exchange).then(Mono.defer { ...execute proxy request...})

    【讨论】:

      猜你喜欢
      • 2022-11-08
      • 2020-11-14
      • 2021-10-22
      • 2020-05-31
      • 2019-10-07
      • 2019-01-15
      • 2020-01-14
      • 2020-04-30
      • 2023-03-11
      相关资源
      最近更新 更多