<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
ReentrantReadWriteLock
讀寫鎖是使用AQS的集大成者,用了獨佔模式和共用模式。本文和大家一起理解下ReentrantReadWriteLock
讀寫鎖的實現原理。在這之前建議大家閱讀下下面3篇關聯文章:
通俗易懂讀寫鎖ReentrantReadWriteLock的使用
上圖是ReentrantReadWriteLock
讀寫鎖的類結構圖:
ReadWriteLock
介面,該介面提供了獲取讀鎖和寫鎖的API。ReentrantReadWriteLock
讀寫鎖內部的成員變數readLock是讀鎖,指向內部類ReadLock。ReentrantReadWriteLock
讀寫鎖內部的成員變數writeLock是寫鎖,指向內部類WriteLock。ReentrantReadWriteLock
讀寫鎖內部的成員變數sync是繼承AQS的同步器,他有兩個子類FairSync
公平同步器和NoFairSync
非公平同步器,讀寫鎖內部也有一個sync,他們使用的是同一個sync。讀寫鎖用的同一個sync同步器,那麼他們共用同一個state, 這樣不會混淆嗎?
不會,ReentrantReadWriteLock
讀寫鎖使用了AQS中state值得低16位元表示寫鎖得計數,用高16位元表示讀鎖得計數,這樣就可以使用同一個AQS同時管理讀鎖和寫鎖。
1.ReentrantReadWriteLock類重要成員變數
// 讀鎖 private final ReentrantReadWriteLock.ReadLock readerLock; // 寫鎖 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器 final Sync sync;
2.ReentrantReadWriteLock構造方法
//預設是非公平鎖,可以指定引數建立公平鎖 public ReentrantReadWriteLock(boolean fair) { // true 為公平鎖 sync = fair ? new FairSync() : new NonfairSync(); // 這兩個 lock 共用同一個 sync 範例,都是由 ReentrantReadWriteLock 的 sync 提供同步實現 readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
3.Sync類重要成員變數
// 用來移位 static final int SHARED_SHIFT = 16; // 高16位元的1 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 65535,16個1,代表寫鎖的最大重入次數 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 低16位元掩碼:0b 1111 1111 1111 1111,用來獲取寫鎖重入的次數 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 獲取讀寫鎖的讀鎖分配的總次數 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 寫鎖(獨佔)鎖的重入次數 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
設計一個加鎖場景,t1執行緒加寫鎖,t2執行緒加讀鎖,我們看下它們整個加鎖得流程。
1.t1 加寫鎖w.lock()
成功,佔了 state 的低 16 位。
0_1
,0表示高16位元的值,1表示低16位元的值。exclusiveOwnerThread
屬性指向t1執行緒。2.t2執行緒執行加讀鎖 r.lock()
,嘗試獲取鎖,發現已經被寫鎖佔據了,加鎖失敗。
3.t2執行緒被封裝成一個共用模式Node.SHARED的節點,加入到AQS的佇列中。
4.在阻塞前,t2執行緒發現自己是佇列中的老二,會嘗試再次獲取讀鎖,因為t1沒有釋放,它會失敗,然後它會把佇列的前驅節點的狀態改為-1,然後阻塞自身,也就是t2執行緒。
5.後面如過有其他執行緒如t3,t4加讀鎖或者寫鎖,由於t1執行緒沒有釋放鎖,會變成下面的狀態。
上面是整個解鎖的流程,下面深入原始碼驗證這個流程。
1.寫鎖加鎖原始碼
WriteLock類的lock()方法是加寫鎖的入口方法。
static final class NonfairSync extends Sync { // ... 省略無關程式碼 // 外部類 WriteLock 方法, 方便閱讀, 放在此處 public void lock() { sync.acquire(1); } // AQS 繼承過來的方法, 方便閱讀, 放在此處 public final void acquire(int arg) { if ( // 嘗試獲得寫鎖失敗 !tryAcquire(arg) && // 將當前執行緒關聯到一個 Node 物件上, 模式為獨佔模式 // 進入 AQS 佇列阻塞 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire(int acquires) { // 獲取當前執行緒 Thread current = Thread.currentThread(); //獲得鎖的狀態 int c = getState(); // 獲得低 16 位, 代表寫鎖的 state 計數 int w = exclusiveCount(c); // c不等於0表示加了讀鎖或者寫鎖 if (c != 0) { if ( // c != 0 and w == 0 表示有讀鎖返回錯誤,讀鎖不支援鎖升級, 或者 w == 0 || // w != 0 說明有寫鎖,寫鎖的擁有者不是自己,獲取失敗 current != getExclusiveOwnerThread() ) { // 獲得鎖失敗 return false; } // 寫鎖計數超過低 16 位最大數量, 報異常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 寫鎖重入, 獲得鎖成功,沒有並行,所以不使用 CAS setState(c + acquires); return true; } if ( // c == 0,說明沒有任何鎖,判斷寫鎖是否該阻塞,是 false 就嘗試獲取鎖,失敗返回 false writerShouldBlock() || // 嘗試更改計數失敗 !compareAndSetState(c, c + acquires) ) { // 獲得鎖失敗 return false; } // 獲得鎖成功,設定鎖的持有執行緒為當前執行緒 setExclusiveOwnerThread(current); return true; } // 非公平鎖 writerShouldBlock 總是返回 false, 無需阻塞 final boolean writerShouldBlock() { return false; } // 公平鎖會檢查 AQS 佇列中是否有前驅節點, 沒有(false)才去競爭 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } }
tryAcquire()
方法是模板方法,由子類自定義實現獲取鎖的邏輯。acquireQueued()
方法封裝成獨佔Node加入到AQS佇列中。2.讀鎖加鎖原始碼
ReadLock
類的lock()
方法是加讀鎖的入口方法,呼叫tryAcquireShared()
方法嘗試獲取讀鎖,返回負數,失敗,加入到佇列中。
// 加讀鎖的方法入口 public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { // tryAcquireShared 返回負數, 表示獲取讀鎖失敗,加入到佇列中 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()
方法是一個模板方法,AQS類中定義語意,子類實現,如果返回1,表示獲取鎖成功,還有剩餘資源,返回0表示獲取成功,沒有剩餘資源,返回-1表示失敗。
// 嘗試以共用模式獲取,返回1表示獲取鎖成功,還有剩餘資源,返回0表示獲取成功,沒有剩餘資源,返回-1,表示失敗 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // exclusiveCount(c) 代表低 16 位, 寫鎖的 state,成立說明有執行緒持有寫鎖 // 寫鎖的持有者不是當前執行緒,則獲取讀鎖失敗,【寫鎖允許降級】 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 高 16 位,代表讀鎖的 state,共用鎖分配出去的總次數 int r = sharedCount(c); // 讀鎖是否應該阻塞 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 嘗試增加讀鎖計數 // 加鎖成功 // 加鎖之前讀鎖為 0,說明當前執行緒是第一個讀鎖執行緒 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; // 第一個讀鎖執行緒是自己就發生了讀鎖重入 } else if (firstReader == current) { firstReaderHoldCount++; } else { // cachedHoldCounter 設定為當前執行緒的 holdCounter 物件,即最後一個獲取讀鎖的執行緒 HoldCounter rh = cachedHoldCounter; // 說明還沒設定 rh if (rh == null || rh.tid != getThreadId(current)) // 獲取當前執行緒的鎖重入的物件,賦值給 cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); // 還沒重入 else if (rh.count == 0) readHolds.set(rh); // 重入 + 1 rh.count++; } // 讀鎖加鎖成功 return 1; } // 邏輯到這 應該阻塞,或者 cas 加鎖失敗 // 會不斷嘗試 for (;;) 獲取讀鎖, 執行過程中無阻塞 return fullTryAcquireShared(current); } // 非公平鎖 readerShouldBlock 偏向寫鎖一些,看 AQS 阻塞佇列中第一個節點是否是寫鎖,是則阻塞,反之不阻塞 // 防止一直有讀鎖執行緒,導致寫鎖執行緒飢餓 // true 則該阻塞, false 則不阻塞 final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } // 下面是公平鎖的readerShouldBlock // 公平鎖會檢查 AQS 佇列中是否有前驅節點, 沒有(false)才去競爭 final boolean readerShouldBlock() { return hasQueuedPredecessors(); }
fullTryAcquireShared()
方法是通過自旋的方式不斷獲取讀鎖,因為由於前面的readerShouldBlock
返回false或者cas失敗,導致沒有獲取到鎖,需要不斷重試。
final int fullTryAcquireShared(Thread current) { // 當前讀鎖執行緒持有的讀鎖次數物件 HoldCounter rh = null; for (;;) { int c = getState(); // 說明有執行緒持有寫鎖 if (exclusiveCount(c) != 0) { // 寫鎖不是自己則獲取鎖失敗 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 條件成立說明當前執行緒是 firstReader,當前鎖是讀忙碌狀態,而且當前執行緒也是讀鎖重入 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { // 最後一個讀鎖的 HoldCounter rh = cachedHoldCounter; // 說明當前執行緒也不是最後一個讀鎖 if (rh == null || rh.tid != getThreadId(current)) { // 獲取當前執行緒的 HoldCounter rh = readHolds.get(); // 條件成立說明 HoldCounter 物件是上一步程式碼新建的 // 當前執行緒不是鎖重入,在 readerShouldBlock() 返回 true 時需要去排隊 if (rh.count == 0) // 防止記憶體漏失 readHolds.remove(); } } if (rh.count == 0) return -1; } } // 越界判斷 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 讀鎖加鎖,條件內的邏輯與 tryAcquireShared 相同 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
doAcquireShared()
是在獲取讀鎖失敗的時候加入AQS佇列的邏輯。
private void doAcquireShared(int arg) { // 將當前執行緒關聯到一個 Node 物件上, 模式為共用模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); // 如果前驅節點就頭節點就去嘗試獲取鎖 if (p == head) { // 再一次嘗試獲取讀鎖 int r = tryAcquireShared(arg); // r >= 0 表示獲取成功 if (r >= 0) { //【這裡會設定自己為頭節點,喚醒相連的後序的共用節點】 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 是否在獲取讀鎖失敗時阻塞 park 當前執行緒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate()
方法是在後續讀鎖被喚醒後,搶到鎖要處理的邏輯,包括修改佇列的頭結點,以及喚醒佇列中的下一個共用節點。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 設定自己為 head 節點 setHead(node); // propagate 表示有共用資源(例如共用讀鎖或號誌),為 0 就沒有資源 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 獲取下一個節點 Node s = node.next; // 如果當前是最後一個節點,或者下一個節點是【等待共用讀鎖的節點】 if (s == null || s.isShared()) // 喚醒後繼節點 doReleaseShared(); } }
由於上面t1執行緒加的寫鎖,所有其他的執行緒都被阻塞了,只有在t1執行緒解鎖以後,其他執行緒才能被喚醒,我們現在看下t1執行緒被喚醒了,會發生什麼?
1.t1執行緒執行解鎖w.unlock()
成功,修改AQS中的state。
2.t1執行緒喚醒佇列中等待的老二, 為什麼不是老大,因為老大是一個空節點,不會設定任何的執行緒。t2執行緒被喚醒後,搶鎖成功,修改state中高16位元為1。
3.t2執行緒恢復執行,設定原來的老二節點為頭節點
4.t2執行緒要做的事情還沒結束呢,因為是共用模式,它現在釋放了,就此時也喚醒佇列中的下一個共用節點。
5.t3執行緒恢復去競爭讀鎖成功,這時state的高位+1,變成2。
6.這時候t3執行緒所在的Node設定為頭節點,同時發現對列的下一個節點不是共用節點,而是獨佔節點,就不會喚醒後面的節點了。
7.之後t2執行緒和t3執行緒進入尾聲,執行r.unlock
操作,state的計數減一,直到變為0。
8.最後寫鎖執行緒t4被喚醒,去搶佔鎖成功,整個流程結束。
上面是整個解鎖的流程,下面深入原始碼驗證這個流程。
1.寫鎖釋放流程
WriteLock
類的unlock()
方法是入口方法,呼叫tryRelease()方法釋放鎖,如果成功,呼叫unparkSuccessor()
方法喚醒執行緒。
public void unlock() { // 釋放鎖 sync.release(1); } public final boolean release(int arg) { // 嘗試釋放鎖 if (tryRelease(arg)) { Node h = head; // 頭節點不為空並且不是等待狀態不是 0,喚醒後繼的非取消節點 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease()
方法是AQS提供的模板方法,返回true表示成功,false失敗,由自定義同步器實現。
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 因為可重入的原因, 寫鎖計數為 0, 才算釋放成功 boolean free = exclusiveCount(nextc) == 0; if (free) // 設定佔用執行緒為null setExclusiveOwnerThread(null); setState(nextc); return free; }
2.讀鎖釋放流程
ReadLock
類的unlock()
方法是釋放共用鎖的入口方法。
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared()
方法是由AQS提供的模板方法,由自定義同步器實現。
protected final boolean tryReleaseShared(int unused) { //自選 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; // 讀鎖的計數不會影響其它獲取讀鎖執行緒, 但會影響其它獲取寫鎖執行緒,計數為 0 才是真正釋放 if (compareAndSetState(c, nextc)) // 返回是否已經完全釋放了 return nextc == 0; } }
呼叫doReleaseShared()
方法喚醒等待的執行緒,這個方法呼叫的地方有兩處,還記得嗎,一個這是裡的解鎖,還有一個是前面加共用鎖阻塞的地方,喚醒後獲取鎖成功,也會呼叫doReleaseShared()
方法。
private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一個節點 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // SIGNAL 喚醒後繼 if (ws == Node.SIGNAL) { // 因為讀鎖共用,如果其它執行緒也在釋放讀鎖,那麼需要將 waitStatus 先改為 0 // 防止 unparkSuccessor 被多次執行 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 喚醒後繼節點 unparkSuccessor(h); } // 如果已經是 0 了,改為 -3,用來解決傳播性 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 條件不成立說明被喚醒的節點非常積極,直接將自己設定為了新的 head, // 此時喚醒它的節點(前驅)執行 h == head 不成立,所以不會跳出迴圈,會繼續喚醒新的 head 節點的後繼節點 if (h == head) break; } }
以上就是詳解Java ReentrantReadWriteLock讀寫鎖的原理與實現的詳細內容,更多關於Java ReentrantReadWriteLock讀寫鎖的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45