【问题标题】:Spring Boot WebClient stops sending requestsSpring Boot WebClient 停止发送请求
【发布时间】:2022-01-18 06:41:16
【问题描述】:

我正在运行一个 Spring Boot 应用程序,该应用程序将 WebClient 用于非阻塞和阻塞 HTTP 请求。应用运行一段时间后,所有传出的 HTTP 请求似乎都卡住了。

WebClient 用于向多个主机发送请求,但作为示例,它是如何初始化并用于向 Telegram 发送请求的:

WebClientConfig:

    @Bean
    public ReactorClientHttpConnector httpClient() {
        HttpClient.create(ConnectionProvider.builder("connectionProvider").build())
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
                .responseTimeout(Duration.ofMillis(responseTimeout));
        return new ReactorClientHttpConnector(httpClient);
    }

相同的 ReactorClientHttpConnector 用于所有 WebClient。

电报客户端:

    @Autowired
    ReactorClientHttpConnector httpClient;

    WebClient webClient;

    RateLimiter rateLimiter;

    @PostConstruct
    public void init() {
        webClient = WebClient.builder()
                .clientConnector(httpClient)
                .baseUrl(telegramUrl)
                .build();

        rateLimiter = RateLimiter.of("telegram-rate-limiter",
                RateLimiterConfig.custom()
                        .limitRefreshPeriod(Duration.ofMinutes(1))
                        .limitForPeriod(20)
                        .build());
    }

    public void sendMessage(@PathVariable("token") String token, @RequestParam("chat_id") long chatId, @RequestParam("text") String message) {
        webClient.post().uri(String.format("/bot%s/sendMessage", token))
                .contentType(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
                        .with("text", message))
                .retrieve()
                .bodyToMono(Void.class)
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                .block();
    }

RateLimiter 用于确保请求数量不超过 Telegram API 中指定的每分钟 20 个。

应用启动时,所有请求都按预期正常解决。但经过一段时间后,所有请求似乎都卡住了。发生这种情况所需的时间可能从几个小时到几天不等。对不同主机的所有请求都会发生这种情况,并且当来自 TelegramBot 的消息停止时很容易注意到。一旦请求被卡住,它们就会无限期地卡住,我必须重新启动应用程序才能让它再次工作。

日志中似乎没有异常导致此问题。由于我为我的电报消息维护了一个队列,因此当队列中的消息数量稳步增加以及等待请求解决的其他进程中发生错误时,我可以看到请求停止的时间点。

由于我设置的连接超时和响应超时没有生效,因此似乎连请求都没有发出。

我之前也尝试过将空闲时间设置为 0,但这并没有解决问题

    @Bean
    public ReactorClientHttpConnector httpClient() {
        HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").maxConnections(1000).maxIdleTime(Duration.ofSeconds(0)).build())
        HttpClient httpClient = HttpClient.newConnection()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
                .responseTimeout(Duration.ofMillis(responseTimeout));
        return new ReactorClientHttpConnector(httpClient);
    }

更新:

我启用了指标并在卡住时使用千分尺查看它。有趣的是,它显示 Telegram 有一个连接,但也没有显示空闲、挂起或活动的连接。

reactor_netty_connection_provider_idle_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_pending_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_active_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_total_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 1.0

问题可能是缺少连接吗?

更新 2:

我认为这可能与另一个问题有关:Closing Reactor Netty connection on error status codes

所以我将我的 HttpClient 更新为:

    @Bean
    public ReactorClientHttpConnector httpClient() {
        HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").metrics(true).build())
                .doAfterResponseSuccess((r, c) -> c.dispose())
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
                .responseTimeout(Duration.ofMillis(responseTimeout));
        return new ReactorClientHttpConnector(httpClient);
    }

但似乎所做的只是加速问题的发生。就像以前一样,活动连接、待处理连接和空闲连接不加总连接数。总和总是大于其他 3 个指标的总和。

更新 3: 问题发生时我做了一个线程转储。总共有 74 个线程,所以我认为应用程序没有线程不足。

Telegram 线程的转储:

"TelegramBot" #20 daemon prio=5 os_prio=0 cpu=14.65ms elapsed=47154.24s tid=0x00007f6b28e73000 nid=0x1c waiting on condition  [0x00007f6aed6fb000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
        - parking to wait for  <0x00000000fa865c80> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.13/AbstractQueuedSynchronizer.java:885)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1039)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1345)
        at java.util.concurrent.CountDownLatch.await(java.base@11.0.13/CountDownLatch.java:232)
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
        at reactor.core.publisher.Mono.block(Mono.java:1707)
        at com.moon.arbitrage.cm.feign.TelegramClient.sendMessage(TelegramClient.java:59)
        at com.moon.arbitrage.cm.service.TelegramService.lambda$sendArbMessage$0(TelegramService.java:53)
        at com.moon.arbitrage.cm.service.TelegramService$$Lambda$1092/0x000000084070f840.run(Unknown Source)
        at com.moon.arbitrage.cm.service.TelegramService.task(TelegramService.java:82)
        at com.moon.arbitrage.cm.service.TelegramService$$Lambda$920/0x0000000840665040.run(Unknown Source)
        at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)

   Locked ownable synchronizers:
        - None

反应堆工作线程:

"reactor-http-epoll-1" #15 daemon prio=5 os_prio=0 cpu=810.44ms elapsed=47157.07s tid=0x00007f6b281c4000 nid=0x17 runnable  [0x00007f6b0c46c000]
   java.lang.Thread.State: RUNNABLE
        at io.netty.channel.epoll.Native.epollWait0(Native Method)
        at io.netty.channel.epoll.Native.epollWait(Native.java:177)
        at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:286)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)

   Locked ownable synchronizers:
        - None

"reactor-http-epoll-2" #16 daemon prio=5 os_prio=0 cpu=1312.16ms elapsed=47157.07s tid=0x00007f6b281c5000 nid=0x18 waiting on condition  [0x00007f6b0c369000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
        - parking to wait for  <0x00000000fa865948> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
        at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.13/CompletableFuture.java:1796)
        at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.13/ForkJoinPool.java:3128)
        at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.13/CompletableFuture.java:1823)
        at java.util.concurrent.CompletableFuture.get(java.base@11.0.13/CompletableFuture.java:1998)
        at com.moon.arbitrage.cm.service.OrderService.reconcileOrder(OrderService.java:103)
        at com.moon.arbitrage.cm.service.BotService$BotTask.lambda$task$1(BotService.java:383)
        at com.moon.arbitrage.cm.service.BotService$BotTask$$Lambda$1161/0x00000008400af440.accept(Unknown Source)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)

   Locked ownable synchronizers:
        - None

"reactor-http-epoll-3" #17 daemon prio=5 os_prio=0 cpu=171.84ms elapsed=47157.07s tid=0x00007f6b28beb000 nid=0x19 runnable  [0x00007f6b0c26a000]
   java.lang.Thread.State: RUNNABLE
        at io.netty.channel.epoll.Native.epollWait0(Native Method)
        at io.netty.channel.epoll.Native.epollWait(Native.java:177)
        at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)

   Locked ownable synchronizers:
        - None

"reactor-http-epoll-4" #18 daemon prio=5 os_prio=0 cpu=188.10ms elapsed=47157.07s tid=0x00007f6b28b7d800 nid=0x1a runnable  [0x00007f6b0c169000]
   java.lang.Thread.State: RUNNABLE
        at io.netty.channel.epoll.Native.epollWait0(Native Method)
        at io.netty.channel.epoll.Native.epollWait(Native.java:177)
        at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)

   Locked ownable synchronizers:
        - None

似乎其中一个被另一个任务阻塞(甚至不是来自 Telegram 服务),但这应该不是问题,因为其他三个工作线程是可以运行的,对吧?

【问题讨论】:

  • 您是否能够在本地工作区中进行复制?尝试将您的 spring 应用程序指向一个电报 api 模拟。如果您无法重现错误,将很难为您提供帮助
  • @JRichardsz 我试图在本地复制它但无济于事,特别是因为这个问题在一段时间后随机发生(可能是几天)。另外,我以 Telegram 为例,但这个问题发生在我向其发送请求的所有主机上。所有主机的共同点是连接不会累加。我正在思考并希望这就是问题的原因。
  • 出现问题时尝试生成你的java进程的thread dump并分析内容。您也可能用完了线程。这是 server.tomcat.threads.max 的值。默认值为 200。
  • 您是否在日志中的任何位置看到“对等方重置连接”?询问是因为 netty 默认使用连接池,并且大多数 api 网关会在一段时间后关闭空闲连接。
  • @Saxon 虽然您的建议不是我问题的原因,但您建议生成线程转储导致我发现了实际问题。你能写一个答案,让我奖励你赏金吗?

标签: java spring-boot spring-webclient


【解决方案1】:

终于找到并解决了问题。问题是我有一个阻塞任务在反应器线程上被阻塞。由于线程转储,我才注意到这一点。阻塞任务正在等待事件,因此可能需要很长时间才能解决。所以最终,当所有四个反应器线程都被阻塞时,所有请求自然会卡在没有线程处理它们的情况下。

简而言之:不要阻塞您的反应器线程。

【讨论】:

    【解决方案2】:

    我建议看一下 RateLimiter 方向。 也许它不能按预期工作,具体取决于您的应用程序随着时间的推移执行的请求数量。 来自 Ratelimiter 的 Javadoc: “重要的是要注意,请求的许可数量永远不会影响请求本身的限制......但它会影响下一个请求的限制。即,如果一个昂贵的任务到达空闲的 RateLimiter,它将立即被授予,但下一个请求将经历额外的限制,从而支付昂贵任务的成本。” 这个讨论也可能有帮助: githubgithub

    我可以想象 RateLimiter 中有一些节流累加或其他效果,我会尝试使用它并确保这件事真的按照你想要的方式工作。 或者,考虑使用 Spring @Scheduled 从队列中读取。 您可能希望使用嵌入式 JMS 为其增添趣味,以获得更多好处(消息持久性等)。

    【讨论】:

      【解决方案3】:

      这些问题往往与spring-boot上的线程耗尽有关,应该用jstack在block发生时生成线程转储来分析:

      jstack <java pid> > ThredDump.txt
      

      生成的文件 ThredDump.txt 的内容通常会给出阻塞线程的信息。

      【讨论】:

      • 对不起,您发布得太晚了。赏金已过期。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-10
      • 1970-01-01
      • 1970-01-01
      • 2022-01-25
      • 2021-10-26
      • 2019-01-14
      相关资源
      最近更新 更多