首頁 > 軟體

Netty分散式使用者端處理接入事件handle原始碼解析

2022-03-25 19:00:20

前文傳送門 :使用者端接入流程初始化原始碼分析

上一小節我們剖析完成了與channel繫結的ChannelConfig初始化相關的流程,

這一小節繼續剖析使用者端連線事件的處理

處理接入事件建立handle

回到上一章NioEventLoop的processSelectedKey ()方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //獲取到channel中的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //如果這個key不是合法的, 說明這個channel可能有問題
    if (!k.isValid()) {
        //程式碼省略
    }
    try {
        //如果是合法的, 拿到key的io事件
        int readyOps = k.readyOps();
        //連結事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        //寫事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        //讀事件和接受連結事件
        //如果當前NioEventLoop是work執行緒的話, 這裡就是op_read事件
        //如果是當前NioEventLoop是boss執行緒的話, 這裡就是op_accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

我們看其中的if判斷:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)

上一小節我們分析過, 如果當前NioEventLoop是work執行緒的話, 這裡就是op_read事件, 如果是當前NioEventLoop是boss執行緒的話, 這裡就是op_accept事件, 這裡我們以boss執行緒為例進行分析

之前我們講過, 無論處理op_read事件還是op_accept事件, 都走的unsafe的read()方法, 這裡unsafe是通過channel拿到, 我們知道如果是處理accept事件, 這裡的channel是NioServerSocketChannel, 這裡與之繫結的unsafe是NioMessageUnsafe

我們跟到NioMessageUnsafe的read()方法:

public void read() {
    //必須是NioEventLoop方法呼叫的, 不能通過外部執行緒呼叫
    assert eventLoop().inEventLoop();
    //伺服器端channel的config
    final ChannelConfig config = config();
    //伺服器端channel的pipeline
    final ChannelPipeline pipeline = pipeline();
    //處理伺服器端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    //設定設定
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                //建立jdk底層的channel
                //readBuf用於臨時承載讀到連結
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                //分配器將讀到的連結進行計數
                allocHandle.incMessagesRead(localRead);
                //連線數是否超過最大值
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        //遍歷每一條使用者端連線
        for (int i = 0; i < size; i ++) {
            readPending = false;
            //傳遞事件, 將建立NioSokectChannel進行傳遞
            //最終會呼叫ServerBootstrap的內部類ServerBootstrapAcceptor的channelRead()方法
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        //程式碼省略
    } finally {
        //程式碼省略
    }
}

首先獲取與NioServerSocketChannel繫結config和pipeline, config我們上一小節進行分析過, pipeline我們將在下一章進行剖析

我們看這一句:

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

這裡通過RecvByteBufAllocator介面呼叫了其內部介面Handler

我們看其RecvByteBufAllocator介面

public interface RecvByteBufAllocator {
    Handle newHandle();
    interface Handle {
        int guess();
        void reset(ChannelConfig config);
        void incMessagesRead(int numMessages);
        void lastBytesRead(int bytes);
        int lastBytesRead();
        void attemptedBytesRead(int bytes);
        int attemptedBytesRead();
        boolean continueReading();
        void readComplete();    
    }
}

我們看到RecvByteBufAllocator介面只有一個方法newHandle(), 顧名思義就是用於建立Handle物件的方法, 而Handle中的方法, 才是實際用於操作的方法

在RecvByteBufAllocator實現類中包含Handle的子類, 具體實現關係如下:

回到read()方法中再看這段程式碼:

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

unsafe()返回當前channel繫結的unsafe物件, recvBufAllocHandle()最終會呼叫AbstractChannel內部類AbstractUnsafe的recvBufAllocHandle()方法

跟進AbstractUnsafe的recvBufAllocHandle()方法:

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
    //如果不存在, 則建立一個recvHandle的範例
    if (recvHandle == null) {
        recvHandle = config().getRecvByteBufAllocator().newHandle();
    }
    return recvHandle;
}

如果如果是第一次執行到這裡, 自身屬性recvHandle為空, 會建立一個recvHandle範例, config()返回NioServerSocketChannel繫結的ChannelConfig, getRecvByteBufAllocator()獲取其RecvByteBufAllocator物件, 這兩部分上一小節剖析過了, 這裡通過newHandle()建立一個Handle, 這裡會走到AdaptiveRecvByteBufAllocator類中的newHandle()方法中

跟進newHandle()方法中

public Handle newHandle() {
    return new HandleImpl(minIndex, maxIndex, initial);
}

這裡建立HandleImpl傳入了三個引數, 這三個引數我們上一小節剖析過, minIndex為最小記憶體在SIZE_TABLE中的下標, maxIndex為最大記憶體在SEIZE_TABEL中的下標, initial是初始記憶體, 我們跟到HandleImpl的構造方法中:

public HandleImpl(int minIndex, int maxIndex, int initial) {
    this.minIndex = minIndex;
    this.maxIndex = maxIndex;
    index = getSizeTableIndex(initial);
    nextReceiveBufferSize = SIZE_TABLE[index];
}

初始化minIndex和maxIndex, 根據initial找到當前的下標, nextReceiveBufferSize是根據當前的下標找到對應的記憶體

這樣, 我們就建立了個Handle物件

在這裡我們需要知道, 這個handle, 是和channel唯一系結的屬性, 而AdaptiveRecvByteBufAllocator物件是和ChannelConfig物件唯一系結的, 間接也是和channel進行唯一系結

繼續回到read()方法

public void read() {
    //必須是NioEventLoop方法呼叫的, 不能通過外部執行緒呼叫
    assert eventLoop().inEventLoop();
    //伺服器端channel的config
    final ChannelConfig config = config();
    //伺服器端channel的pipeline
    final ChannelPipeline pipeline = pipeline();
    //處理伺服器端接入的速率
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    //設定設定
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                //建立jdk底層的channel
                //readBuf用於臨時承載讀到連結
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                //分配器將讀到的連結進行計數
                allocHandle.incMessagesRead(localRead);
                //連線數是否超過最大值
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        //遍歷每一條使用者端連線
        for (int i = 0; i < size; i ++) {
            readPending = false;
            //傳遞事件, 將建立NioSokectChannel進行傳遞
            //最終會呼叫ServerBootstrap的內部類ServerBootstrapAcceptor的channelRead()方法
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        //程式碼省略
    } finally {
        //程式碼省略
    }
}

繼續往下跟:

allocHandle.reset(config);

這個段程式碼是重新設定設定, 也就是將之前的設定資訊進行初始化, 最終會走到, DefaultMaxMessagesRecvByteBufAllocator中的內部類MaxMessageHandle的reet中

我們跟進reset中

public void reset(ChannelConfig config) {
    this.config = config;
    maxMessagePerRead = maxMessagesPerRead();
    totalMessages = totalBytesRead = 0;
}

這裡僅僅對幾個屬性做了賦值, 簡單介紹下這幾個屬性:

config:當前channelConfig物件

maxMessagePerRead:表示讀取訊息的時候可以讀取幾次(迴圈次數), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead屬性, 上一小節已經做過剖析

totalMessages:代表目前讀迴圈已經讀取的訊息個數, 在NIO傳輸模式下也就是已經執行的迴圈次數, 這裡初始化為0

totalBytesRead:代表目前已經讀取到的訊息位元組總數, 這裡同樣也初始化為0

我們繼續往下走, 這裡首先是一個do-while迴圈, 迴圈體裡通過int localRead = doReadMessages(readBuf)這種方式將讀取到的連線數放入到一個List集合中, 這一步我們下一小節再分析, 我們繼續往下走:

我們首先看allocHandle.incMessagesRead(localRead)這一步, 這裡的localRead表示這次迴圈往readBuf中放入的連線數, 在Nio模式下這, 如果讀取到一條連線會返回1

跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:

public final void incMessagesRead(int amt) {
    totalMessages += amt;
}

這裡將totalMessages增加amt, 也就是+1

這裡totalMessage, 剛才已經剖析過, 在NIO傳輸模式下也就是已經執行的迴圈次數, 這裡每次執行一次迴圈都會加一

再去看回圈終止條件allocHandle.continueReading()

跟到MaxMessageHandle的continueReading()方法中:

public boolean continueReading() {
    //config.isAutoRead()預設返回true
    // totalMessages < maxMessagePerRead
    //totalMessages代表當前讀到的連結, 預設是1
    //maxMessagePerRead每一次最大讀多少連結(預設16)
    return config.isAutoRead() &&
           attemptedBytesRead == lastBytesRead &&
           totalMessages < maxMessagePerRead &&
           totalBytesRead < Integer.MAX_VALUE;
}

我們逐個分析判斷條件:

config.isAutoRead(): 這裡預設為true

attemptedBytesRead == lastBytesRead: 表示本次讀取的位元組數和最後一次讀取的位元組數相等, 因為到這裡都沒有進行位元組陣列的讀取操作, 所以預設都為0, 這裡也返回true

totalMessages < maxMessagePerRead

表示當前讀取的次數是否小於最大讀取次數, 我們知道totalMessages每次迴圈都會自增, 而maxMessagePerRead預設值為16, 所以這裡會限制迴圈不能超過16次, 也就是最多一次只能讀取16條連線

totalBytesRead < Integer.MAX_VALUE

表示讀取的位元組數不能超過int型別的最大值

這裡就剖析完了Handle的建立和初始化過程, 並且剖析了迴圈終止條件等相關的邏輯

以上就是Netty分散式使用者端處理接入事件handle原始碼解析的詳細內容,更多關於Netty分散式使用者端接入事件handle的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com