首頁 > 軟體

InterProcessMutex實現zookeeper分散式鎖原理

2022-03-21 19:01:12

原理簡介:

zookeeper實現分散式鎖的原理就是多個節點同時在一個指定的節點下面建立臨時對談順序節點,誰建立的節點序號最小,誰就獲得了鎖,並且其他節點就會監聽序號比自己小的節點,一旦序號比自己小的節點被刪除了,其他節點就會得到相應的事件,然後檢視自己是否為序號最小的節點,如果是,則獲取鎖

zookeeper節點圖分析

InterProcessMutex實現的鎖機制是公平且互斥的,公平的方式是按照每個請求的順序進行排隊的。

InterProcessMutex實現的InterProcessLock介面,InterProcessLock主要規範瞭如下幾個方法:

// 獲取互斥鎖
public void acquire() throws Exception;
// 在給定的時間內獲取互斥鎖
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 釋放鎖處理
public void release() throws Exception;
// 如果此JVM中的執行緒獲取了互斥鎖,則返回true
boolean isAcquiredInThisProcess();

接下來我們看看InterProcessMutex中的實現,它究竟有哪些屬性,以及實現細節

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    // LockInternals是真正實現操作zookeeper的類,它內部包含連線zookeeper使用者端的CuratorFramework
    // LockInternals的具體實現後面我會講到
    private final LockInternals internals;
  	// basePath是鎖的根結點,所有的臨時有序的節點都是basePath的子節點,
    private final String basePath;
    // 
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
	// LockData封裝了請求對應的執行緒(owningThread)、鎖的重入的次數(lockCount)、執行緒對應的臨時節點(lockPath)
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
      	// 原子性的
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
  
    private static final String LOCK_NAME = "lock-";
    // 獲取互斥鎖,阻塞【InterProcessLock的實現】
    @Override
    public void acquire() throws Exception {
      	// 獲取鎖,一直等待
        if ( !internalLock(-1, null) ) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    // 獲取互斥鎖,指定時間time【InterProcessLock的實現】
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }
    // 當前執行緒是否佔用鎖中【InterProcessLock的實現】
    @Override
    public boolean isAcquiredInThisProcess() {
        return (threadData.size() > 0);
    }
    //如果呼叫執行緒與獲取互斥鎖的執行緒相同,則執行一次互斥鎖釋放。如果執行緒已多次呼叫acquire,當此方法返回時,互斥鎖仍將保留 【InterProcessLock的實現】
    @Override
    public void release() throws Exception {
        Thread currentThread = Thread.currentThread(); //當前執行緒
        LockData lockData = threadData.get(currentThread); //執行緒對應的鎖資訊
        if ( lockData == null ) {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
      	// 因為獲取到的鎖是可重入的,對lockCount進行減1,lockCount=0時才是真正釋放鎖
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 ) {
            return;
        }
        if ( newLockCount < 0 ) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
          	// 到這裡時lockCount=0,具體釋放鎖的操作交給LockInternals中的releaseLock方法實現
            internals.releaseLock(lockData.lockPath);
        }
        finally {
            threadData.remove(currentThread);
        }
    }
  	// 獲取basePath根結點下的所有臨時節點的有序集合
  	public Collection<String> getParticipantNodes() throws Exception {
        return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
    }
    
  	boolean isOwnedByCurrentThread() {
        LockData lockData = threadData.get(Thread.currentThread());
        return (lockData != null) && (lockData.lockCount.get() > 0);
    }
  	protected String getLockPath() {
        LockData lockData = threadData.get(Thread.currentThread());
        return lockData != null ? lockData.lockPath : null;
    }
  	// acquire()中呼叫的internalLock()方法
  	private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null ) {
            // 如果當前執行緒已經獲取到了鎖,那麼將重入次數lockCount+1,返回true
            lockData.lockCount.incrementAndGet();
            return true;
        }
      	// attemptLock方法是獲取鎖的真正實現,lockPath是當前執行緒成功在basePath下建立的節點,若lockPath不為空代表成功獲取到鎖
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null ) {
          	// lockPath封裝到當前執行緒對應的鎖資訊中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
}

接下來我們看看InterProcessMutex中使用的LockInternals類的實現細節

public class LockInternals {
    private final CuratorFramework                  client; // 連線zookeeper的使用者端
    private final String                            path;	// 等於basePath,InterProcessMutex中傳進來的
    private final String                            basePath; // 根結點
    private final LockInternalsDriver               driver; // 操作zookeeper節點的driver
    private final String                            lockName; // "lock-"
    private final AtomicReference<RevocationSpec>   revocable = new AtomicReference<RevocationSpec>(null);
  
    private final CuratorWatcher                    revocableWatcher = new CuratorWatcher() {
        @Override
        public void process(WatchedEvent event) throws Exception {
            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) {
                checkRevocableWatcher(event.getPath());
            }
        }
    };
    // 監聽節點的監聽器,若被監聽的節點有動靜,則喚醒 notifyFromWatcher()=>notifyAll();
   	private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            notifyFromWatcher();
        }
    };
   	private volatile int    maxLeases;
  	// 獲取basePath的子節點,排序後的
  	public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
    {
        List<String> children = client.getChildren().forPath(basePath);
        List<String> sortedList = Lists.newArrayList(children);
        Collections.sort
        (
            sortedList,
            new Comparator<String>()
            {
                @Override
                public int compare(String lhs, String rhs)
                {
                    return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                }
            }
        );
        return sortedList;
    }
  	// 嘗試獲取鎖【internalLock=>attemptLock】
  	String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {	// 開始時間
        final long      startMillis = System.currentTimeMillis(); 
      	// 記錄等待時間
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
      	// 重試次數
        int             retryCount = 0;
      	// 當前節點
        String          ourPath = null;
      	// 是否獲取到鎖的標誌
        boolean         hasTheLock = false;
      	// 是否放棄獲取到標誌
        boolean         isDone = false;
      	// 不停嘗試獲取
        while ( !isDone )
        {
            isDone = true;

            try
            {	// 建立當前執行緒對應的節點
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
              	// internalLockLoop中獲取
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {	// 是否可再次嘗試
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
		// 獲取到鎖後,返回當前執行緒對應建立的節點路徑
        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }
  	// 迴圈獲取【attemptLock=>internalLockLoop】
  	private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false; // 是否擁有分散式鎖
        boolean     doDelete = false;	// 是否需要刪除當前節點
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
			// 迴圈嘗試獲取鎖
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {	// 得到basePath下排序後的臨時子節點
                List<String>        children = getSortedChildren();
              	// 獲取之前建立的當前執行緒對應的子節點
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
				// 判斷是否獲取到鎖,沒有就返回監聽路徑
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
              	// 成功獲取到
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {	// 沒有獲取到鎖,監聽前一個臨時順序節點
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {                           
					// 上一個臨時順序節點如果被刪除,會喚醒當前執行緒繼續競爭鎖  
                          client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                              	// 獲取鎖超時
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
              	// 因為獲取鎖超時,所以刪除之前建立的臨時子節點
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
  
  	private void deleteOurPath(String ourPath) throws Exception {
        try
        {
          	// 刪除
            client.delete().guaranteed().forPath(ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }
  
 }

StandardLockInternalsDriver implements LockInternalsDriver

	// 前面internalLockLoop方法中driver.getsTheLock執行的方法
	@Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
    	// 獲取子節點在臨時順序節點列表中的位置
        int             ourIndex = children.indexOf(sequenceNodeName);
        // 檢驗子節點在臨時順序節點列表中是否有效
        validateOurIndex(sequenceNodeName, ourIndex);
        // 若當前子節點的位置<maxLeases,代表可獲取鎖【maxLeases預設=1,若ourIndex=0,代筆自己位置最小】
        boolean         getsTheLock = ourIndex < maxLeases;
        // getsTheLock=true,則不需要監聽前maxLeases的節點【maxLeases預設=1,代表監聽前面最靠近自己的節點】
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

用InterProcessMutex在自己業務實現分散式鎖,請點選此連結閱讀點我

到此這篇關於InterProcessMutex實現zookeeper分散式鎖原理的文章就介紹到這了,更多相關InterProcessMutex實現zookeeper分散式鎖內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com