2019-10-28 17:45  阅读(1896)
文章分类:Tomcat 源码分析 文章标签:TomcatTomcat 源码
©  原文作者:clawhub 原文地址:http://clawhub.club

定义

Connector官方文档

HTTP连接器元素表示支持HTTP/1.1协议的连接器组件。除了能够执行servlet和JSP页面之外,它还使Catalina能够作为独立的web服务器运行。
此组件的特定实例侦听服务器上特定TCP端口号上的连接。可以将一个或多个这样的连接器配置为单个服务的一部分,
每个连接器都转发到关联的引擎来执行请求处理并创建响应。
如果希望使用AJP协议(如mod_jk 1.2)配置用于连接到web服务器的连接器。 (such as the mod_jk 1.2.x connector for Apache 1.3),请参阅AJP连接器文档。
每个传入请求在请求期间都需要一个线程。如果接收到的并发请求多于当前可用的请求处理线程所能处理的,那么将创建更多的线程,
直到配置的最大值(maxThreads属性的值)。如果还同时接收到更多的请求,则将它们堆积在连接器创建的服务器套接字中,
直到配置的最大值(acceptCount属性的值)。任何进一步的同步请求都会收到“连接拒绝”错误,直到有可用的资源来处理它们。

我们使用Tomcat,一般只用到Web服务器、Servlet容器这两个特性,现在用JSP的太少了,使用AJP协议的地方也没接触过。
所以可以简单的将Connector理解为,监听特定TCP端口,接收用户请求,封装成Request和Response,传递到Servlet处理业务,最后返回结果给客户端。

Connector容器,涉及到Http11NioProtocol(ProtocolHandler)、Http11Processor(Processor)、CoyoteAdapter(Adapter)、
NioEndpoint、ConnectionHandler(AbstractEndpoint.Handler)、Poller、Acceptor等,这些类相互协作,监听、处理、封装、传递请求,响应客户。

下面根据生命周期来看Connector类:

Connector的构造

         public Connector(String protocol) {
                boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
                        AprLifecycleListener.getUseAprConnector();
    
                if ("HTTP/1.1".equals(protocol) || protocol == null) {
                    if (aprConnector) {
                        protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
                    } else {
                        protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
                    }
                } else if ("AJP/1.3".equals(protocol)) {
                    if (aprConnector) {
                        protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
                    } else {
                        protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
                    }
                } else {
                    protocolHandlerClassName = protocol;
                }
    
                // Instantiate protocol handler
                ProtocolHandler p = null;
                try {
                    Class<?> clazz = Class.forName(protocolHandlerClassName);
                    p = (ProtocolHandler) clazz.getConstructor().newInstance();
                } catch (Exception e) {
                    log.error(sm.getString(
                            "coyoteConnector.protocolHandlerInstantiationFailed"), e);
                } finally {
                    this.protocolHandler = p;
                }
    
                // Default for Connector depends on this system property
                setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
            }
    

首先通过协议名称创建ProtocolHandler,这里假设为Http11NioProtocol,这个类将引领全局,再来看Http11NioProtocol的构造:

         public Http11NioProtocol() {
                super(new NioEndpoint());
            }
         public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S,?> endpoint) {
                super(endpoint);
            }
         public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) {
                super(endpoint);
                setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
                ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
                setHandler(cHandler);
                getEndpoint().setHandler(cHandler);
            }
         public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
                this.endpoint = endpoint;
                setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
                setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
            }
    

创建了NioEndpoint,和ConnectionHandler。

initInternal初始化

         @Override
            protected void initInternal() throws LifecycleException {
                //父类初始化
                super.initInternal();
                if (protocolHandler == null) {
                    throw new LifecycleException(
                            sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
                }
                // Initialize adapter
                //初始化Adapter
                adapter = new CoyoteAdapter(this);
                //adapter设置到protocolHandler中
                protocolHandler.setAdapter(adapter);
                // Make sure parseBodyMethodsSet has a default
                if (null == parseBodyMethodsSet) {
                    setParseBodyMethods(getParseBodyMethods());
                }
                if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
                    throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                            getProtocolHandlerClassName()));
                }
                if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                        protocolHandler instanceof AbstractHttp11JsseProtocol) {
                    AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                            (AbstractHttp11JsseProtocol<?>) protocolHandler;
                    if (jsseProtocolHandler.isSSLEnabled() &&
                            jsseProtocolHandler.getSslImplementationName() == null) {
                        // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
                        jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
                    }
                }
                try {
                    //初始化protocolHandler
                    protocolHandler.init();
                } catch (Exception e) {
                    throw new LifecycleException(
                            sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
                }
            }
    

这里首先初始化CoyoteAdapter,这个类是用来和Container(Servlet)交互的,之后初始化protocolHandler。再来看protocolHandler.init()方法:

         @Override
            public void init() throws Exception {
                // Upgrade protocols have to be configured first since the endpoint
                // init (triggered via super.init() below) uses this list to configure
                // the list of ALPN protocols to advertise
                for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
                    configureUpgradeProtocol(upgradeProtocol);
                }
    
                super.init();
            }
    

协议升级操作,再调用上一层:

         @Override
            public void init() throws Exception {
                if (getLog().isInfoEnabled()) {
                    getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
                }
    
                if (oname == null) {
                    // Component not pre-registered so register it
                    oname = createObjectName();
                    if (oname != null) {
                        Registry.getRegistry(null, null).registerComponent(this, oname, null);
                    }
                }
    
                if (this.domain != null) {
                    rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
                    Registry.getRegistry(null, null).registerComponent(
                            getHandler().getGlobal(), rgOname, null);
                }
    
                String endpointName = getName();
                endpoint.setName(endpointName.substring(1, endpointName.length()-1));
                endpoint.setDomain(domain);
    
                endpoint.init();
            }
    

这里重要的是endpoint.init()方法:

         public final void init() throws Exception {
                if (bindOnInit) {
                    bind();
                    bindState = BindState.BOUND_ON_INIT;
                }
                if (this.domain != null) {
                    // Register endpoint (as ThreadPool - historical name)
                    oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
                    Registry.getRegistry(null, null).registerComponent(this, oname, null);
    
                    for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
                        registerJmx(sslHostConfig);
                    }
                }
            }
    

是否要在初始化的时候创建连接。默认为true,进入bind方法:

         /**
             * Initialize the endpoint.
             */
            @Override
            public void bind() throws Exception {
                //初始化服务套接字
                initServerSocket();
                // Initialize thread count defaults for acceptor, poller
                if (acceptorThreadCount == 0) {
                    // FIXME: Doesn't seem to work that well with multiple accept threads
                    acceptorThreadCount = 1;
                }
                if (pollerThreadCount <= 0) {
                    //minimum one poller thread
                    pollerThreadCount = 1;
                }
                //计数器
                setStopLatch(new CountDownLatch(pollerThreadCount));
                // Initialize SSL if needed
                initialiseSsl();
                //打开selector池
                selectorPool.open();
            }
         /**
             * Separated out to make it easier for folks that extend NioEndpoint to implement custom [server]sockets
             */
            protected void initServerSocket() throws Exception {
                if (!getUseInheritedChannel()) {
                    serverSock = ServerSocketChannel.open();
                    socketProperties.setProperties(serverSock.socket());
                    InetSocketAddress addr = (getAddress() != null ? new InetSocketAddress(getAddress(), getPort()) : new InetSocketAddress(getPort()));
                    serverSock.socket().bind(addr, getAcceptCount());
                } else {
                    // Retrieve the channel provided by the OS
                    Channel ic = System.inheritedChannel();
                    if (ic instanceof ServerSocketChannel) {
                        serverSock = (ServerSocketChannel) ic;
                    }
                    if (serverSock == null) {
                        throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
                    }
                }
                //mimic APR behavior
                serverSock.configureBlocking(true);
            }
    

初始化服务套接字等操作。

startInternal开启

          /**
             * Begin processing requests via this Connector.
             *
             * @exception LifecycleException if a fatal startup error occurs
             */
            @Override
            protected void startInternal() throws LifecycleException {
                // Validate settings before starting
                if (getPort() < 0) {
                    throw new LifecycleException(sm.getString(
                            "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
                }
                setState(LifecycleState.STARTING);
                try {
                    protocolHandler.start();
                } catch (Exception e) {
                    throw new LifecycleException(
                            sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
                }
            }
    

Connector的startInternal最后调用到了protocolHandler.start()方法,从上一步的initInternal到这一步,已经能简单的看出,
Connector组件的核心就是这个protocolHandler。

            @Override
            public void start() throws Exception {
                if (getLog().isInfoEnabled()) {
                    getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
                }
                //AbstractEndpoint
                endpoint.start();
                // Start async timeout thread
                asyncTimeout = new AsyncTimeout();
                Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
                int priority = endpoint.getThreadPriority();
                if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
                    priority = Thread.NORM_PRIORITY;
                }
                timeoutThread.setPriority(priority);
                timeoutThread.setDaemon(true);
                timeoutThread.start();
            }
    

这里有两步,开启endpoint,初始化AsyncTimeout,这两个部分以前分析过,这里再简单过一遍:
AsyncTimeout的作用是周期检测异步请求是否超时,重要的还是endpoint.start()方法:

        public final void start() throws Exception {
                //bind状态为未绑定的时候,执行bind
                if (bindState == BindState.UNBOUND) {
                    //子类实现
                    bind();
                    bindState = BindState.BOUND_ON_START;
                }
                //这由子类实现
                startInternal();
            }
    

这里只需看endpoint的startInternal方法:

         /**
             * Start the NIO endpoint, creating acceptor, poller threads.
             */
            @Override
            public void startInternal() throws Exception {
                if (!running) {
                    running = true;
                    paused = false;
                    //用于SocketProcessor对象的缓存
                    processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getProcessorCache());
                    //为轮询器事件缓存PollerEvent
                    eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
                    //Bytebuffer缓存,每个通道持有一组缓冲区(两个,SSL持有四个)
                    //NioChannel
                    nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getBufferPool());
                    // Create worker collection
                    if (getExecutor() == null) {
                        createExecutor();
                    }
                    //初始化连接限制
                    initializeConnectionLatch();
                    // Start poller threads
                    //轮询器启动线程,都为守护线程
                    pollers = new Poller[getPollerThreadCount()];
                    for (int i = 0; i < pollers.length; i++) {
                        pollers[i] = new Poller();
                        Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
                        pollerThread.setPriority(threadPriority);
                        pollerThread.setDaemon(true);
                        pollerThread.start();
                    }
                    //开始所有的Acceptor线程,用于监听套接字的
                    startAcceptorThreads();
                }
            }
    

这里主要是创建了acceptor和poller线程,下面分别看一下:

Acceptor

主要作用是获取新的连接,组装后注册到Poller中。

Poller

首先看一下构造函数:

         public Poller() throws IOException {
                    this.selector = Selector.open();
                }
    

可以看到一个Poller对应一个selector, 用于检测已就绪的 Socket。 默认最多不超过 2 个,这里简单的介绍,后期后详细分析。

stopInternal关闭

          /**
             * Terminate processing requests via this Connector.
             *
             * @exception LifecycleException if a fatal shutdown error occurs
             */
            @Override
            protected void stopInternal() throws LifecycleException {
                setState(LifecycleState.STOPPING);
                try {
                    if (protocolHandler != null) {
                        protocolHandler.stop();
                    }
                } catch (Exception e) {
                    throw new LifecycleException(
                            sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);
                }
            }
    

依然调用protocolHandler方法:

            @Override
            public void stop() throws Exception {
                if(getLog().isInfoEnabled()) {
                    getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
                }
                if (asyncTimeout != null) {
                    asyncTimeout.stop();
                }
                endpoint.stop();
            }
    

终止异步检测线程,关闭endpoint.AbstractEndpoint中的stop方法:

         public final void stop() throws Exception {
                stopInternal();
                if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) {
                    unbind();
                    bindState = BindState.UNBOUND;
                }
            }
    

分别调用NioEndpoint的stopInternal与unbind,这些方法后期会分析,这里先略过。

destroyInternal销毁

         @Override
            protected void destroyInternal() throws LifecycleException {
                try {
                    if (protocolHandler != null) {
                        protocolHandler.destroy();
                    }
                } catch (Exception e) {
                    throw new LifecycleException(
                            sm.getString("coyoteConnector.protocolHandlerDestroyFailed"), e);
                }
    
                if (getService() != null) {
                    getService().removeConnector(this);
                }
    
                super.destroyInternal();
            }
    

除了调用 protocolHandler.destroy()方法外,还解除了Service容器与当前Connector之间的关系。
protocolHandler.destroy():

         @Override
            public void destroy() throws Exception {
                if(getLog().isInfoEnabled()) {
                    getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
                }
    
                try {
                    endpoint.destroy();
                } finally {
                    if (oname != null) {
                        if (mserver == null) {
                            Registry.getRegistry(null, null).unregisterComponent(oname);
                        } else {
                            // Possibly registered with a different MBeanServer
                            try {
                                mserver.unregisterMBean(oname);
                            } catch (MBeanRegistrationException | InstanceNotFoundException e) {
                                getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
                                        oname, mserver));
                            }
                        }
                    }
    
                    if (rgOname != null) {
                        Registry.getRegistry(null, null).unregisterComponent(rgOname);
                    }
                }
            }
         public final void destroy() throws Exception {
                if (bindState == BindState.BOUND_ON_INIT) {
                    unbind();
                    bindState = BindState.UNBOUND;
                }
                Registry registry = Registry.getRegistry(null, null);
                registry.unregisterComponent(oname);
                for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
                    unregisterJmx(sslHostConfig);
                }
            }
    

本篇文章就分析到这里吧,有个重要的逻辑没有分析,就是Acceptor接收请求,传递到Poller中处理,再转给worker线程包装,
最后调用容器的 pipeline 进行处理。


来源:https://www.jianshu.com/u/9632919f32c3

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Tomcat源码分析【九】Connector连接器
上一篇
Tomcat源码分析【八】启动过程分析之注册关闭钩子、阻塞监听关闭指令
下一篇
Tomcat源码分析【十】请求处理过程分析之NIO网络操作