首頁 > 軟體

Java並行程式設計迴環屏障CyclicBarrier

2022-04-15 19:00:33

CyclicBarrier

前面介紹的CountDownLatch在解決多個執行緒同步方面相對於呼叫執行緒的join方法已經有了不少優化。但是CountDownLatch的計數器是一次性的,也就是等到計數器值變為0後,再呼叫CountDownLatch的await和countdown方法都會立刻返回,這就起不到執行緒同步的效果了。所以為了滿足計數器可以重置的需要,JDK開發組提供了CyclicBarrier類,並且CyclicBarrier類的功能並不限於CountDownLatch的功能。從字面意思理解 CyclicBarrier 是迴環屏障的意思,它可以讓一組執行緒全部達到一個狀態後再全部同時執行。這裡之所以叫作迴環是因為當所有等待執行緒執行完畢,並重置CyclicBarrier 的狀態後它可以被重用。之所以叫作屏障是因為執行緒呼叫await方法後就會被阻塞,這個阻塞點就稱為屏障點,等所有執行緒都呼叫了 await方法後,執行緒們就會衝破屏障,繼續向下執行。在介紹原理前先介紹幾個範例以便加深理解。在下面的例子中,我們要實現的是,使用兩個執行緒去執行一個被分解的任務 A,當兩個執行緒把自己的任務都執行完畢後再對它們的結果進行彙總處理。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CycleBarrierTest {

   
   //建立一個執行緒數固定為2的執行緒池
   private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
       @Override
       public void run() {
           System.out.println(Thread.currentThread() + " task1 merge result");
       }
   });

   public static void main(String[] args) throws InterruptedException{
       ExecutorService executorService = Executors.newFixedThreadPool(2);

       
       //新增執行緒A到執行緒池
       executorService.submit(new Runnable() {
           @Override
           public void run() {
               try {
                   System.out.println(Thread.currentThread() + "task1");
                   System.out.println(Thread.currentThread() + "enter in  barrier");
                   cyclicBarrier.await();
                   System.out.println(Thread.currentThread() + "enter out  barrier");
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       });

       //新增執行緒B到執行緒池
       executorService.submit(new Runnable() {
           @Override
           public void run() {
               try {
                   System.out.println(Thread.currentThread() + "task2");
                   System.out.println(Thread.currentThread() + "enter in  barrier");
                   cyclicBarrier.await();
                   System.out.println(Thread.currentThread() + "enter out  barrier");
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       });

       //關閉執行緒池
       executorService.shutdown();

   }
}

如上程式碼建立了一個CyclicBarrier物件,其第一個引數為計數器初始值,第二個數Runable是當計數值為0時需要執行的任務。在main函數裡面首先建立了一個大小為2的執行緒池。然後新增兩個子任務到執行緒池,每個子任務在執行完自己的邏輯後會呼叫方法。一開始計數器值為2,當第一個執行緒呼叫await方法時,計數器值會遞減為1,由於此時計數器值不為0,所以當前執行緒就到了屏障點而被阻塞。然後第二個執行緒呼叫await時,會進入屏障,計數器值也會遞減,現在計數器值為0,這時就會去執行 CyclicBarrier建構函式中的任務,執行完畢後退出屏障點,並且喚醒被阻塞的第二個執行緒。這時候第一個執行緒也會退出屏障點繼續向下執行。

上面的例子說明了多個執行緒之間是相互等待的,假如計數器值為N,那麼隨後呼叫 await 方法的N1個執行緒都會因為到達屏障點而被阻塞,當第N個執行緒呼叫await後,計數器值為0了,這時候第N個執行緒才會發出通知喚醒前面的N1個執行緒。也就是當全部執行緒都到達屏障點時才能一塊繼續向下執行。對於這個例子來說,使用CountDownLatch也可以得到類似的輸出結果。下面再舉個例子來說明CyclicBarrier的可複用性。

假設一個任務由階段1、階段2和階段3組成,每個執行緒要序列地執行階段1、階段2和階段3,當多個執行緒執行該任務時,必須要保證所有執行緒的階段1全部完成後才能進入階段2執行,當所有執行緒的階段2全部完成後才能進入階段3執行。下面使用 CyclicBarrier 來完成這個需求。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CycleBarrierTest1 {

    //建立一個執行緒數固定為2的執行緒池
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException{
        ExecutorService executorService = Executors.newFixedThreadPool(2);


        //新增執行緒A到執行緒池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "step1");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step2");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step3");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        //新增執行緒B到執行緒池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "step1");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step2");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step3");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        //關閉執行緒池
        executorService.shutdown();

    }
}

如上程式碼中,每個子執行緒在執行完階段1後都呼叫了await方法,等到所有執行緒都到達屏障點後才會一塊往下執行,這就保證了所有執行緒都完成了階段1後才會開始執行階段2。

到此這篇關於Java並行程式設計迴環屏障CyclicBarrier的文章就介紹到這了,更多相關Java 迴環屏障CyclicBarrier內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com