2021-05-05 16:51  阅读(136)
文章分类:Netty 学习之旅 文章标签:NettyNetty 学习
©  原文作者:中间件兴趣圈 原文地址:https://blog.csdn.net/prestigeding/article/details/53977445

1、线程模型总结

Netty线程模型基于主从Reactor模型;Channel会绑定一个线程模型(EventLoopGroup),与该通道的读,写等事件都在一个EventLoopGroup中执行,避免了Handler执行的线程安全问题。

线程模型前置篇:

  1. Nio实现Reactor模式
  2. 图说netty线程模型

2、源码分析NioEventLoopGroup初始化流程

2.1 NioEventLoopGroup构造方法

        /**
             * Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores
             * available, as well as the default {@link Executor} and the {@link SelectorProvider} which
             * is returned by {@link SelectorProvider#provider()}.
             *
             * @see io.netty.util.concurrent.DefaultExecutorServiceFactory
             */
            public NioEventLoopGroup() {
                this(0);
            }
    
            /**
             * Create a new instance that uses the default {@link Executor} and the {@link SelectorProvider} which
             * is returned by {@link SelectorProvider#provider()}.
             *
             * @see io.netty.util.concurrent.DefaultExecutorServiceFactory
             *
             * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
             *                      If {@code executor} is {@code null} this number will also be the parallelism
             *                      requested from the default executor. It is generally advised for the number
             *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
             */
            public NioEventLoopGroup(int nEventLoops) {
                this(nEventLoops, (Executor) null);
            }
    
            /**
             * Create a new instance that uses the the {@link SelectorProvider} which is returned by
             * {@link SelectorProvider#provider()}.
             *
             * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
             *                      If {@code executor} is {@code null} this number will also be the parallelism
             *                      requested from the default executor. It is generally advised for the number
             *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
             *                      {@code executor} to lie very close together.
             * @param executor   the {@link Executor} to use, or {@code null} if the default should be used.
             */
            public NioEventLoopGroup(int nEventLoops, Executor executor) {
                this(nEventLoops, executor, SelectorProvider.provider());
            }
    
            /**
             * Create a new instance that uses the the {@link SelectorProvider} which is returned by
             * {@link SelectorProvider#provider()}.
             *
             * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
             *                      If {@code executor} is {@code null} this number will also be the parallelism
             *                      requested from the default executor. It is generally advised for the number
             *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
             *                      {@code executor} to lie very close together.
             * @param executorServiceFactory   the {@link ExecutorServiceFactory} to use, or {@code null} if the default
             *                                 should be used.
             */
            public NioEventLoopGroup(int nEventLoops, ExecutorServiceFactory executorServiceFactory) {
                this(nEventLoops, executorServiceFactory, SelectorProvider.provider());
            }
    
            /**
             * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
             *                      If {@code executor} is {@code null} this number will also be the parallelism
             *                      requested from the default executor. It is generally advised for the number
             *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
             *                      {@code executor} to lie very close together.
             * @param executor  the {@link Executor} to use, or {@code null} if the default should be used.
             * @param selectorProvider  the {@link SelectorProvider} to use. This value must not be {@code null}.
             */
            public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) {
                super(nEventLoops, executor, selectorProvider);
            }
    
            /**
             * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
             *                      If {@code executor} is {@code null} this number will also be the parallelism
             *                      requested from the default executor. It is generally advised for the number
             *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
             *                      {@code executor} to lie very close together.
             * @param executorServiceFactory   the {@link ExecutorServiceFactory} to use, or {@code null} if the
             *                                 default should be used.
             * @param selectorProvider  the {@link SelectorProvider} to use. This value must not be {@code null}.
             */
            public NioEventLoopGroup(
                    int nEventLoops, ExecutorServiceFactory executorServiceFactory, final SelectorProvider selectorProvider) {
                super(nEventLoops, executorServiceFactory, selectorProvider);
            }

重点关注构造方法如下参数:

  • int nEventLoops
    EventLoop个数。
  • Executor executor
    任务执行器。
  • SelectorProvider selectorProvider
    Nio Selector。

并且在使用 EventLoopGroup boosGroup = new EventLoopGroup();时,在未调用其父类构造时,nEventLoops为0, executor为null,selectorProvider为特定平台下的Selector实现类。

2.2 进入到直接父类MultithreadEventLoopGroup构造方法

        /**
             * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)}
             */
            protected MultithreadEventLoopGroup(int nEventLoops, Executor executor, Object... args) {
                super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executor, args);
            }

该构造方法,只是初始化nEventLoops参数,如果为0,则使用CPU可用核心数的2倍。

2.3 进入到父类MultithreadEventExecutorGroup中,这里是具体初始化的地方

        private MultithreadEventExecutorGroup(int nEventExecutors,
                                                  Executor executor,
                                                  boolean shutdownExecutor,
                                                  Object... args) {
                if (nEventExecutors <= 0) {
                    throw new IllegalArgumentException(
                            String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors));
                }
    
                if (executor == null) {
                    executor = newDefaultExecutorService(nEventExecutors); // @1
                    shutdownExecutor = true;
                }
    
                children = new EventExecutor[nEventExecutors];                //@2
                if (isPowerOfTwo(children.length)) {                                     //@3
                    chooser = new PowerOfTwoEventExecutorChooser();
                } else {
                    chooser = new GenericEventExecutorChooser();
                }
    
                for (int i = 0; i < nEventExecutors; i ++) {
                    boolean success = false;
                    try {
                        children[i] = newChild(executor, args);                  //@4
                        success = true;
                    } catch (Exception e) {
                        // TODO: Think about if this is a good exception type
                        throw new IllegalStateException("failed to create a child event loop", e);
                    } finally {
                        if (!success) {                                       //@5
                            for (int j = 0; j < i; j ++) {
                                children[j].shutdownGracefully();
                            }
    
                            for (int j = 0; j < i; j ++) {
                                EventExecutor e = children[j];
                                try {
                                    while (!e.isTerminated()) {
                                        e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                    }
                                } catch (InterruptedException interrupted) {
                                    // Let the caller handle the interruption.
                                    Thread.currentThread().interrupt();
                                    break;
                                }
                            }
                        }
                    }
                }
    
                final boolean shutdownExecutor0 = shutdownExecutor;
                final Executor executor0 = executor;
                final FutureListener<Object> terminationListener = new FutureListener<Object>() {      //@6
                    @Override
                    public void operationComplete(Future<Object> future) throws Exception {
                        if (terminatedChildren.incrementAndGet() == children.length) {
                            terminationFuture.setSuccess(null);
                            if (shutdownExecutor0) {
                                // This cast is correct because shutdownExecutor0 is only try if
                                // executor0 is of type ExecutorService.
                                ((ExecutorService) executor0).shutdown();
                            }
                        }
                    }
                };
    
                for (EventExecutor e: children) {
                    e.terminationFuture().addListener(terminationListener);
                }
    
                Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
                Collections.addAll(childrenSet, children);
                readonlyChildren = Collections.unmodifiableSet(childrenSet);
            }

代码@1:创建该线程模型的线程执行器,此处返回的是io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors。

代码@2:开始创建该组的工作EventExecutor数组,其中真正存放的实例由代码@4中创建。

代码@3:创建从线程组(EventExecutor)执行器中选择一个执行。

代码@4:创建一个具体的执行器、有具体的线程模型(EventLoopGroup)子类实现。这里是下一步要重点关注的对象,NioEventLoopGroup实例化的对象为NioEventLoop:

202105051651420861.png

代码@5:如果创建执行器失败,则关闭资源。

代码@6:添加相关事件通知处理器。

执行到这里,NioEventLoopGroup的初始化完成,目前可以得出如下结论:

  • 1个NioEventLoopGroup可以有多个 NioEventLoop,轮流接受客户端请求;
  • 同一个NioEventLoopGroup中的NioEventLoop共同持有一个Executor,这个Executor是何许人也(io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors)。

接下来将重点探究NioEventLoop的类层次结构与其实现。

3、NioEventLoop初探

NioEventLoop继承自SingleThreadEventLoop,从字面上讲,NioEventLoopGroup的每一个NioEventLoop是一个线程的事件循环器,而每一个NioEventLoop中的执行器(EventExecutor)是一个并发度为nEventLoops的ForkJoinPool。
NioEventLoop的构造方法如下:

202105051651421772.png

这里大家是否想过一个问题,为什么单个线程的EventLoop(SingleThreadEventLoop)的事件循环器,里面需要用一个线程池呢?这里用单个线程当事件循环器有什么作用呢?。我目前的理解是,事件循环器(EventLoop)其实就是一个IO线程,首先使用单个线程来实现,简单高效,没有线程切换开销,多线程访问等问题;并且Netty将一个通道的读、写等操作都绑定到一个相同的事件循环器,这样有利于状态的保存,比如说可以比较方便的在Handler(Handler在IO线程中执行)使用线程本地变量ThreadLocal、同时减少线程切换。而使用一个线程池,而不是一个简单的线程,主要是为了提高程序的健壮性,如果单一线程由于异常,导致该线程消亡后,线程池会另起一个新的线程继续提供服务。[但是,从后面的分析看,NioEventLoop中的线程也是轮流执行的。]

接下来,将从SingleThreadEventExecutor,整个线程模型的执行者开始相信接口执行器内部运作逻辑。

4、SingleThreadEventExecutor源码分析

SingleThreadEventExecutor是NioEventLoopGroup的具体执行器,也就是NioEventLoopGroup中运行的线程,其实就是SingleThreadEventExecutor。本文将从重点属性、构造方法、核心方法三方面剖析该类的实现。

4.1 重要属性

        private static final int ST_NOT_STARTED = 1;        //状态,,,未启动,未启动接受任务
            private static final int ST_STARTED = 2;                 //   已启动,运行
            private static final int ST_SHUTTING_DOWN = 3; //关闭中(平滑关闭)
            private static final int ST_SHUTDOWN = 4;       //已关闭
            private static final int ST_TERMINATED = 5;    // 终止
    
            private final Queue<Runnable> taskQueue;   // 该线程模型执行器 任务队列,子类可定制自己的任务队列
            @SuppressWarnings({ "FieldMayBeFinal", "unused" })
            private volatile Thread thread;                         // 当前执行器运行的线程。
            private final Executor executor;                       // 具体的线程池,此处为 Netty实现的ForkJoinPool。
            private final Semaphore threadLock = new Semaphore(0);
            private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
            private final boolean addTaskWakesUp;      //如果设置为true, 当且仅当 添加              一个任务时,才唤醒选择器,比如是否唤醒 select() 函数。
            @SuppressWarnings({ "FieldMayBeFinal", "unused" })
            private volatile int state = ST_NOT_STARTED;     // 当前的状态

4.2 构造方法解读

        /**
             * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it.
             * @param executor the {@link Executor} which will be used for executing.
             * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up
             * the executor thread.
             */
            protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean                                                      addTaskWakesUp) {
                super(parent);
                if (executor == null) {
                    throw new NullPointerException("executor");
                }
                this.addTaskWakesUp = addTaskWakesUp;
                this.executor = executor;
                taskQueue = newTaskQueue();
            }
            /**
             * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
             * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
             * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
             * implementation that does not support blocking operations at all.
             */
            protected Queue<Runnable> newTaskQueue() {
                return new LinkedBlockingQueue<Runnable>();
            }

子类可以通过重写newTaskQueue方法,重写底层的任务队列。

4.3 核心方法

jdk并发包中线程池的实现,比如submit等方法,都是先包装成相关Task,然后调用execute方法。SingleThreadEventExecutor类,最终父类就是Executor,所以,我们从executor方法开始研究。

4.3.1 executor 方法

        @Override
            public void execute(Runnable task) {
                if (task == null) {
                    throw new NullPointerException("task");
                }
                boolean inEventLoop = inEventLoop();  //@1
                if (inEventLoop) {
                    addTask(task);                                     //@2
                } else {
                    startExecution();                                 //@3
                    addTask(task);                                   //@4
                    if (isShutdown() && removeTask(task)) {    //@5
                        reject();                                                       //@6
                    }
                }
                if (!addTaskWakesUp && wakesUpForTask(task)) {    //@7
                    wakeup(inEventLoop);                                               //@8
                }
            }

代码@1,是否在事件循环中,具体实现如下:

        @Override
    
        public boolean inEventLoop(Thread thread) {
    
            return thread == this.thread;
    
        }

怎么解释呢?提交任务的操作,比如需要提交一个读任务,或写任务,如果调用的API的线程是当前的EventLoop,则直接加入到任务队列中等待执行,如果是其他线程调用的,则启动该EventLoop线程进行调度,然后放入到任务队列中。举个简单的例子,在IO线程中(解码出请求信息后),将任务放入到业务队列中去处理的时候,如果调用channel.write方法,此时要注意的是,真实的writer方法的调用,不会在业务线程中调用,因为次数该业务线程并不是EventLoop的执行线程,只会将任务放入到队列,然后业务线程中直接返回(理解这里的异步操作)。

代码@2:如果是EventLoop执行的任务,直接加入任务队列

代码@3:如果是其他线程,则启动调度,稍后进行详细的代码分析。

代码@5,代码@6:如果停止执行,则拒绝服务。

代码@6,7:唤醒选择器,感觉这里addTaskWakesUp 这个变量有点问题,不过具体的唤醒选择器逻辑是对于事件模型来说比较重要,在相关的子类中都有重写。故这部分在研究NioEventLoop时再重点关注。

接下来重点研究步骤3,startExecution方法的实现

4.3.2 startExecution方法详解

        private void startExecution() {
                if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                    if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // @1
                        schedule(new ScheduledFutureTask<Void>(
                                this, Executors.<Void>callable(new PurgeTask(), null),
                                ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));   // @2
                        scheduleExecution();   //@3
                    }
                }
            }
    
        protected final void scheduleExecution() {
                updateThread(null);
                executor.execute(asRunnable); // @4
            }
    
        private void updateThread(Thread t) { 
            THREAD_UPDATER.lazySet(this, t);     //@5
        }
    
        private final Runnable asRunnable = new Runnable() {  //@6
                @Override
                public void run() {
                    updateThread(Thread.currentThread());
                    // lastExecutionTime must be set on the first run
                    // in order for shutdown to work correctly for the
                    // rare case that the eventloop did not execute
                    // a single task during its lifetime.
                    if (firstRun) {
                        firstRun = false;
                        updateLastExecutionTime();
                    }
                    try {
                        SingleThreadEventExecutor.this.run();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                        cleanupAndTerminate(false);
                    }
                }
            };

代码@1:该startExecution方法支持多次调用,因为对状态进行了CAS检测并设置,如果启动状态为未启动,并设置为启动中成功,则继续下文的启动流程。

代码@2:新建一个任务,该任务中取消任务队列中的任务,并移除。具体代码下文会重点分析。

代码@3:启动调度器,此处先将当前运行的线程设置为空,代码@5,可以使用lazySet来更新的原因是,对其可见性要求没那么高,因为在添加一个任务的时候,就算检测到当前线程不是EventLoop线程,也就是asRunnable线程,也没关系,会先调用startExecution方法,等其asRunable运行,然后再放入队列中。

代码@4:核心所在呀,这里让SingleThreadEventExecutor真正的名副其实是单个线程。尽管每个EventLoop的执行器是一个并发度为nEventCount的ForkJoinPool线程池。

代码@6:在ForkJoinPool中执行的任务,就是asRunnable中run方法的逻辑,而该方法里面首先先设置当前线程,然后执行SingleThreadEvent的run方法,run方法的具体实现由子类实现。

4.3.3 关于代码@2,关于schedule方法详解

        <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
                if (inEventLoop()) {
                    scheduledTaskQueue().add(task);
                } else {
                    execute(new Runnable() {
                        @Override
                        public void run() {
                            scheduledTaskQueue().add(task);
                        }
                    });
                }
    
                return task;
            }

整个调度任务的执行过程如下:先将任务封装成ScheduledFutureTask,然后如果当前线程是当前执行器线程,则直接加入到优先级队列中,如果不是,则调用execute方法,由执行器线程加入到调度任务的优先级队列中。这里是Netty线程模型的核心所在,通道的相关IO操作等最终都有由IO线程放入到队列并执行之。避免了多线程的竞争。

5、源码分析NioEventLoop

5.1 NioEventLoop重要属性详解

        private static final int CLEANUP_INTERVAL = 256;     //取消键的个数超过改造,将cancelKeys清空为0,并执行一次重新选择
        private static final boolean DISABLE_KEYSET_OPTIMIZATION =
                    SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); // 是否启用选择键优化(SelectionKeys),默认为true
    
        private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;   // 众所周知,jdk在1.7之前的Selector的select方法会出现空轮询,导致CPU资源紧张,解决
                                                                          //空轮询最小的空轮训次数,如果SELECTOR_AUTO_REBUILD_THRESHOLD小于该值,则不触发Selector的重建工作。
         private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;  //如果出现轮询(select),连续多少次未返回准备的键,则触发Selector重建。默认为512
        /**
             * The NIO {@link Selector}.
             */
            Selector selector;                // NIO Selector
            private SelectedSelectionKeySet selectedKeys;      // 可选择的键,netty对原生Selector的select()方法返回的键的一个优化集合
    
            private final SelectorProvider provider;
    
            /**
             * Boolean that controls determines if a blocked Selector.select should
             * break out of its selection process. In our case we use a timeout for
             * the select method and the select method will block for that time unless
             * waken up.
             */
            private final AtomicBoolean wakenUp = new AtomicBoolean();    //是否需要执行 selector的wakenup()方法
    
            private volatile int ioRatio = 50;   // ioRatio执行比例
            private int cancelledKeys;            //取消键的个数
            private boolean needsToSelectAgain; //是否需要重新select

5.2 核心入口方法 run

继上文的分析,一个任务提交到EventExecutor,首先会先确认是否开始执行(startExecution),在启动调度之前,会运行具体的线程调度处理逻辑run方法里的逻辑。NioEventLoop的间接父类为SingleThreadEventExecutor。

        @Override
            protected void run() {
                boolean oldWakenUp = wakenUp.getAndSet(false);      //@1
                try {
                    if (hasTasks()) {      // @2 
                        selectNow();
                    } else {
                        select(oldWakenUp);    //@3 
    
                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).
    
                        if (wakenUp.get()) {         // @4
                            selector.wakeup();
                        }
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {          // @51
                        processSelectedKeys();  //@52
                        runAllTasks();                //53
                    } else { //@61
                        final long ioStartTime = System.nanoTime();
    
                        processSelectedKeys();//@62
    
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);    //@63
                    }
    
                    if (isShuttingDown()) {   //@7
                        closeAll();
                        if (confirmShutdown()) {
                            cleanupAndTerminate(true);
                            return;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
    
                    // TODO: After using a ForkJoinPool that is potentially shared with other software
                    // than Netty. The Thread.sleep might be problematic. Even though this is unlikely to ever
                    // happen anyways.
    
                    // Prevent possible consecutive immediate failures that lead to
                    // excessive CPU consumption.
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
    
                scheduleExecution();     //@7
            }

代码@1:获取当前的wakeup状态并充值为false

代码@2:如果任务队列中有任务,则执行一次快速selectNow操作,该方法不阻塞。

代码@3:如果任务队列中,没有任务,则执行select方法。select方法,不会阻塞,因为调用的是selectNow或select(超时时间)

代码@4:如果需要weakup,则调用selector的weakup()方法。

代码@5,6:是处理具体的任务相关逻辑。相关方法在后文详细讲解。(根据ioRatio的值不同,处理的逻辑不同)。

代码@6:关闭流程。

代码@7:这里重新向EventExecutor提交任务,再次开始执行select方法。

202105051651422723.png

这里的设计,还得琢磨一下,为什么不直接在一个线程中发送执行。

先重点研究一下代码@3处select(wakeup)方法

5.2.1 select(wakeup)方法详解

        private void select(boolean oldWakenUp) throws IOException {
                Selector selector = this.selector;
                try {
                    int selectCnt = 0;
                    long currentTimeNanos = System.nanoTime();
                    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);   // @1
                    for (;;) {
                        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                        if (timeoutMillis <= 0) {
                            if (selectCnt == 0) {
                                selector.selectNow();
                                selectCnt = 1;
                            }
                            break;
                        }
    
                        int selectedKeys = selector.select(timeoutMillis);   // @2
                        selectCnt ++;
    
                        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {   // @3
                            // - Selected something,
                            // - waken up by user, or
                            // - the task queue has a pending task.
                            // - a scheduled task is ready for processing
                            break;
                        }
                        if (Thread.interrupted()) {
                            // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                            // As this is most likely a bug in the handler of the user or it's client library we will
                            // also log it.
                            //
                            // See https://github.com/netty/netty/issues/2426
                            if (logger.isDebugEnabled()) {
                                logger.debug("Selector.select() returned prematurely because " +
                                        "Thread.currentThread().interrupt() was called. Use " +
                                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                            }
                            selectCnt = 1;
                            break;
                        }
    
                        long time = System.nanoTime();
                        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {   // @4
                            // timeoutMillis elapsed without anything selected.
                            selectCnt = 1;
                        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {      //@5
                            // The selector returned prematurely many times in a row.
                            // Rebuild the selector to work around the problem.
                            logger.warn(
                                    "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                                    selectCnt);
    
                            rebuildSelector();                                                                     //@6
                            selector = this.selector;
    
                            // Select again to populate selectedKeys.
                            selector.selectNow();
                            selectCnt = 1;
                            break;
                        }
    
                        currentTimeNanos = time;
                    }
    
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                        }
                    }
                } catch (CancelledKeyException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
                    }
                    // Harmless exception - log anyway
                }
            }

代码@1:计算select方法应该传入的超时时间,方法主要是从优先级队列(调度队列)中,取第一个节点,计算该任务在多久后应该被调度。

        protected long delayNanos(long currentTimeNanos) {
    
                ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    
                if (scheduledTask == null) {
    
                    return SCHEDULE_PURGE_INTERVAL;
    
                }
    
                return scheduledTask.delayNanos(currentTimeNanos);
    
            }

代码@2:执行带超时时间的select方法。

代码@3:如果本次有选择出感兴趣的键、或有调度任务处理,则跳出,去执行相关的操作。

代码@4:此处的判断主要是判断是否是空轮询,由于select是带超时时间的,如果没有超过其超时时间,就返回并且没有选择到键,则认为发生了空轮训,然后执行@5的逻辑,如果连续发生空轮询,超过SELECTOR_AUTO_REBUILD_THRESHOLD的值(默认512)次的话,执行重建Selector操作,也就是@6 rebuildSelector()方法执行逻辑。

5.2.2 rebuildSelector 方法详解

        /**
             * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
             * around the infamous epoll 100% CPU bug.
             */
            public void rebuildSelector() {
                if (!inEventLoop()) {
                    execute(new Runnable() {
                        @Override
                        public void run() {
                            rebuildSelector();
                        }
                    });
                    return;
                }
    
                final Selector oldSelector = selector;
                final Selector newSelector;
    
                if (oldSelector == null) {
                    return;
                }
    
                try {
                    newSelector = openSelector();
                } catch (Exception e) {
                    logger.warn("Failed to create a new Selector.", e);
                    return;
                }
    
                // Register all channels to the new Selector.
                int nChannels = 0;
                for (;;) {
                    try {
                        for (SelectionKey key: oldSelector.keys()) {
                            Object a = key.attachment();
                            try {
                                if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                                    continue;
                                }
    
                                int interestOps = key.interestOps();
                                key.cancel();
                                SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                                if (a instanceof AbstractNioChannel) {
                                    // Update SelectionKey
                                    ((AbstractNioChannel) a).selectionKey = newKey;
                                }
                                nChannels ++;
                            } catch (Exception e) {
                                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                                if (a instanceof AbstractNioChannel) {
                                    AbstractNioChannel ch = (AbstractNioChannel) a;
                                    ch.unsafe().close(ch.unsafe().voidPromise());
                                } else {
                                    @SuppressWarnings("unchecked")
                                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                                    invokeChannelUnregistered(task, key, e);
                                }
                            }
                        }
                    } catch (ConcurrentModificationException e) {
                        // Probably due to concurrent modification of the key set.
                        continue;
                    }
    
                    break;
                }
    
                selector = newSelector;
    
                try {
                    // time to close the old selector as everything else is registered to the new one
                    oldSelector.close();
                } catch (Throwable t) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to close the old Selector.", t);
                    }
                }
    
                logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
            }

该方法的实现思路如下:

首先,调用者必须是EventExecutor的当前线程,然后就是新建一个Selector,然后将原来注册在Selector的通道,事件重新注册到新的Selector( selector.keys()),并取消在原Selector上的事件(取消操作非常重要,因为如果不取消,Selector关闭后,注册在Selector上的通道都将关闭)然后关闭旧的Selector以释放相关资源。上述代码就不一一详细解读了。

5.2.3 关于run方法 @52,@62 processSelectedKeys()方法详解

        private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
                // check if the set is empty and if so just return to not create garbage by
                // creating a new Iterator every time even if there is nothing to process.
                // See https://github.com/netty/netty/issues/597
                if (selectedKeys.isEmpty()) {
                    return;
                }
    
                Iterator<SelectionKey> i = selectedKeys.iterator();  
                for (;;) {
                    final SelectionKey k = i.next();
                    final Object a = k.attachment();
                    i.remove();
    
                    if (a instanceof AbstractNioChannel) {       //@1
                        processSelectedKey(k, (AbstractNioChannel) a);
                    } else {                //@2
                        @SuppressWarnings("unchecked")
                        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                        processSelectedKey(k, task);
                    }
    
                    if (!i.hasNext()) {
                        break;
                    }
    
                    if (needsToSelectAgain) {     //@3
                        selectAgain();
                        selectedKeys = selector.selectedKeys();
    
                        // Create the iterator again to avoid ConcurrentModificationException
                        if (selectedKeys.isEmpty()) {
                            break;
                        } else {
                            i = selectedKeys.iterator();
                        }
                    }
                }
            }

该方法,主要就是遍历选择键,真正对键的处理在代码@1,代码@2中。主要看一下代码@1的处理逻辑吧。

        private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
                final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
                if (!k.isValid()) {                                                                             //@1
                    // close the channel if the key is not valid anymore
                    unsafe.close(unsafe.voidPromise());
                    return;
                }
    
                try {
                    int readyOps = k.readyOps();   //@2
                    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                    // to a spin loop
                    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {     //@3
                        unsafe.read();
                        if (!ch.isOpen()) {
                            // Connection already closed - no need to handle write.
                            return;
                        }
                    }
                    if ((readyOps & SelectionKey.OP_WRITE) != 0) {   //@4
                        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                        ch.unsafe().forceFlush();
                    }
                    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {   //@5
                        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                        // See https://github.com/netty/netty/issues/924
                        int ops = k.interestOps();
                        ops &= ~SelectionKey.OP_CONNECT;
                        k.interestOps(ops);
    
                        unsafe.finishConnect();
                    }
                } catch (CancelledKeyException ignored) {
                    unsafe.close(unsafe.voidPromise());
                }
            }

代码@1:如果key不可用,直接将通道关闭。

代码@2:获取该key已准备就绪的操作事件。

代码@3:判断是否有读事件就绪,如果就绪,则直接调用该Channel的unsafe对象的read操作。如果通道已经关闭,则返回。

代码@4:如果有写事件就绪,调用通道 ch.unsafe().forceFlush()方法,强制进行写操作。

代码@5:如果是连接操作,则将该键的感兴趣的连接事件取消,然后调用finishConnect()实现。

5.2.4 关于run方法 @53,@63 runAllTask详解

        /**
             * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
             *
             * @return {@code true} if and only if at least one task was run
             */
            protected boolean runAllTasks() {
                fetchFromScheduledTaskQueue();
                Runnable task = pollTask();
                if (task == null) {
                    return false;
                }
    
                for (;;) {
                    try {
                        task.run();
                    } catch (Throwable t) {
                        logger.warn("A task raised an exception.", t);
                    }
    
                    task = pollTask();
                    if (task == null) {
                        lastExecutionTime = ScheduledFutureTask.nanoTime();
                        return true;
                    }
                }
            }
    
            /**
             * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
             * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
             */
            protected boolean runAllTasks(long timeoutNanos) {
                fetchFromScheduledTaskQueue();
                Runnable task = pollTask();
                if (task == null) {
                    return false;
                }
    
                final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
                long runTasks = 0;
                long lastExecutionTime;
                for (;;) {
                    try {
                        task.run();
                    } catch (Throwable t) {
                        logger.warn("A task raised an exception.", t);
                    }
    
                    runTasks ++;
    
                    // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                    // XXX: Hard-coded value - will make it configurable if it is really a problem.
                    if ((runTasks & 0x3F) == 0) {
                        lastExecutionTime = ScheduledFutureTask.nanoTime();
                        if (lastExecutionTime >= deadline) {
                            break;
                        }
                    }
    
                    task = pollTask();
                    if (task == null) {
                        lastExecutionTime = ScheduledFutureTask.nanoTime();
                        break;
                    }
                }
    
                this.lastExecutionTime = lastExecutionTime;
                return true;
            }
    
        private void fetchFromScheduledTaskQueue() {
                if (hasScheduledTasks()) {
                    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
                    for (;;) {
                        Runnable scheduledTask = pollScheduledTask(nanoTime);
                        if (scheduledTask == null) {
                            break;
                        }
                        taskQueue.add(scheduledTask);
                    }
                }
            }

上述代码就是运行积压在队列中的任务,在获取任务时没有使用队列的阻塞方法,故这些方法最终不会阻塞。

从这里可以更加具体说一下ioRatio这个参数的含义。io执行时间比例,取值为1到100,Netty把处理Selector.selecteKeys的时间,也就是processSelectedKey()方法执行的时间做执行时间的基数,标记为t1时间,从上文的processSelectedKey讲解,其实可以得出t1的时间就绪事件的网络读写时间。另一部分时间是运行任务队列中的时间。如果ioRatio为100的话,表示processSelectedKey,runAllTask时间依次执行,但如果ioRatio设置小于100的话,runAllTask的运行时间将减少(权重),网络事件(select)就绪事件更加容易得到执行,那我们不妨思考一下任务队列中的task是从如何被加入的?一般也是一些读写事件,但不是由IO线程(EventLoop触发的),而是在业务线程中调用网络读写相关API,此时会先加入队列,然后再被调度执行。但这两部分,其实都是IO时间,所以对于ioRatio这个参数,我认为是 运行任务队列的权重更为直观。

总结:本文深入细致的分析了Netty NIO的事件模型,从事件模型的初始化(构造的全过程)、核心属性、构造方法、核心入口方法等方面细致分析了NioEventLoopGroup、SingleThreadEventExecutor的原理。

Netty事件模型:核心思路基于主从Reactor模型,一个NioEventLoopGroup包含n个NioEventLoop,每一个NioEventLoop持有一个Selector和一个线程池(执行器EventExecute,其实是netty ForkJoinPool,并发度为n),在选择器的NioEventLoop的run方法每次运行后,就会交给NioEventLoop中线程池的另外一个线程,这里的设计,其实我不太明白为什么要这样,一直在一个线程中执行我个人觉得更好。

问题:

不知道netty为什么要这样设计,,在NioEventLoop中,内部会持有一个EventExecutor(ForkjoinPool,并发度为nEventLoops),我开始以为,一个NioEventLoop,只会用到ForkJoinPool中的一个线程,除非那个线程异常退出了后,才会用一个新的线程来提供服务,这样保证健壮性),,但我仔细看源码时,我发现,每一次select后,会重新进行一次scheduleExecution方法调用,这样会使用ForkJoinPool中的另外一个线程,,这样的设计,不利于ThreadLocal的使用,特别是线程本地内存分配,比如同一个通道的读操作,从线程本地变量中分配一个ByteBuf,然后写操作又会用ForkJoinPool的第二个线程,这样的ByteBuf又不会重复使用,为什么会这样设计呢?为什么在EventExecutor的线程池(ForkJoinPool),只使用一个线程,只有当这个线程奔溃后,再切换另一个线程进行处理(线程池自动处理)。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> 再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor
上一篇
高仿Dubbo服务调用模型、私有协议实现、编码解码器使用实践
下一篇
NioSocketChannel源码分析之读事件处理逻辑