【问题标题】:Webflux WebClient java.lang.NullPointerExceptionWebflux WebClient java.lang.NullPointerException
【发布时间】:2020-05-18 01:18:40
【问题描述】:

我在将 webflux 网络客户端连接到我的端点时遇到问题。

在我的控制器中,我试图公开巧克力 DTO 的热流。 当我尝试使用邮递员调用端点时,它可以工作。

这句话的存在是因为Stackoverflow要我添加更多的文字,因为我的问题代码太多,不便之处见谅。

@RestController
public class HotChocolateController {

   @GetMapping(value = "/stream/chocolate", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
   public Flux<Chocolate> chocolateStream() {
       return Flux.interval(Duration.ofMillis(500))
               .map(l -> Chocolate.builder()
                    .id(String.valueOf(System.currentTimeMillis()))
                       .name("Hot Chocolate")
                       .build())
               .log();
   }
}

然后我尝试使用单元测试来测试我的设置:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class ReactiveEndpointTests {

    private final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();


    @Test
    public void testHotChocolate(){
        Flux<Chocolate> chocolateSource = client.get().uri("/stream/chocolate").retrieve().bodyToFlux(Chocolate.class);
        chocolateSource.subscribe(chocolate -> System.out.println());
    }

当我运行这个测试时,我得到了以下异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
    at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Request to GET http://localhost:8080/stream/chocolate [DefaultWebClient]
Stack trace:
        at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:315) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel$0(PooledConnectionProvider.java:248) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4105) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4211) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4077) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4013) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:201) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:172) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:132) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:336) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:504) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$3(PooledConnectionProvider.java:169) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:319) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:99) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:345) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:496) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop$7(SimplePool.java:206) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:307) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:257) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.AbstractNioChannel.doClose(AbstractNioChannel.java:502) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doClose(NioSocketChannel.java:342) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:759) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.NioEventLoop.closeAll(NioEventLoop.java:762) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:524) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

您知道为什么会发生这种情况吗?

【问题讨论】:

    标签: java spring-webflux project-reactor reactor-netty


    【解决方案1】:

    您只是订阅流并立即退出测试方法。 您可以将方法更改为类似下面的代码,您将能够看到 get 请求的结果:

        @Test
        public void testHotChocolate(){
            Flux<String> chocolateSource =
                    client.get()
                            .uri("/stream/chocolate")
                            .retrieve()
                            .bodyToFlux(String.class)
                            .doOnNext(System.out::println);
            chocolateSource.blockLast();
        }
    

    【讨论】:

      猜你喜欢
      • 2018-05-09
      • 2019-08-28
      • 2018-08-18
      • 1970-01-01
      • 2019-04-22
      • 2017-12-31
      • 2021-08-06
      • 2019-12-28
      • 1970-01-01
      相关资源
      最近更新 更多