netty作为一个被广泛应用的通信框架,有必要我们多了解一点。

  实际上netty的几个重要的技术亮点: 

    1. reactor的线程模型;
    2. 安全有效的nio非阻塞io模型应用;
    3. pipeline流水线式的灵活处理过程;
    4. channelHandler的灵活实现;
    5. 提供许多开箱即用的处理器和编解码器;

  我们可以从这些点去深入理解其过人之处。

 

1. 一个NettyServer的demo

  要想深入理解某个框架,一般还是要以demo作为一个抓手点的。以下,我们可以看到一个简单的nettyServer的创建过程,即netty的quick start样例吧。

@Slf4j
public class NettyServerHelloApplication {

    /**
     * 一个server的样例
     */
    public static void main(String[] args) throws Exception {
        // 1. 创建对应的EventLoop线程池备用, 分bossGroup和workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(4);
        try {
            // 2. 创建netty对应的入口核心类 ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            // 3. 设置server的各项参数,以及应用处理器
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100) // 设置tcp协议的请求等待队列
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 3.2. 最重要的,将各channelHandler绑定到netty的上下文中(暂且这么说吧)
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast("encoder", new MessageEncoder());
                            p.addLast("decoder", new MessageDecoder());
                            p.addLast(new EchoServerHandler());
                        }
                    });

            // 4. 绑定tcp端口开启服务端监听, sync() 保证执行完成所有任务
            ChannelFuture f = b.bind(ServerConstant.PORT).sync();

            // 5. 等待关闭信号,让业务线程去服务业务了
            f.channel().closeFuture().sync();
        } finally {
            // 6. 收到关闭信号后,优雅关闭server的线程池,保护应用
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

  以上,就是一个简版的nettyServer的整个框架了,这也基本上整个nettyServer的编程范式了。主要即分为这么几步:

    1. 创建对应的EventLoop线程池备用, 分bossGroup和workerGroup;
    2. 创建netty对应的入口核心类 ServerBootstrap;
    3. 设置server的各项参数,以及应用处理器(必备的channelHandler业务接入过程);
    4. 绑定tcp端口开启服务端监听;
    5. 等待关闭信号,让业务线程去服务业务了;
    6. 收到关闭信号后,优雅关闭server的线程池,保护应用;

  事实上,如果我们直接基于jdk提供的ServerSocketChannel是否也差不了多少呢?是的,至少表面看起来是的,但我们要处理许多的异常情况,且可能面对变化繁多的业务类型。又该如何呢?

  毕竟一个框架的成功,绝非偶然。下面我们就这几个过程来看看netty都是如何处理的吧!

 

2. EventLoop 的创建

  EventLoop 直译为事件循环,但在这里我们也可以理解为一个线程池,因为所有的事件都是提交给其处理的。那么,它倒底是个什么样的循环呢?

  首先来看下其类继承情况: 

Netty(一):server启动流程解析

 

  从类图可以看出,EventLoop也是一个executor或者说线程池的实现,它们也许有相通之处。

    // 调用方式如下
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(4);
    // io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory)
    /**
     * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }    
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    // io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 默认线程是 cpu * 2
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        // 创建一个执行器,该执行器每提交一个任务,就创建一个线程来运行,即并没有队列的概念
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 使用一个数组来保存整个可用的线程池
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 为每个child创建一个线程运行, 该方法由子类实现
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    // 如果创建失败,则把已经创建好的线程池关闭掉
                    // 不过值得注意的是,当某个线程池创建失败后,并没有立即停止后续创建工作,即无 return 操作,这是为啥?
                    // 实际上,发生异常时,Exeception 已经被抛出,此处无需关注
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        // 创建选择器,猜测是做负载均衡时使用
        // 此处的chooser默认是 DefaultEventExecutorChooserFactory
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

    // io.netty.channel.nio.NioEventLoopGroup#newChild
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        // 注意此处的参数类型是由外部进行保证的,在此直接做强转操作
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    
    // io.netty.channel.nio.NioEventLoop#NioEventLoop
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        // 此构造器会做很多事,比如创建队列,开启nio selector...
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }


    // io.netty.util.concurrent.DefaultEventExecutorChooserFactory#newChooser
    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        // 如: 1,2,4,8... 都会创建 PowerOfTwoEventExecutorChooser
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    // io.netty.util.concurrent.DefaultPromise#addListener
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

  以上,就是 NioEventLoopGroup 的创建过程了. 本质上其就是一个个的单独的线程组成的数组列表, 等待被调用.

 

3. ServerBootstrap 的创建

  ServerBootstrap是Netty的一个服务端核心入口类, 它可以很快速的创建一个稳定的netty服务.

  ServerBootstrap 的类图如下: 

Netty(一):server启动流程解析

 

  还是非常纯粹的啊!其中有意思是的, ServerBootstrap继承自 AbstractBootstrap, 而这个 AbstractBootstrap 是一个自依赖的抽象类: AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> , 这样,即父类可以直接返回子类的信息了。

  其默认构造方法为空,所以所以参数都使用默认值, 因为还有后续的参数设置过程,接下来,我们看看其一些关键参数的设置: 

    // 1. channel的设定
    // io.netty.bootstrap.AbstractBootstrap#channel
    /**
     * The {@link Class} which is used to create {@link Channel} instances from.
     * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
     * {@link Channel} implementation has no no-args constructor.
     */
    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        // 默认使用构造器反射的方式创建 channel
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
    /**
     * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
     * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
     * is not working for you because of some more complex needs. If your {@link Channel} implementation
     * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
     * simplify your code.
     */
    @SuppressWarnings({ "unchecked", "deprecation" })
    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }
    // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)
    /**
     * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
     */
    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }
    @SuppressWarnings("unchecked")
    private B self() {
        return (B) this;
    }

    // 2. option 参数选项设置, 它会承包各种特殊配置的设置, 是一个通用配置项设置的入口 
    /**
     * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
     * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
     */
    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        // options 是一个 new LinkedHashMap<ChannelOption<?>, Object>(), 即非线程安全的容器, 所以设置值时要求使用 synchronized 保证线程安全
        // value 为null时代表要将该选项设置删除, 如果key相同,后面的配置将会覆盖前面的配置
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return self();
    }
    
    // 3. childHandler 添加channelHandler, 这是一个最重要的一个方法, 它会影响到后面的业务处理统筹
    // 调用该方法仅将 channelHandler的上下文加入进来, 实际还未进行真正的添加操作 .childHandler(new ChannelInitializer<SocketChannel>() {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100) // 设置tcp协议的请求等待队列
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LoggingHandler(LogLevel.INFO));
                    p.addLast("encoder", new MessageEncoder());
                    p.addLast("decoder", new MessageDecoder());
                    p.addLast(new EchoServerHandler());
                }
            });
    /**
     * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
     */
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        // 仅将 channelHandler 绑定到netty的上下文中
        this.childHandler = childHandler;
        return this;
    }
    
    // 4. bossGroup, workGroup 如何被分配 ?
    /**
     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
     * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
     * {@link Channel}'s.
     */
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // parentGroup 是给acceptor使用的, 主要用于对socket连接的接入,所以一般一个线程也够了
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        // childGroup 主要用于接入后的socket的事件的处理,一般要求数量较多,视业务属性决定
        this.childGroup = childGroup;
        return this;
    }

  bind 绑定tcp端口,这个是真正触发server初始化的一步,工作量比较大,我们另开一段讲解。

 

4. nettyServer 的初始化

  前面所有工作都是在准备, 都并未体现在外部, 而 bind 则会是开启一个对外服务, 对外可见, 真正启动server.

    // io.netty.bootstrap.AbstractBootstrap#bind(int)
    /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    // io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
    /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(SocketAddress localAddress) {
        // 先验证各种参数是否设置完整, 如线程池是否设置, channelHandler 是否设置...
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        // 绑定tcp端口
        return doBind(localAddress);
    }
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 1. 创建一些channel使用, 与eventloop绑定, 统一管理嘛
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            // 2. 注册成功之后, 开始实际的 bind() 操作, 实际就是调用 channel.bind()
            // doBind0() 是一个异步的操作,所以使用的一个 promise 作为结果驱动
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

  所以,从整体来说,bind()过程分两大步走:1. 初始化channel,与nio关联; 2. 落实channel和本地端口的绑定工作; 我们来细看下:

 

4.1 初始化channel

  初始化channel, 并注册到 selector上, 这个操作实际上非常重要。

    // 以下我们先看下执行框架
    // io.netty.bootstrap.AbstractBootstrap#initAndRegister
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 即根据前面设置的channel 使用反射创建一个实例出来
            // 即此处将会实例化出一个 ServerSocketChannel 出来
            // 所以如果你想用jdk的nio实现,则设置channel时使用 NioServerSocketChannel.class即可, 而你想使用其他更优化的实现时比如EpollServerSocketChannel时,改变一下即可
            // 而此处的 channelFactory 就是一个反射的实现 ReflectiveChannelFactory, 它会调用如上channel的无参构造方法实例化
            // 重点工作就需要在这个无参构造器中完成,我们接下来看看
            channel = channelFactory.newChannel();
            // 初始化channel的一些公共参数, 相当于做一些属性的继承, 因为后续它将不再依赖 ServerBootstrap, 它需要有独立自主能力
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // 注册创建好的 channel 到eventLoop中
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }
    
    // 1. 先看看 NioServerSocketChannel 的构造过程
    // io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        // newSocket 简单说就是创建一个本地socket, api调用: SelectorProvider.provider().openServerSocketChannel()
        // 但此时本 socket 并未和任何端口绑定
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 注册 OP_ACCEPT 事件
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 此处的 javaChannel() 实际就是 channel, 这样调用只是为统一吧
        // 创建一个新的 socket 传入 NioServerSocketChannelConfig 中
        // 主要用于一些 RecvByteBufAllocator 的设置,及channel的保存
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    // io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
    /**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // 先让父类初始化必要的上下文
        super(parent);
        // 保留 channel 信息,并设置非阻塞标识
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
    // io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        // 初始化上下文
        this.parent = parent;
        // DefaultChannelId
        id = newId();
        // NioMessageUnsafe
        unsafe = newUnsafe();
        // new DefaultChannelPipeline(this); 
        // 比较重要,将会初始化 head, tail 节点
        pipeline = newChannelPipeline();
    }
    // io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        // 初始化 head, tail
        tail = new TailContext(this);
        head = new HeadContext(this);
        // 构成双向链表
        head.next = tail;
        tail.prev = head;
    }



    // 2. 初始化channel, 有个最重要的动作是将 Acceptor 接入到 pipeline 中
    // io.netty.bootstrap.ServerBootstrap#init
    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        // 根据前面的设置, 将各种属性copy过来, 放到 config 字段中
        // 同样, 因为 options 和 attrs 都不是线程安全的, 所以都要上锁操作
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        // 此处的pipeline, 就是在 NioServerSocketChannel 中初始化好head,tail的pipeline
        ChannelPipeline p = channel.pipeline();
        // childGroup 实际就是外部的 workGroup
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        // 这个就比较重要了, 关联 ServerBootstrapAcceptor
        // 主动添加一个 initializer, 它将作为第一个被调用的 channelInitializer 存在 
        // 而 channelInitializer 只会被调用一次
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 添加 Acceptor 到 pipeline 中, 形成一个 head -> ServerBootstrapAcceptor -> tail 的pipeline
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
        // 此操作过后,当前pipeline中,就只有此一handler
    }

  。。。

 

4.2 handler的添加过程

  addLast() 看起来只是一个添加元素的过程, 总体来说就是一个双向链表的添加, 但也蛮有意思的, 有兴趣可以戳开详情看看.

    // io.netty.channel.ChannelHandler
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    // io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        // 支持同时添加多个 handler
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }
    // io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 重复性检查 @Shareable 参数使用
            checkMultiplicity(handler);
            // 生成一个新的上下文, filterName()将会生成一个唯一的名称, 如 ServerBootstrap$1#0
            newCtx = newContext(group, filterName(name, handler), handler);
            // 将当前ctx添加到链表中
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                // 未注册情况下, 不会进行下一步了
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            // 而已注册情况下, 则会使用 executor 提交callHandlerAdded0, 即调用 pipeline 的头节点
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        // 一个双向链表保存上下文
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    // 添加ctx到队列尾部
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
    // 对每一次添加 handler, 则都会产生一个事件, 通知现有的handler, handlerAdded()
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
            // any pipeline events ctx.handler() will miss them because the state will not allow it.
            ctx.setAddComplete();
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            boolean removed = false;
            try {
                remove0(ctx);
                try {
                    ctx.handler().handlerRemoved(ctx);
                } finally {
                    ctx.setRemoved();
                }
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }
查看 handler 的添加过程

相关文章:

  • 2022-02-11
  • 2021-06-20
  • 2021-07-26
  • 2021-07-21
  • 2021-05-19
猜你喜欢
  • 2021-12-03
  • 2022-12-23
  • 2021-10-07
  • 2022-12-23
  • 2021-07-11
  • 2022-03-10
  • 2022-12-23
相关资源
相似解决方案