【问题标题】:Communication in Netty Nio javaNetty Nio java中的通信
【发布时间】:2018-04-01 19:25:35
【问题描述】:

我想在 Netty nio 中创建一个包含两个客户端和一个服务器的通信系统。更具体地说,首先,我希望当两个客户端与服务器连接时从服务器发送消息,然后能够在两个客户端之间交换数据。我正在使用code provided from this example。我对代码的修改可以在这里找到:link

似乎 serverHandler 中的 channelRead 在第一个客户端连接时工作,所以它总是返回 1,但是当第二个客户端连接时不会更改为 2。当两个客户端都连接到时,如何从服务器正确检查服务器?如何从客户端的主要功能中动态读取此值?那么让双方客户交流的最佳方式是什么?

EDIT1: 显然,客户端服务似乎正在运行并直接关闭,所以每次我运行一个新的 NettyClient 时都会连接,但之后连接就会关闭。所以计数器总是从零变为一。正如我在下面的 cmets 中被告知的那样,我在同一个端口使用 telnet 对其进行了测试,但是计数器似乎正常增加,但是使用 NettyClient 服务号。

EDIT2:看来我得到的问题来自future.addListener(ChannelFutureListener.CLOSE); 中的channelRead 中的ProcessingHandler class。当我评论它时,似乎代码有效。但是,我不确定评论出来的后果是什么。此外,我想从客户端的主要功能中检查返回消息何时是特定的两个。如何,我可以创建一个方法来等待来自服务器的特定消息,同时它会阻止主要功能。

 static EventLoopGroup workerGroup = new NioEventLoopGroup();
 static Promise<Object> promise = workerGroup.next().newPromise(); 
 public static void callClient() throws Exception {
    String host = "localhost";
    int port = 8080;
    try {
        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(NioSocketChannel.class);
        b.option(ChannelOption.SO_KEEPALIVE, true);
        b.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
            }
        });
        ChannelFuture f = b.connect(host, port).sync();
    } finally {
        //workerGroup.shutdownGracefully();
    }
}

我想在主函数内部调用方法并返回结果,当它是 2 时继续主功能。但是,我不能在 while 内调用 callClient,因为它会多次运行同一个客户端。

   callBack();
    while (true) {
        Object msg = promise.get();
        System.out.println("Case1: the connected clients is not two");
        int ret = Integer.parseInt(msg.toString());
        if (ret == 2){
            break;
        }
    }
   System.out.println("Case2: the connected clients is two");
   // proceed with the main functionality

如何更新第一个客户的承诺变量。当我运行两个客户端时,对于第一个客户端,我总是收到消息:

案例1:连接的客户端不是两个

似乎承诺没有正常更新,而对于第二个客户,我总是收到:

案例2:连接的客户端是两个

【问题讨论】:

  • 能否提供一个示例 git repo 进行调试?
  • 在这个例子中,在 channelActive 的 processingHandler 中,我试图计算活动通道的数量,然而,这个数字总是 1。
  • 即使我将其声明为静态私有,该组也总是以某种方式初始化。

标签: java sockets web communication


【解决方案1】:

如果我没记错的话,ChannelHandlerContext 是每个通道一个,它的管道中可以有多个 ChannelHandler。您的 channels 变量是处理程序类的 instance 变量。然后为每个连接创建一个 new ProcessingHandler 实例。因此,一旦初始化,每个变量在channels 变量中将有一个且只有一个连接 - 它是为其创建的。

参见服务器代码 (NettyServer.java) 中 initChannel 函数中的new ProcessingHandler()

您可以将 channels 变量设为静态,以便在 ProcessingHandler 实例之间共享。或者,您可以在别处创建单个 ProcessingHandler 实例(例如,作为 run() 函数中的局部变量),然后将该实例传递给 addLast 调用而不是 new ProcessingHandler()

【讨论】:

  • 不能简单地重用ChannelHandler的实例,除非你指定注解@io.netty.channel.ChannelHandler.Sharable,见netty.io/4.0/api/io/netty/channel/ChannelHandler.Sharable.html
  • 也不确定 NettyClient f.channel().closeFuture().sync() 中的以下几行;和workerGroup.shutdownGracefully();他们是否停止客户端套接字的运行?
  • 您的测试客户端立即断开连接,这就是组大小保持为 1 的原因 - 当第二个客户端连接时,第一个客户端已经断开连接。我已经从 github 下载了您的示例,将 ProcessingHandler 替换为您问题的副本,并将频道更改为静态。使用 telnet 进行测试时,服务器报告大小随着每次连接而增加。
  • 关闭套接字时。我不确定您要做什么。代码中的 ctx.close() 已经关闭了客户端套接字。 workerGroup.shutdownGracefully() 应该关闭组中的所有通道。但这在客户端代码中没有多大意义——你只有一个连接(它显然不会关闭其他客户端连接,因为它们不属于你的应用实例)。
  • 事实上,在您当前的代码中, ctx.close() 关闭客户端的套接字(或者更确切地说是启动关闭),然后 f.channel().closeFuture().sync() 在关闭过程中得到通知完成,然后才调用 workerGroup.shutdownGracefully() (此时您的应用程序中唯一的套接字已经关闭,因此,已经没有什么可以关闭了)。
【解决方案2】:

为什么 ChannelGroup 频道的大小始终为 1。即使我连接 更多客户?

因为每个新的Channel(客户端)都会调用子ChannelInitializer。您正在创建ProcessingHandler 的新实例,因此每个频道查看它自己的ChannelGroup 实例。

解决方案 1 - 渠道属性

使用Attribute 并将其与Channel 关联。

在某处创建属性(假设在Constants 类中):

public static final AttributeKey<ChannelGroup> CH_GRP_ATTR = 
       AttributeKey.valueOf(SomeClass.class.getName());

现在,创建将被ProcessingHandler 的所有实例使用的 ChannelGroup:

final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

在 NettyServer 中更新您的孩子 ChannelInitializer

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(
        new RequestDecoder(), 
        new ResponseDataEncoder(), 
        new ProcessingHandler());

    ch.attr(Constants.CH_GRP_ATTR).set(channels);
}

现在您可以像这样在处理程序中访问 ChannelGroup 的实例:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    final ChannelGroup channels = ctx.channel().attr(Constants.CH_GRP_ATTR).get();
    channels.add(ctx.channel());

这会起作用,因为每次新客户端连接时,都会使用对 ChannelGroup 的相同引用调用 ChannelInitializer。

解决方案 2 - 静态字段

如果您将ChannelGroup 声明为静态,则所有类实例将看到相同的ChannelGroup 实例:

private static final ChannelGroup channels =
     new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

解决方案 3 - 传播共享实例

ProcessingHandler的构造函数中引入参数:

private final ChannelGroup channels;
public ProcessingHandler(ChannelGroup chg) {
    this.channels = chg;
}

现在,在您的 NettyServer 类中创建 ChannelGroup 的实例并将其传播到 ProcessingHandler 构造函数:

final ChannelGroup channels = new 
      DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(
        new RequestDecoder(), 
        new ResponseDataEncoder(), 
        new ProcessingHandler(channels)); // <- here
}

就个人而言,我会选择第一个解决方案,因为

  • 它清楚地将 ChannelGroup 与 Channel 上下文相关联
  • 您可以在其他处理程序中访问相同的 ChannelGroup
  • 您可以拥有多个服务器实例(在不同端口上运行,在同一个 JVM 中)

【讨论】:

  • 也不确定 NettyClient f.channel().closeFuture().sync() 中的以下几行;和workerGroup.shutdownGracefully();它们是否停止了客户端套接字的运行?
  • @JoseRamon :你到底想要达到什么目的?只需关闭客户端连接?
  • 实际上我想运行我的服务器,当在服务器中我看到我有两个活跃的客户端从服务器发送一个真实的消息。我首先尝试遵循您的第二个解决方案,将 ChannelGroup 声明为私有静态最终结果,但通道大小仍然是一个。
  • @JoseRamon :如果add 被调用两次,则在正常情况下不可能将通道大小设为 1。您能否确认(例如通过调试器?)
  • 我运行一次 NettyServer 和两次 NettyClient。是的,我在 ProcessingHandler 的 channelActive 中添加了一个断点,我将每个通道添加到通道组,并且 ChannelGroup 的大小始终为一个。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-04
  • 2018-03-30
  • 1970-01-01
  • 1970-01-01
  • 2018-04-11
  • 1970-01-01
相关资源
最近更新 更多