<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
ConcurrentHashMap
是一個執行緒安全的集合,底層是通過對指定索引位置上的節點進行加鎖,而不是對整個陣列加鎖,當一個執行緒對指定索引位置上的節點加了鎖之後,其它執行緒就不能對該索引位置上的節點進行操作,但可以對其它的索引位置上的節點操作,ConcurrentHashMap與HashMap
有許多相似的地方,ConcurrentHashMap
只是在一些產生執行緒安全的地方加了鎖,如果對HashMap
瞭解的話,再來看ConcurrentHashMap
就簡單許多。
/** 陣列最大容量大小(1073741824) */ private static final int MAXIMUM_CAPACITY = 1 << 30; /** 預設的初始容量大小 */ private static final int DEFAULT_CAPACITY = 16; /** 預設的並行等級 */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** 負載因子 */ private static final float LOAD_FACTOR = 0.75f; /** 連結串列轉換為紅黑樹的閾值 */ static final int TREEIFY_THRESHOLD = 8; /** 紅黑樹轉換為連結串列的閾值 */ static final int UNTREEIFY_THRESHOLD = 6; /** * 最小樹化閾值 * 當連結串列中的節點數量大於等於8時,如果陣列的長度小於最小樹化閾值則不會將連結串列轉換為紅黑樹,而是對陣列進行擴容 * 將連結串列中的節點分散到別的索引節點中去,只有當陣列的長度大於等於最小樹化閾值則會將連結串列轉換為紅黑樹 */ static final int MIN_TREEIFY_CAPACITY = 64; /** 擴容時,每個cpu最少需要負責的區域長度 */ private static final int MIN_TRANSFER_STRIDE = 16; /** 擴容時生成標記的二進位制所在位置 */ private static int RESIZE_STAMP_BITS = 16; /** 擴容時記錄擴容容量的標記的二進位制所在位置 */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //節點正在移動 static final int MOVED = -1; //節點已被樹化 static final int TREEBIN = -2; static final int RESERVED = -3; // hash for transient reservations //32位元二進位制中正數最大的值,主要用於對key的hash值進行二次運算 static final int HASH_BITS = 0x7fffffff; /** 當前機器的cpu數量 */ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * 存放元素的陣列物件 * 只有在第一次新增元素的時候進行初始化 * 如果沒有指定陣列容量則使用預設的容量16進行初始化 * 陣列容量為2的次方,如果指定的陣列容量不是2的次方則會進行計算獲取到大於指定容量並是2的次方的數 */ transient volatile Node<K, V>[] table; /** 擴容後的新陣列,如果該陣列不為空則說明當前有其它執行緒正在進行擴容 */ private transient volatile Node<K, V>[] nextTable; /** 基本計數器值,在沒有執行緒爭用的時候才會使用 */ private transient volatile long baseCount; /** * -1時則說明陣列物件正在初始化 * 如果陣列未被初始化時該值為陣列初始化時的大小 * 如果陣列已被初始化該值為陣列擴容的閾值 * -N:低16位元二進位制轉換為十進位制數M,M-1則是當前正在擴容的執行緒數量 */ private transient volatile int sizeCtl; /** * 擴容時需要轉移舊陣列中的剩餘的索引位置長度 * 每個執行緒最少負責16個索引位置上的節點轉移 */ private transient volatile int transferIndex; /** * 計數器陣列的鎖標識 * 用與初始化計數器陣列以及擴容和建立計數器物件的時候使用 */ private transient volatile int cellsBusy; /** 計數器陣列 */ private transient volatile CounterCell[] counterCells;
/** * 建立一個預設容量為16的陣列物件 * 該構造方法中並沒有執行任何操作 * 並沒有建立預設容量的陣列物件 * 只有在第一次新增元素的時候才會初始化陣列物件 */ public ConcurrentHashMap() { } /** * 建立一個指定初始容量的陣列物件 * @param initialCapacity 指定的初始容量 */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); //校驗指定的初始容量大小是否大於等於最大容量大小的一半 //如果大於等於最大容量大小的一半則在後續第一次新增元素的時候使用最大容量大小建立陣列 //如果小於最大容量大小的一半則根據指定容量來計算最接近指定容量大小並大於指定容量的2的次方 //此處有一個疑問,指定容量大小如果等於最大容量大小的一半則說明指定容量大小是2的29次方,為什麼不使用指定容量大小,而是使用最大的容量大小? //從tableSizeFor方法也能看出,如果指定的初始容量大小本來就是2的次方的話並不會使用指定的初始容量大小來建立陣列物件 //而是使用大於指定初始容量大小的2的次方來建立陣列物件,比如指定的初始容量大小為16,那就會使用32來建立陣列物件 //在HashMap中如果指定的初始容量大小本來就是2的次方的話則會使用指定的初始容量大小來建立陣列物件 //並不會像ConcurrentHashMap一樣會取比當前指定的2的次方的初始容量大小大的2的次方的容量大小來建立陣列物件 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); //將計算後的容量大小賦值給sizeCtl等待初始化 this.sizeCtl = cap; } /** * 根據指定的集合中的元素來建立新的陣列物件 * @param m the map */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { //使用預設的初始容量大小等待初始化 this.sizeCtl = DEFAULT_CAPACITY; //初始化新的陣列物件並將指定集合中的元素新增到新的陣列物件中 putAll(m); } /** * 根據指定的初始容量大小和指定的負載因子來建立新的陣列物件 */ public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } /** * 根據指定的初始容量大小和指定的負載因子和並行數來建立新的陣列物件 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) //初始容量大小小於並行數則使用並行數來建立陣列物件 initialCapacity = concurrencyLevel; //通過計算獲取到擴容的閾值 long size = (long) (1.0 + (long) initialCapacity / loadFactor); //如果閾值大於等於最大容量大小則使用最大容量大小來建立新的陣列物件 //如果閾值小於最大容量大小則使用閾值來計算獲取到大於閾值並是2的次方的值 //使用計算後的值來建立新的陣列物件 int cap = (size >= (long) MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int) size); //將計算後的容量大小賦值給sizeCtl等待初始化 this.sizeCtl = cap; }
在構造方法中並沒有初始化陣列物件,而是在第一次新增元素的時候才會進行初始化,根據指定的初始容量大小和指定的負載因子進行初始化,如果沒有指定初始容量或負載因子則使用預設的,而陣列的大小必須是2的次方,最大的陣列大小則是2的30次方,如果大於2的30次方的話,陣列則不會將繼續進行擴容,當陣列的容量大小已到達最大時,後續新增的元素則會掛載到連結串列或紅黑樹上,如果說指定的初始容量不是2的次方時則會呼叫tableSizeFor
方法計算出大於指定初始容量並且是2的次方的值,如果說指定的初始容量本身就是2的次方時通過計算出的值還是本身。
此處就有一個疑問,本身的值不是2的次方的時候會通過計算獲取到大於指定的初始容量並且是最接近指定的初始容量的2的次方的值,如果本身的值就是2的次方的時候則會取本身的值,但是在第二個構造方法中的tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1))
方法是有問題的,假如initialCapacity 為16
,按理來說最終陣列初始化的容量大小應該也為16,但是經過該方法計算最終的陣列初始化的容量大小為32,雖然該構造方法有問題,但是並不影響。
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) //key和value不能為空 throw new NullPointerException(); //將key的hash值的高16位元二進位制參加運算獲取到新的hash值 int hash = spread(key.hashCode()); //指定索引位置上節點的數量 int binCount = 0; for (Node<K, V>[] tab = table; ; ) { //陣列中指定索引的節點 Node<K, V> f; //n 陣列長度 //i key所在的陣列索引位置 //fh 指定索引的節點的hash值 int n, i, fh; //校驗陣列是否已經初始化 if (tab == null || (n = tab.length) == 0) //陣列未初始化則初始化陣列 //並返回初始化完成的陣列物件 tab = initTable(); //通過陣列索引長度與key的hash值進行與運算獲取到指定索引位置上的元素節點,並校驗元素節點是否為空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //指定索引位置上的節點不存在則使用傳遞進來的key和value建立新的節點 //並將新的節點放置到指定索引位置上 if (casTabAt(tab, i, null,new Node<K, V>(hash, key, value, null))) //節點新增成功則退出迴圈 break; //校驗索引位置上的節點是否正在移動 } else if ((fh = f.hash) == MOVED) //如果索引位置上的節點正在移動則幫忙移動節點到新的陣列中 //協助擴容 tab = helpTransfer(tab, f); else { V oldVal = null; //使用鎖鎖住指定索引位置上的節點 //這樣其它執行緒就不能對該索引位置上的節點進行操作 //其它執行緒可以對其它索引位置上的節點進行操作 synchronized (f) { //校驗指定索引位置上的節點是否被更改 if (tabAt(tab, i) == f) { //校驗節點的hash值是否大於等於0 //如果節點的hash值大於等於0則說明該索引位置上只有一個節點或節點是一個連結串列節點 if (fh >= 0) { binCount = 1; //從指定索引位置上的節點開始遍歷 for (Node<K, V> e = f; ; ++binCount) { K ek; //校驗遍歷到的節點的hash值以及key是否與傳遞的key和計算的hash值相同 //如果相同則根據onlyIfAbsent引數來決定是否需要使用新的value替換舊的value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { //獲取舊的value oldVal = e.val; if (!onlyIfAbsent) //使用新的value替換舊的value e.val = value; break; } //遍歷到的節點的hash值以及key不相同 //則判斷當前遍歷到的節點是否是連結串列上的最後一個節點 //如果不是最後一個節點則繼續遍歷 //如果是最後一個節點則為key和value建立一個新的節點 //並將原尾節點的next指標指向新建立的節點 Node<K, V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K, V>(hash, key, value, null); break; } } //校驗節點是否是一個紅黑樹節點 } else if (f instanceof TreeBin) { Node<K, V> p; binCount = 2; //將指定的key和value封裝成一個樹節點並新增到紅黑樹中 //如果紅黑樹中已經存在相同key的節點則根據onlyIfAbsent引數來決定是否使用新的value替換紅黑樹中的節點的value if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,value)) != null) { //獲取節點中的舊值 oldVal = p.val; if (!onlyIfAbsent) //替換節點中的value p.val = value; } } } } if (binCount != 0) { //校驗索引位置上的節點的數量是否到達樹化的閾值 //如果該索引位置上的節點本來就是樹節點則不會繼續樹化 //只有當索引位置上的節點是連結串列節點並且節點的數量大於等於8的時候才會樹化 if (binCount >= TREEIFY_THRESHOLD) //索引位置上的節點數量已經到達樹化的閾值 //則將該索引位置上的所有節點轉換為樹節點 treeifyBin(tab, i); if (oldVal != null) //返回被替換的舊值 return oldVal; break; } } } //更新集合中元素節點的數量,並校驗是否需要擴容 addCount(1L, binCount); return null; }
其實putval
中的邏輯並不難,主要還是putval
中呼叫的其它一些方法比較難以理解,我們就一步步的來看。
首先如果key
和value
為null
的情況下是不能新增到當前集合中的,再看spread
方法,該方法是將key
本身的hash
值的高16位元參加運算獲取到新的hash
值,如果陣列的長度的二進位制是在低16位元二進位制中就會導致key
高16位元的二進位制沒有參加運算,容易導致運算後的key
的索引位置發生衝突,所以需要將高16位元二進位制無符號右移16位元與原hash
值的二進位制進行運算減少索引衝突。
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
為什麼要與HASH_BITS進行與運算呢? HASH_BITS
是32位元二進位制中最大的正整數,與HASH_BITS
進行與運算其實就是讓計算後的hash
值保持在正整數。
後續整個程式碼都被for
迴圈包裹著,只有當新增元素或替換元素成功時才會退出,而for
迴圈裡總共有4個大的分支。
1.陣列未初始化,此時就需要呼叫initTable
方法初始化陣列物件,之前也說過陣列物件是在第一次新增元素的時候初始化的,這是一種懶載入的思想,當你使用到的時候才會去初始化。
private final Node<K, V>[] initTable() { Node<K, V>[] tab; int sc; //校驗陣列是否未被初始化 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) //陣列長度小於0則說明其它執行緒正在準備初始化陣列 //當前執行緒則需要讓出cpu讓其它執行緒初始化陣列 Thread.yield(); //通過cas操作將sizeCtl修改成-1,告知其它執行緒當前執行緒正在準備初始化陣列 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //如果指定了初始容量則使用指定的初始容量來建立陣列 //如果沒有指定則使用預設的初始容量大小16來建立陣列 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //初始化陣列 @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; //將初始化的陣列物件賦值給當前集合中的陣列物件 //並賦值給tab,下一次迴圈校驗則會發現tab不為空則會退出迴圈並退出當前方法 table = tab = nt; //計算陣列擴容的閾值 sc = n - (n >>> 2); } } finally { //如果try中的程式碼塊執行失敗時sizeCtl則是原先陣列的長度 //執行成功時sizeCtl則是擴容的閾值 sizeCtl = sc; } break; } } //陣列已被初始化返回陣列 return tab; }
我們來看initTable
方法中的程式碼,首先whlie
迴圈中校驗了陣列是否還沒被初始化,當陣列未被初始化的時候,當前執行緒則會去嘗試初始化陣列,直到陣列被當前執行緒或者其它執行緒初始化成功。
先看(sc = sizeCtl) < 0
這段程式碼,sizeCtl
小於0分為兩種情況,sizeCtl為-1
的時候則說明當前有其它執行緒正在初始化陣列物件,sizeCtl小於-1
的時候則說明當前陣列正在進行擴容,從當前情況來看陣列正在進行擴容的這種情況是不大可能存在的,所以說只有可能其它執行緒正在進行初始化陣列,當其它執行緒正在初始化陣列物件時,那當前執行緒則需要讓出cpu
的執行權,等待初始化陣列物件的執行緒執行完成。
再看後一個判斷,該判斷是一個cas
操作,將sizeCtl
修改成-1,告知其它執行緒當前執行緒正在初始化陣列,如果當前執行緒修改失敗,則說明有其它執行緒搶先修改了sizeCtl
的值,那當前執行緒則需要重新走while
迴圈來檢視陣列是否初始化完成,如果沒有完成那需要讓出cpu
的執行權,如果陣列已經被其它執行緒初始化完成,那當前執行緒則可以對陣列進行操作,如果說當前執行緒執行cas
操作成功,則會先檢視是否指定了陣列初始的容量大小,如果指定了則使用指定的初始容量大小來建立陣列物件,如果沒有指定則使用預設的初始容量大小16來建立陣列物件,該指定的初始容量並非是真正的初始容量,如果說指定的初始容量是2的次方則會使用該容量大小來建立陣列物件,如果說指定的初始容量不是2的次方的時候則會通過計算來獲取大於這個指定的初始容量並且是最接近該初始容量的2的次方,並使用計算後的值來初始化陣列物件,當初始化完成陣列物件時則會計算下一次陣列擴容的閾值。
在initTable
方法中我們看到程式碼中使用一個變數sizeCtl
來區分多種情況,其實sizeCtl
一共分為4種情況,那我們先了解一下分為那4種。
sizeCtl = N && table = null
:當陣列還未被初始化的時候,sizeCtl
則是陣列初始化的容量大小sizeCtl = N
:當陣列已經被初始化了的時候,sizeCtl
則是陣列下一次擴容的閾值sizeCtl = -1
:說明當前陣列物件正在被初始化sizeCtl = -N
:當前陣列正在進行擴容,-N
的低16位元二進位制轉換為十進位制數M
,M-1
則是當前正在進行擴容的執行緒數量,-N
的高16位元二進位制則是本次擴容的標記2.通過spread
方法計算後的hash
值與陣列索引長度進行與運算獲取到當前元素節點所在的索引位置,並呼叫tabAt
方法根據索引位置計算該索引中的元素節點在記憶體中的偏移量,再根據偏移量從table
陣列中獲取元素節點,如果該元素節點為空則根據指定的key
和value
建立一個新的元素節點並呼叫casTabAt
方法根據cas
操作將新的元素節點新增到陣列中指定的索引位置上。
static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; //獲取陣列中第一個元素在記憶體中開始的偏移量 ABASE = U.arrayBaseOffset(ak); //獲取陣列中元素在記憶體中的增量地址 int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); //獲取增量地址在32位元二進位制情況下高位連續包含0的個數 //使用31減去包含0的個數獲取到需要位移的次數 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); }
在看tabAt
方法和casTabAt
方法之前,我們先了解一下ConcurrentHashMap
中靜態程式碼塊中的一些程式碼,通過getUnsafe()
方法獲取到Unsafe
類,Unsafe
類是一個不安全的類,能通過該類直接對記憶體中的一些資源進行操作,U.objectFieldOffset
方法能獲取到指定名稱變數在記憶體中的偏移量,後續能根據該偏移量直接對變數進行操作,U.arrayBaseOffset
方法獲取指定型別陣列中第一個元素在記憶體中的偏移量,ABASE
則是陣列中第一個元素在記憶體中開始的偏移量,U.arrayIndexScale
方法獲取指定型別陣列中元素在記憶體中的增量地址,ASHIFT
則是通過增量地址在32位元二進位制的情況下高位連續為0的個數,並使用31減去連續為0的個數獲取到需要位移的次數。
static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i) { //根據指定的索引位置計算該索引中的節點在記憶體中的偏移量 //並從tab陣列中根據索引所在的記憶體的偏移量獲取節點 return (Node<K, V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE); }
i << ASHIFT
獲取到第一個索引位置到索引i
的增量地址,再加上ABASE
即可獲取到索引i在記憶體中的偏移量,然後在根據偏移量從volatile
型別的tab
陣列中獲取節點。
static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i,Node<K, V> c, Node<K, V> v) { //根據指定的索引位置計算該索引中的節點在記憶體中的偏移量 //並根據偏移量獲取節點,並將獲取到的節點與傳遞的節點c進行比較 //如果兩個節點相同則使用節點v進行替換 return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v); }
根據指定的索引位置的記憶體偏移量獲取節點,如果獲取到的節點與傳遞的節點c
相同則使用節點v進行替換,該方法用於新增一個新的節點或替換節點,新增新的節點的時候,只需要將節點c
設定為null
,替換節點的時候,節點c
則需要設定為原索引位置上的節點。
3.當指定索引位置上的節點不為null
的時候,但是節點的hash
值為-1,此時說明有其它執行緒正在對陣列進行擴容,並且指定索引位置上的節點已經轉移到了新的陣列中,此時當前執行緒就需要呼叫helpTransfer
方法協助其它執行緒進行擴容,當協助完成時則會對新的陣列進行操作。
final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) { Node<K, V>[] nextTab; int sc; //f instanceof ForwardingNode 當前節點正在轉移 //nextTab = ((ForwardingNode<K, V>) f).nextTable) != null 擴容的新陣列是否不為空 //如果條件都為true則說明有執行緒正在擴容中,當前執行緒則需要協助擴容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) { //擴容標記 int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //(sc >>> RESIZE_STAMP_SHIFT) != rs 校驗擴容標識是否一致,不一致則說明不是同一個陣列 //sc == rs + 1 應該是sc == (rs << 16) + 1 校驗擴容是否完成 //sc == rs + MAX_RESIZERS 應該是sc == (rs << 16) + MAX_RESIZERS 校驗擴容的執行緒的數量是否到達最大 //transferIndex <= 0 舊陣列中還有多少沒有轉移的索引節點長度,如果小於等於0則說明已經轉移完成則不需要協助 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //擴容執行緒加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //協助擴容 transfer(tab, nextTab); break; } } //擴容完成則返回新的陣列,對新的陣列進行操作 return nextTab; } //當前節點還未被轉移或新的陣列還沒有被初始化則對舊陣列進行操作 return table; }
我們來看一下協助擴容的方法,區域性變數nextTab
和sc
則是最新的擴容陣列和擴容標識以及擴容的執行緒數量。
tab != null和f instanceof FowardingNode
:該條件只是為了程式碼的健壯性,tab !=null
從程式碼來看,既然當前執行緒執行到了協助擴容的方法,那tab
肯定是不為null
的,再看f instanceof FowardingNode
條件,在之前的判斷中f的節點的hash值為-1
,-1就已經代表節點已經被轉移
,只有當節點已經轉移的時候才會執行到當前方法中,此時你就可以發現之前為什麼要將節點的hash
值控制在正整數,如果說沒有控制在正整數的時候,當節點的hash值恰巧為-1
的時候,就會導致執行緒執行一些沒有必要的程式碼,而且在後續也不好區分連結串列節點。(nextTab = ((ForwardingNode<K, V>) f).nextTable) != null
:該條件則是獲取最新的擴容陣列並校驗最新的陣列是否不為null
。當上述條件都為true
的時候則會通過resizeStamp
方法使用舊陣列的長度生成一個擴容標識,再看while
迴圈中的條件語句。
nextTab == nextTable
:校驗當前執行緒協助擴容的陣列是否是最新的陣列,如果此時nextTable == null
的話則說明已經完成了擴容,那當前執行緒則不需要協助擴容了,只需要執行自己該執行的操作,如果說當前執行緒協助擴容的陣列不是最新的擴容陣列的時候,則會從nextTable
中繼續獲取f
節點,再從f
節點中獲取nextTable
,繼續校驗是否是最新的擴容陣列。
table == tab
:校驗擴容是否完成。
(sc = sizeCtl) < 0
:校驗是否正在擴容中。
上述條件都為true
的時候則說明陣列正在進行擴容中,我們先來看一些在哪些條件下當前執行緒不會去協助擴容。
(sc >>> RESIZE_STAMP_SHIFT) != rs
:校驗擴容標識是否一致,不一致則說明不是對同一長度的陣列進行的擴容。
sc == rs + 1
:該條件應該是sc == (rs << 16) + 1
,校驗擴容是否完成,在擴容開始的時候sizeCtl為(rs << 16) + 2
,這個2代表著開始擴容的執行緒以及最後一個擴容完成的執行緒,在transfer
擴容方法中,每一個執行緒完成了所否則的區域的時候都會將sizeCtl
的值減1,當最後一個執行緒執行完成時減1,此時sizeCtl
的值就等於(rs<< 16) + 1
則說明整個陣列擴容都已經完成了,已經不需要執行緒協助了。
sc == rs + MAX_RESIZERS
:該條件應該是sc == (rs << 16) + MAX_RESIZERS
,校驗當前正在擴容的執行緒的數量是否到達最大值(65535)。
transferIndex <= 0
:校驗舊陣列中還未完成轉移的索引節點長度是否小於等於0,小於等於0則說明已經完成了轉移則不需要協助。
當上述條件都不成立的時候則說明陣列擴容需要協助,此時通過cas
操作將擴容執行緒的數量加1並呼叫transfer
方法來協助擴容,該擴容方法放在後面講解。
4.當上面3個分支都不成立的時候,則說明該索引位置上的節點不為空並且沒有被轉移,此時就需要使用synchronized
對指定索引位置上的節點加鎖,然後根據節點的型別來執行相關的操作。
hash
值都是大於等於0的,當指定索引位置上的節點是一個連結串列節點的時候,則會從連結串列的頭節點開始遍歷並且記錄連結串列中節點的數量,當連結串列中的節點的key
與指定的key
相同則根據onlyIfAbsent
引數來決定是否需要替換value
,如果沒有相同的key
時,則會為指定的key
和value
建立一個新的節點,並將新的節點新增到連結串列的尾部。hash
值固定為-2,此處直接校驗的是頭節點的型別是否是紅黑樹,當指定索引位置上的頭節點是一個紅黑樹節點,則會呼叫頭節點的putTreeVal
方法將指定的key
和value
新增到紅黑樹中,如果說指定的key
已經存在在紅黑樹中則會根據onlyIfAbsent
引數來決定是否需要替換value
,此處為什麼是呼叫頭節點的putTreeVal
方法呢?看到後面連結串列轉換為紅黑樹節點的時候就能明白了。final TreeNode<K, V> putTreeVal(int h, K k, V v) { Class<?> kc = null; boolean searched = false; for (TreeNode<K, V> p = root; ; ) { int dir, ph; K pk; if (p == null) { first = root = new TreeNode<K, V>(h, k, v, null, null); break; } else if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { if (!searched) { TreeNode<K, V> q, ch; searched = true; if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) || ((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null)) return q; } dir = tieBreakOrder(k, pk); } TreeNode<K, V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { TreeNode<K, V> x, f = first; first = x = new TreeNode<K, V>(h, k, v, f, xp); if (f != null) //將新新增的樹節點設定為連結串列的頭節點 f.prev = x; if (dir <= 0) xp.left = x; else xp.right = x; if (!xp.red) x.red = true; else { //獲取寫鎖,如果獲取寫鎖失敗則會將當前執行緒掛起進行等待 //如果有執行緒正在讀,那當前寫的執行緒就要掛起進行等待 //當讀的執行緒執行完成之後就會去喚醒掛起的執行緒 //如果說當前執行緒在寫,有執行緒準備讀 //那讀執行緒並不會去讀取紅黑樹中的節點 //而是讀取連結串列的節點,因為TreeBin物件中包含了紅黑樹的根節點並且也包含了連結串列的頭節點 //如果發生寫寫的問題呢? //其實並不會發生寫寫的問題,因為當前整個方法都被外部的synchronized鎖住了 lockRoot(); try { //平衡紅黑樹中的節點 root = balanceInsertion(root, x); } finally { //釋放鎖 unlockRoot(); } } break; } } assert checkInvariants(root); return null; }
putTreeVal
和balanceInsertion
方法在之前的HashMap
中已經講過,此處就不再講解,我們主要還是看一下lockRoot
和unlockRoot
方法。
//鎖狀態 volatile int lockState; //寫鎖 static final int WRITER = 1; // set while holding write lock //等待 static final int WAITER = 2; // set when waiting for write lock //讀鎖 static final int READER = 4; // increment value for setting read lock
private final void lockRoot() { //嘗試加寫鎖 if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) //加寫鎖失敗則嘗試將執行緒掛起進行等待 contendedLock(); }
通過cas
操作嘗試加寫鎖,當加寫鎖失敗則會呼叫contendedLock
方法將執行緒掛起進行等待。
private final void contendedLock() { boolean waiting = false; for (int s; ; ) { /** * ~WAITER = 1111 1111 1111 1111 1111 1111 1111 1101 * ((s = lockState) & ~WAITER) == 0 等於0則說明當前沒有執行緒對當前TreeBin物件下的紅黑樹進行加鎖 * 不等於則說明有執行緒對TreeBin物件下的紅黑樹加鎖 */ if (((s = lockState) & ~WAITER) == 0) { //沒有執行緒對TreeBin物件下的紅黑樹加鎖 //當前執行緒則可以對TreeBin物件下的紅黑樹嘗試加鎖 if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) { if (waiting) //將等待執行緒清空 waiter = null; return; } //(s & WAITER) == 0 執行當前判斷語句的時候則說明有執行緒對紅黑樹加了鎖 //如果(s & WAITER)等於0則說明沒有執行緒在等待加鎖 //此時當前執行緒就可以進行等待 } else if ((s & WAITER) == 0) { //將鎖狀態的32位元二進位制中的第2位設定為1 //代表當前有一個執行緒正在等待加鎖 if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) { waiting = true; //將TreeBin中的等待執行緒設定為當前執行緒 waiter = Thread.currentThread(); } } else if (waiting) //當前執行緒已經設定成了等待狀態了,此時就需要將執行緒掛起 LockSupport.park(this); } }
contendedLock
方法中總共有3個分支,我們就一個一個的開始講解。
((s = lockState) & ~WAITER) == 0
:校驗是否有執行緒對指定索引位置上的TreeBin
物件中的紅黑樹進行加鎖。
~WAITER = -3
,32位元二進位制為 1111 1111 1111 1111 1111 1111 1111 1101
WRITER = 1
,1為寫鎖,32位元二進位制為 0000 0000 0000 0000 0000 0000 0000 0001
READER = 4
,4為讀鎖,32位元二進位制為 0000 0000 0000 0000 0000 0000 0000 0100
使用最新的鎖狀態與~WAITER
進行與運算,當加了讀鎖或寫鎖時,讀鎖和寫鎖的二進位制標識與~WAITER
二進位制標識都為1的情況下則說明已經有執行緒加了鎖,如果有執行緒加了鎖,那當前執行緒則會執行後續的程式碼來將自己掛起並等待。
(s & WAITER) == 0
:執行到當前語句的時候就說明已經有執行緒加了鎖,此時就需要校驗一下是否有執行緒掛起並在等待,從程式碼上來看,並不會出現多個執行緒同時等待,只會存在一個等待的執行緒,如果沒有執行緒在等待,則會通過cas
操作將鎖狀態加上一個有執行緒在等待的標識,並將TreeBin
物件中的等待執行緒設定為當前執行緒。
waiting
:當有執行緒加了鎖,並且將當前執行緒設定為了等待的執行緒,此時就需要將當前執行緒掛起。
為什麼說只會存在一個等待的執行緒呢?
讀鎖與寫鎖本就互斥,在get
方法中,獲取紅黑樹中的節點的時候則會加上一個讀鎖,此時寫鎖就需要掛起進行等待,當讀鎖釋放完成之後就會喚醒加寫鎖的執行緒,如果已經有執行緒加了寫鎖,那get
方法則不會從紅黑樹中獲取節點,而是從TreeBin
物件中記錄的連結串列的頭節點開始遍歷進行匹配,那會不會發生多個執行緒同時去加寫鎖呢?其實並不會,因為加寫鎖的方法的外部整個都被synchronized
鎖住了,所以並不會存在多個執行緒同時加寫鎖,也不會存在多個等待的執行緒。
連結串列新增節點和紅黑樹新增節點都已經講解完畢,此時就要看一下後續程式碼,連結串列節點在什麼情況下會轉換為紅黑樹以及是怎麼轉換的。
if (binCount != 0) { //校驗索引位置上的節點的數量是否到達樹化的閾值 //如果該索引位置上的節點本來就是樹節點則不會繼續樹化 //只有當索引位置上的節點是連結串列節點並且節點的數量大於等於8的時候才會樹化 if (binCount >= TREEIFY_THRESHOLD) //索引位置上的節點數量已經到達樹化的閾值 //則將該索引位置上的所有節點轉換為樹節點 treeifyBin(tab, i); if (oldVal != null) //返回被替換的舊值 return oldVal; break; }
在新增一個節點到連結串列中去,會從連結串列的頭節點開始遍歷並記錄連結串列中的節點數量,該數量就是用binCount
記錄的,當連結串列中的節點數量大於等於TREEIFY_THRESHOLD(8)
的時候則會呼叫treeifyBin
方法來決定是否需要將連結串列中的所有節點樹化並轉換為紅黑樹。
private final void treeifyBin(Node<K, V>[] tab, int index) { //指定索引位置上的節點 Node<K, V> b; //n 陣列長度 int n, sc; if (tab != null) { //校驗陣列的長度是否小於最小的樹化閾值 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) /** * 陣列長度小於最小的樹化閾值 * 此時並不會將陣列中的連結串列節點轉換為樹節點 * 只會將陣列進行擴容 * * 如果仔細檢視tryPresize方法你會發現該方法中也對n進行了左移1位 * 當前tryPresize傳參的時候就已經對n進行了左移1位,然後在方法裡面又對n進行了左移1位 * 這樣是不是就對陣列進行了4倍擴容,並不是我們想的那樣2倍擴容呢? * 其實不是的,在tryPresize方法裡面並沒有使用2次位移後的n來擴容 * 而是使用最開始的陣列長度n來計算進行擴容 */ tryPresize(n << 1); //獲取指定索引位置上的元素節點並校驗元素節點是否不為空 //如果元素節點不為空則校驗該節點的hash值是大於等於0 //如果hash值大於等於0則說明該節點需要轉換為樹節點 //如果hash值小於0則說明該節點正在移動或已經被樹化了 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //使用鎖鎖住指定索引位置上的節點 //在新增元素的時候也會對指定索引位置上的節點加鎖 //如果兩個索引位置都是相同的索引位置的時候,如果新增元素的執行緒先加了鎖 //那執行到當前方法的執行緒則會等待新增元素的執行緒釋放鎖 //當新增元素的執行緒釋放了鎖之後,當前方法的執行緒則會獲取鎖將索引位置上所有的節點都樹化,包括新新增的元素節點 //如果是執行到當前方法的執行緒獲取到了鎖,那新增元素的執行緒則會等待 //當指定索引位置上的所有節點都樹化了之後並釋放了鎖 //新增元素的執行緒則會去獲取鎖,此時新增元素的執行緒會發現當前索引位置上的節點已經是樹節點 //則會將元素節點新增到紅黑樹中並使紅黑樹平衡 synchronized (b) { //校驗在獲取鎖的時候指定索引位置上的節點是否被更改 if (tabAt(tab, index) == b) { //hd 頭節點 //tl 當前遍歷到的節點的上一個節點 TreeNode<K, V> hd = null, tl = null; //從指定索引位置上的節點開始遍歷,依次將連結串列上的所有節點轉換為樹節點 for (Node<K, V> e = b; e != null; e = e.next) { //將普通節點轉換為樹節點 TreeNode<K, V> p = new TreeNode<K, V>(e.hash, e.key, e.val, null, null); //校驗當前遍歷的節點是否是連結串列中的頭節點 //如果是頭節點則將該節點設定為了樹節點的頭節點 //如果不是頭節點則將當前轉換的樹節點與上一個樹節點進行關聯 if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //建立一個TreeBin物件並將樹化的節點轉換為紅黑樹 //TreeBin物件中保留著紅黑樹的根節點以及連結串列的頭節點 //再呼叫setTabAt方法將TreeBin物件新增到指定的索引位置上 //在HashMap中並沒有建立一個TreeBin物件來存放紅黑樹的根節點 //而是直接將紅黑樹的根節點放置到指定的索引位置上 setTabAt(tab, index, new TreeBin<K, V>(hd)); } } } } }
(n = tab.length) < MIN_TREEIFY_CAPACITY
:校驗陣列的長度是否小於最小樹化的閾值,並不是說連結串列中的節點數量到達8了就要將連結串列節點轉換為紅黑樹節點,前提是需要陣列的長度到達了閾值(64)才會轉換為紅黑樹節點,如果陣列的長度沒有到達閾值則會進行擴容。
(b = tabAt(tab, index)) != null && b.hash >= 0
:校驗外部synchronized
釋放鎖之後,準備節點樹化的期間是否有其它執行緒對該節點進行操作了,該操作分為兩種,有執行緒將該索引位置上的所有節點都刪除了或者說有執行緒將該索引位置上的節點都樹化了,這兩種情況下就不需要再進行樹化了。 我們主要看一下是怎麼樹化的以及轉換成紅黑樹的,樹化的操作其實很簡單,就是對指定索引位置上的節點加鎖防止其它執行緒操作,依次從連結串列的頭節點開始遍歷,將連結串列中的Node
節點轉換為TreeNode
節點,就這樣樹化就完成了,而轉換為紅黑樹就是通過建立一個TreeBin
物件來完成的,在構造TreeBin
物件時會將樹化的節點轉換為紅黑樹中的節點。
TreeBin(TreeNode<K, V> b) { //給當前TreeBin物件設定一個樹化標識 super(TREEBIN, null, null, null); //樹化的頭節點的設定為連結串列的頭節點 this.first = b; //根節點 TreeNode<K, V> r = null; //從樹化的頭節點開始遍歷將節點轉換為紅黑樹中的節點,直到沒有下一個節點 for (TreeNode<K, V> x = b, next; x != null; x = next) { //下一個節點 next = (TreeNode<K, V>) x.next; //初始化每個樹節點的左右子節點 x.left = x.right = null; if (r == null) { //根節點為空則將當前遍歷到的節點設定為根節點 //並將該節點的顏色設定為黑色 x.parent = null; x.red = false; r = x; } else { //待新增到紅黑樹中的節點的key K k = x.key; //待新增到紅黑樹中的節點的hash值 int h = x.hash; Class<?> kc = null; //從紅黑樹的根節點開始遍歷來校驗節點放置的位置 for (TreeNode<K, V> p = r; ; ) { //dir 節點新增到紅黑樹中的位置 //ph 被遍歷到的紅黑樹中的節點的key int dir, ph; //被遍歷到的紅黑樹中的節點的key K pk = p.key; if ((ph = p.hash) > h) //被遍歷到的紅黑樹中的節點的hash值大於待新增到紅黑樹中的節點的hash值 //則需要將節點新增到紅黑樹中的左子節點 dir = -1; else if (ph < h) //被遍歷到的紅黑樹中的節點的hash值小於待新增到紅黑樹中的節點的hash值 //則需要將節點新增到紅黑樹中的右子節點 dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K, V> xp = p; //校驗待新增到紅黑樹中的節點所放置的位置是否有節點存在 //如果有節點存在則重新走回圈與子節點繼續比較直到沒有了子節點 if ((p = (dir <= 0) ? p.left : p.right) == null) { //放置待新增到紅黑樹中的節點的位置沒有了節點 //則會將節點新增到紅黑樹中 //將待新增節點的父節點的指標指向當前遍歷到的紅黑樹中的節點 x.parent = xp; if (dir <= 0) //將待新增的節點放置到被遍歷到的節點的左子節點 xp.left = x; else //將待新增的節點放置到被遍歷到的節點的右子節點 xp.right = x; //紅黑樹中新增了新的節點,可能讓紅黑樹不在平衡 //所以需要將紅黑樹中的節點進行平衡 r = balanceInsertion(r, x); break; } } } } //TreeBin物件記錄根節點 this.root = r; assert checkInvariants(root); }
TreeBin
的構造方法比較簡單,通過super
方法給當前的TreeBin
物件設定hash值為-2
,然後通過遍歷樹化後的連結串列節點,將連結串列的頭節點設定為紅黑樹的根節點,後續連結串列中的節點從紅黑樹的根節點開始進行比較,來獲取到連結串列的節點在紅黑樹中所存放的位置,紅黑樹按左小右大的方式存放節點,每新增一個節點到紅黑樹中,都有可能讓紅黑樹不平衡,所以每次都會嘗試是否需要平衡紅黑樹中的節點。
上圖是連結串列轉紅黑樹,轉為紅黑樹之後,索引位置上是一個TreeBin
物件,並且TreeBin
物件中包含了紅黑樹的根節點以及連結串列的頭節點,而紅黑樹中的節點中不僅有紅黑樹本身的指標,而且還有連結串列的一些指標,既能當做是紅黑樹也可以當做連結串列,上圖中並沒有將所有節點的連結串列指標都畫出來,只是畫了部分節點的連結串列指標。
當元素新增操作以及連結串列轉換為紅黑樹操作完成之後,我們再來看ConcurrentHashMap
是如何記錄元素數量的以及擴容的。
/** * 新增元素節點或刪除元素節點的時候需要將計數器中的值修改 * 如果在單執行緒的情況下直接對基本計數器值修改 * 如果在多執行緒的情況下對基本計數器修改失敗的話並且計數器陣列為空 * 則需要初始化計數器陣列,並使用執行緒生成亂數與計數器陣列的長度進行運算獲取到指定的索引位置 * 並將指定索引位置上的計數器物件進行初始化並將check值儲存在計數器物件中的value * 如果計數器陣列不為空,但是指定的索引位置上的計數器物件為空 * 則初始化計數器物件並將check值儲存在計數器物件中的value * 如果計數器物件也不為空則直接將check值累加到計數器物件中的value * 在將check值儲存之後則會校驗當前集合中的陣列是否需要擴容 * 如果需要擴容則會呼叫transfer方法來進行擴容 */ private final void addCount(long x, int check) { //計數器陣列 CounterCell[] as; //b 基本計數器值 long b, s; //計數器陣列counterCells不為空則說明之前已經發生過多執行緒對基本計數器baseCount進行操作 //計數器陣列counterCells為空則說明之前一直在單執行緒的情況下對基本計數器baseCount進行操作 //如果計數器陣列為空則會嘗試對基本計數器baseCount進行操作,如果對baseCount操作失敗則說明當前處於多執行緒的情況下 //此時就需要初始化counterCells if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; //as == null || (m = as.length - 1) < 0 校驗計數器陣列是否初始化,如果沒有初始化則呼叫fullAddCount方法進行初始化 //(a = as[ThreadLocalRandom.getProbe() & m]) == null 當前計數器陣列不為空的時候則使用執行緒生成的亂數 //與計數器陣列的長度進行與運算獲取到指定索引位置上的計數器物件,並校驗計數器物件是否為空 //如果計數器物件為空則呼叫fullAddCount方法對指定索引位置上的計數器物件進行初始化 //!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) 計數器陣列不為空並且運算獲取到的指定索引位置上的計數器物件也不為空 //此時就可以對計數器物件進行操作,如果操作失敗則說明有其它執行緒獲取到了這個索引位置上的計數器物件並對這個計數器進行操作中 //此時就需要呼叫fullAddCount方法選擇其它索引位置上的計數器物件進行操作 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //對計數器陣列以及計數器陣列中的計數器物件進行初始化 //一個執行緒對計數器物件操作失敗則會更換其它索引上的計數器物件進行操作 //如果這個執行緒多次對計數器物件操作失敗並且當前計數器陣列的長度小於cpu的數量 //此時就會嘗試進行擴容,然後對擴容之後的陣列中的計數器物件進行操作 //如果說計數器陣列的長度不小於cpu的數量則不會進行擴容 //只會一直更換計數器物件進行操作,直到操作成功 fullAddCount(x, uncontended); return; } //校驗check是否小於等於1 //check=-1的情況下則說明刪除了指定索引位置上的元素節點 //check=0的情況下則說明指定索引位置上沒有元素節點,直接將指定的key和value封裝成一個節點放置到指定的索引位置上 //check=1的情況在put方法下則說明指定索引位置上有元素節點,但是隻是對索引位置上的元素節點中的value進行了替換 //在其它方法中check=1可能表示在指定的索引位置上新增了1個元素節點 //如果是這三種情況的話則不會去嘗試進行擴容 //如果執行到了當前判斷語句的話則說明並沒有執行上面的fullAddCount方法 //而是通過上面判斷語句中的cas操作對指定索引位置上的計數器物件的值操作成功 //但是在check=1的情況下的put方法只是將原本的元素節點中的value替換了 //如果此時還要對計數器物件中的value加1,那不就會造成1個元素節點變成了2個元素節點? //其實並不會的,如果check=1的情況下put方法並不會走到當前的方法中 if (check <= 1) return; //將計數器陣列中所有的計數器物件值以及基本的計數器值合併獲取到當前集合中的元素節點的數量 s = sumCount(); } //校驗check是否大於等於0 //大於等於0則說明新增了新的元素節點 //此時就需要來校驗是否需要進行擴容 //check一共分為5種情況,有三種已經在上面說過,現在來看一下剩下的兩種情況 //check=2的情況下代表在紅黑樹種新增了一個樹節點,也有可能是在一個連結串列節點中新增了一個節點 //check>2的情況下代表在連結串列節點中新增了一個節點 if (check >= 0) { Node<K, V>[] tab, nt; int n, sc; //當前陣列中的元素節點的數量已經到達擴容的閾值並且陣列的長度小於陣列最大容量大小 //此時會嘗試將陣列進行擴容,如果擴容失敗則會一直進行嘗試,直到擴容成功 while (s >= (long) (sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //獲取擴容時的標記 int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //擴容執行緒加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) //協助擴容 transfer(tab, nt); //將sizeCtl修改成一個很大的負數,告知其它執行緒現在有執行緒正在擴容 //為什麼要+2而不是+1?+1代表當前擴容的執行緒數量+1,而+2可能是標識開啟擴容 } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //擴容 transfer(tab, null); //重新計算集合中元素節點的數量 s = sumCount(); } } }
addCount
中分為兩個分支,分別為元素數量計數以及陣列擴容,我們先看計數的分支。
在單執行緒的情況下只會對基本計數器baseCount
進行操作,如果在多執行緒的情況下同時對baseCount
進行操作,只會有一個執行緒操作成功,而其它執行緒並不會等待繼續對baseCount
進行操作,而是呼叫fullAddCount
方法初始化計數器陣列(CounterCells
)以及陣列中的計數器物件(CounterCell
)並對陣列中的計數器物件進行操作,只要出現過一次多執行緒的情況,往後都不會對baseCount
操作了,而是直接對計數器陣列中的計數器物件進行操作,fullAddCount
中程式碼具體的意思可以參考下面程式碼中的註釋,這裡不再多講。
private final void fullAddCount(long x, boolean wasUncontended) { int h; //使用當前執行緒生成亂數,如果亂數為0則說明ThreadLocalRandom中的一些引數還未被初始化 //此時就需要進行初始化並重新獲取亂數 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; for (; ; ) { //計數器陣列 CounterCell[] as; //計數器物件 CounterCell a; //計數器陣列長度 int n; //計數器物件中的value long v; //校驗計數器陣列是否已初始化 if ((as = counterCells) != null && (n = as.length) > 0) { //校驗當前執行緒生成的亂數與計數器陣列索引長度運算後獲取到的指定索引位置上的計數器物件是否為空 if ((a = as[(n - 1) & h]) == null) { //校驗計數器陣列的鎖狀態是否為0 //如果不為0則說明有其它執行緒正在對計數器陣列進行擴容或對計數器陣列中的計數器物件進行初始化 if (cellsBusy == 0) { //建立一個計數器物件,並將值新增到計數器物件中 CounterCell r = new CounterCell(x); //加鎖,防止多個執行緒同時對通一個索引位置上的計數器物件進行初始化 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { CounterCell[] rs; int m, j; //校驗一下運算後的索引位置上的計數器物件是否為空 //如果不為空則說明已經被其它執行緒初始化了,當前執行緒就不需要再去初始化 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //將建立好的計數器物件新增到運算後的指定索引位置上 rs[j] = r; //初始化成功 created = true; } } finally { //修改計數器陣列鎖標識 cellsBusy = 0; } if (created) //初始化成功則退出 break; //初始化失敗,說明已經有其它執行緒初始化了計數器物件,此時就需要重新走回圈選擇初始化成功的計數器物件進行操作 continue; } } //其它執行緒正在對計數器陣列進行擴容或初始化指定索引位置上的計數器物件 collide = false; } else if (!wasUncontended) //wasUncontended為false則說明有其它執行緒正在對指定索引位置上的計數器物件進行操作 //此時當前執行緒則需要獲取其它索引位置上的計數器物件進行操作 wasUncontended = true; //嘗試對指定索引位置上的計數器物件進行操作 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) //執行成功則退出 break; //校驗當前是否有執行緒對計數器陣列進行了擴容,將計數器陣列中的計數器物件移動到了新的計數器陣列中 //如果已經將計數器物件移動到了新的計數器陣列中,那重新選擇新的計數器陣列中的計數器物件來進行操作 //如果沒有執行緒對計數器陣列進行擴容,那比較一下當前計數器陣列的長度是否大於等於cpu的數量 //如果大於等於cpu的數量則不會進行擴容,只會重新選擇計數器陣列中的計數器物件來操作 //如果計數器陣列的長度小於cpu的數量那就會對計數器陣列進行2倍的擴容 else if (counterCells != as || n >= NCPU) //此處將collide修改為false可能是想再次嘗試一下是否能對計數器陣列中的計數器物件進行操作 //如果能操作則不進行擴容,這樣能減少擴容帶來的效能問題 collide = false; else if (!collide) collide = true; //多次獲取計數器陣列中的計數器物件而不能操作,並計數器陣列的長度小於cpu的數量 //導致部分的cpu沒有被使用到,此時就需要對計數器陣列進行擴容,充分的使用cpu //對計數器陣列加鎖 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { //校驗計數器陣列是否變更,防止其它執行緒已經對計數器陣列進行了擴容 if (counterCells == as) { //初始化一個新的計數器陣列,大小是原來的2倍 CounterCell[] rs = new CounterCell[n << 1]; //迴圈將原來的計數器陣列中的計數器物件移動到擴容後的計數器陣列中 for (int i = 0; i < n; ++i) rs[i] = as[i]; //更新當前集合中的計數器陣列 counterCells = rs; } } finally { //修改計數器陣列鎖標識 cellsBusy = 0; } collide = false; //使用擴容後的計數器陣列進行操作 continue; } //計數器物件正在被其它執行緒操作,此時需要重新生成亂數,獲取別的索引位置上的計數器物件來進行操作 h = ThreadLocalRandom.advanceProbe(h); //計數器陣列未被初始化,此時就需要校驗一下當前是否有其它執行緒正在初始化計數器陣列 //如果沒有,那當前執行緒就需要將計數器陣列初始化的標識設定為1,代表當前有執行緒正在初始化計數器陣列 } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { if (counterCells == as) { //建立計數器陣列,計數器陣列的長度為2的次方 CounterCell[] rs = new CounterCell[2]; //建立一個計數器物件,並將值新增到計數器物件中 //並使用執行緒生成的亂數與計數器陣列索引長度進行運算獲取到指定的索引位置 //並將計數器物件放置到指定的索引位置上 rs[h & 1] = new CounterCell(x); //更新當前集合中的計數器陣列 counterCells = rs; //初始化完成 init = true; } } finally { //修改計數器陣列鎖標識 cellsBusy = 0; } if (init) //計數器陣列初始化完成則退出 break; //計數器陣列正在被其它執行緒初始化或已經被其它執行緒初始化完成 //此時嘗試對基本的計數器值進行操作 //如果失敗則說明有其它執行緒也在對基本計數器值進行操作 //那就要重新走回圈,來看計數器陣列是否被初始化完成,如果被初始化完成那就對計數器陣列中的計數器物件進行操作 //如果還沒有初始化完成則會繼續嘗試對基本的計數器值進行操作 } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; } }
我們再看addCount
中的第二個分支,該分支中主要是先校驗是否到達了擴容的閾值以及是否小於陣列最大的容量大小,條件成立則會校驗是否有執行緒在擴容,如果有執行緒在擴容,那當前執行緒則需要協助擴容如果沒有那當前執行緒則開啟擴容。
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) { //n 陣列長度 //stride 每個cpu需要負責轉移的索引長度 int n = tab.length, stride; //使用陣列長度來計算每個cpu需要負責的長度 //每個cpu最少需要負責16的長度 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; //nextTab為空則說明當前沒有其它執行緒在初始化 //不為空則說明當前有其它執行緒正在初始化,當前執行緒則需要幫助初始化的執行緒將舊陣列中的元素節點轉移到新的陣列中 if (nextTab == null) { try { //建立新的陣列,容量是舊陣列的2倍 @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1]; nextTab = nt; } catch (Throwable ex) { //出現異常時則說明擴容失敗,容量大小已經超出MAXIMUM_CAPACITY引數指定的最大的陣列容量 //因為MAXIMUM_CAPACITY引數指定的容量大小是int型別中正整數最大的2的次方 //當陣列的長度已經到達MAXIMUM_CAPACITY引數指定的容量大小時,如果再進行2倍的擴容就會導致陣列的長度變成負數 //此時就會導致擴容失敗,將int型別中最大的正整數設定成擴容的閾值來停止本次擴容 //如果陣列的長度已到達陣列最大的容量大小那就不會進行擴容了 sizeCtl = Integer.MAX_VALUE; return; } //擴容後的陣列,當其它擴容的執行緒發現nextTable不為空時則不會重複擴容 nextTable = nextTab; //擴容後,預設需要轉移整個舊陣列 transferIndex = n; } //獲取擴容後的陣列長度 int nextn = nextTab.length; //建立一個轉移的節點 ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab); //是否繼續轉移 boolean advance = true; //標識擴容之後舊陣列中的節點是否全部完成轉移 boolean finishing = false; for (int i = 0, bound = 0; ; ) { Node<K, V> f; int fh; while (advance) { //nextIndex 當前執行緒開始轉移的長度位置 //nextBound 當前執行緒負責轉移的索引位置邊界,如果到達這個邊界則說明當前執行緒已經完成了所負責的索引長度的節點 //轉移舊陣列中的元素節點是從陣列的尾部向陣列的頭部開始轉移 int nextIndex, nextBound; //每次轉移完一個索引位置上的節點的時候都會校驗一下下一次轉移的索引位置是否已經超出邊界 //或舊陣列中的元素節點都已經全部轉移完成 //如果下一次轉移的索引位置超出邊界,但是剩下需要轉移的節點沒有被其它執行緒協助轉移 //那麼當前執行緒則繼續選擇部分的索引長度來轉移 if (--i >= bound || finishing) advance = false; //校驗剩餘需要轉移的索引長度是否為0,如果為0則說明舊陣列中沒有需要轉移的元素節點了 else if ((nextIndex = transferIndex) <= 0) { //將轉移的節點的索引位置設定為-1,後續會根據該條件退出擴容方法 i = -1; advance = false; //根據開始轉移的長度位置與每個執行緒需要轉移的長度進行比較 //如果開始轉移的長度位置大於每個執行緒需要轉移的長度,那就使用開始轉移的長度位置減去每個執行緒需要轉移的長度 //獲取到當前執行緒需要負責轉移的索引位置邊界 //如果開始轉移的長度位置小於等於每個執行緒需要轉移的長度,那就說明當前需要轉移的長度不需要其它執行緒協助 //當前執行緒則會將剩下需要轉移的節點轉移到新的陣列中 } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //將計算後的索引邊界賦予bound,用於執行緒每次轉移之後進行校驗 //如果已經到達了邊界說明執行緒已經完成了所負責的索引長度的節點轉移 //執行緒不再執行節點的轉移 bound = nextBound; //開始轉移的長度位置-1,獲取到開始轉移的節點的索引位置 i = nextIndex - 1; advance = false; } } //i < 0 小於0則說明當前執行緒所負責轉移的節點已經完成並且沒有其它需要轉移的節點了 //i >= n //i + n >= nextn if (i < 0 || i >= n || i + n >= nextn) { int sc; //當前finishing為true時則說明舊陣列中的所有節點都已經轉移 //此時就需要將新陣列設定為當前集合使用的陣列並計算下一次的擴容閾值 if (finishing) { //將擴容的陣列置為空,代表當前沒有執行緒正在進行擴容操作 nextTable = null; //將擴容後的新陣列設定為當前集合正在使用的陣列 table = nextTab; //計算下一次擴容的閾值 sizeCtl = (n << 1) - (n >>> 1); //擴容完成,退出 return; } //finishing為false則會走到當前語句,則說明當前執行緒並不知道舊陣列中的元素節點有沒有轉移完成 //但是當前執行緒所負責轉移的索引節點已經完成,此時就需要將並行擴容中的執行緒數量減1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //擴容標記左移16位元獲取到執行緒數量的標記 //使用sc-2獲取到當前還剩下的執行緒數量的標記 //如果兩個相等則說明當前執行緒是最後一個擴容結束的執行緒 //此時就需要當前執行緒執行收尾操作,需要從舊陣列中的尾部開始向頭部節點遍歷來檢查是否所有的元素節點都轉移了 //如果都轉移了,則將擴容後的新陣列設定為當前集合正在使用的陣列,並且計算下一次擴容的閾值 //如果還有元素節點沒有轉移,當前執行緒則會將剩下的元素節點進行轉移 //如果兩個不相等則說明當前執行緒不是最後一個擴容結束的執行緒 //當前執行緒已經完成了所負責的索引位置的元素節點,並且舊陣列中沒有其他需要轉移的節點了,當前執行緒直接退出擴容 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //當前執行緒是最後一個結束擴容的執行緒,此時就需要檢查舊陣列中是否所有的元素節點都轉移了 finishing = advance = true; //從舊陣列的尾部開始檢查 i = n; } } else if ((f = tabAt(tab, i)) == null) //如果指定索引位置上的節點為空則直接將舊陣列中該索引位置上的節點設定成一個正在轉移的節點進行佔位 //當有新的執行緒要對該索引位置的節點操作的時候則會發現該索引位置上的節點是一個正在轉移的節點 //則不會對該索引位置上的節點進行操作而是先協助擴容執行緒進行轉移 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) //指定索引位置上的節點已經被其它執行緒處理過 advance = true; else { //加鎖,防止其它執行緒對當前需要移動的節點進行操作 synchronized (f) { //校驗節點是否變更 if (tabAt(tab, i) == f) { //ln 低位節點,該節點放置到新陣列中的索引位置與在舊陣列中的索引位置相同 //hn 高位節點,該節點放置到新陣列中的索引位置是在舊陣列中的索引位置加上舊陣列的長度 Node<K, V> ln, hn; //校驗節點的hash值是否大於等於0 //如果節點的hash值大於等於0則說明該索引位置上只有一個節點或節點是一個連結串列節點 if (fh >= 0) { //使用節點的hash值與舊陣列的長度進行與運算 //與運算後的結果分為0和1,0則將該節點放置到低位,1則將節點放置高位 int runBit = fh & n; //避免後續轉移節點的時候沒有必要的迴圈以及建立節點 Node<K, V> lastRun = f; //遍歷整個連結串列來決定從那個節點開始以及後續的節點都是沒有必要遍歷和建立的 //而是直接使用指標指向舊陣列中的節點,當擴容完成之後舊陣列以及舊陣列中部分沒有被指標參照的節點則會被回收 for (Node<K, V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //沒有必要回圈和建立的節點應該放置到新陣列中的低為還是高位 if (runBit == 0) { //低位 ln = lastRun; hn = null; } else { //高位 hn = lastRun; ln = null; } //從頭節點開始遍歷將需要重新建立的節點進行建立並新增到高位或低位 //在新增到高位或低位的時候,新建立的節點使用的是頭插法 //如果有節點不需要重新建立,那不需要重新建立的節點則會放到高位或低位節點的尾部 //在上面的時候如果不需要重新建立的節點其實就已經放在了ln或hn中 //當有節點重新建立了的時候,則ln或hn設定為新建立的節點的下一個節點 //其實整體來說就是頭插法,如果說整個連結串列或部分連續的連結串列節點不需要重新建立的時候還是保持在舊陣列中一樣的順序 for (Node<K, V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) //低位 ln = new Node<K, V>(ph, pk, pv, ln); else //高位 hn = new Node<K, V>(ph, pk, pv, hn); } //將低位節點連結串列新增到新陣列中所在的索引位置與舊陣列所在的索引位置相同 setTabAt(nextTab, i, ln); //將高位節點連結串列新增到新陣列中所在的索引位置是在舊陣列中所在的索引位置加上舊陣列的長度 setTabAt(nextTab, i + n, hn); //將舊陣列中的索引位置上的節點設定為一個已經轉移的節點 setTabAt(tab, i, fwd); //繼續推進下一次節點轉移 advance = true; //節點是一個紅黑樹節點 } else if (f instanceof TreeBin) { TreeBin<K, V> t = (TreeBin<K, V>) f; //低位頭節點和尾節點 TreeNode<K, V> lo = null, loTail = null; //高位頭節點和尾節點 TreeNode<K, V> hi = null, hiTail = null; //低位節點和高位節點的數量 int lc = 0, hc = 0; //從TreeBin物件中記錄的連結串列的頭節點開始遍歷將每一個節點分為低位節點和高位節點 for (Node<K, V> e = t.first; e != null; e = e.next) { int h = e.hash; //構造新的樹節點 TreeNode<K, V> p = new TreeNode<K, V>(h, e.key, e.val, null, null); if ((h & n) == 0) { //將構造的樹節點的上一個節點指標指向原低位尾節點 //如果原低尾節點為空則說明當前樹節點是第一個節點 //那就將當前樹節點設定為低位頭節點 if ((p.prev = loTail) == null) lo = p; else //原低位尾節點不為空則將原尾節點的下一個節點指標指向當前樹節點 loTail.next = p; //將新構造的樹節點設定為低位尾節點 loTail = p; //低位節點的數量自增 ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //拆分後的高低位節點的數量是否小於等於紅黑樹轉連結串列的閾值 //如果小於等於則呼叫untreeify方法將樹節點轉換為連結串列節點 //如果高位節點數量或低位節點數量大於閾值則會校驗對方節點的數量是否不等於0 //如果說對方的節點數量等於0則說明節點並沒有拆分為高位或低位節點 //那就使用原本的TreeBin物件進行轉移 //如果對方的節點數量大於0則說明已經拆分為了高低位節點 //此時就需要將高低位節點轉換為紅黑樹並封裝成一個TreeBin物件 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K, V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K, V>(hi) : t; //將低位節點TreeBin物件新增到新陣列中所在的索引位置與舊陣列所在的索引位置相同 setTabAt(nextTab, i, ln); //將高位節點TreeBin新增到新陣列中所在的索引位置是在舊陣列中所在的索引位置加上舊陣列的長度 setTabAt(nextTab, i + n, hn); //將舊陣列中的索引位置上的節點設定為一個已經轉移的節點 setTabAt(tab, i, fwd); //繼續推進下一次節點轉移 advance = true; } } } } } }
transfer
方法中的程式碼比較多,我們就一段一段的講解。
int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null) { try { @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; }
首先會通過舊陣列的長度來計算每個cpu
在轉移舊陣列中的節點所需要負責的區域長度,每個cpu
最少需要負責16個區域的長度。 nextTab == null
則是校驗是否有執行緒已經在對新的陣列進行初始化了,如果沒有那當前執行緒則會去初始化新的陣列,新陣列的長度則是舊陣列的2倍,如果出現異常則說明舊陣列的長度已經到達了陣列最大的容量大小了,此時就不能再繼續擴容了,陣列最大的容量大小為2的30次方
,則是int
型別中正整數最大的2的次方,如果再次進行擴容的話就會導致陣列的容量大小變成負數。
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab); ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }
ForwardingNode
節點是一個佔位節點,當將舊陣列中指定索引位置上的所有節點都轉移到了新的陣列中,則會使用該節點進行佔位,告知其它執行緒該索引位置上的節點都已經被轉移到了新的陣列中。
while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
while
迴圈的作用主要是為每個執行緒分配所負責的區域以及推進轉移的進度。
--i >= bound || finishing
:--i >= bound
則是每次轉移完一個索引位置上的節點的時候都會校驗一下下一次轉移的索引位置的節點是否已經超出當前執行緒負責轉移的邊界了,而finishing
則是校驗舊陣列中所有的元素節點是否都已經完成了轉移。
(nextIndex = transferIndex) <= 0
:剩餘需要轉移的區域的長度是否小於等於0,小於等於0則說明舊陣列中的元素節點已經轉移完成了,在開始準備轉移的時候該值為舊陣列的長度,每當有一個執行緒來協助擴容的時候則會從該值中取一部分長度來負責轉移。
條件三則是根據開始轉移的長度位置與每個執行緒需要轉移的長度進行比較,如果開始轉移的長度位置大於每個執行緒需要轉移的長度,那就使用開始轉移的長度位置減去每個執行緒需要轉移的長度,獲取到當前執行緒需要負責轉移的索引位置的邊界,再將剩餘需要轉移的長度存放到transferIndex
中,等待其它執行緒協助或者說等待當前執行緒轉移完所負責的區域之後繼續轉移剩餘的長度,如果開始轉移的長度位置小於等於每個執行緒所需要轉移的長度,那就說明當前執行緒自己可以完成轉移,不需要其它執行緒協助,轉移節點的時候則是從陣列的尾部向前推進。
if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; } }
i小於0
則說明當前執行緒所負責轉移的區域節點已經轉移完成,並且陣列中沒有其它需要轉移的節點了,此時就需要通過下面的cas
操作將sizeCtl
中的擴容執行緒數量減1,執行緒數量減1之後會去校驗當前執行緒是否是最後一個擴容的執行緒,如果不是則說明當前執行緒已經完成了所負責的索引位置的元素節點轉移,並且舊陣列中沒有其他需要轉移的節點了,當前執行緒直接退出擴容,如果是最後一個擴容的執行緒,此時就需要當前執行緒執行收尾操作,需要從舊陣列中的尾部開始向頭部節點變數來檢查是否所有的元素節點都轉移了,如果還有沒被轉移的元素節點,那當前執行緒則會去進行轉移,當檢查完成之後會將新的陣列替換舊的陣列,並計算下一次擴容的閾值。
else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd);
如果指定索引位置上的節點為空則通過cas操作將舊陣列中該索引位置上的節點設定成ForwardingNode
節點進行佔位,告知其它執行緒該索引位置上的節點已經進行了轉移,其它執行緒則不會對該索引位置上的節點進行操作,而是先協助擴容執行緒進行節點轉移。
else if ((fh = f.hash) == MOVED) advance = true;
索引位置上的節點的hash
值為MOVED
,則說明該索引位置上的節點都已經轉移了,當前執行緒則繼續推進索引位置轉節點。
synchronized (f) { if (tabAt(tab, i) == f) { Node<K, V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K, V> lastRun = f; for (Node<K, V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K, V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K, V>(ph, pk, pv, ln); else hn = new Node<K, V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K, V> t = (TreeBin<K, V>) f; TreeNode<K, V> lo = null, loTail = null; TreeNode<K, V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K, V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K, V> p = new TreeNode<K, V>(h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K, V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K, V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } }
我們再看真正轉移節點的程式碼,分為轉移連結串列節點和轉移紅黑樹節點。
轉移連結串列節點:連結串列節點會被分為高位節點連結串列(hn)
和低位節點連結串列(ln)
,首先通過頭節點的hash
值與舊陣列的長度進行與運算,與運算後的結果分為0和1,0則將節點放置到低位,1將該節點放置到高位,然後遍歷整個連結串列來決定從那個節點開始以及後續的節點都是沒有必要重新建立的,而是直接使用指標指向舊陣列中的節點,lastRun
指標指向的節點以及後續的節點都是沒有必要建立的,因為你後續的連結串列節點沒有被拆分為高位或低位,是一個連續存放在高位或低位的連結串列節點,所以說不需要重新建立節點,比如說一個連結串列中的所有節點都是放在低位節點中的,那lastRun
指標指向的就是連結串列的頭節點,該連結串列中的節點並沒有改動,並不需要重新建立節點,而是直接將連結串列中的頭節點放置到新的陣列中即可,當區分了哪些節點是不需要重新建立的,則會將不需要重新建立的節點的頭節點lastRun
賦值給高位或低位,然後從連結串列的頭節點開始遍歷將需要建立的節點進行建立並新增到高位連結串列或低位連結串列中,在新增到高位連結串列或低位連結串列的時候,節點使用的是頭插法,建立完成之後,低位節點連結串列會放置到新陣列中所在的索引位置與節點在舊陣列中所在的索引位置相同,高位節點連結串列會放置到新陣列中所在的索引位置是節點在舊陣列中所在的索引位置加上舊陣列的長度,當高低位節點連結串列都轉移完成之後則會在舊陣列中該索引位置上新增到一個佔位節點。
下面的圖中ln
則是低位節點連結串列,hn
則是高位節點連結串列,在某些情況下有些節點並不需要重新建立,而是使用原來的節點,最差的情況下就是隻有連結串列節點的尾部的一個節點不需要重新建立。
紅黑樹節點:從TreeBin
物件中記錄的連結串列的頭節點開始遍歷,將每一個節點轉換為新的樹節點,並分為高低位連結串列節點,校驗高低位連結串列節點中的節點數量是否小於等於紅黑樹轉連結串列的閾值,如果小於等於則會將高低位連結串列節點中的所有樹節點都轉換為普通的Node
節點,如果不小於等於則會將高低位連結串列節點轉換為紅黑樹。 如果說高位或低位是一個連結串列節點的話,則會將連結串列的頭節點放置到新的陣列中,如果是紅黑樹的話,則會將TreeBin
物件放置到新的陣列中,然後再將舊陣列中的索引位置上新增一個佔位節點。
注意:其實在轉移舊陣列中的節點的時候是有問題的,有可能會造成節點資料的丟失
執行緒A獲取到了索引位置上的連結串列節點,頭節點型別為Node
,準備對該連結串列節點加synchronized
鎖進行轉移時,此時執行緒B先加了synchronized
鎖,對該索引位置上的連結串列節點新增了新的節點並將連結串列節點轉換為了紅黑樹,並將TreeBin
物件放置在了該索引位置上,當執行緒B釋放了鎖之後,執行緒A獲取到了鎖後去判斷之前獲取到的頭節點Node
是否與索引位置上最新的TreeBin
物件相同,顯然是不相同的,當不相同的情況下就會跳過該索引位置上的節點的轉移,在上面說過,當擴容完成時,最後一個執行緒會去檢查舊陣列中是否還有節點沒有轉移,如果有則會進行轉移,如果說在檢查轉移的時候也遇到了上面類似的問題,刪除節點的時候將TreeBin
轉換為Node
,是不是就會跳過該索引位置上的檢查,從而導致節點資料丟失。
以上就是Java並行原始碼分析ConcurrentHashMap執行緒集合的詳細內容,更多關於Java ConcurrentHashMap的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45