首頁 > 軟體

Netty分散式使用者端接入流程初始化原始碼分析

2022-03-25 19:00:42

前文概述:

前文傳送門:NioEventLoop任務佇列執行

之前的章節學習了server啟動以及eventLoop相關的邏輯, eventLoop輪詢到使用者端接入事件之後是如何處理的?這一章我們循序漸進, 帶大家繼續剖析使用者端接入之後的相關邏輯

第一節:初始化NioSockectChannelConfig

建立channel

在剖析接入流程之前我們首先補充下第一章有關建立channel的知識:

我們在第一章剖析過channel的建立, 其中NioServerSocketChannel中有個構造方法:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

當時我們並沒有剖析config相關知識, 在這一章首先對此做一個補充, 這裡我們看到每一個NioServerSocketChannel都擁有一個config屬性, 這個屬性存放著NioServerSocketChannel的相關設定, 這裡建立一個NioServerSocketChannelConfig物件, 並將當前channel, 和channel對應的java底層的socket物件進行了傳入, NioServerSocketChannelConfig其實是NioServerSocketChannel的內部類

我們跟到NioServerSocketChannelConfig類的構造方法中:

private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
    super(channel, javaSocket);
}

我們繼續跟入其父類別DefaultServerSocketChannelConfig的構造方法中:

public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
    super(channel);
    if (javaSocket == null) {
        throw new NullPointerException("javaSocket");
    }
    this.javaSocket = javaSocket;
}

這裡繼續呼叫了其父類別的構造方法, 並儲存了jdk底層的socket物件, 並且呼叫其父類別DefaultChannelConfig的構造方法

跟到其父類別DefaultChannelConfig的構造方法中

public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}

這裡呼叫了自身的構造方法, 傳入了channel和一個AdaptiveRecvByteBufAllocator物件

AdaptiveRecvByteBufAllocator是一個緩衝區分配器, 用於分配一個緩衝區Bytebuf的, 有關Bytebuf的相關內容會在後面的章節詳細講解, 這裡可以簡單介紹作為了解, 就當對於之後知識的預習

Bytebuf相當於jdk的ByetBuffer, Netty對其做了重新的封裝, 用於讀寫channel中的位元組流, 熟悉Nio的同學對此應該並不陌生, AdaptiveRecvByteBufAllocator就是用於分配netty中ByetBuff的緩衝區分配器, 根據名字, 我們不難看出這個緩衝區是一個可變大小的位元組緩衝區

我們跟到AdaptiveRecvByteBufAllocator的構造方法中:

public AdaptiveRecvByteBufAllocator() {
    //DEFAULT_MINIMUM:最小緩衝區長度64位元組
    //DEFAULT_INITIAL:初始容量1024位元組
    //最大容量65536位元組
    this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}

這裡呼叫自身的構造方法並且傳入了三個屬性, 這三個屬性的含義分別為:

DEFAULT_MINIMUM:代表要分配的緩衝區長度最少為64個位元組

DEFAULT_INITIAL:代表要分配的緩衝區的初始容量為1024個位元組

DEFAULT_MAXIMUM:代表要分配的緩衝區最大容量為65536個位元組

我們跟到this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM)方法中

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
    //忽略驗證程式碼
    //最小容量在table中的下標
    int minIndex = getSizeTableIndex(minimum);
    if (SIZE_TABLE[minIndex] < minimum) {
        this.minIndex = minIndex + 1;
    } else {
        this.minIndex = minIndex;
    }
    //最大容量在table中的下標
    int maxIndex = getSizeTableIndex(maximum);
    if (SIZE_TABLE[maxIndex] > maximum) {
        this.maxIndex = maxIndex - 1;
    } else {
        this.maxIndex = maxIndex;
    }
    this.initial = initial;
}

其中這裡初始化了三個屬性, 分別是:

minIndex:最小容量在size_table中的下標

maxIndex:最大容量在table中的下標

initial:初始容量1024個位元組

這裡的size_table就是一個陣列, 裡面盛放著byteBuf可分配的記憶體大小的集合, 分配的bytebuf無論是擴容還是收縮, 記憶體大小都屬於size_table中的元素, 那麼這個陣列是如何初始化的, 我們跟到這個屬性中:

private static final int[] SIZE_TABLE;

我們看到是一個final修飾的靜態成員變數, 我們跟到static塊中看它的初始化過程:

static {
    //List集合
    List<Integer> sizeTable = new ArrayList<Integer>();
    //從16開始, 每遞增16新增到List中, 直到大於等於512
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }
    //從512開始, 倍增新增到List中, 直到記憶體溢位
    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }
    //初始化陣列
    SIZE_TABLE = new int[sizeTable.size()];
    //將list的內容放入陣列中
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

首先建立一個Integer型別的list用於盛放記憶體元素

這裡通過兩組迴圈為list新增元素

首先看第一組迴圈:

for (int i = 16; i < 512; i += 16) {
    sizeTable.add(i);
}

這裡是通過16平移的方式, 直到512個位元組, 將每次平移之後的記憶體大小新增到list中

再看第二組迴圈

for (int i = 512; i > 0; i <<= 1) {
    sizeTable.add(i);
}

超過512之後, 再通過倍增的方式迴圈, 直到int型別記憶體溢位, 將每次倍增之後大小新增到list中

最後初始化SIZE_TABLE陣列, 將list中的元素按下表存放到陣列中

這樣就初始化了記憶體陣列

再回到AdaptiveRecvByteBufAllocator的構造方法中

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
    //忽略驗證程式碼
    //最小容量在table中的下標
    int minIndex = getSizeTableIndex(minimum);
    if (SIZE_TABLE[minIndex] < minimum) {
        this.minIndex = minIndex + 1;
    } else {
        this.minIndex = minIndex;
    }
    //最大容量在table中的下標
    int maxIndex = getSizeTableIndex(maximum);
    if (SIZE_TABLE[maxIndex] > maximum) {
        this.maxIndex = maxIndex - 1;
    } else {
        this.maxIndex = maxIndex;
    }
    this.initial = initial;
}

這裡分別根據傳入的最小和最大容量去SIZE_TABLE中獲取其下標

我們跟到getSizeTableIndex(minimum)中:

private static int getSizeTableIndex(final int size) {
    for (int low = 0, high = SIZE_TABLE.length - 1;;) {
        if (high < low) {
            return low;
        }
        if (high == low) {
            return high;
        }
        int mid = low + high >>> 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else {
            return mid + 1;
        }
    }
}

這裡是通過二分查詢去獲取其下表

if (SIZE_TABLE[minIndex] < minimum)這裡判斷最小容量下標所屬的記憶體大小是否小於最小值, 如果小於最小值則下標+1

最大容量的下標獲取原理同上, 判斷最大容量下標所屬記憶體大小是否大於最大值, 如果是則下標-1

我們回到DefaultChannelConfig的構造方法:

public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}

剛才我們剖析過了AdaptiveRecvByteBufAllocator()的建立過程, 我們繼續跟到this()中:

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
    setRecvByteBufAllocator(allocator, channel.metadata());
    this.channel = channel;
}

我們看到這裡初始化了channel, 在channel初始化之前, 呼叫了setRecvByteBufAllocator(allocator, channel.metadata())方法, 顧名思義, 這是用於設定緩衝區分配器的方法, 第一個引數是我們剛剛分析過的新建的AdaptiveRecvByteBufAllocator物件, 第二個傳入的是與channel繫結的ChannelMetadata物件, ChannelMetadata物件是什麼?

我們跟進到metadata()方法當中, 由於是channel是NioServerSocketChannel, 所以呼叫到了NioServerSocketChannel的metadata()方法:

public ChannelMetadata metadata() {
    return METADATA;
}

這裡返回了一個成員變數METADATA, 跟到這個成員變數中:

private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);

這裡建立了一個ChannelMetadata物件, 並在構造方法中傳入false和16

繼續跟到ChannelMetadata的構造方法中

public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) {
    //省略驗證程式碼
    //false
    this.hasDisconnect = hasDisconnect;
    //16
    this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
}

這裡做的事情非常簡單, 只初始化了兩個屬性:

hasDisconnect=false

defaultMaxMessagesPerRead=16

defaultMaxMessagesPerRead=16代表在讀取對方的連結或者channel的位元組流時(無論server還是client), 最多隻迴圈16次, 後面的講解將會看到

剖析完了ChannelMetadata物件的建立, 我們回到DefaultChannelConfig的構造方法:

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
    setRecvByteBufAllocator(allocator, channel.metadata());
    this.channel = channel;
}

跟到setRecvByteBufAllocator(allocator, channel.metadata())方法中:

private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
    if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
        ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
    } else if (allocator == null) {
        throw new NullPointerException("allocator");
    }
    rcvBufAllocator = allocator;
}

首先會判斷傳入的緩衝區分配器是不是MaxMessagesRecvByteBufAllocator型別的, 因為AdaptiveRecvByteBufAllocator實現了MaxMessagesRecvByteBufAllocator介面, 所以此條件成立

之後將其轉換成MaxMessagesRecvByteBufAllocator型別,

然後呼叫其maxMessagesPerRead(metadata.defaultMaxMessagesPerRead())方法,

這裡會走到其子類DefaultMaxMessagesRecvByteBufAllocator的maxMessagesPerRead(int maxMessagesPerRead)方法中,

其中引數metadata.defaultMaxMessagesPerRead()返回就是ChannelMetadata的屬性defaultMaxMessagesPerRead,

也就是16

跟到maxMessagesPerRead(int maxMessagesPerRead)方法中:

public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
    //忽略驗證程式碼

    //初始化為16
    this.maxMessagesPerRead = maxMessagesPerRead;
    return this;
}

這裡將自身屬性maxMessagesPerRead設定為16, 然後返回自身

回到DefaultChannelConfig的構造方法

private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
    if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
        ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
    } else if (allocator == null) {
        throw new NullPointerException("allocator");
    }
    rcvBufAllocator = allocator;
}

設定完了記憶體分配器的maxMessagesPerRead屬性, 最後將DefaultChannelConfig自身的成員變數rcvBufAllocator設定成我們初始化完畢的allocator物件

至此, 有關channelConfig有關的初始化過程剖析完成

以上就是Netty分散式使用者端接入流程初始化原始碼分析的詳細內容,更多關於Netty分散式使用者端接入流程初始化的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com