首頁 > 軟體

Java執行緒池 ThreadPoolExecutor 詳解

2022-07-29 14:03:19

一 為什麼要使用執行緒池

對於作業系統而言,建立一個執行緒的代價是十分昂貴的, 需要給它分配記憶體列入排程,同時線上程切換時要執行記憶體換頁清空 CPU 快取,切換回來時還要重新從記憶體中讀取資訊,破壞了資料的區域性性。因此在並行程式設計中,當執行緒建立過多時,會影響程式效能,甚至引起程式崩潰。

而執行緒池屬於池化管理模式,具有以下優點:

  • 降低資源消耗:通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的效能消耗。
  • 提高響應速度:當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性:能夠對執行緒進行統一分配、調優和監控。

二 執行緒池原理詳解

2.1 執行緒池核心組成

執行緒池包含 3 個核心部分:

  • 執行緒集合:核心執行緒和工作執行緒
  • 阻塞佇列:用於待執行任務排隊
  • 拒絕策略處理器:阻塞佇列滿後,對任務處理進行 

2.2 Execute 原理

當一個新任務提交至執行緒池之後,執行緒池的處理流程如下: 

  • 首先判斷當前執行的執行緒數量是否小於 corePoolSize。如果是,則建立一個工作執行緒來執行任務;如果都在執行任務,則進入步驟 2。
  • 判斷 BlockingQueue 是否已經滿了,若沒滿,則將任務放入 BlockingQueue;若滿了,則進入步驟 3。
  • 判斷當前執行的匯流排程數量是否小於 maximumPoolSize,如果是則建立一個新的工作執行緒來執行任務。
  • 否則交給 RejectedExecutionHandler 來處理任務。

當 ThreadPoolExecutor 建立新執行緒時,通過 CAS 來更新執行緒池的狀態 ctl。

三 執行緒池的使用

執行緒池的使用主要分為以下三個步驟: 

3.1 建立執行緒池

3.1.1 自定義執行緒池

執行緒池的真正實現類是 ThreadPoolExecutor其構造方法有如下 4 種:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

下面詳細來看建構函式需要傳入的重點引數:

  • corePoolSize (必需)執行緒池中的核心執行緒數,當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於 corePoolSize, 即使有其他空閒執行緒能夠執行新來的任務, 也會繼續建立執行緒;如果當前執行緒數為 corePoolSize,繼續提交的任務被儲存到阻塞佇列中,等待被執行;如果執行了執行緒池的 **prestartAllCoreThreads()**方法,執行緒池會提前建立並啟動所有核心執行緒。
  • workQueue (必需)用來儲存等待被執行的任務的阻塞佇列。
    • ArrayBlockingQueue: 基於陣列結構的有界阻塞佇列,按 FIFO 排序任務;
    • LinkedBlockingQueue: 基於連結串列結構的阻塞佇列,按 FIFO 排序任務,吞吐量通常要高於 ArrayBlockingQueue;
    • SynchronousQueue: 一個不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue;
    • PriorityBlockingQueue: 具有優先順序的無界阻塞佇列;
  • maximumPoolSize (必需)執行緒池中能容納的最大執行緒數。如果當前阻塞佇列滿了,且繼續提交任務,則建立新的執行緒執行任務,前提是當前執行緒數小於 maximumPoolSize;當阻塞佇列是無界佇列, 則 maximumPoolSize 則不起作用, 因為無法提交至核心執行緒池的執行緒會一直持續地放入 workQueue。
  • keepAliveTime (必需)執行緒閒置超時時長。如果超過該時長,非核心執行緒就會被回收。如果將 allowCoreThreadTimeout 設定為 true 時,核心執行緒也會超時回收。
  • unit (必需)keepAliveTime 的單位,常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)
  • threadFactory (可選)建立執行緒的工廠,通過自定義的執行緒工廠可以給每個新建的執行緒設定一個具有識別度的執行緒名。預設為 DefaultThreadFactory
  • handler (可選)執行緒池的飽和策略,當阻塞佇列滿了,且沒有空閒的工作執行緒,如果繼續提交任務,必須採取一種策略處理該任務,執行緒池提供了 4 種策略:
    • AbortPolicy: 直接丟擲異常,預設策略;
    • CallerRunsPolicy: 用呼叫者所在的執行緒來執行任務;
    • DiscardOldestPolicy: 丟棄阻塞佇列中靠最前的任務,並執行當前任務;
    • DiscardPolicy: 直接丟棄任務; 也可以根據應用場景實現 RejectedExecutionHandler 介面,自定義飽和策略,如記錄紀錄檔或持久化儲存不能處理的任務。

3.1.2 功能執行緒池

除了呼叫 ThreadPoolExecutor 自定義執行緒池的方式,其實 Executors 也已經為我們封裝好了 4 種常見的功能執行緒池,如下:

  • 定長執行緒池(FixedThreadPool)
  • 定時執行緒池(ScheduledThreadPool)
  • 可快取執行緒池(CachedThreadPool)
  • 單執行緒化執行緒池(SingleThreadExecutor)

定長執行緒池(FixedThreadPool)

  • 特點:只有核心執行緒,執行緒數量固定,執行完立即回收,任務佇列為連結串列結構的有界佇列。
  • 應用場景:控制執行緒最大並行數。
  • 使用範例:
// 1. 建立定長執行緒池物件 & 設定執行緒池執行緒數量固定為 3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 2. 建立好 Runnable 類執行緒物件 & 需執行的任務
Runnable task = new Runnable(){
  public void run() {
     ...//待執行的耗時任務
  }
};
// 3. 向執行緒池提交任務
fixedThreadPool.execute(task);

定時執行緒池(ScheduledThreadPool)

  • 特點:核心執行緒數量固定,非核心執行緒數量無限,執行完閒置 10ms 後回收,任務佇列為延時阻塞佇列。
  • 應用場景:執行定時或週期性的任務。
  • 使用範例:
// 1. 建立定時執行緒池物件 & 設定執行緒池執行緒數量固定為 5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. 建立好 Runnable 類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
  public void run() {
     ...//待執行的耗時任務
  }
};
// 3. 向執行緒池提交任務
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延遲 1s 後執行任務

可快取執行緒池(CachedThreadPool)

  • 特點:無核心執行緒,非核心執行緒數量無限,執行完閒置 60s 後回收,任務佇列為不儲存元素的阻塞佇列。
  • 應用場景:執行大量、耗時少的任務。
  • 使用範例:
// 1. 建立可快取執行緒池物件
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 建立好 Runnable 類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
  public void run() {
     ...//待執行的耗時任務
  }
};
// 3. 向執行緒池提交任務
cachedThreadPool.execute(task);

單執行緒化執行緒池(SingleThreadExecutor)

  • 特點:只有 1 個核心執行緒,無非核心執行緒,執行完立即回收,任務佇列為連結串列結構的有界佇列。
  • 應用場景:不適合並行但可能引起 IO 阻塞性及影響 UI 執行緒響應的操作,如資料庫操作、檔案操作等。
  • 使用範例:
// 1. 建立單執行緒化執行緒池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 建立好 Runnable 類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
  public void run() {
     ...//待執行的耗時任務
  }
};
// 3. 向執行緒池提交任務
singleThreadExecutor.execute(task);

3.1.3 功能執行緒池存在的問題

目前已不推薦使用功能執行緒池,而是通過自定義 ThreadPoolExecutor 的方式。因為直接使用功能執行緒池具有資源耗盡的風險。

  • newFixedThreadPool 和 newSingleThreadExecutor: 主要問題是堆積的請求處理佇列均採用 LinkedBlockingQueue,可能會耗費非常大的記憶體,甚至 OOM。
  • newCachedThreadPool 和 newScheduledThreadPool: 主要問題是執行緒數最大數是 Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至 OOM。

3.2 向執行緒池提交任務

向執行緒池提交任務的流程非常簡單,只需要向執行緒池的 execute 方法傳入 Runnable 物件即可。

// 向執行緒池提交任務
threadPool.execute(new Runnable() {
    @Override
    public void run() {
        ... //待執行的任務
    }
});

3.3 關閉執行緒池

當執行緒池不再使用時,需要手動關閉以釋放資源。執行緒池關閉的原理是:遍歷執行緒池中的所有執行緒,然後逐個呼叫執行緒的 interrupt 方法來中斷執行緒。一般通過呼叫以下兩個方法:

  • shutdown():將執行緒池裡的執行緒狀態設定成 SHUTDOWN 狀態, 然後中斷所有沒有正在執行任務的執行緒。
  • shutdownNow():將執行緒池裡的執行緒狀態設定成 STOP 狀態, 然後停止所有正在執行或暫停任務的執行緒. 只要呼叫這兩個關閉方法中的任意一個, isShutDown() 返回 true. 當所有任務都成功關閉了, isTerminated()返回 true。

3.4 自定義執行緒池需要考慮因素

使用 ThreadPoolExecutor 自定義執行緒池時,需要從任務的優先順序,任務的執行時間長短,任務的性質(CPU 密集/ IO 密集),任務的依賴關係這四個角度來分析。並且近可能地使用有界的工作佇列。

性質不同的任務可用使用不同規模的執行緒池分開處理:

  • CPU 密集型: 儘可能少的執行緒,核心執行緒數 = CPU 核心數 + 1
  • IO 密集型: 儘可能多的執行緒, 核心執行緒數 = CPU 核心數 * 2
  • 混合型: CPU 密集型的任務與 IO 密集型任務的執行時間差別較小,拆分為兩個執行緒池;否則沒有必要拆分。

到此這篇關於Java執行緒池 ThreadPoolExecutor 詳解的文章就介紹到這了,更多相關JavaThreadPoolExecutor內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com