结果输出

netty写出结果,总需要让channel进行write,然后配合flush刷出。

如果一次性写好的话,writeAndFlush就可以一步完成。

但是这里有两个方法

  1. ctx.writeAndFlush
  2. ctx.channel().writeAndFlush

底层自然都是通过channel进行的writeAndFlush

但是中间部分的context和作为全局的channel,经由pipeline的管理,两种写入方式有稍许的不同。

ctx.channel().writeAndFlush

        ctx.channel().writeAndFlush(msg);

netty-writeAndFlush的输出顺序
可以看到,虽然都是channel进行的操作,但是由于都包含了channelcontextpipeline也可以写。

这里选择的当然是AbstractChannel

    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }

AbstractChannel是基本的SocketChannel的封装,也就是说,AbstractChannel是封装过的channel

并不是channel直接的信息写入。

在这里,经手的,实际上是pipelinewriteAndFlush

    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }

pipelinecontext的双向链表,tail就是最后一个,pipeline的写好像要全部遍历。

    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
    }
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }

操作对象就是channel,底层最真实的操作对象,真正的写操作的执行者。

多线程中,总是executor来进行的异步操作,这两个都是必须的,也不用继续探讨。

    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            return promise;
        }
        write(msg, true, promise);
        return promise;
    }

经过channel的有效检测,然后进行真实的写操作

   private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

findContextOutbound

这里看到了findContextOutbound,不得不想起findContextInbound的循环遍历

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

只要能够找到循环的证据,那就能够证明它是outbound的遍历执行了。

线程检测

executor.inEventLoop(),多线程里面总是会这样检测,如果就是自身,就不用再挂任务。

至于执行器的选择,这个就不提了。

集成操作

			if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }

单纯的写入口,就集成了刷出flush的操作

invokeWriteAndFlush

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

invokeWrite

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

遍历传播

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

继续
netty-writeAndFlush的输出顺序
当然是选择outbound

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

然后
netty-writeAndFlush的输出顺序
这回轮到了context

    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg)
                return promise;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);
        return promise;
    }

继续

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

这就循环了,findContextOutbound不停遍历outboundContext传播事件。

具体操作

netty-writeAndFlush的输出顺序
最后必然就是传播到HeadContext

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

unsafe就直接写入了

        public final void write(Object msg, ChannelPromise promise) {
        	...
            outboundBuffer.addMessage(msg, size, promise);
        }

ctx.writeAndFlush

    ctx.writeAndFlush(msg);
    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
    }
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            return promise;
        }
        write(msg, true, promise);
        return promise;
    }
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

也不用多说,最后调用的方法都是一样的,好像都会遍历context传播,直到HeadContext写操作。

差异分析

其实真正的差异在于

ctx.channel().writeAndFlush

public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}
    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }

对于channel()的写入,其实是pipeline的写入,指定的contexttail

它会完整的遍历整个pipeline

但是ctx.writeAndFlush,从当前的context向后传播。

画图描述一下
netty-writeAndFlush的输出顺序

例子验证

inbound

public class SimpleInboundHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("write now");
        ctx.channel().writeAndFlush("ctx.channel().writeAndFlush()");
        ctx.writeAndFlush("ctx.writeAndFlush");
    }
}

outbound

public class SimpleOutboundHandler extends ChannelOutboundHandlerAdapter {
    private String name ;
    public SimpleOutboundHandler(String name){
        this.name = name;
    }
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        System.out.println(this.name + " : " + msg);
    }
}

pipeline

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch2) throws Exception {
        ChannelPipeline pipeline = ch2.pipeline();
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new SimpleOutboundHandler("first-outbound"));
        pipeline.addLast(new SimpleInboundHandler());
        pipeline.addLast(new SimpleOutboundHandler("last-outbound"));
    }
}

运行结果
netty-writeAndFlush的输出顺序

逻辑分析

pipeline
outbound
first-outbound
inbound
last-outbound
last-outbound
first-outbound

全遍历的话,应该会经过全部的outbound

但是可以发现,ctx.writeAndFlush并没有经过last-outbound,只是经过了first-outbound

和我们的追踪相符合。

小结

所以,为了减少无谓的步骤,我们开发过程中可以更多使用ctx.writeAndFlush

如果觉得ctx.channel().writeAndFlush更高大上,更好的话,那就得不偿失了。

毕竟就编程而言,好东西应该唾手可得。

但同样的,如果我们的完整流程是经由outbound而实现的话。

ctx.writeAndFlush就会使得流程变得不完整,甚至出错。

所以,具体使用哪种方式,得因地制宜,合理的使用方为上策。

相关文章:

  • 2021-11-22
  • 2021-09-04
  • 2021-10-19
  • 2021-06-17
  • 2021-11-13
  • 2022-03-05
  • 2021-07-02
猜你喜欢
  • 2022-03-08
  • 2022-12-23
  • 2021-12-12
  • 2021-07-24
  • 2022-02-10
  • 2021-09-27
相关资源
相似解决方案