<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
上一小節簡單介紹了ByteBufAllocator以及其子類UnPooledByteBufAllocator的緩衝區分類的邏輯, 這一小節開始帶大家剖析更為複雜的PooledByteBufAllocator, 我們知道PooledByteBufAllocator是通過自己取一塊連續的記憶體進行ByteBuf的封裝, 所以這裡更為複雜, 在這一小節簡單講解有關PooledByteBufAllocator分配邏輯
友情提示: 從這一節開始難度開始加大, 請各位戰友做好心理準備
PooledByteBufAllocator同樣也重寫了AbstractByteBuf的newDirectBuffer和newHeapBuffer兩個抽象方法, 我們這一小節以newDirectBuffer為例, 先簡述一下其邏輯
首先看UnPooledByteBufAllocator中newDirectBuffer這個方法
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
首先 PoolThreadCache cache = threadCache.get() 這一步是拿到一個執行緒區域性快取物件, 執行緒區域性快取, 顧明思議, 就是同一個執行緒共用的一個快取
threadCache是PooledByteBufAllocator類的一個成員變數, 型別是PoolThreadLocalCache(這兩個非常容易混淆, 切記):
private final PoolThreadLocalCache threadCache;
再看其型別PoolThreadLocalCache的定義:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //程式碼省略 }
這裡繼承了一個FastThreadLocal類, 這個類相當於jdk的ThreadLocal, 只是效能更快, 有關FastThreadLocal, 我們在後面的章節會詳細剖析, 這裡我們只要知道, 繼承FastThreadLocal類並且重寫了initialValue方法, 則通過其get方法就能獲得initialValue返回的物件, 並且這個物件是執行緒共用的
在這裡我們看到, 在重寫的initialValue方法中, 初始化了heapArena和directArena兩個屬性之後, 通過new PoolThreadCache()這種方式建立了PoolThreadCache物件
這裡注意, PoolThreadLocalCache是一個FastThreadLocal, 而PoolThreadCache才是執行緒區域性快取, 這兩個類名非常非常像, 千萬別搞混了(我當初讀這段程式碼時因為搞混所以懵逼了)
其中heapArena和directArena是分別是用來分配堆和堆外記憶體用的兩個物件, 以directArena為例, 我們看到是通過leastUsedArena(directArenas)這種方式獲得的, directArenas是一個directArena型別的陣列, leastUsedArena(directArenas)這個方法是用來獲取陣列中一個使用最少的directArena物件
directArenas是PooledByteBufAllocator的成員變數, 是在其構造方法中初始化的:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { //程式碼省略 if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } }
我們看到這裡通過directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 預設是cpu核心數的2倍, 這點我們可以跟蹤構造方法的呼叫鏈可以分析到
這樣保證了每一個執行緒會有一個獨享的arena
我們看newArenaArray(nDirectArena)這個方法:
private static <T> PoolArena<T>[] newArenaArray(int size) { return new PoolArena[size]; }
這裡只是建立了一個陣列, 預設長度為nDirectArena
繼續跟PooledByteBufAllocator的構造方法, 建立完了陣列, 後面在for迴圈中為陣列賦值:
首先通過new PoolArena.DirectArena建立一個DirectArena範例, 然後再為新建立的directArenas陣列賦值
再回到PoolThreadLocalCache的構造方法中:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //程式碼省略 }
方法最後, 建立PoolThreadCache的一個物件, 我們跟進構造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //程式碼省略 //儲存成兩個成員變數 this.heapArena = heapArena; this.directArena = directArena; //程式碼省略 }
這裡省略了大段程式碼, 只需要關注這裡將兩個值儲存在PoolThreadCache的成員變數中
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
簡單分析的執行緒區域性快取初始化相關邏輯, 我們再往下跟:
PoolArena<ByteBuffer> directArena = cache.directArena;
通過上面的分析, 這步我們應該不陌生, 在PoolThreadCache構造方法中將directArena和heapArena中儲存在成員變數中, 這樣就可以直接通過cache.directArena這種方式拿到其成員變數的內容
從以上邏輯, 我們可以大概的分析一下流程, 通常會建立和執行緒數量相等的arena, 並以陣列的形式儲存在PooledByteBufAllocator的成員變數中, 每一個PoolThreadCache建立的時候, 都會在當前執行緒拿到一個arena, 並儲存在自身的成員變數中
PoolThreadCache除了維護了一個arena之外, 還維護了一個快取列表, 我們在重複分配ByteBuf的時候, 並不需要每次都通過arena進行分配, 可以直接從快取列表中拿一個ByteBuf
在PooledByteBufAllocator中維護了三個值:
1. tinyCacheSize
2. smallCacheSize
3. normalCacheSize
tinyCacheSize代表tiny型別的ByteBuf能快取多少個
smallCacheSize代表small型別的ByteBuf能快取多少個
normalCacheSize代表normal型別的ByteBuf能快取多少個
具體tiny型別, small型別, normal是什麼意思, 我們會在後面講解
我們回到PoolThreadLocalCache類中看其構造方法:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //程式碼省略 }
我們看到這三個屬性是在PoolThreadCache的構造方法中傳入的
這三個屬性是通過PooledByteBufAllocator的構造方法中初始化的, 跟隨構造方法的呼叫鏈會走到這個構造方法:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); }
這裡仍然呼叫了一個過載的構造方法, 這裡我們關注這幾個引數:
DEFAULT_TINY_CACHE_SIZE,
DEFAULT_SMALL_CACHE_SIZE,
DEFAULT_NORMAL_CACHE_SIZE
這裡對應著幾個靜態的成員變數:
private static final int DEFAULT_TINY_CACHE_SIZE; private static final int DEFAULT_SMALL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;
static{ //程式碼省略 DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); //程式碼省略 }
在這裡我們看到, 這三個屬性分別初始化的大小是512, 256, 64, 這三個屬性就對應了PooledByteBufAllocator另外的幾個成員變數, tinyCacheSize, smallCacheSize, normalCacheSize
也就是說, tiny型別的ByteBuf在每個快取中預設快取的數量是512個, small型別的ByteBuf在每個快取中預設快取的數量是256個, normal型別的ByteBuf在每個快取中預設快取的數量是64個
我們再到PooledByteBufAllocator中過載的構造方法中:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { super(preferDirect); threadCache = new PoolThreadLocalCache(); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; //程式碼省略 }
篇幅原因, 這裡也省略了大段程式碼, 大家可以通過構造方法引數找到原始碼中相對的位置進行閱讀
我們關注這段程式碼:
this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize;
在這裡將將引數的
DEFAULT_TINY_CACHE_SIZE,
DEFAULT_SMALL_CACHE_SIZE,
DEFAULT_NORMAL_CACHE_SIZE
的三個值儲存到了成員變數
tinyCacheSize,
smallCacheSize,
normalCacheSize
PooledByteBufAllocator中將這三個成員變數初始化之後, 在PoolThreadLocalCache的initialValue方法中就可以使用這三個成員變數的值了
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //程式碼省略 }
這裡就可以在建立PoolThreadCache物件的的構造方法中傳入tinyCacheSize, smallCacheSize, normalCacheSize這三個成員變數了
我們再跟到PoolThreadCache的構造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //程式碼省略 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { //程式碼省略 } //程式碼省略 ThreadDeathWatcher.watch(thread, freeTask); }
其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就代表了三種型別的快取陣列, 陣列元素是MemoryRegionCache型別的物件, MemoryRegionCache就代表一個ByeBuf的快取
以tinySubPageDirectCaches為例, 我們看到tiny型別的快取是通過createSubPageCaches這個方法建立的
這裡傳入了三個引數tinyCacheSize我們之前分析過是512, PoolArena.numTinySubpagePools這裡是32(這裡不同型別的快取大小不一樣, small型別是4, normal型別是3) , SizeClass.Tiny代表其型別是tiny型別
private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0) { //建立陣列, 長度為32 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { //每一個節點是ubPageMemoryRegionCache物件 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } }
這裡首先建立了MemoryRegionCache, 長度是我們剛才分析過的32
然後通過for迴圈, 為陣列賦值, 賦值的物件是SubPageMemoryRegionCache型別的, SubPageMemoryRegionCache就是MemoryRegionCache型別的子類, 同樣也是一個快取物件, 構造方法中, cacheSize, 就是其中快取物件的數量, 如果是tiny型別就是512, sizeClass, 代表其型別, 比如tiny, small或者normal
再簡單跟到其構造方法:
SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); }
這裡呼叫了父類別的構造方法, 我們繼續跟進去:
MemoryRegionCache(int size, SizeClass sizeClass) { //size會進行規格化 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); //佇列大小 queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; }
首先會對其進行規格化, 其實就是查詢大於等於當前size的2的冪次方的數, 這裡如果是512那麼規格化之後還是512, 然後初始化一個佇列, 佇列大小就是傳入的大小, 如果是tiny, 這裡大小就是512
這裡我們不難看出, 其實每個快取的物件, 裡面是通過一個佇列儲存的, 有關快取佇列和ByteBuf之間的邏輯, 後面的小節會進行剖析
從上面剖析我們不難看出, PoolThreadCache中維護了三種型別的快取陣列, 每個快取陣列中的每個值中, 又通過一個佇列進行物件的儲存
當然這裡只舉了Direct型別的物件關係, heap型別其實都是一樣的, 這裡不再贅述
這一小節邏輯較為複雜, 同學們可以自己在原始碼中跟蹤一遍加深印象
以上就是Netty分散式ByteBuf中PooledByteBufAllocator剖析的詳細內容,更多關於Netty分散式ByteBuf PooledByteBufAllocato的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45