【问题标题】:Connecting to a Server message queue using Reactor Netty使用 Reactor Netty 连接到服务器消息队列
【发布时间】:2020-09-13 17:25:29
【问题描述】:

我正在尝试使用 Reactor Netty 连接到在 docker 容器上运行的消息队列。由于依赖问题,我将其作为独立的,而不是使用 SpringFlux。

从 Reactor Netty 文档中的示例中,我看到有一种方法可以连接到服务器并获得响应:

public static void main(String[] args) {
        String response =
                HttpClient.create()
                          .headers(h -> h.add("my header", my_header)
                          .get()
                          .uri(my_uri)
                          .responseContent() 
                          .aggregate()       
                          .asString()        
                          .block();
    }

但是当我之后尝试通过 System.out.println() 显示输出时,没有任何反应。

我也尝试了解如何使用:

Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)

但我不确定该怎么做。 我在文档中看到有一个名为 Connection 的类,它使用 TCPClient 并有一个方法 subscribe。

我有点迷茫,你能指出我在不使用 spring-flux 的情况下在 Reactor Netty 中实现这个的正确方向吗?

谢谢

编辑:

经过一些实验,我得到了这个:

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((res, bytes) - > {
                   System.out.println(bytes.asString());
                   return bytes.asString();})
               .subscribe();
}

这给了我一个 FluxHandle,我如何使用它来实际读取响应的正文?

【问题讨论】:

  • block 的示例应该可以工作,如果不是这样,请打开一个问题。您还可以通过添加.wiretap(true) 来跟踪网络流量。

标签: java httpclient project-reactor reactor-netty


【解决方案1】:

所以我想出了如何订阅和读取从服务器接收的数据,甚至使用jackson 库将数据转换为 JSON,以便我的代码更容易读取。

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((resp, bytes) -> {
                    return bytes.asString();
                })
                .subscribe(response -> {
                    try {
                        consumeData(new ObjectMapper()
                                .readValue(response, MyData.class));
                    } catch (IOException ex) {
                        System.out.println("ERROR converting to json: " + ex);
                    }
                    });
}

似乎在使用 subscribe() 方法时,我可以收听传入的响应并对其进行处理。当服务器停止或消息队列关闭时,我仍然需要添加一种关闭连接的方法,这样客户端就不会挂在不存在的消息队列上。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-05-24
    • 2012-12-05
    • 2018-03-30
    • 1970-01-01
    • 1970-01-01
    • 2014-02-23
    • 2019-09-25
    • 2011-05-23
    相关资源
    最近更新 更多