<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前文傳送門 :使用者端接入流程初始化原始碼分析
上一小節我們剖析完成了與channel繫結的ChannelConfig初始化相關的流程,
這一小節繼續剖析使用者端連線事件的處理
回到上一章NioEventLoop的processSelectedKey ()方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //獲取到channel中的unsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //如果這個key不是合法的, 說明這個channel可能有問題 if (!k.isValid()) { //程式碼省略 } try { //如果是合法的, 拿到key的io事件 int readyOps = k.readyOps(); //連結事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //寫事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //讀事件和接受連結事件 //如果當前NioEventLoop是work執行緒的話, 這裡就是op_read事件 //如果是當前NioEventLoop是boss執行緒的話, 這裡就是op_accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
我們看其中的if判斷:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
上一小節我們分析過, 如果當前NioEventLoop是work執行緒的話, 這裡就是op_read事件, 如果是當前NioEventLoop是boss執行緒的話, 這裡就是op_accept事件, 這裡我們以boss執行緒為例進行分析
之前我們講過, 無論處理op_read事件還是op_accept事件, 都走的unsafe的read()方法, 這裡unsafe是通過channel拿到, 我們知道如果是處理accept事件, 這裡的channel是NioServerSocketChannel, 這裡與之繫結的unsafe是NioMessageUnsafe
我們跟到NioMessageUnsafe的read()方法:
public void read() { //必須是NioEventLoop方法呼叫的, 不能通過外部執行緒呼叫 assert eventLoop().inEventLoop(); //伺服器端channel的config final ChannelConfig config = config(); //伺服器端channel的pipeline final ChannelPipeline pipeline = pipeline(); //處理伺服器端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設定設定 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //建立jdk底層的channel //readBuf用於臨時承載讀到連結 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的連結進行計數 allocHandle.incMessagesRead(localRead); //連線數是否超過最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條使用者端連線 for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將建立NioSokectChannel進行傳遞 //最終會呼叫ServerBootstrap的內部類ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //程式碼省略 } finally { //程式碼省略 } }
首先獲取與NioServerSocketChannel繫結config和pipeline, config我們上一小節進行分析過, pipeline我們將在下一章進行剖析
我們看這一句:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
這裡通過RecvByteBufAllocator介面呼叫了其內部介面Handler
public interface RecvByteBufAllocator { Handle newHandle(); interface Handle { int guess(); void reset(ChannelConfig config); void incMessagesRead(int numMessages); void lastBytesRead(int bytes); int lastBytesRead(); void attemptedBytesRead(int bytes); int attemptedBytesRead(); boolean continueReading(); void readComplete(); } }
我們看到RecvByteBufAllocator介面只有一個方法newHandle(), 顧名思義就是用於建立Handle物件的方法, 而Handle中的方法, 才是實際用於操作的方法
在RecvByteBufAllocator實現類中包含Handle的子類, 具體實現關係如下:
回到read()方法中再看這段程式碼:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
unsafe()返回當前channel繫結的unsafe物件, recvBufAllocHandle()最終會呼叫AbstractChannel內部類AbstractUnsafe的recvBufAllocHandle()方法
跟進AbstractUnsafe的recvBufAllocHandle()方法:
public RecvByteBufAllocator.Handle recvBufAllocHandle() { //如果不存在, 則建立一個recvHandle的範例 if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
如果如果是第一次執行到這裡, 自身屬性recvHandle為空, 會建立一個recvHandle範例, config()返回NioServerSocketChannel繫結的ChannelConfig, getRecvByteBufAllocator()獲取其RecvByteBufAllocator物件, 這兩部分上一小節剖析過了, 這裡通過newHandle()建立一個Handle, 這裡會走到AdaptiveRecvByteBufAllocator類中的newHandle()方法中
public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); }
這裡建立HandleImpl傳入了三個引數, 這三個引數我們上一小節剖析過, minIndex為最小記憶體在SIZE_TABLE中的下標, maxIndex為最大記憶體在SEIZE_TABEL中的下標, initial是初始記憶體, 我們跟到HandleImpl的構造方法中:
public HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }
初始化minIndex和maxIndex, 根據initial找到當前的下標, nextReceiveBufferSize是根據當前的下標找到對應的記憶體
這樣, 我們就建立了個Handle物件
在這裡我們需要知道, 這個handle, 是和channel唯一系結的屬性, 而AdaptiveRecvByteBufAllocator物件是和ChannelConfig物件唯一系結的, 間接也是和channel進行唯一系結
public void read() { //必須是NioEventLoop方法呼叫的, 不能通過外部執行緒呼叫 assert eventLoop().inEventLoop(); //伺服器端channel的config final ChannelConfig config = config(); //伺服器端channel的pipeline final ChannelPipeline pipeline = pipeline(); //處理伺服器端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設定設定 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //建立jdk底層的channel //readBuf用於臨時承載讀到連結 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的連結進行計數 allocHandle.incMessagesRead(localRead); //連線數是否超過最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條使用者端連線 for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將建立NioSokectChannel進行傳遞 //最終會呼叫ServerBootstrap的內部類ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //程式碼省略 } finally { //程式碼省略 } }
繼續往下跟:
allocHandle.reset(config);
這個段程式碼是重新設定設定, 也就是將之前的設定資訊進行初始化, 最終會走到, DefaultMaxMessagesRecvByteBufAllocator中的內部類MaxMessageHandle的reet中
public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; }
這裡僅僅對幾個屬性做了賦值, 簡單介紹下這幾個屬性:
config
:當前channelConfig物件
maxMessagePerRead
:表示讀取訊息的時候可以讀取幾次(迴圈次數), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead屬性, 上一小節已經做過剖析
totalMessages
:代表目前讀迴圈已經讀取的訊息個數, 在NIO傳輸模式下也就是已經執行的迴圈次數, 這裡初始化為0
totalBytesRead
:代表目前已經讀取到的訊息位元組總數, 這裡同樣也初始化為0
我們繼續往下走, 這裡首先是一個do-while迴圈, 迴圈體裡通過int localRead = doReadMessages(readBuf)這種方式將讀取到的連線數放入到一個List集合中, 這一步我們下一小節再分析, 我們繼續往下走:
我們首先看allocHandle.incMessagesRead(localRead)這一步, 這裡的localRead表示這次迴圈往readBuf中放入的連線數, 在Nio模式下這, 如果讀取到一條連線會返回1
跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:
public final void incMessagesRead(int amt) { totalMessages += amt; }
這裡將totalMessages增加amt, 也就是+1
這裡totalMessage, 剛才已經剖析過, 在NIO傳輸模式下也就是已經執行的迴圈次數, 這裡每次執行一次迴圈都會加一
再去看回圈終止條件allocHandle.continueReading()
跟到MaxMessageHandle的continueReading()方法中:
public boolean continueReading() { //config.isAutoRead()預設返回true // totalMessages < maxMessagePerRead //totalMessages代表當前讀到的連結, 預設是1 //maxMessagePerRead每一次最大讀多少連結(預設16) return config.isAutoRead() && attemptedBytesRead == lastBytesRead && totalMessages < maxMessagePerRead && totalBytesRead < Integer.MAX_VALUE; }
我們逐個分析判斷條件:
config.isAutoRead(): 這裡預設為true
attemptedBytesRead == lastBytesRead: 表示本次讀取的位元組數和最後一次讀取的位元組數相等, 因為到這裡都沒有進行位元組陣列的讀取操作, 所以預設都為0, 這裡也返回true
totalMessages < maxMessagePerRead
表示當前讀取的次數是否小於最大讀取次數, 我們知道totalMessages每次迴圈都會自增, 而maxMessagePerRead預設值為16, 所以這裡會限制迴圈不能超過16次, 也就是最多一次只能讀取16條連線
totalBytesRead < Integer.MAX_VALUE
表示讀取的位元組數不能超過int型別的最大值
這裡就剖析完了Handle的建立和初始化過程, 並且剖析了迴圈終止條件等相關的邏輯
以上就是Netty分散式使用者端處理接入事件handle原始碼解析的詳細內容,更多關於Netty分散式使用者端接入事件handle的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45