首頁 > 軟體

Netty分散式編碼器及寫資料事件處理使用場景

2022-03-29 13:01:18

概述

上一小章我們介紹了解碼器, 這一章我們介紹編碼器

其實編碼器和解碼器比較類似, 編碼器也是一個handler, 並且屬於outbounfHandle, 就是將準備發出去的資料進行攔截, 攔截之後進行相應的處理之後再次進傳送處理, 如果理解了解碼器, 那麼編碼器的相關內容理解起來也比較容易

編碼器

第一節: writeAndFlush的事件傳播

我們之前在學習pipeline的時候, 講解了write事件的傳播過程, 但在實際使用的時候, 我們通常不會呼叫channel的write方法, 因為該方法只會寫入到傳送資料的快取中, 並不會直接寫入channel中, 如果想寫入到channel中, 還需要呼叫flush方法

實際使用過程中, 我們用的更多的是writeAndFlush方法, 這方法既能將資料寫到傳送快取中, 也能重新整理到channel中

我們看一個最簡單的使用的場景

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.channel().writeAndFlush("test data");
}

學過netty的同學們對此肯定不陌生, 通過這種方式, 可以將資料傳送到channel中, 對方可以收到響應

我們跟到writeAndFlush方法中

首先會走到AbstractChannel的writeAndFlush:

public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

繼續跟到DefualtChannelPipeline中的writeAndFlush方法中:

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

這裡我們看到, writeAndFlush是從tail節點進行傳播, 有關事件傳播, 我們再pipeline中進行過剖析, 相信這個不會陌生

繼續跟, 會跟到AbstractChannelHandlerContext中的writeAndFlush方法:

public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}

繼續跟:

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }
    if (!validatePromise(promise, true)) {
        ReferenceCountUtil.release(msg);
        // cancelled
        return promise;
    } 
    write(msg, true, promise);
    return promise;
}

繼續跟write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    //findContextOutbound()尋找前一個outbound節點
    //最後到head節點結束
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒有調flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

這裡的邏輯我們也不陌生, 找到下一個節點, 因為writeAndFlush是從tail節點開始的, 並且是outBound的事件, 所以這裡會找到tail節點的上一個outBoundHandler, 有可能是編碼器, 也有可能是我們業務處理的handler

 if (executor.inEventLoop()) 判斷是否是eventLoop執行緒, 如果不是, 則封裝成task通過nioEventLoop非同步執行, 我們這裡先按照是eventLoop執行緒分析

首先, 這裡通過flush判斷是否呼叫了flush, 這裡顯然是true, 因為我們呼叫的方法是writeAndFlush

我們跟到invokeWriteAndFlush中

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        //寫入
        invokeWrite0(msg, promise);
        //重新整理
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

這裡就真相大白了, 其實在writeAndFlush中, 首先呼叫write, write完成之後再呼叫flush方法進行的重新整理

首先跟到invokeWrite0方法中:

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        //呼叫當前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

該方法我們在pipeline中已經進行過分析, 就是呼叫當前handler的write方法, 如果當前handler中write方法是繼續往下傳播, 在會繼續傳播寫事件, 直到傳播到head節點, 最後會走到HeadContext的write方法中

跟到HeadContext的write方法中:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

這裡通過當前channel的unsafe物件對將當前訊息寫到快取中, 寫入的過程, 我們之後的小節進行分析

回到到invokeWriteAndFlush方法中:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        //寫入
        invokeWrite0(msg, promise);
        //重新整理
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

我們再看invokeFlush0方法

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

同樣, 這裡會呼叫當前handler的flush方法, 如果當前handler的flush方法是繼續傳播flush事件, 則flush事件會繼續往下傳播, 直到最後會呼叫head節點的flush方法, 如果我們熟悉pipeline的話, 對這裡的邏輯不會陌生

跟到HeadContext的flush方法中:

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

這裡同樣, 會通過當前channel的unsafe物件通過呼叫flush方法將快取的資料重新整理到channel中, 有關重新整理的邏輯, 我們會在以後的小節進行剖析

以上就是writeAndFlush的相關邏輯, 整體上比較簡單, 熟悉pipeline的同學應該很容易理解

更多關於Netty分散式編碼器及寫資料事件的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com