<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
作為常用的http
協定伺服器,tomcat
應用非常廣泛。tomcat也是遵循Servelt協定的,Servelt
協定可以讓伺服器與真實服務邏輯程式碼進行解耦。各自只需要關注Servlet
協定即可。
對於tomcat是如何作為一個高效能的伺服器的呢?你是不是也會有這樣的疑問?
tomcat是如何接收網路請求?
如何做到高效能的http
協定伺服器?
tomcat從8.0往後開始使用了NIO非阻塞io模型,提高了吞吐量,本文的原始碼是tomcat 9.0.48版本
org.apache.tomcat.util.net.Acceptor
實現了Runnable
介面,在一個單獨的執行緒中以死迴圈的方式一直進行socket的監聽
執行緒的初始化及啟動是在方法org.apache.tomcat.util.net.AbstractEndpoint#startAcceptorThread
有個很重要的屬性org.apache.tomcat.util.net.AbstractEndpoint
;同時實現了run
方法,方法中主要有以下功能:
socketChannel
public void run() { int errorDelay = 0; try { // Loop until we receive a shutdown command while (!stopCalled) { ... if (stopCalled) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait // 如果連線超過了 8*1024,則執行緒阻塞等待; 是使用org.apache.tomcat.util.threads.LimitLatch類實現了分享鎖(內部實現了AbstractQueuedSynchronizer) // 請你注意到達最大連線數後作業系統底層還是會接收使用者端連線,但使用者層已經不再接收。 endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; } U socket = null; try { // Accept the next incoming connection from the server // socket // 抽象方法,不同的endPoint有不同的實現方法。NioEndPoint為例,實現方法為serverSock.accept(),這個方法主要看serverSock範例化時如果為阻塞,accept方法為阻塞;反之為立即返回,如果沒有socket連結,則為null socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (!stopCalled && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // endPoint類的抽象方法,不同的endPoint有不同的實現。處理獲取到的socketChannel連結,如果該socket連結能正常處理,那麼該方法會返回true,否則為false if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ... } } } finally { stopLatch.countDown(); } state = AcceptorState.ENDED; }
再來看下org.apache.tomcat.util.net.NioEndpoint#setSocketOptions
方法的具體實現(NioEndpoint為例)
這個方法中主要做的事:
protected boolean setSocketOptions(SocketChannel socket) { NioSocketWrapper socketWrapper = null; try { // Allocate channel and wrapper // 優先使用已有的快取nioChannel NioChannel channel = null; if (nioChannels != null) { channel = nioChannels.pop(); } if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(bufhandler, this); } else { channel = new NioChannel(bufhandler); } } // 將nioEndpoint與NioChannel進行包裝 NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this); channel.reset(socket, newWrapper); connections.put(socket, newWrapper); socketWrapper = newWrapper; // Set socket properties // Disable blocking, polling will be used // 設定當前連結的socket為非阻塞 socket.configureBlocking(false); if (getUnixDomainSocketPath() == null) { socketProperties.setProperties(socket.socket()); } socketWrapper.setReadTimeout(getConnectionTimeout()); socketWrapper.setWriteTimeout(getConnectionTimeout()); socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); // 將包裝後的nioChannel與nioEndpoint進行註冊,註冊到Poller,將對應的socket包裝類新增到Poller的佇列中,同時喚醒selector poller.register(socketWrapper); return true; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } if (socketWrapper == null) { destroySocket(socket); } } // Tell to close the socket if needed return false; }
上一小節是接收到了socket請求,進行包裝之後,將socket新增到了Poller
的佇列上,並可能喚醒了Selector
,本小節就來看看,Poller是如何進行socket的輪詢的。
首先org.apache.tomcat.util.net.NioEndpoint.Poller
也是實現了Runnable介面,是一個可以單獨啟動的執行緒
初始化及啟動是在org.apache.tomcat.util.net.NioEndpoint#startInternal
重要的屬性:
java.nio.channels.Selector
:在Poller物件初始化的時候,就會啟動輪詢器SynchronizedQueue<PollerEvent>
:同步的事件佇列再來看下具體處理邏輯,run方法的原始碼
public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { // 去SynchronizedQueue事件佇列中拉去,看是否已經有了事件,如果有,則返回true // 如果從佇列中拉取到了event(即上一步將NioSocketWrapper封裝為PollerEvent新增到次佇列中),將socketChannel註冊到Selector上,標記為SelectionKey.OP_READ,新增處理常式attachment(為Accetpor新增到Poller時的 // NioSocketWrapper) hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { // If we are here, means we have other stuff to do // Do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } // Either we timed out or we woke up, process events first if (keyCount == 0) { hasEvents = (hasEvents | events()); } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. // selector輪詢獲取已經註冊的事件,如果有事件準備好,此時通過selectKeys方法就能拿到對應的事件 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); // 獲取到事件後,從迭代器刪除事件,防止事件重複輪詢 iterator.remove(); // 獲取事件的處理器,這個attachment是在event()方法中註冊的,後續這個事件的處理,就交給這個wrapper去處理 NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper != null) { processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); } getStopLatch().countDown(); }
在這裡,有一個很重要的方法,org.apache.tomcat.util.net.NioEndpoint.Poller#events()
,他是從Poller
的事件佇列中獲取Acceptor
接收到的可用socket,並將其註冊到Selector
上
/** * Processes events in the event queue of the Poller. * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty */ public boolean events() { boolean result = false; PollerEvent pe = null; // 如果Acceptor將socket新增到佇列中,那麼events.poll()方法就能拿到對應的事件,否則拿不到就返回false for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; NioSocketWrapper socketWrapper = pe.getSocketWrapper(); SocketChannel sc = socketWrapper.getSocket().getIOChannel(); int interestOps = pe.getInterestOps(); if (sc == null) { log.warn(sm.getString("endpoint.nio.nullSocketChannel")); socketWrapper.close(); } else if (interestOps == OP_REGISTER) { // 如果是Acceptor剛新增到佇列中的事件,那麼此時的ops就是OP_REGISTER try {, // 將次socket註冊到selector上,標記為OP_READ事件,新增事件觸發時處理常式socketWrapper sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { // ??這裡的邏輯,不清楚什麼情況下會進入到這個分支裡面 final SelectionKey key = sc.keyFor(getSelector()); if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. socketWrapper.close(); } else { final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment(); if (attachment != null) { // We are registering the key to start with, reset the fairness counter. try { int ops = key.interestOps() | interestOps; attachment.interestOps(ops); key.interestOps(ops); } catch (CancelledKeyException ckx) { cancelledKey(key, socketWrapper); } } else { cancelledKey(key, socketWrapper); } } } if (running && !paused && eventCache != null) { pe.reset(); eventCache.push(pe); } } return result; }
還有一個重要方法就是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey
,上一個方法是獲取event,並註冊到selector,那這個方法就是通過Selector
獲取到的資料準備好的event,並開始封裝成對應的業務處理執行緒SocketProcessorBase
,扔到執行緒池裡開始處理
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) { try { if (close) { cancelledKey(sk, socketWrapper); } else if (sk.isValid()) { if (sk.isReadable() || sk.isWritable()) { if (socketWrapper.getSendfileData() != null) { processSendfile(sk, socketWrapper, false); } else { unreg(sk, socketWrapper, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { //這裡如果是非同步的操作,就會走這裡 if (socketWrapper.readOperation != null) { if (!socketWrapper.readOperation.process()) { closeSocket = true; } } else if (socketWrapper.readBlocking) { // readBlocking預設為false synchronized (socketWrapper.readLock) { socketWrapper.readBlocking = false; socketWrapper.readLock.notify(); } } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) { // 處理正常的事件,這裡的processSocket就要正式開始處理請求了。 // 將對應的事件封裝成對應的執行緒,然後交給執行緒池去處理正式的請求業務 closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (socketWrapper.writeOperation != null) { if (!socketWrapper.writeOperation.process()) { closeSocket = true; } } else if (socketWrapper.writeBlocking) { synchronized (socketWrapper.writeLock) { socketWrapper.writeBlocking = false; socketWrapper.writeLock.notify(); } } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk, socketWrapper); } } } } else { // Invalid key cancelledKey(sk, socketWrapper); } } catch (CancelledKeyException ckx) { cancelledKey(sk, socketWrapper); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.nio.keyProcessingError"), t); } }
上一步,Selector獲取到了就緒的請求socket,然後根據socket註冊的觸發處理常式等,將這些資料進行封裝,扔到了執行緒池裡,開始具體的業務邏輯處理。本節就是從工作執行緒封裝開始,org.apache.tomcat.util.net.SocketProcessorBase
為工作執行緒類的抽象類,實現了Runnable介面,不同的Endpoint實現具體的處理邏輯,本節以NioEndpoint為例
以下為org.apache.tomcat.util.net.AbstractEndpoint#processSocket
方法原始碼
/** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } // 優先使用已經存在的執行緒 SocketProcessorBase<S> sc = null; if (processorCache != null) { sc = processorCache.pop(); } if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // 獲取執行緒池。執行緒池的初始化,是在Acceptor、Poller這兩個單獨執行緒啟動之前建立 // tomcat使用了自定義的org.apache.tomcat.util.threads.TaskQueue,這塊tomcat也進行了小的適配開發 // 核心執行緒為10個,最大200執行緒 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
上面的方法是得到了處理業務邏輯的執行緒SocketProcessorBase,NioEndpoint內部類org.apache.tomcat.util.net.NioEndpoint.SocketProcessor
繼承了這個抽象類,也就是具體的業務處理邏輯在org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun
方法中,最終呼叫到我們的Servlet
protected void doRun() { /* * Do not cache and re-use the value of socketWrapper.getSocket() in * this method. If the socket closes the value will be updated to * CLOSED_NIO_CHANNEL and the previous value potentially re-used for * a new connection. That can result in a stale cached value which * in turn can result in unintentionally closing currently active * connections. */ Poller poller = NioEndpoint.this.poller; if (poller == null) { socketWrapper.close(); return; } try { int handshake = -1; try { // 握手相關判斷邏輯 ... } catch (IOException x) { ... } // 三次握手成功了 if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket // event為SocketEvent.OPEN_READ,這個變數是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey方法賦值 if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { // 這裡就開始正式處理請求了 state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { poller.cancelledKey(getSelectionKey(), socketWrapper); } } else if (handshake == -1 ) { getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL); poller.cancelledKey(getSelectionKey(), socketWrapper); } else if (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { poller.cancelledKey(getSelectionKey(), socketWrapper); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error(sm.getString("endpoint.processing.fail"), t); poller.cancelledKey(getSelectionKey(), socketWrapper); } finally { socketWrapper = null; event = null; //return to cache if (running && !paused && processorCache != null) { processorCache.push(this); } } }
Tomcat
是如何接收網路請求?
使用java nio的同步非阻塞去進行網路監聽。
org.apache.tomcat.util.net.AbstractEndpoint#bindWithCleanup
中初始化網路監聽、SSL
{ .... serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); // 當應用層面的連線數到達最大值時,作業系統可以繼續接收連線,那麼作業系統能繼續接收的最大連線數就是這個佇列長度,可以通過acceptCount 引數設定,預設是 100 serverSock.bind(addr, getAcceptCount()); } serverSock.configureBlocking(true); //mimic APR behavior
org.apache.tomcat.util.net.NioEndpoint#startInternal
中初始化業務處理的執行緒池、連線限制器、Poller執行緒、Acceptor執行緒
如何做到高效能的http
協定伺服器?
Tomcat把接收連線、檢測 I/O 事件以及處理請求進行了拆分,用不同規模的執行緒去做對應的事情,這也是tomcat能高並行處理請求的原因。不讓執行緒阻塞,儘量讓CPU忙起來
是怎麼設計的呢?
通過介面、抽象類等,將不同的處理邏輯拆分,各司其職
org.apache.tomcat.util.net.AbstractEndpoint
:I/O事件的檢測、處理邏輯都在這個類的實現類裡面。使用模板方法,不同的協定有不同的實現方法。NioEndpoint/Nio2Endpoint/AprEndpoint
org.apache.tomcat.util.net.NioEndpoint.Poller
:參照了java.nio.channels.Selector
,內部有個事件佇列,監聽I/O事件具體就是在這裡做的org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper
org.apache.tomcat.util.net.NioEndpoint.SocketProcessor
: 具體處理請求的執行緒類NioEndpoint元件:Tomcat如何實現非阻塞I/O?
到此這篇關於Apache Tomcat如何高並行處理請求 的文章就介紹到這了,更多相關Apache Tomcat高並行請求 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45