<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前一小結回顧:pipeline管道Handler刪除
有關於inbound事件, 在概述中做過簡單的介紹, 就是以自己為基準, 流向自己的事件, 比如最常見的channelRead事件, 就是對方發來資料流的所觸發的事件, 己方要對這些資料進行處理, 這一小節, 以啟用channelRead為例講解有關inbound事件的處理流程
在業務程式碼中, 我們自己的handler往往會通過重寫channelRead方法來處理對方發來的資料, 那麼對方發來的資料是如何走到channelRead方法中了呢, 也是我們這一小節要剖析的內容
在業務程式碼中, 傳遞channelRead事件方式是通過fireChannelRead方法進行傳播的
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這裡重寫了channelRead方法, 並且方法體內繼續通過fireChannelRead方法進行傳播channelRead事件, 那麼這兩種寫法有什麼異同?
這裡首先獲取當前context的pipeline物件, 然後通過pipeline物件呼叫自身的fireChannelRead方法進行傳播, 因為預設建立的DefaultChannelpipeline
我們跟到DefaultChannelpipeline的fireChannelRead方法中:
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
這裡首先呼叫的是AbstractChannelHandlerContext類的靜態方法invokeChannelRead, 引數傳入head節點和事件的訊息
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這裡的Object m m通常就是我們傳入的msg, 而next, 目前是head節點, 然後再判斷是否為當前eventLoop執行緒, 如果不是則將方法包裝成task交給eventLoop執行緒處理
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
首先通過invokeHandler()判斷當前handler是否已新增, 如果新增, 則執行當前handler的chanelRead方法, 其實這裡我們基本上就明白了, 通過fireChannelRead方法傳遞事件的過程中, 其實就是找到相關handler執行其channelRead方法, 由於我們在這裡的handler就是head節點, 所以我們跟到HeadContext的channelRead方法中:
HeadContext的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //向下傳遞channelRead事件 ctx.fireChannelRead(msg); }
在這裡我們看到, 這裡通過fireChannelRead方法繼續往下傳遞channelRead事件, 而這種呼叫方式, 就是我們剛才分析使用者程式碼的第一種呼叫方式:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這裡直接通過context物件呼叫fireChannelRead方法, 那麼和使用pipeline呼叫有什麼區別的, 我會回到HeadConetx的channelRead方法, 我們來剖析ctx.fireChannelRead(msg)這句, 大家就會對這個問題有答案了, 跟到ctx的fireChannelRead方法中, 這裡會走到AbstractChannelHandlerContext類中的fireChannelRead方法中
跟到AbstractChannelHandlerContext類中的fireChannelRead方法:
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
這裡我們看到, invokeChannelRead方法中傳入了一個findContextInbound()引數, 而這findContextInbound方法其實就是找到當前Context的下一個節點
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
這裡的邏輯也比較簡單, 是通過一個doWhile迴圈, 找到當前handlerContext的下一個節點, 這裡要注意迴圈的終止條件, while (!ctx.inbound)表示下一個context標誌的事件不是inbound的事件, 則迴圈繼續往下找, 言外之意就是要找到下一個標註inbound事件的節點
有關事件的標註, 之前的小節已經剖析過了, 如果是使用者定義的handler, 是通過handler繼承的介面而定的, 如果tail或者head, 那麼是在初始化的時候就已經定義好, 這裡不再贅述
回到AbstractChannelHandlerContext類的fireChannelRead方法中:
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
找到下一個節點後, 繼續呼叫invokeChannelRead方法, 傳入下一個和訊息物件:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); //第一次執行next其實就是head EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這裡的邏輯我們又不陌生了, 因為我們傳入的是當前context的下一個節點, 所以這裡會呼叫下一個節點invokeChannelRead方法, 因我們剛才剖析的是head節點, 所以下一個節點有可能是使用者新增的handler的包裝類HandlerConext的物件
這裡我們跟進invokeChannelRead方法中去:
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { //發生異常的時候在這裡捕獲異常 notifyHandlerException(t); } } else { fireChannelRead(msg); } }
又是我們熟悉的邏輯, 呼叫了自身handler的channelRead方法, 如果是使用者自定義的handler, 則會走到使用者定義的channelRead()方法中去, 所以這裡就解釋了為什麼通過傳遞channelRead事件, 最終會走到使用者重寫的channelRead方法中去
同樣, 也解釋了該小節最初提到過的兩種寫法的區別:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
寫法1是通過當前節點往下傳播事件
寫法2是通過頭節點往下傳遞事件
所以, 在handler中如果如果要在channelRead方法中傳遞channelRead事件, 一定要採用寫法2的方式向下傳遞, 或者交給其父類別處理, 如果採用1的寫法則每次事件傳輸到這裡都會繼續從head節點傳輸, 從而陷入死迴圈或者發生異常
這裡有一點需要注意, 如果使用者程式碼中channelRead方法, 如果沒有顯示的呼叫ctx.fireChannelRead(msg)那麼事件則不會再往下傳播, 則事件會在這裡終止, 所以如果我們寫業務程式碼的時候要考慮有關資源釋放的相關操作
如果ctx.fireChannelRead(msg)則事件會繼續往下傳播, 如果每一個handler都向下傳播事件, 當然, 根據我們之前的分析channelRead事件只會在標識為inbound事件的HandlerConetext中傳播, 傳播到最後, 則最終會呼叫到tail節點的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
我們跟進到onUnhandledInboundMessage方法中:
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { //釋放資源 ReferenceCountUtil.release(msg); } }
這裡做了釋放資源的相關的操作
至此, channelRead事件傳輸相關羅輯剖析完整, 其實對於inbound事件的傳輸流程都會遵循這一邏輯, 小夥伴們可以自行剖析其他inbound事件的傳輸流程,更多關於Netty分散式pipeline傳播inbound事件的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45