2019-10-28 17:45  阅读(2328)
文章分类:Tomcat 源码分析 文章标签:TomcatTomcat 源码
©  原文作者:clawhub 原文地址:http://clawhub.club

上一篇分析完Acceptor,将获取的Socket封装成NioChannel注册到Poller,在注册的过程中NioChannel会封装成PollerEvent。
本篇主要看PollerEvent是怎么处理的。

Poller类实现了Runnable接口,所以主要看其run方法:

          /**
                 * The background thread that adds sockets to the Poller, checks the
                 * poller for triggered events and hands the associated socket off to an
                 * appropriate processor as events occur.
                 * 后台线程将套接字添加到轮询器,检查轮询器是否触发事件,并在事件发生时将关联的套接字交给适当的处理器。
                 */
                @Override
                public void run() {
                    // Loop until destroy() is called
                    while (true) {
                        boolean hasEvents = false;
                        try {
                            //没有关闭
                            if (!close) {
                                //处理轮询器事件队列中的事件。
                                hasEvents = events();
                                if (wakeupCounter.getAndSet(-1) > 0) {
                                    //if we are here, means we have other stuff to do
                                    //do a non blocking select
                                    //非阻塞,只要有通道就绪就立刻返回。
                                    keyCount = selector.selectNow();
                                } else {
                                    //阻塞到至少有一个通道在你注册的事件上就绪了,最长阻塞时间为selectorTimeout毫秒。
                                    keyCount = selector.select(selectorTimeout);
                                }
                                wakeupCounter.set(0);
                            }
                            //关闭
                            if (close) {
                                //处理轮询器事件队列中的事件。
                                events();
                                //超时的处理
                                timeout(0, false);
                                try {
                                    selector.close();
                                } catch (IOException ioe) {
                                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                                }
                                break;
                            }
                        } catch (Throwable x) {
                            ExceptionUtils.handleThrowable(x);
                            log.error("", x);
                            continue;
                        }
                        //either we timed out or we woke up, process events first
                        //要么超时,要么醒来,先处理事件
                        if (keyCount == 0) hasEvents = (hasEvents | events());
    
                        //获取每一个SelectionKey,并处理
                        Iterator<SelectionKey> iterator =
                                keyCount > 0 ? selector.selectedKeys().iterator() : null;
                        // Walk through the collection of ready keys and dispatch
                        // any active event.
                        while (iterator != null && iterator.hasNext()) {
                            SelectionKey sk = iterator.next();
                            //附件,里面有数据
                            NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
                            // Attachment may be null if another thread has called
                            // cancelledKey()
                            if (attachment == null) {
                                iterator.remove();
                            } else {
                                iterator.remove();
                                //处理数据
                                processKey(sk, attachment);
                            }
                        }//while
    
                        //process timeouts
                        timeout(keyCount, hasEvents);
                    }//while
                    // Poller销毁,计数器减一
                    getStopLatch().countDown();
                }
    

主要就是获取队列中的事件,处理事件,这里看两个方法:

events

处理轮询器事件队列中的事件。

          /**
                 * Processes events in the event queue of the Poller.
                 * 处理轮询器事件队列中的事件。
                 *
                 * @return <code>true</code> if some events were processed,
                 * <code>false</code> if queue was empty
                 */
                public boolean events() {
                    boolean result = false;
    
                    PollerEvent pe = null;
                    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
                        result = true;
                        try {
                            //PollerEvent也实现了Runnable接口
                            pe.run();
                            //重置
                            pe.reset();
                            if (running && !paused) {
                                eventCache.push(pe);
                            }
                        } catch (Throwable x) {
                            log.error("", x);
                        }
                    }
    
                    return result;
                }
    

继续看PollerEvent的run方法:

         @Override
                public void run() {
                    //注册操作
                    if (interestOps == OP_REGISTER) {
                        try {
                            //注册Selector,读操作
                            socket.getIOChannel().register(
                                    socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                        } catch (Exception x) {
                            log.error(sm.getString("endpoint.nio.registerFail"), x);
                        }
                    } else {
                        final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                        try {
                            if (key == null) {
                                // The key was cancelled (e.g. due to socket closure)
                                // and removed from the selector while it was being
                                // processed. Count down the connections at this point
                                // since it won't have been counted down when the socket
                                // closed.
                                socket.socketWrapper.getEndpoint().countDownConnection();
                                ((NioSocketWrapper) socket.socketWrapper).closed = true;
                            } else {
                                final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                                if (socketWrapper != null) {
                                    //we are registering the key to start with, reset the fairness counter.
                                    int ops = key.interestOps() | interestOps;
                                    socketWrapper.interestOps(ops);
                                    //设置interest值
                                    key.interestOps(ops);
                                } else {
                                    socket.getPoller().cancelledKey(key);
                                }
                            }
                        } catch (CancelledKeyException ckx) {
                            try {
                                socket.getPoller().cancelledKey(key);
                            } catch (Exception ignore) {
                            }
                        }
                    }
                }
    

这一步可以说是为了后面处理数据做铺垫。

processKey(sk, attachment)

        protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
                    try {
                        if (close) {
                            //取消键,里面各种关闭
                            cancelledKey(sk);
                        } else if (sk.isValid() && attachment != null) {
                            //有效且有数据
                            if (sk.isReadable() || sk.isWritable()) {
                                //有文件时
                                if (attachment.getSendfileData() != null) {
                                    processSendfile(sk, attachment, false);
                                } else {
                                    //取消注册
                                    unreg(sk, attachment, sk.readyOps());
                                    boolean closeSocket = false;
                                    // Read goes before write
                                    if (sk.isReadable()) {
                                        //处理可读
                                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                            closeSocket = true;
                                        }
                                    }
                                    //可写
                                    if (!closeSocket && sk.isWritable()) {
                                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                            closeSocket = true;
                                        }
                                    }
                                    if (closeSocket) {
                                        cancelledKey(sk);
                                    }
                                }
                            }
                        } else {
                            //invalid key
                            cancelledKey(sk);
                        }
                    } catch (CancelledKeyException ckx) {
                        cancelledKey(sk);
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        log.error("", t);
                    }
                }
    

可以看出主要是调用processSocket方法:

         /**
             * Process the given SocketWrapper with the given status. Used to trigger
             * processing as if the Poller (for those endpoints that have one)
             * selected the socket.
             *
             * @param socketWrapper The socket wrapper to process
             * @param event         The socket event to be processed
             * @param dispatch      Should the processing be performed on a new
             *                          container thread
             *
             * @return if processing was triggered successfully
             */
            public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                    SocketEvent event, boolean dispatch) {
                try {
                    if (socketWrapper == null) {
                        return false;
                    }
                    SocketProcessorBase<S> sc = processorCache.pop();
                    if (sc == null) {
                        sc = createSocketProcessor(socketWrapper, event);
                    } else {
                        sc.reset(socketWrapper, event);
                    }
                    Executor executor = getExecutor();
                    if (dispatch && executor != null) {
                        executor.execute(sc);
                    } else {
                        sc.run();
                    }
                } catch (RejectedExecutionException ree) {
                    getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
                    return false;
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    // This means we got an OOM or similar creating a thread, or that
                    // the pool and its queue are full
                    getLog().error(sm.getString("endpoint.process.fail"), t);
                    return false;
                }
                return true;
            }
    

这里将attachment和SocketEvent封装成SocketProcessor,交给Executor(即工作线程)处理。
下一篇继续分析工作线程的操作流程。


来源:https://www.jianshu.com/u/9632919f32c3

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Tomcat源码分析【十二】请求处理过程分析之Poller
上一篇
Tomcat源码分析【十一】请求处理过程分析之Acceptor
下一篇
Tomcat源码分析【十三】请求处理过程分析之SocketProcessor