首頁 > 軟體

Netty分散式pipeline管道異常傳播事件原始碼解析

2022-03-28 13:00:08

講完了inbound事件和outbound事件的傳輸流程, 這一小節剖析異常事件的傳輸流程

傳播異常事件

簡單的例外處理的場景

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    throw new Exception("throw Exception");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println(cause.getMessage());
}

我們在handler的channelRead方法中主動丟擲異常, 模擬程式中出現異常的場景, 經測試會發現, 程式最終會走到exceptionCaught方法中, 獲取異常物件並列印其資訊

那麼丟擲異常之後, 是如何走到exceptionCaught方法的呢?

我們回顧之前小節channelRead事件的傳播流程, channelRead方法是在AbstractChannelHandlerContext類的invokeChannelRead方法中被呼叫

我們跟到invokeChannelRead這個方法

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            //呼叫了當前handler的channelRead方法, 其實就是head物件呼叫自身的channelRead方法
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //發生異常的時候在這裡捕獲異常
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

這裡不難看出, 當呼叫戶自定義的handler的channelRead方法發生異常之後, 會被捕獲, 並呼叫notifyHandlerException方法, 並傳入異常物件, 也就是我們範例中丟擲的異常

我們跟到fireChannelRead方法中:

private void notifyHandlerException(Throwable cause) {
    //程式碼省略
    invokeExceptionCaught(cause);
}

再繼續跟到invokeExceptionCaught方法中:

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            //當前handler呼叫exceptionCaught()方法
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            //程式碼省略
        }
    } else {
        fireExceptionCaught(cause);
    }
}

走到這裡一切都明白了, 這裡呼叫了當前handler的exceptionCaught方法, 也就是我們重寫的exceptionCaught方法

知道了為什麼會走到exceptionCaught方法之後, 我們再進行剖析異常事件的傳播流程

我還是通過兩種寫法來進行剖析

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println(cause.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    //寫法1
    ctx.fireChannelRead(cause);
    //寫法2
    ctx.pipeline().fireExceptionCaught(cause);
}

這兩種寫法我們並不陌生, 可能我們能直接猜到, 第一種寫法是從當前節點進行傳播, 第二種寫法則從頭結點或者尾節點進行轉播, 那麼和傳播inbound事件或outbound事件有什麼區別呢?我們先以第二種寫法為例, 剖析異常事件傳輸的整個流程

跟到DefualtChannelPipeline的fireExceptionCaught方法中:

public final ChannelPipeline fireExceptionCaught(Throwable cause) {
    AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
    return this;
}

我們看到invokeExceptionCaught傳入了head節點, 我們可以猜測, 異常事件的傳播是從head節點開始的

跟進invokeExceptionCaught方法

static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    ObjectUtil.checkNotNull(cause, "cause");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        //執行下一個節點的異常方法
        next.invokeExceptionCaught(cause);
    } else {
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeExceptionCaught(cause);
                }
            });
        } catch (Throwable t) {
            //忽略程式碼
        }
    }
}

因為這裡是傳入的是head節點, 所以這裡的next指向head節點

我們跟到invokeExceptionCaught方法中, 這裡其實是headContext的父類別AbstractChannelHandlerContext中的方法:

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            //當前handler呼叫exceptionCaught()方法
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            //程式碼省略
        }
    } else {
        fireExceptionCaught(cause);
    }
}

這裡又是我們熟悉的邏輯, 呼叫當前handler的exceptionCaught方法, 因為當前handler是head, 所以首先會呼叫headContext的exceptionCaught方法

跟進exceptionCaught方法:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.fireExceptionCaught(cause);
}

這裡僅僅是繼續傳播異常事件, 這時候我們發現, 這個寫法和我們剛才提到傳播異常事件的兩種寫法的第一種寫法一樣:

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    //寫法1
    ctx.fireChannelRead(cause);
    //寫法2
    ctx.pipeline().fireExceptionCaught(cause);
}

根據我們之前的學習, 我們知道第一種寫法是從當前節點傳播, 而第二種寫法是從頭傳播, 並且要求傳播事件一定要使用第一種寫法, 否則事件到這裡會重新從頭傳播進而引發不可預知錯誤, 這個結論在異常傳播同樣適用, 同學們一定要注意這點

我們繼續跟fireExceptionCaught方法, 這裡會走到AbstractChannelHandlerContex類的fireExceptionCaught方法:

public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    //傳播異常事件的時候, 直接拿了當前節點的下一個節點
    invokeExceptionCaught(next, cause);
    return this;
}

這個時候我們發現, 這裡並沒有去獲取下一個的inbound節點還是outbound節點, 而是直接通過next拿到下一個節點, 這就說明在異常事件傳播的過程中是不區分inbound事件還是outbound事件的, 都是直接從head節點按照連結串列結構往下傳播,

跟到invokeExceptionCaught方法中

static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    ObjectUtil.checkNotNull(cause, "cause");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) { 
        next.invokeExceptionCaught(cause);
    } else {
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeExceptionCaught(cause);
                }
            });
        } catch (Throwable t) {
            //程式碼省略
        }
    }
}

這裡又是我們熟悉的邏輯, 我們知道invokeExceptionCaught中執行了next的exceptionCaught, 這裡的next, 因為我們是從head節點開始剖析的, 所以這裡很有可能就是使用者自定義的handler, 如果使用者沒有重寫exceptionCaught方法, 則會交給使用者handler的父類別處理

我們以ChannelInboundHandlerAdapter為例看它的該方法實現:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
    ctx.fireExceptionCaught(cause);
}

我們看到這裡繼續向下傳播了異常事件

走到這裡我們會知道, 如果我們沒有重寫exceptionCaught方法, 異常事件會一直傳播到連結串列的底部, 就是tail節點

我們跟到TailConext的exceptionCaught方法:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    onUnhandledInboundException(cause);
}

我們看到最終這裡釋放了異常物件

以上就是有關異常事件的傳播,更多關於Netty分散式pipeline管道異常傳播的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com