2021-05-12 14:32:11
NIO 在Tomcat中的應用
對NIO的理解
個人單方面認為,NIO與BIO的最大區別在於主動和被動,使用BIO的方式需要等待被呼叫方返回資料,很明顯此時呼叫者是被動的。
舉個例子
阻塞IO
假設你是一個膽小又害羞的男孩子,你約了隔壁測試的妹子,但你並不敢主動約會,所以你把自己的手機號碼給她,並暗示她想要約會的時候打電話給你。很明顯此時你陷入了被動,約不約會的結果需要妹子主動告知你,如果她忘了,那麼你要陷入長時間的等待中以及無盡的猜測和自我懷疑中(太慘了)。[如果你是一個膽小害羞又好色的男孩子,那就慘了]
非阻塞IO 我們知道,渣男通常有很多的備胎,我管這個叫做備胎池(SpareTirePool), 那麼當他想要約會的時候,只要群發問妹子要不要約會,如果要約會的話就和妹子約會,約會結束之後,處理其他約會事件,如果沒有繼續下一次詢問。在這個例子中約會可以視為IO事件,問妹子的過程可以視為備胎池的輪詢。
Tomcat 如何使用NIO
既然是網路通訊的I/O那必然有以下兩個步驟
- SeverSocket的啟動
- I/O事件的處理
關鍵程式碼在 package org.apache.tomcat.util.net.NioEndpoint 中
P.S. 文章太長,如果不想看可以直接閱讀結論
ServerSocket的啟動
在最開始看程式碼,是震驚的,真的,如果你看Reactor模型的話
以下bind方法程式碼是啟動ServerSocket的流程,主要流程如下
- 係結地址
- 設定接收新連線的方式為阻塞方式(關鍵點)
- 設定Acceptor和Poller的數量以及初始化SelectorPool
@Override
public void bind() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
// 以阻塞的方式來接收連線!!
serverSock.configureBlocking(true); //mimic APR behavior
// 設定Acceptor和Poller的數量
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
// 顧名思義,Acceptor是用來處理新連線的
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
// Poller 用來處理I/O事件
pollerThreadCount = 1;
}
setStopLatch(new CountDownLatch(pollerThreadCount));
// Initialize SSL if needed
initialiseSsl();
// 從此處可以看出tomcat池化了selector
selectorPool.open();
}
Tomcat NIO 如何處理I/O事件
先說結論,Tomcat NIO模型中有以下關鍵角色
- Acceptor 用於接收新連線,每個Acceptor一個執行緒,以阻塞的方式接收新連線
- Poller 當Acceptor接收到新連線,進行處理之後選擇一個Poller處理該連線上的I/O事件。
- LimitLatch 一個用來限制連線數的鎖
Acceptor
Acceptor的主要工作就是不斷接收來自用戶端的連線,在簡單處理之後將該連線交給Poller處理
接收來自用戶端連線, 如果你不想看程式碼,以下是其主要流程
- 接收來自用戶端的連線,並將其交給Poller處理
@Override
public void run() {
int errorDelay = 0;
// running的檢測貫穿了Accpetor的處理流程,在每次關鍵操作的時候都會執行檢測
while (running) {
// 如果進入暫停狀態則每隔一段時間檢測一下
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
// 再次檢測
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//檢查是否達到最大連線數如果是則陷入等待,如果不是則增加當前連線數
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
//接收新連線
socket = serverSock.accept();
} catch (IOException ioe) {
// 發生異常,則減少連線數
countDownConnection();
if (running) {
handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
//setSocketOptions會導致將該連線交給Poller處理
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
再來看看setSocketOptions做了什麼,不想看程式碼的話,總結如下
- 將用戶端socket設定為非阻塞模式
- 將用戶端的socket封裝為NioChannel或SecureNioChannel(使用了物件池技術)
- 從Poller池中獲取一個Poller,將NioChannel註冊到Poller上
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//設定為非阻塞模式,以便通過selector進行查詢
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
//從物件池中獲取一個NioChannel,tomcat會複用一切可以複用的物件以減少建立新物件所帶來的消耗
NioChannel channel = nioChannels.pop();
if (channel == null) {
// 沒有獲取到,那就新建一個唄
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
// SSL這一塊還沒研究
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
//重新設定SocketBufferHandler,將其設定為可寫和可讀
channel.reset();
}
//從Poller池中獲取一個Poller(按照次序獲取,可以理解為一個圓環),並將Channel註冊到上面
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
Poller
從連線註冊到Poller說起
不加鎖的獲取一個Poller
具體說明見程式碼
關鍵點:對一個數A取餘會將餘數的結果限制在A的範圍內
/**
* Return an available poller in true round robin fashion.
* 很明顯,取餘的方式揭示了獲取Poller的方法。你可以理解為
* Poller會組成一個圓環,這樣我們就可以通過不斷遞增獲取
* 下一個Poller,但是資料會溢位所以我們要取絕對值
* @return The next poller in sequence
*/
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
channel的註冊
該方法會對新的建的連線進行封裝,並以PollerEvent的形式註冊到相應的Poller中
需要注意的是,真正的註冊讀事件並不是在此方法註冊的(當前方法呼叫者為Acceptor執行緒),而是在Poller執行緒中註冊讀事件的
/**
* Registers a newly created socket with the poller.
* 將新建的socket註冊到Poller上
* @param socket The newly created socket
*/
public void register(final NioChannel socket) {
//以下程式碼為設定各種引數,可以從方法名進行推測,不再進行敘述
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
//從快取中獲取一個PollerEvent
PollerEvent r = eventCache.pop();
// 註冊讀事件
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
// 如果沒有從快取中獲取,那麼就新建一個
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
Poller處理I/O 事件
Poller 處理I/O事件的的程式碼較長,而且細節也較多,總結其主要作用如下
- 檢測是否有Acceptor提交PollerEvent,如果有則呼叫PolllerEvent的run方法註冊讀事件
- 在執行關鍵操作的時候檢測該Poller是否被關閉如果是,則執行相應的資源釋放和關閉操作
- 呼叫selector.select() 輪詢事件,如果有讀事件則交給processKey處理
@Override
public void run() {
// Loop until destroy() is called
// 一直迴圈直到destroy方法被呼叫
while (true) {
boolean hasEvents = false;
try {
if (!close) {
// events 方法會處理Acceptor註冊到Poller中的PollerEvent
// 主要是註冊讀事件
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
// 檢測到關閉,則處理剩餘的事件並關閉selector
if (close) {
// 處理Acceptors註冊到Poller中的PollerEvent
events();
//selector time out 或者poller被關閉就會呼叫timeout方法
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
// 執行 select 操作,查詢I/O事件
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
// 處理檢測到的I/O事件
processKey(sk, attachment);
}
}//while
//timeout 會檢查是否關閉,如果已經關閉並且有事件未處理會呼叫cancelledKey方法
//cancelledKey:該方法主要是對和該連線相關的資源執行關閉操作
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
processKey 處理I/O事件
processKey主要工作如下
- 再次檢測Poller是否關閉,如果是則釋放資源
- 檢測查詢到事件是否合法,如果合法則取消已註冊到selector上的事件且被被本次輪詢所查詢到的事件
- 再呼叫processSocket處理讀事件,之後處理寫事件
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
// 如果Poller關閉則關閉和釋放和此連線相關的資源
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
// 取消註冊事件
// sk.interestOps()& (~readyOps)
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write 先讀後寫
if (sk.isReadable()) {
// 關鍵程式碼,呼叫processSocket方法處理讀事件
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
processSocket 真-處理I/O事件
processSocket定義在org.apache.tomcat.util.net.AbstractEndPoint中, 也就是意味著無論你採用的是BIO還是NIO或者NIO2最終讀寫資料都是呼叫此方法
從程式碼中可以看出,依然是物件池,依然是再次封裝(套娃),並將其提交到執行緒池中執行,接下來的內容就不再本次討論範圍內呢。
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
總結
Tomcat的NIO模型
手抖了,線不怎麼♂
LimitLatch 為所有的Acceptor共用,用來限制當前的最大連線數
Acceptor 以阻塞的形式來接收新連線,並將其封裝成PollerEvent物件提交到Poller中
Poller 接收來自Acceptor的PollerEvent並註冊讀事件,以及輪詢和其系結的用戶端Socket有無讀事件,如果有則執行進一步操作,將其提交到其他地方執行處理(解析Http協定)
思想遷移
學習原始碼就是為了學習其設計思想. -- 沃茲及.碩德
物件池化 池化物件、池化連線可以大大降低新建物件以及GC所帶來的消耗,當需要使用從池中取出來重新設定相關值即可
環形佇列 雖然這玩意不新鮮,但配合上原子類,就可以在高並行的情況,高效的獲取佇列中的下一個元素(環形佇列中索引溢位的處理在之前我是沒有考慮到的)
阻塞獲取連結,非阻塞處理IO事件 與Reactor模型形成強烈的對比,學習NIO的時候思維被限制住了,認為非阻塞的獲取連線會獲得更高的效能,但現在情況不一定了(還沒測試,哪位老哥試了告訴我一下)
關鍵操作時,對標誌位進行檢測 如果你要通過一個標誌變數來控制你的執行緒,且執行緒迴圈一次需要相對較長的時間(你程式碼太長,操作太多)那麼最好在執行關鍵操作之前對你的標誌變數進行檢查,來決定是否要改變執行緒的行為(康康poller和Acceptor的程式碼)
初次學習Tomcat的程式碼,有理解錯誤的地方還請大佬指出
相關文章