首頁 > 軟體

Netty分散式抽象編碼器MessageToByteEncoder邏輯分析

2022-03-29 13:00:03

前文回顧:Netty分散式編碼器及寫資料事件處理

MessageToByteEncoder

同解碼器一樣, 編碼器中也有一個抽象類叫MessageToByteEncoder, 其中定義了編碼器的骨架方法, 具體編碼邏輯交給子類實現

解碼器同樣也是個handler, 將寫出的資料進行擷取處理, 我們在學習pipeline中我們知道, 寫資料的時候會傳遞write事件, 傳遞過程中會呼叫handler的write方法, 所以編碼器碼器可以重寫write方法, 將資料編碼成二進位制位元組流然後再繼續傳遞write事件

首先看MessageToByteEncoder的類宣告

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter{
    //省略類體
}

這裡繼承ChannelOutboundHandlerAdapter, 說明是個outBoundhandler, 我們知道write事件是個outBound事件, 而outBound事件只能通過outBoundHandler進行傳輸

write事件傳播過程中要呼叫handler的write方法

我們跟到MessageToByteEncoder的write方法中:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }

            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}

首先通過 if (acceptOutboundMessage(msg)) 判斷當前物件是否可處理

如果可處理, 則進入if塊中的邏輯, 如果不能處理, 則進入else塊, 通過ctx.write(msg, promise)繼續傳遞write事件

我們看if塊中

 I cast = (I) msg 這裡是強制型別轉換, 轉換成I型別, I型別是個泛型, 具體型別由使用者定義

 buf = allocateBuffer(ctx, cast, preferDirect) 這裡進行緩衝區分配

跟到allocateBuffer方法中

protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, 
                           boolean preferDirect) throws Exception {
    if (preferDirect) {
        return ctx.alloc().ioBuffer();
    } else {
        return ctx.alloc().heapBuffer();
    }
}

這裡會直接通過ctx的記憶體分配器進行記憶體分配, 通過判斷preferDirect來分配堆記憶體或者堆外記憶體, 預設情況下是分配堆外記憶體

有關記憶體分配, 我們之前已經做過相關的剖析

回到write方法中:

記憶體分配結束之後會呼叫encode(ctx, cast, buf)方法進行編碼, 該類由子類實現

子類可以通過繼承該類, 重寫encode方法, 將引數物件cast編碼成位元組寫入到傳入的ByteBuf中, 就完成了編碼工作

編碼完成後後, 會通過ReferenceCountUtil.release(cast)將cast物件釋放

 if (buf.isReadable()) 這裡判斷buf是否有可讀位元組, 如果有可讀位元組, 則繼續傳遞write事件

如果沒有可讀位元組, 則將buf進行釋放, 繼續傳播write事件, 傳遞一個空的ByteBuf

最後將buf設定為空

以上就是有關抽象編碼器的抽象邏輯, 具體的編碼邏輯還需要其子類去做,更多關於Netty分散式抽象編碼器MessageToByteEncoder的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com