首頁 > 軟體

Netty分散式pipeline傳播inbound事件原始碼分析

2022-03-28 13:01:15

前一小結回顧:pipeline管道Handler刪除

傳播inbound事件

有關於inbound事件, 在概述中做過簡單的介紹, 就是以自己為基準, 流向自己的事件, 比如最常見的channelRead事件, 就是對方發來資料流的所觸發的事件, 己方要對這些資料進行處理, 這一小節, 以啟用channelRead為例講解有關inbound事件的處理流程

在業務程式碼中, 我們自己的handler往往會通過重寫channelRead方法來處理對方發來的資料, 那麼對方發來的資料是如何走到channelRead方法中了呢, 也是我們這一小節要剖析的內容

在業務程式碼中, 傳遞channelRead事件方式是通過fireChannelRead方法進行傳播的

這裡給大家看兩種寫法

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //寫法1:
    ctx.fireChannelRead(msg);
    //寫法2
    ctx.pipeline().fireChannelRead(msg);
}

這裡重寫了channelRead方法, 並且方法體內繼續通過fireChannelRead方法進行傳播channelRead事件, 那麼這兩種寫法有什麼異同?

我們先以寫法2為例, 將這種寫法進行剖析

這裡首先獲取當前context的pipeline物件, 然後通過pipeline物件呼叫自身的fireChannelRead方法進行傳播, 因為預設建立的DefaultChannelpipeline

我們跟到DefaultChannelpipeline的fireChannelRead方法中:

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

這裡首先呼叫的是AbstractChannelHandlerContext類的靜態方法invokeChannelRead, 引數傳入head節點和事件的訊息

我們跟進invokeChannelRead方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

這裡的Object m m通常就是我們傳入的msg, 而next, 目前是head節點, 然後再判斷是否為當前eventLoop執行緒, 如果不是則將方法包裝成task交給eventLoop執行緒處理

我們跟到invokeChannelRead方法中

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

首先通過invokeHandler()判斷當前handler是否已新增, 如果新增, 則執行當前handler的chanelRead方法, 其實這裡我們基本上就明白了, 通過fireChannelRead方法傳遞事件的過程中, 其實就是找到相關handler執行其channelRead方法, 由於我們在這裡的handler就是head節點, 所以我們跟到HeadContext的channelRead方法中:

HeadContext的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //向下傳遞channelRead事件
    ctx.fireChannelRead(msg);
}

在這裡我們看到, 這裡通過fireChannelRead方法繼續往下傳遞channelRead事件, 而這種呼叫方式, 就是我們剛才分析使用者程式碼的第一種呼叫方式:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //寫法1:
    ctx.fireChannelRead(msg);
    //寫法2
    ctx.pipeline().fireChannelRead(msg);
}

這裡直接通過context物件呼叫fireChannelRead方法, 那麼和使用pipeline呼叫有什麼區別的, 我會回到HeadConetx的channelRead方法, 我們來剖析ctx.fireChannelRead(msg)這句, 大家就會對這個問題有答案了, 跟到ctx的fireChannelRead方法中, 這裡會走到AbstractChannelHandlerContext類中的fireChannelRead方法中

跟到AbstractChannelHandlerContext類中的fireChannelRead方法:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

這裡我們看到, invokeChannelRead方法中傳入了一個findContextInbound()引數, 而這findContextInbound方法其實就是找到當前Context的下一個節點

跟到findContextInbound方法

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

這裡的邏輯也比較簡單, 是通過一個doWhile迴圈, 找到當前handlerContext的下一個節點, 這裡要注意迴圈的終止條件, while (!ctx.inbound)表示下一個context標誌的事件不是inbound的事件, 則迴圈繼續往下找, 言外之意就是要找到下一個標註inbound事件的節點

有關事件的標註, 之前的小節已經剖析過了, 如果是使用者定義的handler, 是通過handler繼承的介面而定的, 如果tail或者head, 那麼是在初始化的時候就已經定義好, 這裡不再贅述

回到AbstractChannelHandlerContext類的fireChannelRead方法中:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

找到下一個節點後, 繼續呼叫invokeChannelRead方法, 傳入下一個和訊息物件:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    //第一次執行next其實就是head
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

這裡的邏輯我們又不陌生了, 因為我們傳入的是當前context的下一個節點, 所以這裡會呼叫下一個節點invokeChannelRead方法, 因我們剛才剖析的是head節點, 所以下一個節點有可能是使用者新增的handler的包裝類HandlerConext的物件

這裡我們跟進invokeChannelRead方法中去:

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try { 
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //發生異常的時候在這裡捕獲異常
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

又是我們熟悉的邏輯, 呼叫了自身handler的channelRead方法, 如果是使用者自定義的handler, 則會走到使用者定義的channelRead()方法中去, 所以這裡就解釋了為什麼通過傳遞channelRead事件, 最終會走到使用者重寫的channelRead方法中去

同樣, 也解釋了該小節最初提到過的兩種寫法的區別:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //寫法1:
    ctx.fireChannelRead(msg);
    //寫法2
    ctx.pipeline().fireChannelRead(msg);
}

寫法1是通過當前節點往下傳播事件

寫法2是通過頭節點往下傳遞事件

所以, 在handler中如果如果要在channelRead方法中傳遞channelRead事件, 一定要採用寫法2的方式向下傳遞, 或者交給其父類別處理, 如果採用1的寫法則每次事件傳輸到這裡都會繼續從head節點傳輸, 從而陷入死迴圈或者發生異常

這裡有一點需要注意, 如果使用者程式碼中channelRead方法, 如果沒有顯示的呼叫ctx.fireChannelRead(msg)那麼事件則不會再往下傳播, 則事件會在這裡終止, 所以如果我們寫業務程式碼的時候要考慮有關資源釋放的相關操作

如果ctx.fireChannelRead(msg)則事件會繼續往下傳播, 如果每一個handler都向下傳播事件, 當然, 根據我們之前的分析channelRead事件只會在標識為inbound事件的HandlerConetext中傳播, 傳播到最後, 則最終會呼叫到tail節點的channelRead方法

我們跟到tailConext的channelRead方法中

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

我們跟進到onUnhandledInboundMessage方法中:

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        //釋放資源
        ReferenceCountUtil.release(msg);
    }
}

這裡做了釋放資源的相關的操作

至此, channelRead事件傳輸相關羅輯剖析完整, 其實對於inbound事件的傳輸流程都會遵循這一邏輯, 小夥伴們可以自行剖析其他inbound事件的傳輸流程,更多關於Netty分散式pipeline傳播inbound事件的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com