结果输出
netty写出结果,总需要让channel进行write,然后配合flush刷出。
如果一次性写好的话,writeAndFlush就可以一步完成。
但是这里有两个方法
ctx.writeAndFlushctx.channel().writeAndFlush
底层自然都是通过channel进行的writeAndFlush。
但是中间部分的context和作为全局的channel,经由pipeline的管理,两种写入方式有稍许的不同。
ctx.channel().writeAndFlush
ctx.channel().writeAndFlush(msg);
可以看到,虽然都是channel进行的操作,但是由于都包含了channel,context和pipeline也可以写。
这里选择的当然是AbstractChannel
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
AbstractChannel是基本的SocketChannel的封装,也就是说,AbstractChannel是封装过的channel。
并不是channel直接的信息写入。
在这里,经手的,实际上是pipeline的writeAndFlush
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
pipeline是context的双向链表,tail就是最后一个,pipeline的写好像要全部遍历。
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); }操作对象就是
channel,底层最真实的操作对象,真正的写操作的执行者。多线程中,总是
executor来进行的异步操作,这两个都是必须的,也不用继续探讨。
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
write(msg, true, promise);
return promise;
}
经过channel的有效检测,然后进行真实的写操作
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
findContextOutbound
这里看到了findContextOutbound,不得不想起findContextInbound的循环遍历
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
只要能够找到循环的证据,那就能够证明它是outbound的遍历执行了。
线程检测
executor.inEventLoop(),多线程里面总是会这样检测,如果就是自身,就不用再挂任务。
至于执行器的选择,这个就不提了。
集成操作
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
单纯的写入口,就集成了刷出flush的操作
invokeWriteAndFlushprivate void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
invokeWriteprivate void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
遍历传播
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
继续
当然是选择outbound
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
然后
这回轮到了context
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg)
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
继续
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
这就循环了,findContextOutbound不停遍历outboundContext传播事件。
具体操作
最后必然就是传播到HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
unsafe就直接写入了
public final void write(Object msg, ChannelPromise promise) {
...
outboundBuffer.addMessage(msg, size, promise);
}
ctx.writeAndFlush
ctx.writeAndFlush(msg);
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
也不用多说,最后调用的方法都是一样的,好像都会遍历context传播,直到HeadContext写操作。
差异分析
其实真正的差异在于
ctx.channel().writeAndFlush
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
对于channel()的写入,其实是pipeline的写入,指定的context是tail。
它会完整的遍历整个pipeline。
但是ctx.writeAndFlush,从当前的context向后传播。
画图描述一下
例子验证
inbound
public class SimpleInboundHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("write now");
ctx.channel().writeAndFlush("ctx.channel().writeAndFlush()");
ctx.writeAndFlush("ctx.writeAndFlush");
}
}
outbound
public class SimpleOutboundHandler extends ChannelOutboundHandlerAdapter {
private String name ;
public SimpleOutboundHandler(String name){
this.name = name;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
System.out.println(this.name + " : " + msg);
}
}
pipeline
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch2) throws Exception {
ChannelPipeline pipeline = ch2.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new SimpleOutboundHandler("first-outbound"));
pipeline.addLast(new SimpleInboundHandler());
pipeline.addLast(new SimpleOutboundHandler("last-outbound"));
}
}
运行结果
逻辑分析
全遍历的话,应该会经过全部的outbound。
但是可以发现,ctx.writeAndFlush并没有经过last-outbound,只是经过了first-outbound。
和我们的追踪相符合。
小结
所以,为了减少无谓的步骤,我们开发过程中可以更多使用ctx.writeAndFlush。
如果觉得ctx.channel().writeAndFlush更高大上,更好的话,那就得不偿失了。
毕竟就编程而言,好东西应该唾手可得。
但同样的,如果我们的完整流程是经由outbound而实现的话。
ctx.writeAndFlush就会使得流程变得不完整,甚至出错。
所以,具体使用哪种方式,得因地制宜,合理的使用方为上策。