【问题标题】:How to limit the request/second with WebClient?如何使用 WebClient 限制请求/秒?
【发布时间】:2018-05-17 09:10:14
【问题描述】:

我正在使用WebClient 对象将 Http Post 请求发送到服务器。 它非常迅速地发送大量请求(QueueChannel 中有大约 4000 条消息)。问题是......似乎服务器响应不够快......所以我收到很多服务器错误 500 并且连接过早关闭。

有没有办法限制每秒请求的数量?或者限制它使用的线程数?

编辑:

消息端点在 QueueChannel 中处理消息:

@MessageEndpoint
public class CustomServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    IHttpService httpService;

    @ServiceActivator(
            inputChannel = "outputFilterChannel",
            outputChannel = "outputHttpServiceChannel",
            poller = @Poller( fixedDelay = "1000" )
    )
    public void processMessage(Data data) {
        httpService.push(data);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

WebClient 服务类:

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://www.blabla.com/log";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ResponseEntity<Response>> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange()
                .flatMap(response -> response.toEntity(Response.class));

        res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
    }
}

【问题讨论】:

  • 为什么不用 Threed.sleep ?
  • 你能发布一个你如何使用WebClient的例子吗?我确信这对于 Project Reactor 是可行的,无需任何其他库。
  • @MuratOzkan 我编辑了帖子

标签: spring spring-webflux project-reactor


【解决方案1】:

Resilience4j 对 Project Reactor 的非阻塞速率限制提供了出色的支持。

必需的依赖项(除了 Spring WebFlux):

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.6.1</version>
</dependency>

例子:

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;

public class WebClientRateLimit
{
    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    private final WebClient webClient;
    private final RateLimiter rateLimiter;

    public WebClientRateLimit()
    {
        this.webClient = WebClient.create();

        // enables 3 requests every 5 seconds
        this.rateLimiter = RateLimiter.of("my-rate-limiter",
                RateLimiterConfig.custom()
                                 .limitRefreshPeriod(Duration.ofSeconds(5))
                                 .limitForPeriod(3)
                                 .timeoutDuration(Duration.ofMinutes(1)) // max wait time for a request, if reached then error
                                 .build());
    }

    public Mono<?> call()
    {
        return webClient.get()
                        .uri("https://jsonplaceholder.typicode.com/todos/1")
                        .retrieve()
                        .bodyToMono(String.class)
                        .doOnSubscribe(s -> System.out.println(COUNTER.incrementAndGet() + " - " + LocalDateTime.now()
                                + " - call triggered"))
                        .transformDeferred(RateLimiterOperator.of(rateLimiter));
    }

    public static void main(String[] args)
    {
        WebClientRateLimit webClientRateLimit = new WebClientRateLimit();

        long start = System.currentTimeMillis();

        Flux.range(1, 16)
            .flatMap(x -> webClientRateLimit.call())
            .blockLast();

        System.out.println("Elapsed time in seconds: " + (System.currentTimeMillis() - start) / 1000d);
    }
}

示例输出:

1 - 2020-11-30T15:44:01.575003200 - call triggered
2 - 2020-11-30T15:44:01.821134 - call triggered
3 - 2020-11-30T15:44:01.823133100 - call triggered
4 - 2020-11-30T15:44:04.462353900 - call triggered
5 - 2020-11-30T15:44:04.462353900 - call triggered
6 - 2020-11-30T15:44:04.470399200 - call triggered
7 - 2020-11-30T15:44:09.461199100 - call triggered
8 - 2020-11-30T15:44:09.463157 - call triggered
9 - 2020-11-30T15:44:09.463157 - call triggered
11 - 2020-11-30T15:44:14.461447700 - call triggered
10 - 2020-11-30T15:44:14.461447700 - call triggered
12 - 2020-11-30T15:44:14.461447700 - call triggered
13 - 2020-11-30T15:44:19.462098200 - call triggered
14 - 2020-11-30T15:44:19.462098200 - call triggered
15 - 2020-11-30T15:44:19.468059700 - call triggered
16 - 2020-11-30T15:44:24.462615 - call triggered
Elapsed time in seconds: 25.096

文档:https://resilience4j.readme.io/docs/examples-1#decorate-mono-or-flux-with-a-ratelimiter

【讨论】:

    【解决方案2】:

    问题Limiting rate of requests with Reactor提供了两个答案(一个在评论中)

    zip 使用另一个流量限制器

    .zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))

    只是延迟每个网络请求

    使用delayElements函数

    编辑: 下面的答案对于阻止 RestTemplate 是有效的,但并不真正适合反应模式。

    WebClient 无法限制请求,但您可以使用组合轻松添加此功能。

    您可以使用 Guava 的 RateLimiter 从外部限制您的客户端/ (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)

    在本教程http://www.baeldung.com/guava-rate-limiter 中,您将了解如何以阻塞方式或超时使用速率限制器。

    我会在单独的类中装饰所有需要限制的调用

    1. 限制每秒调用次数
    2. 使用 WebClient 执行实际的网络调用

    【讨论】:

    • 谢谢,我要试试这个
    • zipWith 解决方案基本上是delayElements 的繁琐变体。
    • @Phhoste,您找到解决方案了吗?如果是,请分享,我遇到了同样的问题。非常需要它。
    • 嗯,flux.limiteRate(int) 呢?
    【解决方案3】:

    我希望我不会迟到。无论如何,限制请求的速率只是我一周前创建爬虫时遇到的问题之一。以下是问题:

    1. 我必须执行递归、分页顺序请求。分页参数包含在我调用的 API 中。
    2. 收到响应后,暂停 1 秒钟,然后再执行下一个请求。
    3. 对于遇到的某些错误,请重试
    4. 重试时,暂停几秒钟

    解决办法如下:

    private Flux<HostListResponse> sequentialCrawl() {
        AtomicLong pageNo = new AtomicLong(2);
        // Solution for #1 - Flux.expand
        return getHosts(1)
            .doOnRequest(value -> LOGGER.info("Start crawling."))
            .expand(hostListResponse -> { 
                final long totalPages = hostListResponse.getData().getTotalPages();
                long currPageNo = pageNo.getAndIncrement();
                if (currPageNo <= totalPages) {
                    LOGGER.info("Crawling page " + currPageNo + " of " + totalPages);
                    // Solution for #2
                    return Mono.just(1).delayElement(Duration.ofSeconds(1)).then(
                        getHosts(currPageNo)
                    );
                }
                return Flux.empty();
            })
            .doOnComplete(() -> LOGGER.info("End of crawling."));
    }
    
    private Mono<HostListResponse> getHosts(long pageNo) {
        final String uri = hostListUrl + pageNo;
        LOGGER.info("Crawling " + uri);
    
        return webClient.get()
            .uri(uri)
            .exchange()
            // Solution for #3
            .retryWhen(companion -> companion
                .zipWith(Flux.range(1, RETRY + 1), (error, index) -> {
                    String message = "Failed to crawl uri: " + error.getMessage();
                    if (index <= RETRY && (error instanceof RequestIntervalTooShortException
                        || error instanceof ConnectTimeoutException
                        || "Connection reset by peer".equals(error.getMessage())
                    )) {
                        LOGGER.info(message + ". Retries count: " + index);
                        return Tuples.of(error, index);
                    } else {
                        LOGGER.warn(message);
                        throw Exceptions.propagate(error); //terminate the source with the 4th `onError`
                    }
                })
                .map(tuple -> {
                    // Solution for #4
                    Throwable e = tuple.getT1();
                    int delaySeconds = tuple.getT2();
                    // TODO: Adjust these values according to your needs
                    if (e instanceof ConnectTimeoutException) {
                        delaySeconds = delaySeconds * 5;
                    } else if ("Connection reset by peer".equals(e.getMessage())) {
                        // The API that this app is calling will sometimes think that the requests are SPAM. So let's rest longer before retrying the request.
                        delaySeconds = delaySeconds * 10;
                    }
                    LOGGER.info("Will retry crawling after " + delaySeconds + " seconds to " + uri + ".");
                    return Mono.delay(Duration.ofSeconds(delaySeconds));
                })
                .doOnNext(s -> LOGGER.warn("Request is too short - " + uri + ". Retried at " + LocalDateTime.now()))
            )
            .flatMap(clientResponse -> clientResponse.toEntity(String.class))
            .map(responseEntity -> {
                HttpStatus statusCode = responseEntity.getStatusCode();
                if (statusCode != HttpStatus.OK) {
                    Throwable exception;
                    // Convert json string to Java POJO
                    HostListResponse response = toHostListResponse(uri, statusCode, responseEntity.getBody());
                    // The API that I'm calling will return error code of 06 if request interval is too short
                    if (statusCode == HttpStatus.BAD_REQUEST && "06".equals(response.getError().getCode())) {
                        exception = new RequestIntervalTooShortException(uri);
                    } else {
                        exception = new IllegalStateException("Request to " + uri + " failed. Reason: " + responseEntity.getBody());
                    }
                    throw Exceptions.propagate(exception);
                } else {
                    return toHostListResponse(uri, statusCode, responseEntity.getBody());
                }
            });
    }
    

    【讨论】:

      【解决方案4】:

      我用它来限制活动请求的数量:

      public DemoClass(WebClient.Builder webClientBuilder) {
          AtomicInteger activeRequest = new AtomicInteger();
          this.webClient = webClientBuilder
                  .baseUrl("http://httpbin.org/ip")
                  .filter(
                          (request, next) -> Mono.just(next)
                                  .flatMap(a -> {
                                      if (activeRequest.intValue() < 3) {
                                          activeRequest.incrementAndGet();
                                          return next.exchange(request)
                                                  .doOnNext(b -> activeRequest.decrementAndGet());
                                      }
                                    return Mono.error(new RuntimeException("Too many requests"));
                                  })
                                  .retryWhen(Retry.anyOf(RuntimeException.class)
                                          .randomBackoff(Duration.ofMillis(300), Duration.ofMillis(1000))
                                          .retryMax(50)
                                  )
                  )
                  .build();
      }
      
      public Mono<String> call() {
          return webClient.get()
                  .retrieve()
                  .bodyToMono(String.class);
      }
      

      【讨论】:

        【解决方案5】:

        我们可以自定义 ConnectionBuilder 来限制 WebClient 上的活动连接。

        需要为队列中的等待请求数添加pendingAquiredMaxCount,因为默认队列大小始终为2 * maxConnections。

        此速率限制网络客户端一次处理请求。

        ConnectionProvider provider = ConnectionProvider.builder('builder').maxConnections(maxConnections).pendingAcquireMaxCount(maxPendingRequests).build()
        TcpClient tcpClient = TcpClient
                                .create(provider)
               
        WebClient client = WebClient.builder()
                                .baseUrl('url')
                                .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
        

        【讨论】:

          猜你喜欢
          • 2020-03-30
          • 2015-08-17
          • 1970-01-01
          • 2021-11-18
          • 2010-09-12
          • 1970-01-01
          • 2018-03-23
          相关资源
          最近更新 更多