首頁 > 軟體

Java並行包執行緒池ThreadPoolExecutor的實現

2022-04-02 13:01:48

執行緒池主要解決兩個問題:一是當執行大量非同步任務時執行緒池能夠提供較好的效能。在不使用執行緒池時,每當需要執行非同步任務時直接new一個執行緒來執行,而執行緒的建立和銷燬都是需要開銷的。執行緒池裡面的執行緒是可複用的,不需要每次執行非同步任務時都重新建立和銷燬執行緒。二是執行緒池提供了一種資源限制和管理手段,比如可以限制執行緒的個數,動態新增執行緒等。每個ThreadPoolExecutor也保留了一些基本的統計資料,比如當前執行緒池完成的任務數目等。

我們首先來看一下類圖

Excecutor是一個工具類,裡面提供了許多靜態方法,這些方法根據使用者選擇返回不同的執行緒池範例。ThreadPool繼承了AbstractExecutorService。

下面我們看一下原始碼,

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

成員變數ctl是一個Integer的原子變數,用來記錄當前執行緒池狀態和執行緒池中的執行緒個數,有點類似於ReentrantReadWriteLock使用一個變數來儲存兩種資訊。

執行緒池一共有五種狀態:

  • RUNNING:能接受新任務,並且處理阻塞佇列裡面的任務
  • SHUTDOWN:拒絕接受新任務但是處理阻塞佇列裡的任務
  • STOP:拒絕新任務並且拋棄阻塞佇列裡的任務
  • TIDYING:所有任務都執行完(包含阻塞佇列裡面的任務)後當前執行緒池活動執行緒數為0,將要呼叫terminated方法。
  • TERMINATED:終止狀態。terminated方法呼叫完成以後的狀態。

執行緒池狀態轉換如下:

  • RUNNING->SHUTDOWN:顯示呼叫shutdown方法,或者隱式呼叫finalize()方法裡的shutdown()方法。
  • RUNNINGSHUTDOWN->STOP:顯示呼叫shutdown方法
  • SHUTDOWN->TIDYING:當執行緒池和任務佇列都為空時
  • STOP->TIDYING:當執行緒池為空時
  • TIDYING->TERMINATED:Terminated() hook方法執行完畢時

執行緒池的使用

合理利用執行緒池能夠帶來三個好處:

  • 降低資源消耗。減少了建立和銷燬執行緒的次數,每個工作執行緒都可以被重複利用,可執行多個任務。
  • 提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性。可以根據系統的承受能力,調整執行緒池中工作線執行緒的數目,防止因為消耗過多的記憶體,而把伺服器累趴下(每個執行緒需要大約1MB記憶體,執行緒開的越多,消耗的記憶體也就越大,最後宕機)。

在java.util.concurrent.Executors執行緒工廠類裡面提供了一些靜態工廠,生成一些常用的執行緒池。官方建議使用Executors工程類來建立執行緒池物件。
Executors類中有個建立執行緒池的方法如下:

  • public static ExecutorService newFixedThreadPool(int nThreads):返回執行緒池物件。(建立的是有界執行緒池,也就是池中的執行緒個數可以指定最大數量)

獲取到了一個執行緒池ExecutorService 物件,那麼怎麼使用呢,在這裡定義了一個使用執行緒池物件的方法如下:

  • public Future<?> submit(Runnable task):獲取執行緒池中的某一個執行緒物件,並執行

Future介面:用來記錄執行緒任務執行完畢後產生的結果。

使用執行緒池中執行緒物件的步驟:

  • 建立執行緒池物件。
  • 建立Runnable介面子類物件。(task)
  • 提交Runnable介面子類物件。(take task)
  • 關閉執行緒池(一般不做)。

Runnable實現類程式碼:

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("我要一個教練");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("教練來了: " + Thread.currentThread().getName());
        System.out.println("教我游泳,交完後,教練回到了游泳池");
    }
}

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 建立執行緒池物件
        ExecutorService service = Executors.newFixedThreadPool(2);//包含2個執行緒物件
        // 建立Runnable範例物件
        MyRunnable r = new MyRunnable();

        //自己建立執行緒物件的方式
        // Thread t = new Thread(r);
        // t.start(); ---> 呼叫MyRunnable中的run()

        // 從執行緒池中獲取執行緒物件,然後呼叫MyRunnable中的run()
        service.submit(r);
        // 再獲取個執行緒物件,呼叫MyRunnable中的run()
        service.submit(r);
        service.submit(r);
        // 注意:submit方法呼叫結束後,程式並不終止,是因為執行緒池控制了執行緒的關閉。
        // 將使用完的執行緒又歸還到了執行緒池中
        // 關閉執行緒池
        //service.shutdown();
    }
}

Callable測試程式碼:

<T> Future<T> submit(Callable<T> task) : 獲取執行緒池中的某一個執行緒物件,並執行.
Future : 表示計算的結果.

V get()   : 獲取計算完成的結果。

public class ThreadPoolDemo2 {
    public static void main(String[] args) throws Exception {
        // 建立執行緒池物件
      ExecutorService service = Executors.newFixedThreadPool(2);//包含2個執行緒物件

        // 建立Runnable範例物件
        Callable<Double> c = new Callable<Double>() {
            @Override
            public Double call() throws Exception {
                return Math.random();
            }
        };

        // 從執行緒池中獲取執行緒物件,然後呼叫Callable中的call()
        Future<Double> f1 = service.submit(c);
        // Futur 呼叫get() 獲取運算結果
        System.out.println(f1.get());

        Future<Double> f2 = service.submit(c);
        System.out.println(f2.get());

        Future<Double> f3 = service.submit(c);
        System.out.println(f3.get());
    }
}

執行緒池的練習

public class Demo04 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        SumCallable sc = new SumCallable(100);
        Future<Integer> fu = pool.submit(sc);
        Integer integer = fu.get();
        System.out.println("結果: " + integer);
        
        SumCallable sc2 = new SumCallable(200);
        Future<Integer> fu2 = pool.submit(sc2);
        Integer integer2 = fu2.get();
        System.out.println("結果: " + integer2);

        pool.shutdown();
    }
}
public class SumCallable implements Callable<Integer> {
    private int n;

    public SumCallable(int n) {
        this.n = n;
    }

    @Override
    public Integer call() throws Exception {
        // 求1-n的和?
        int sum = 0;
        for (int i = 1; i <= n; i++) {
            sum += i;
        }
        return sum;
    }
}

到此這篇關於Java並行包執行緒池ThreadPoolExecutor的實現的文章就介紹到這了,更多相關Java並行包執行緒池ThreadPoolExecutor內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com