【问题标题】:UnsupportedOperationException when sending messages through ServerBootstrap ChannelPipeline in Netty通过 Netty 中的 ServerBootstrap ChannelPipeline 发送消息时出现 UnsupportedOperationException
【发布时间】:2016-04-14 19:49:00
【问题描述】:

我使用的是 Netty 5.0。

我有一个互补的客户端引导程序,我从 netty github 获取了 SecureChatClient.java 示例。 Wenn 我从客户端引导程序向服务器发送消息,它工作得很好。当我尝试从服务器引导程序向客户端发送消息时(首先通过客户端成功启动连接/通道后),我得到一个java.lang.UnsupportedOperationException,没有任何进一步的信息。从服务器向客户端发送消息是通过上面的代码完成的。

serverbootstrap 是否只用于接收?

serverbootstrap 是否不意味着能够将消息写回客户端,如上所示?我的意思是,消息可以从套接字向上通过 ChannelHandlers 进入 ChannelPipeline,但只有 ChannelHandlers 应该将响应写回 ChannelPipeline 并输出套接字。因此,在 ServerBootstrap 中,用户并不意味着能够从管道外部沿 ChannelPipeline 发送消息。 (希望这是有道理的)

或者我只是错过了什么?

我的代码如下:

    // Ports.
    int serverPort = 8080;

    EventLoopGroup bossGroup    = new NioEventLoopGroup();
    EventLoopGroup workerGroup  = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast("MyMessageHandler", new MyMessageHandler());
             }
         })
         .option(ChannelOption.SO_BACKLOG, 128)
         .childOption(ChannelOption.SO_KEEPALIVE, true);

        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(serverPort).sync();
        Channel ch = f.channel();

        System.out.println("Server: Running!");

      // Read commands from the stdin.
      ChannelFuture lastWriteFuture = null;
      BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
      while(true)
      {
          String line = in.readLine();
          if (line == null) break;

          ByteBuf getOut = buffer(64);
          getOut.writeBytes(line.getBytes());

          // Sends the received line to the server.
          lastWriteFuture = ch.writeAndFlush(getOut);

          lastWriteFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture cf) throws Exception {
                    if(cf.isSuccess()) {
                        System.out.println("CFListener: SUCCESS! YEAH! HELL! YEAH!");
                    } else {
                        System.out.println("CFListener: failure! FAILure! FAILURE!");
                        System.out.println(cf.cause());
                    }
                }
            });

      }

               // Wait until all messages are flushed before closing the channel.
      if (lastWriteFuture != null) {
          lastWriteFuture.sync();
      }


        // Wait until the server socket is closed.
        // In this example, this does not happen, but you can do that to gracefully
        // shut down your server.
        f.channel().closeFuture().sync();
    } catch (InterruptedException | UnsupportedOperationException e) {
        e.printStackTrace();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

我开始使用以下示例:https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/securechat

我的问题是调用ch.writeAndFlush时出现以下异常:

java.lang.UnsupportedOperationException
at io.netty.channel.socket.nio.NioServerSocketChannel.filterOutboundMessage(NioServerSocketChannel.java:184)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:784)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1278)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeWriteNow(ChannelHandlerInvokerUtil.java:158)
at io.netty.channel.DefaultChannelHandlerInvoker$WriteTask.run(DefaultChannelHandlerInvoker.java:440)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:328)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)

【问题讨论】:

  • 响应您的新 ChannelGroupException 错误:您可以将 f.cause() 转换为 ChannelGroupException,然后打印其 iterator() 中的每个值
  • 我把整件事都放到了我的问题中。我希望我做的迭代器是对的。我愿意改进或更正。
  • @Ferrybig 由于您的调试技巧解决了这个问题。太感谢了!把答案放在里面有更多的细节。如果有办法给予人们信任,请告诉我。想以某种方式给你点赞。
  • 我不确定你是如何解决后半部分的,但在stackoverflow,我们通常在一个问题中只保留一个问题,所以我们可以有一个可以接受的答案,目前你的问题是基于 2 个单独的部分,其中第一部分是我帮助您调试的原始问题,而后一个问题是您发布的答案,我认为最好删除第二部分和您的“一半”答案,然后将我的标记为已接受,因为它解决了您最初的问题,并阻止人们投票以“过于宽泛”来结束您的问题
  • 好吧,我明白你的意思了。将进行更改!

标签: java netty


【解决方案1】:

您不能写入 ServerChannel,只能连接到普通通道。由于这个原因,您对 writeAndFlush 的呼叫失败。

要向每个客户端发送消息,您应该将每个客户端的通道存储在 ChannelGroup 中并在其上调用 writeAndFlush()。

一个快速的方法是向你的 ServerBootstrap 添加另一个处理程序,将传入的连接放入 ChannelGroup,一个快速的实现是这样的:

// In your main:
ChannelGroup allChannels =
         new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

// In your ChannelInitializer<SocketChannel>
ch.pipeline().addLast("grouper", new GlobalSendHandler());

// New class:
public class MyHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelActive(ChannelHandlerContext ctx) {
         allChannels.add(ctx.channel());
         super.channelActive(ctx);
     }
 }

然后我们可以调用以下命令向每个连接发送消息,这将返回一个ChannelGroupFuture 而不是普通的ChannelFuture

allChannels.writeAndFlush(getOut);

经过上述修复后,您的总代码将如下所示:

// Ports.
int serverPort = 8080;

ChannelGroup allChannels =
         new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

EventLoopGroup bossGroup    = new NioEventLoopGroup();
EventLoopGroup workerGroup  = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast("MyMessageHandler", new MyMessageHandler());
             ch.pipeline().addLast("grouper", new GlobalSendHandler());
         }
     })
     .option(ChannelOption.SO_BACKLOG, 128)
     .childOption(ChannelOption.SO_KEEPALIVE, true);

    // Bind and start to accept incoming connections.
    ChannelFuture f = b.bind(serverPort).sync();
    Channel ch = f.channel();

    System.out.println("Server: Running!");

  // Read commands from the stdin.
  ChannelGroupFuture lastWriteFuture = null;
  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  while(true)
  {
      String line = in.readLine();
      if (line == null) break;

      ByteBuf getOut = buffer(64);
      getOut.writeBytes(line.getBytes());

      // Sends the received line to the server.
      lastWriteFuture = allChannels.writeAndFlush(getOut);

      lastWriteFuture.addListener(new ChannelGroupFutureListener() {
            @Override
            public void operationComplete(ChannelGroupFuture cf) throws Exception {
                if(cf.isSuccess()) {
                    System.out.println("CFListener: SUCCESS! YEAH! HELL! YEAH!");
                } else {
                    System.out.println("CFListener: failure! FAILure! FAILURE!");
                    System.out.println(cf.cause());
                }
            }
        });

  }

           // Wait until all messages are flushed before closing the channel.
  if (lastWriteFuture != null) {
      lastWriteFuture.sync();
  }


    // Wait until the server socket is closed.
    // In this example, this does not happen, but you can do that to gracefully
    // shut down your server.
    f.channel().closeFuture().sync();
} catch (InterruptedException | UnsupportedOperationException e) {
    e.printStackTrace();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

【讨论】:

  • 为我指明了正确的方向,非常有用,并且非常感谢我的问题的重组! lastWriteFuture 也需要是 ChannelGroupFuture 才能正常工作。此外,ChannelFutureListener 必须是 ChannelGroupFutureListener,其中“ChannelGroupFuture cf”作为 operationComplete(arg0) 的 arg0。将 1 条消息发送回一个客户端可以正常工作。发送另一条后续消息失败,并出现有关 ChannelGroupFuture 的异常。还不确定,这是我的代码还是网络。今天晚些时候,当我彻底了解它时,我会回复你。
  • 我将抛出的异常添加到问题中。现在将对其进行彻底调查。
【解决方案2】:

我认为 Netty Server 没有解码器、编码器。 如果要发送字符串数据,

serverBootstrap.group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline channelPipeline = channel.pipeline();
        channelPipeline.addLast("String Encoder", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("String Decoder", new StringDecoder(CharsetUtil.UTF_8));
    }
});

添加服务器的初始化程序!

【讨论】:

  • 如果要运行“4.x示例”,则必须测试4.x版本
  • 我发送的字符串预先通过 getOut.writeBytes(line.getBytes()); 转换为 ByteBuf所以这不是它失败的原因。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-01-13
  • 1970-01-01
  • 2013-09-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多