首頁 > 軟體

AQS同步元件Semaphore號誌案例剖析

2022-08-07 22:01:07

基本概念

Semaphore也是一個執行緒同步的輔助類,可以維護當前存取自身的執行緒個數,並提供了同步機制。使用Semaphore可以控制並行存取資源的執行緒個數。

例如排隊買票的情況,如果只有三個視窗,那麼同一時間最多也只能有三個人買票。第四個人來了之後就必須在後面等著,只有其他人買好了,才可以去相應的視窗進行買票 。

作用和使用場景

  • 用於保證同一時間並行存取執行緒的數目。
  • 號誌在作業系統中是很重要的概念,Java並行庫裡的Semaphore就可以很輕鬆的完成類似作業系統號誌的控制。Semaphore可以很容易控制系統中某個資源被同時存取的執行緒個數。
  • 在資料結構中我們學過連結串列,連結串列正常是可以儲存無限個節點的,而Semaphore可以實現有限大小的列表。

使用場景:僅能提供有限存取的資源。比如資料庫連線。

原始碼分析

建構函式

/**
*接受一個整型的數位,表示可用的許可證數量。Semaphore(10)表*示允許10個執行緒獲取許可證,
*也就是最大並行數是10。 
*
* @param permits 可用許可證的初始數量。
**/
public Semaphore(int permits) {
        sync = new NonfairSync(permits);
}
/**
 * 使用給定的許可數量和給定的公平性設定
 *
 * @param permits 可用許可證的初始數量。
 *
 * @param fair 指定是公平模式還是非公平模式,預設非公平模式 . 公平模式:先啟動的執行緒優先得到
 * 許可。 非公平模式:先啟動的執行緒並不一定先獲得許可,誰搶到誰就獲得許可。
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

常用方法

acquire() 獲取一個許可

acquire(int permits) 獲取指定個數的許可

tryAcquire()方法嘗試獲取1個許可證

tryAcquire(long timeout, TimeUnit unit) 最大等待許可的時間

tryAcquire(int permits) 獲取指定個數的許可

tryAcquire(int permits, long timeout, TimeUnit unit) 最大等待許可的時間

availablePermits() : 返回此號誌中當前可用的許可證數

release() 釋放許可

release(int permits) 釋放指定個數的許可

int getQueueLength() 返回正在等待獲取許可證的執行緒數。

boolean hasQueuedThreads() 是否有執行緒正在等待獲取許可證。

void reducePermits(int reduction) 減少reduction個許可證。是個protected方法。

Collection getQueuedThreads() 返回所有等待獲取許可證的執行緒集合。是個protected方法。

使用案例

acquire()獲取單個許可

/**
     * 執行緒數量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //獲取一個許可
                    semaphore.acquire();
                    test(threadNum);
                    //釋放一個許可
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模擬請求的耗時操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

輸出結果:

根據輸出結果的時間可以看出來同一時間最多隻能3個執行緒執行,符合預期

acquire(int permits)獲取多個許可

/**
     * 執行緒數量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        //號誌設定為3,也就是最大並行量為3,同時只允許3個執行緒獲得許可
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //獲取多個許可
                    semaphore.acquire(3);
                    test(threadNum);
                    //釋放多個許可
                    semaphore.release(3);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模擬請求的耗時操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

輸出結果:

設定了3個許可,每個執行緒每次獲取3個許可,因此同一時間只能有1個執行緒執行 。

tryAcquire()獲取許可

tryAcquire()嘗試獲取一個許可,如果未獲取到,不等待,將直接丟棄該執行緒不執行

/**
     * 執行緒數量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        //號誌設定為3,也就是最大並行量為3,同時只允許3個執行緒獲得許可
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //嘗試獲取一個許可,如果未獲取到,不等待,將直接丟棄該執行緒不執行
                    if(semaphore.tryAcquire()) {
                        test(threadNum);
                        //釋放許可
                        semaphore.release();
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模擬請求的耗時操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

輸出結果:

從輸出可以看到,在3個執行緒獲取到3個許可後,因為每個執行緒呼叫的方法要執行1秒中,最早的一個許可也要在1S後釋放,剩下的17個執行緒未獲取到許可,使用了semaphore.tryAcquire()方法,沒有設定等待時間,所以便直接被丟棄,不執行了。

tryAcquire(long timeout, TimeUnit unit)

tryAcquire(long timeout, TimeUnit unit)未獲取到許可,設定等待時長

/**
     * 執行緒數量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        //號誌設定為3,也就是最大並行量為3,同時只允許3個執行緒獲得許可
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //設定了獲取許可等待時間為2秒,如果兩秒後還是未獲得許可的執行緒便得不到執行
                    if(semaphore.tryAcquire(2000, TimeUnit.MILLISECONDS)) {
                        test(threadNum);
                        //釋放許可
                        semaphore.release();
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模擬請求的耗時操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

輸出結果:

tryAcquire通過引數指定了2秒的等待時間。 上述程式碼中同一時間最多執行3個。第4個執行緒因前3個執行緒執行需要耗時一秒未釋放許可,因此需要等待。

但是由於設定了2秒的等待時間,所以在5秒內等待到了釋放的許可,繼續執行,迴圈往復。

但是15個執行緒 ,每秒並行3個,2S是執行不完的。所以上面執行到第6個(0開始,顯示是5)就結束了,【每次執行結果會有差異,取決於CPU】,並沒有全部執行完15個執行緒。

以上就是AQS同步元件Semaphore號誌案例剖析的詳細內容,更多關於AQS同步元件Semaphore的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com