【问题标题】:How to read Message in netty in other class如何在其他课程中阅读netty中的消息
【发布时间】:2016-04-30 20:52:21
【问题描述】:

我想在除InboundHandler 之外的类中的特定位置阅读消息。我无法在 channelRead0 方法中找到读取它的方法,该方法是从 netty 框架调用的。

例如:

context.writeMessage("message");
String msg = context.readMessage;

如果这不可能,我如何将我在channelRead0 方法中获得的结果映射到我在另一个类中进行的特定调用?

【问题讨论】:

    标签: java netty


    【解决方案1】:

    Netty 框架设计为异步驱动。使用这个类比,它可以以最少的线程使用处理大量连接。如果您正在创建一个使用 netty 框架将呼叫分派到远程位置的 api,您应该对您的呼叫使用相同的类比。

    不要让你的 api 直接返回值,而是让它返回 Future<?>Promise<?>。在您的应用程序中实现此系统有多种方法,最简单的方法是创建一个自定义处理程序,将传入请求映射到 FIFO 队列中的Promises。

    一个例子如下:

    这很大程度上基于我过去提交的this 答案。

    我们从将请求映射到管道中的请求的处理程序开始:

    public class MyLastHandler extends SimpleInboundHandler<String> {
        private final SynchronousQueue<Promise<String>> queue;
    
        public MyLastHandler (SynchronousQueue<Promise<String>> queue) {
            super();
            this.queue = queue;
        }
    
        // The following is called messageReceived(ChannelHandlerContext, String) in 5.0.
        @Override
        public void channelRead0(ChannelHandlerContext ctx, String msg) {
            this.queue.remove().setSuccss(msg); 
            // Or setFailure(Throwable)
        }
    }
    

    然后我们需要一种将命令发送到远程服务器的方法:

    Channel channel = ....;
    SynchronousQueue<Promise<String>> queue = ....;
    
    public Future<String> sendCommandAsync(String command) {
        return sendCommandAsync(command, new DefaultPromise<>());
    }
    
    public Future<String> sendCommandAsync(String command, Promise<String> promise) {
        synchronized(channel) {
            queue.offer(promise);
            channel.write(command);
        }
        channel.flush();
    }
    

    在我们完成方法之后,我们需要一个方法来调用它:

    sendCommandAsync("USER anonymous", 
        new DefaultPromise<>().addListener(
            (Future<String> f) -> {
                String response = f.get();
                if (response.startWidth("331")) {
                    // do something
                }
                // etc
            }
        )
    );
    

    如果被调用者想使用我们的api作为阻塞调用,他也可以这样做:

    String response = sendCommandAsync("USER anonymous").get();
    if (response.startWidth("331")) {
        // do something
    }
    // etc
    

    请注意,如果线程状态被中断,Future.get() 可以抛出InterruptedException,这与套接字读取操作不同,后者只能通过套接字上的某些交互来取消。这个异常在FutureListener应该不是问题。

    【讨论】:

    • 我很高兴有带有 async/await 的 C#.. 多么可怕的代码编写方式
    猜你喜欢
    • 2015-07-02
    • 1970-01-01
    • 2012-07-14
    • 1970-01-01
    • 2019-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-06
    相关资源
    最近更新 更多