【发布时间】:2018-09-15 20:23:36
【问题描述】:
我刚刚开始使用 Netty,想慢慢接触它以真正了解它是如何工作的。我有一个基于独立套接字测试程序的初始用例:
- 从客户端连接到服务器时,立即发送消息并处理响应
很简单……至少我是这么想的。我已经看了好几天了,并没有真正理解为什么它的行为不像预期的那样。
这是原始测试程序,它再次简单地连接到远程服务器并立即将字节缓冲区写入服务器。然后服务器立即发送一个 ack 响应,该响应刚刚写入控制台。
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.net.*;
import java.util.StringTokenizer;
import java.nio.charset.StandardCharsets;
public class SocketTest {
public static void main(String[] args) throws IOException {
final String host = "remote host";
final int port = 123456;
final String msg = "hi";
final byte sync = (byte)0x02;
final short value1 = (short)70;
final byte value2 = (byte)12;
final byte value3 = (byte)0x4c;
final int value4 = 1;
final short value5 = (short)0x03;
final short value6 = (short)0xffff;
try {
SocketChannel socketChannel
= SocketChannel.open(new InetSocketAddress(host, port));
ByteBuffer buf = ByteBuffer.allocate(15);
buf.put(sync);
buf.putShort(value1);
buf.put(value2);
buf.put(value3);
buf.putInt(value4);
buf.putShort(value5);
buf.putShort(value6);
buf.put(msg.getBytes(StandardCharsets.UTF_8));
buf.flip();
//write
while(buf.hasRemaining()) {
socketChannel.write(buf);
}
//read
ByteBuffer inBuf = ByteBuffer.allocate(78);
while (socketChannel.read(inBuf) > 0) {
System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
}
} catch(UnknownHostException e) {
System.exit(1);
} catch(IOException ioe) {
System.exit(1);
} finally {
socketChannel.close();
}
}
}
我用 Netty 进行了同样的测试并尝试了这个基本用例:
NettySocketTest
import java.net.InetSocketAddress;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
public class NettySocketTest {
private final String host;
private final int port;
private SocketChannel channelInstance;
public NettySocketTest(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel (SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ClientTestHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} catch(Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully().sync();
}
}
}
ClientTestHandler
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class ClientTestHandler extends SimpleChannelInboundHandler {
final String msg = "hi";
@Override
public void channelActive(final ChannelHandlerContext ctx) {
//existing test code - [1]
ByteBuffer buf = ByteBuffer.allocate(15);
buf.put((byte)0x02);
buf.putShort((short)70);
buf.put((byte)12);
buf.put((byte)0x4c);
buf.putInt(101);
buf.putShort((short)0x03);
buf.putShort((short)0xffff);
buf.put(msg.getBytes(StandardCharsets.UTF_8));
buf.flip();
//preferred implementation - [2]
/**
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf syncBuf = ctx.alloc().buffer(1);
syncBuf.writeByte((byte)0x02);
ByteBuf headerBuf = ctx.alloc().buffer(12);
headerBuf.writeShort((short)70);
headerBuf.writeByte((byte)12);
headerBuf.writeByte((byte)0x4c);
headerBuf.writeInt(101);
headerBuf.writeShort((short)0x03);
headerBuf.writeShort((short)0xffff);
ByteBuf bodyBuf = ctx.alloc().buffer(2);
bodyBuf.writeBytes("hi".getBytes(StandardCharsets.UTF_8));
messageBuf.addComponents(syncBuf, headerBuf, bodyBuf);
**/
//DOESN'T WORK - [3]
final ChannelFuture f = ctx.writeAndFlush(buf);
//ALSO DOESN'T WORK
//final ChannelFuture f = ctx.channel().writeAndFlush(buf);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
});
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object resp) {
ByteBuf msg = (ByteBuf) resp;
System.out.println("Response received: " + msg.toString(StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
在 ClientTestHandler 中:
- 在转换为 Netty 提供的便捷方法之前,我采用了现有的 ByteBuffer 代码并尝试按原样使用它。请参阅 #2。
- 这是尝试使用 CompositeByteBuf。 1 或 2 都不起作用。
- 尝试使用 ChannelHandlerContext 写出消息,另一次尝试直接在上下文中使用通道,希望两者中的一个会非法响应。
我的期望是消息被发送到服务器并得到响应并打印到控制台。
客户端代码:
- 成功启动/连接
- 成功调用 channelActive
- 在 ChannelFutureListener 上成功调用 operationComplete
- 永远不会调用 channelRead0 方法
我故意没有在管道中添加任何其他内容,因此我可以简单地开始并修改代码并边走边学。我将 CompositeByteBuf 还原为 nio.ByteBuffer,这样我就可以从另一个测试中的相同代码开始。
连接时使用channelActive立即向服务器发送字节是否正确?谁能帮我理解我在这个基本用例中做错了什么以及为什么没有捕获响应(假设消息已实际发送)?
【问题讨论】:
-
您对
channelActive所做的事情仅是正确的,例如它用于向服务器发送第一条消息(例如握手、建立连接等)。我怀疑服务器没有向客户端回复任何内容。也请分享您的服务器代码。 -
服务器是第三方提供商,他们的服务器代码是专有的。将 nio.ByteBuffer 传递给 write 是否正确,如图所示?无法解释为什么非 Netty 代码会得到响应,但内部的相同缓冲区定义却没有……
-
你能用 netty
ByteBuf代替吗? -
当然。如果您检查 ClientHandler 类,我尝试使用复合字节缓冲区进行测试(我无法使其工作),但从未尝试过直接的字节缓冲区。可以试一试。
-
ChannelFuture(我从示例代码中使用)是问题所在。关闭通道会阻止接收到来自服务器的响应。移除监听器,一切都按预期运行!
标签: netty