1、由于篇幅过长难以发布,所以本章节接着上一节来的,上一章节为【原理剖析(第 010 篇)Netty之服务端启动工作原理分析(上)】;
2、那么本章节就继续分析Netty的服务端启动,分析Netty的源码版本为:netty-netty-4.1.22.Final;

二、三、四章节请看上一章节

四、源码分析Netty服务端启动

上一章节,我们主要分析了一下线程管理组对象是如何被实例化的,并且还了解到了每个线程管理组都有一个子线程数组来处理任务; 那么接下来我们就直接从4.6开始分析了:

4.6、为serverBootstrap添加配置参数

1、源码:
	// NettyServer.java
	// 将 Boss、Worker 设置到 ServerBootstrap 服务端引导类中
	serverBootstrap.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			// 指定通道类型为NioServerSocketChannel,一种异步模式,OIO阻塞模式为OioServerSocketChannel
			.localAddress("localhost", port)//设置InetSocketAddress让服务器监听某个端口已等待客户端连接。
			.childHandler(new ChannelInitializer<Channel>() {//设置childHandler执行所有的连接请求
				@Override
				protected void initChannel(Channel ch) throws Exception {
					ch.pipeline().addLast(new PacketHeadDecoder());
					ch.pipeline().addLast(new PacketBodyDecoder());

					ch.pipeline().addLast(new PacketHeadEncoder());
					ch.pipeline().addLast(new PacketBodyEncoder());

					ch.pipeline().addLast(new PacketHandler());
				}
			});

2、主要为后序的通信设置了一些配置参数而已,指定构建的Channel为NioServerSocketChannel,说明需要启动的是服务端Netty;
   而后面的服务端Channel实例化,就是需要通过这个参数反射实例化得到;

3、同时还设置childHandler,这个childHandler也是有顺序的,服务端读数据时执行的顺序是PacketHeadDecoder、PacketBodyDecoder、PacketHandler;
   而服务端写数据时执行的顺序是PacketHandler、PacketBodyEncoder、PacketHeadEncoder;
   所以在书写方式大家千万别写错了,按照本示例代码的方式书写即可;

4.7、serverBootstrap调用bind绑定注册

1、源码:
	// NettyServer.java
	// 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
	ChannelFuture channelFuture = serverBootstrap.bind().sync();

2、这里其实没什么好看的,接下来我们就主要看看这个bind()方法主要干了些啥,就这么简简单单一句代码就把服务端给启动起来了,有点神气了;

4.8、bind()操作

1、源码:
	// AbstractBootstrap.java
    /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind() {
        validate();
        SocketAddress localAddress = this.localAddress;
        if (localAddress == null) {
            throw new IllegalStateException("localAddress not set");
        }
        return doBind(localAddress); // 创建一个Channel,并且绑定它
    }

	// AbstractBootstrap.java
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); // 初始化和注册

        // 执行到此,服务端大概完成了以下几件事情:
        // 1、实例化NioServerSocketChannel,并为Channel配备了pipeline、config、unsafe对象;
        // 2、将多个handler添加至pipeline双向链表中,并且等待Channel注册成功后需要给每个handler触发添加或者移除事件;
        // 3、将NioServerSocketChannel注册到NioEventLoop的多路复用器上;

        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        // 既然NioServerSocketChannel的Channel绑定到了多路复用器上,那么接下来就是绑定地址,绑完地址就可以正式进行通信了
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

2、大致一看,原来doBind方法主要干了两件事情,initAndRegister与doBind0;

3、initAndRegister主要做的事情就是初始化服务端Channel,并且将服务端Channel注册到bossGroup子线程的多路复用器上;

4、doBind0则主要完成服务端启动的最后一步,绑定地址,绑定完后就可以正式进行通信了;

4.9、initAndRegister()初始化和注册

1、源码:
	// AbstractBootstrap.java
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 反射调用clazz.getConstructor().newInstance()实例化类
            // 同时也实例化了Channel,如果是服务端的话则为NioServerSocketChannel实例化对象
            // 在实例化NioServerSocketChannel的构造方法中,也为每个Channel创建了一个管道属性对象DefaultChannelPipeline=pipeline对象
            // 在实例化NioServerSocketChannel的构造方法中,也为每个Channel创建了一个配置属性对象NioServerSocketChannelConfig=config对象
            // 在实例化NioServerSocketChannel的构造方法中,也为每个Channel创建了一个unsafe属性对象NioMessageUnsafe=unsafe对象
            channel = channelFactory.newChannel(); // 调用ReflectiveChannelFactory的newChannel方法

            // 初始化刚刚被实例化的channel
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        // config().group()=bossGroup或parentGroup,然后利用parentGroup去注册NioServerSocketChannel=channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

2、逐行分析后会发现,首先通过反射实例化服务端channel对象,然后将服务端channel初始化一下;

3、然后调用bossGroup的注册方法,将服务端channel作为参数传入;

4、至此,方法名也表明该段代码的意图,实例化并初始化服务端Channel,然后注册到bossGroup子线程的多路复用器上;

4.10、init服务端Channel

1、源码:
	// ServerBootstrap.java
    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

		// 服务端ServerSocketChannel的管道对象,Channel实例化的时候就被创建出来了
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        ChannelInitializer<Channel> tempHandler = new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("initAndRegister.init.initChannel-->ch.eventLoop().execute");
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        };

        // 这里我将addLast的参数剥离出来了,方便查看阅读
        p.addLast(tempHandler);
    }

	// DefaultChannelPipeline.java
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }	
	
	// DefaultChannelPipeline.java
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }	
	
	// DefaultChannelPipeline.java
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        // 这里加了synchronized关键字,因此说addLast的新增动作都是线程安全的
        // 然后再细看一下其它的方法,只要涉及到的handler的增删改动作的方法,那些方法的代码块都是经过synchronized修饰了,保证操作过程中线程安全
        synchronized (this) {
            // 检查handler的一些基本信息,若不是被Sharable注解过的话,而且已经被添加到其他pipeline时则会抛出异常
            checkMultiplicity(handler);

            // 通过一系列参数的封装,最后封装成DefaultChannelHandlerContext对象
            newCtx = newContext(group, filterName(name, handler), handler);

            // 将newCtx添加到倒数第二的位置,即tail的前面一个位置
            // 这里的pipeline中的handler的构成方式是一个双向链表式的结构
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 该addLast方法可能会被其它各个地方调用,但是又为了保证handler的线程安全,则采用了synchronized来保证addLast的线程安全
            // 在Channel未注册到多路复用器之前,registered肯定为false,那么则把需要添加的handler封装成AbstractChannelHandlerContext对象,
            // 然后调用setAddPending方法,pengding意味着在将来的某个时刻调用,那到底在什么时刻被调用呢?
            // 英文解释中提到一旦Channel注册成功了的话则会被调用,所以Channel后续注册完毕,再调用ChannelHandler.handlerAdded
            if (!registered) {
                newCtx.setAddPending();

                // 将newCtx追加到PendingHandlerCallback单向链表的队尾,以便将来回调时用到
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(

相关文章: