<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
zookeeper實現分散式鎖的原理就是多個節點同時在一個指定的節點下面建立臨時對談順序節點,誰建立的節點序號最小,誰就獲得了鎖,並且其他節點就會監聽序號比自己小的節點,一旦序號比自己小的節點被刪除了,其他節點就會得到相應的事件,然後檢視自己是否為序號最小的節點,如果是,則獲取鎖。
InterProcessMutex實現的鎖機制是公平且互斥的,公平的方式是按照每個請求的順序進行排隊的。
InterProcessMutex實現的InterProcessLock介面,InterProcessLock主要規範瞭如下幾個方法:
// 獲取互斥鎖 public void acquire() throws Exception; // 在給定的時間內獲取互斥鎖 public boolean acquire(long time, TimeUnit unit) throws Exception; // 釋放鎖處理 public void release() throws Exception; // 如果此JVM中的執行緒獲取了互斥鎖,則返回true boolean isAcquiredInThisProcess();
接下來我們看看InterProcessMutex中的實現,它究竟有哪些屬性,以及實現細節
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> { // LockInternals是真正實現操作zookeeper的類,它內部包含連線zookeeper使用者端的CuratorFramework // LockInternals的具體實現後面我會講到 private final LockInternals internals; // basePath是鎖的根結點,所有的臨時有序的節點都是basePath的子節點, private final String basePath; // private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); // LockData封裝了請求對應的執行緒(owningThread)、鎖的重入的次數(lockCount)、執行緒對應的臨時節點(lockPath) private static class LockData { final Thread owningThread; final String lockPath; // 原子性的 final AtomicInteger lockCount = new AtomicInteger(1); private LockData(Thread owningThread, String lockPath) { this.owningThread = owningThread; this.lockPath = lockPath; } } private static final String LOCK_NAME = "lock-"; // 獲取互斥鎖,阻塞【InterProcessLock的實現】 @Override public void acquire() throws Exception { // 獲取鎖,一直等待 if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); } } // 獲取互斥鎖,指定時間time【InterProcessLock的實現】 @Override public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } // 當前執行緒是否佔用鎖中【InterProcessLock的實現】 @Override public boolean isAcquiredInThisProcess() { return (threadData.size() > 0); } //如果呼叫執行緒與獲取互斥鎖的執行緒相同,則執行一次互斥鎖釋放。如果執行緒已多次呼叫acquire,當此方法返回時,互斥鎖仍將保留 【InterProcessLock的實現】 @Override public void release() throws Exception { Thread currentThread = Thread.currentThread(); //當前執行緒 LockData lockData = threadData.get(currentThread); //執行緒對應的鎖資訊 if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } // 因為獲取到的鎖是可重入的,對lockCount進行減1,lockCount=0時才是真正釋放鎖 int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { // 到這裡時lockCount=0,具體釋放鎖的操作交給LockInternals中的releaseLock方法實現 internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } // 獲取basePath根結點下的所有臨時節點的有序集合 public Collection<String> getParticipantNodes() throws Exception { return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver()); } boolean isOwnedByCurrentThread() { LockData lockData = threadData.get(Thread.currentThread()); return (lockData != null) && (lockData.lockCount.get() > 0); } protected String getLockPath() { LockData lockData = threadData.get(Thread.currentThread()); return lockData != null ? lockData.lockPath : null; } // acquire()中呼叫的internalLock()方法 private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // 如果當前執行緒已經獲取到了鎖,那麼將重入次數lockCount+1,返回true lockData.lockCount.incrementAndGet(); return true; } // attemptLock方法是獲取鎖的真正實現,lockPath是當前執行緒成功在basePath下建立的節點,若lockPath不為空代表成功獲取到鎖 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { // lockPath封裝到當前執行緒對應的鎖資訊中 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } }
接下來我們看看InterProcessMutex中使用的LockInternals類的實現細節
public class LockInternals { private final CuratorFramework client; // 連線zookeeper的使用者端 private final String path; // 等於basePath,InterProcessMutex中傳進來的 private final String basePath; // 根結點 private final LockInternalsDriver driver; // 操作zookeeper節點的driver private final String lockName; // "lock-" private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null); private final CuratorWatcher revocableWatcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) { checkRevocableWatcher(event.getPath()); } } }; // 監聽節點的監聽器,若被監聽的節點有動靜,則喚醒 notifyFromWatcher()=>notifyAll(); private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { notifyFromWatcher(); } }; private volatile int maxLeases; // 獲取basePath的子節點,排序後的 public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception { List<String> children = client.getChildren().forPath(basePath); List<String> sortedList = Lists.newArrayList(children); Collections.sort ( sortedList, new Comparator<String>() { @Override public int compare(String lhs, String rhs) { return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName)); } } ); return sortedList; } // 嘗試獲取鎖【internalLock=>attemptLock】 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { // 開始時間 final long startMillis = System.currentTimeMillis(); // 記錄等待時間 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; // 重試次數 int retryCount = 0; // 當前節點 String ourPath = null; // 是否獲取到鎖的標誌 boolean hasTheLock = false; // 是否放棄獲取到標誌 boolean isDone = false; // 不停嘗試獲取 while ( !isDone ) { isDone = true; try { // 建立當前執行緒對應的節點 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); // internalLockLoop中獲取 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // 是否可再次嘗試 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } // 獲取到鎖後,返回當前執行緒對應建立的節點路徑 if ( hasTheLock ) { return ourPath; } return null; } // 迴圈獲取【attemptLock=>internalLockLoop】 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; // 是否擁有分散式鎖 boolean doDelete = false; // 是否需要刪除當前節點 try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } // 迴圈嘗試獲取鎖 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { // 得到basePath下排序後的臨時子節點 List<String> children = getSortedChildren(); // 獲取之前建立的當前執行緒對應的子節點 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash // 判斷是否獲取到鎖,沒有就返回監聽路徑 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); // 成功獲取到 if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { // 沒有獲取到鎖,監聽前一個臨時順序節點 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // 上一個臨時順序節點如果被刪除,會喚醒當前執行緒繼續競爭鎖 client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); // 獲取鎖超時 if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { // 因為獲取鎖超時,所以刪除之前建立的臨時子節點 deleteOurPath(ourPath); } } return haveTheLock; } private void deleteOurPath(String ourPath) throws Exception { try { // 刪除 client.delete().guaranteed().forPath(ourPath); } catch ( KeeperException.NoNodeException e ) { // ignore - already deleted (possibly expired session, etc.) } } }
StandardLockInternalsDriver implements LockInternalsDriver
// 前面internalLockLoop方法中driver.getsTheLock執行的方法 @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { // 獲取子節點在臨時順序節點列表中的位置 int ourIndex = children.indexOf(sequenceNodeName); // 檢驗子節點在臨時順序節點列表中是否有效 validateOurIndex(sequenceNodeName, ourIndex); // 若當前子節點的位置<maxLeases,代表可獲取鎖【maxLeases預設=1,若ourIndex=0,代筆自己位置最小】 boolean getsTheLock = ourIndex < maxLeases; // getsTheLock=true,則不需要監聽前maxLeases的節點【maxLeases預設=1,代表監聽前面最靠近自己的節點】 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
用InterProcessMutex在自己業務實現分散式鎖,請點選此連結閱讀點我
到此這篇關於InterProcessMutex實現zookeeper分散式鎖原理的文章就介紹到這了,更多相關InterProcessMutex實現zookeeper分散式鎖內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45