【问题标题】:Spring WebClient: Retry with WebFlux.fn + reactor-addonsSpring WebClient:使用 WebFlux.fn + reactor-addons 重试
【发布时间】:2020-03-24 08:18:10
【问题描述】:

我正在尝试使用 Kotlin Coroutines + WebFlux.fn + reactor-addons 为 WebClient 添加条件重试:

suspend fun ClientResponse.asResponse(): ServerResponse =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
        .retryWhen { 
            Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error("Retry for {}", it.exception()) }
        )
        .awaitSingle()

在重试之前也添加一个条件

if (statusCode().isError) {
        body(
            BodyInserters.fromPublisher(
                Mono.error(StatusCodeError(status = statusCode())),
                StatusCodeException::class.java
            )
        )
    } else {
        body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
    }

调用看起来像:

suspend fun request(): ServerResponse =
           webClient/*...*/
                    .awaitExchange()
                    .asResponse()

【问题讨论】:

    标签: spring kotlin spring-webflux spring-webclient spring-reactive


    【解决方案1】:

    这个spring webclient: retry with backoff on specific error给了我回答问题的提示:

    .awaitExchange() 返回 ClientResponse 而不是 Mono&lt;ClientReponse&gt; 这意味着我的重试是作用于bodyToMono 而不是exchange() 的操作。

    解决方案现在看起来像

    suspend fun Mono<ClientResponse>.asResponse(): ServerResponse =
        flatMap {
            if (it.statusCode().isError) {
                Mono.error(StatusCodeException(status = it.statusCode()))
            } else {
                it.asResponse()
            }
        }.retryWhen(
            Retry.onlyIf { ctx: RetryContext<Throwable> ->
                (ctx.exception() as? StatusCodeException)?.shouldRetry() ?: false
            }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error { it.exception() } }
        ).awaitSingle()
    
    private fun ClientResponse.asResponse(): Mono<ServerResponse> =
        status(statusCode())
            .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
            .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
    

    【讨论】:

      猜你喜欢
      • 2019-02-14
      • 2019-01-03
      • 1970-01-01
      • 2019-09-08
      • 2021-01-28
      • 1970-01-01
      • 2019-05-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多