首頁 > 軟體

Java多執行緒ThreadPoolExecutor詳解

2022-08-12 18:01:20

前言:

根據ThreadPoolExecutor的構造方法,JDK提供了很多工廠方法來建立各種用途的執行緒池.

1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>());
}

說明:

  • 核心執行緒數 == 最大執行緒數(沒有救急執行緒被建立),因此也無需超時時間
  • 阻塞佇列是無界的,可以放任意數量的任務(最大為Integer.MAX_VALUE)

適用於 任務量一已知,相對耗時的任務

2 newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
}

說明:

  • 核心執行緒數是 0, 最大執行緒數是 Integer.MAX_VALUE,救急執行緒的空閒生存時間是 60s
    • 全部都是救急執行緒(60s 後可以回收)
    • 救急執行緒可以無限建立(最大是Integer.MAX_VALUE)
  • 佇列採用了 SynchronousQueue 實現特點是,它沒有容量,沒有執行緒來取是放不進去的(一手交錢、一手交 貨)

如下案例:

SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
     try {
         log.debug("putting {} ", 1);
         integers.put(1);
         log.debug("{} putted...", 1);
         log.debug("putting...{} ", 2);
         integers.put(2);
         log.debug("{} putted...", 2);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
},"t1").start();
sleep(1);
new Thread(() -> {
     try {
         log.debug("taking {}", 1);
         integers.take();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
},"t2").start();
sleep(1);
new Thread(() -> {
     try {
         log.debug("taking {}", 2);
         integers.take();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
},"t3").start();
/*
執行結果:
11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted... 
*/

整個執行緒池表現為執行緒數會根據任務量不斷增長,沒有上限,當任務執行完畢,空閒 1分鐘後釋放執行緒。

適用於 任務數比較密集,但每個任務執行時間較短的情況

3 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>()));
}

希望多個任務排隊執行。執行緒數固定為 1,任務數多於 1 時,會放入無界佇列排隊。任務執行完畢,這唯一的執行緒也不會被釋放.

與其他執行緒區別:

  • 自己建立一個單執行緒序列執行任務,如果任務執行失敗而終止那麼沒有任何補救措施,而執行緒池還會新建一 個執行緒,保證池的正常工作
  • Executors.newSingleThreadExecutor() 執行緒個數始終為1,不能修改
    • FinalizableDelegatedExecutorService 應用的是裝飾器模式,只對外暴露了 ExecutorService 介面,因此不能呼叫 ThreadPoolExecutor 中特有的方法.
  • Executors.newFixedThreadPool(1) 初始時為1,以後還可以修改
    • 對外暴露的是 ThreadPoolExecutor 物件,可以強轉後呼叫 setCorePoolSize 等方法進行修改

4 提交任務

// 執行任務
void execute(Runnable command);
// 提交任務 task,用返回值 Future 獲得任務執行結果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任務
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;
// 提交 tasks 中所有任務,帶超時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
 throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,帶超時時間
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;

上述都是提供的提交任務的方法,根據不同的業務場景需求,選擇對應的提交方法.

5 關閉執行緒池

shutdown

/*
執行緒池狀態變為 SHUTDOWN
- 不會接收新任務
- 但已提交任務會執行完
- 此方法不會阻塞呼叫執行緒的執行
*/
void shutdown();
public void shutdown() {
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
     checkShutdownAccess();
     // 修改執行緒池狀態
     advanceRunState(SHUTDOWN);
     // 僅會打斷空閒執行緒
     interruptIdleWorkers();
     onShutdown(); // 擴充套件點 ScheduledThreadPoolExecutor
     } finally {
     mainLock.unlock();
     }
     // 嘗試終結(沒有執行的執行緒可以立刻終結,如果還有執行的執行緒也不會等)
     tryTerminate();
}

shutdownNow

/*
執行緒池狀態變為 STOP
- 不會接收新任務
- 會將佇列中的任務返回
- 並用 interrupt 的方式中斷正在執行的任務
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
     List<Runnable> tasks;
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
     checkShutdownAccess();
     // 修改執行緒池狀態
     advanceRunState(STOP);
     // 打斷所有執行緒
     interruptWorkers();
     // 獲取佇列中剩餘任務
     tasks = drainQueue();
     } finally {
     mainLock.unlock();
     }
     // 嘗試終結
     tryTerminate();
     return tasks;
}

其他打斷方法

// 不在 RUNNING 狀態的執行緒池,此方法就返回 true
boolean isShutdown();
// 執行緒池狀態是否是 TERMINATED
boolean isTerminated();
// 呼叫 shutdown 後,由於呼叫執行緒並不會等待所有任務執行結束,因此如果它想線上程池 TERMINATED 後做些事
情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

到此這篇關於Java多執行緒ThreadPoolExecutor詳解的文章就介紹到這了,更多相關Java ThreadPoolExecutor內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com