<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
瞭解了inbound事件的傳播過程, 對於學習outbound事件傳輸的流程, 也不會太困難
在我們業務程式碼中, 有可能使用wirte方法往寫資料:
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().write("test data"); }
當然, 直接呼叫write方法是不能往對方channel中寫入資料的, 因為這種方式只能寫入到緩衝區, 還要呼叫flush方法才能將緩衝區資料刷到channel中, 或者直接呼叫writeAndFlush方法, 有關邏輯, 我們會在後面章節中詳細講解, 這裡只是以wirte方法為例為了演示outbound事件的傳播的流程
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
這兩種寫法有什麼區別, 我們首先跟到第一種寫法中去:
ctx.channel().write("test data");
這裡獲取ctx所繫結的channel
我們跟到AbstractChannel的write方法中:
public ChannelFuture write(Object msg) { return pipeline.write(msg); }
這裡pipeline是DefaultChannelPipeline
public final ChannelFuture write(Object msg) { //從tail節點開始(從最後的節點往前寫) return tail.write(msg); }
這裡呼叫tail節點write方法, 這裡我們應該能分析到, outbound事件, 是通過tail節點開始往上傳播的, 帶著這點猜想, 我們繼往下看
其實tail節點並沒有重寫write方法, 最終會呼叫其父類別AbstractChannelHandlerContext的write方法
AbstractChannelHandlerContext的write方法:
public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
我們看到這裡有個newPromise()這個方法, 這裡是建立一個Promise物件, 有關Promise的相關知識我們會在以後的章節剖析
我們繼續跟write:
public ChannelFuture write(final Object msg, final ChannelPromise promise) { //程式碼省略 write(msg, false, promise); return promise; }
繼續跟write:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這裡跟我們上一小節剖析過channelRead方法有點類似, 但是事件傳輸的方向有所不同, 這裡findContextOutbound()是獲取上一個標註outbound事件的HandlerContext
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
這裡的邏輯我們似曾相識, 跟我們上一小節的findContextInbound()方法有點像, 只是過程是反過來的
在這裡, 會找到當前context的上一個節點, 如果標註的事件不是outbound事件, 則繼續往上找, 意思就是找到上一個標註outbound事件的節點
AbstractChannelHandlerContext next = findContextOutbound();
這裡將找到節點賦值到next屬性中
因為我們之前分析的write事件是從tail節點傳播的, 所以上一個節點就有可能是使用者自定的handler所屬的context
然後判斷是否為當前eventLoop執行緒, 如果是不是, 則封裝成task非同步執行, 如果不是, 則繼續判斷是否呼叫了flush方法, 因為我們這裡沒有呼叫, 所以會執行到next.invokeWrite(m, promise),
我們繼續跟invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
這裡會判斷當前handler的狀態是否是新增狀態, 這裡返回的是true, 將會走到invokeWrite0(msg, promise)這一步
private void invokeWrite0(Object msg, ChannelPromise promise) { try { //呼叫當前handler的wirte()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
這裡的邏輯也似曾相識, 呼叫了當前節點包裝的handler的write方法, 如果使用者沒有重寫write方法, 則會交給其父類別處理
我們跟到ChannelOutboundHandlerAdapter的write方法中看:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
這裡呼叫了當前ctx的write方法, 這種寫法和我們小節開始的寫法是相同的, 我們回顧一下:
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
我們跟到其write方法中, 這裡走到的是AbstractChannelHandlerContext類的write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
又是我們所熟悉邏輯, 找到當前節點的上一個標註事件為outbound事件的節點, 繼續執行invokeWrite方法, 根據之前的剖析, 我們知道最終會執行到上一個handler的write方法中
走到這裡已經不難理解, ctx.channel().write("test data")其實是從tail節點開始傳播寫事件, 而ctx.write("test data")是從自身開始傳播寫事件
所以, 在handler中如果重寫了write方法要傳遞write事件, 一定採用ctx.write("test data")這種方式或者交給其父類別處理處理, 而不能採用ctx.channel().write("test data")這種方式, 因為會造成每次事件傳輸到這裡都會從tail節點重新傳輸, 導致不可預知的錯誤
如果用程式碼中沒有重寫handler的write方法, 則事件會一直往上傳輸, 當傳輸完所有的outbound節點之後, 最後會走到head節點的wirte方法中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
我們看到write事件最終會流向這裡, 通過unsafe物件進行最終的寫操作
有關inbound事件和outbound事件的傳輸, 可通過下圖進行說明:
以上就是Netty分散式pipeline管道傳播outBound事件原始碼解析的詳細內容,更多關於Netty分散式pipeline管道傳播outBound的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45