首頁 > 軟體

ThreadPoolExecutor引數含義及原始碼執行流程詳解

2022-11-09 14:02:17

背景

執行緒池是為了避免執行緒頻繁的建立和銷燬帶來的效能消耗,而建立的一種池化技術,它是把已建立的執行緒放入“池”中,當有任務來臨時就可以重用已有的執行緒,無需等待建立的過程,這樣就可以有效提高程式的響應速度。但如果要說執行緒池的話一定離不開 ThreadPoolExecutor ,在阿里巴巴的《Java 開發手冊》中是這樣規定執行緒池的:

執行緒池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的讀者更加明確執行緒池的執行規則,規避資源耗盡的風險。

說明:Executors 返回的執行緒池物件的弊端如下:

FixedThreadPool 和 SingleThreadPool:允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。CachedThreadPool 和 ScheduledThreadPool:允許的建立執行緒數量為 Integer.MAX_VALUE,可能會建立大量的執行緒,從而導致 OOM。

其實當我們去看 Executors 的原始碼會發現,Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor() 和 Executors.newCachedThreadPool() 等方法的底層都是通過 ThreadPoolExecutor 實現的,所以本課時我們就重點來了解一下 ThreadPoolExecutor 的相關知識,比如它有哪些核心的引數?它是如何工作的?

典型回答

ThreadPoolExecutor 的核心引數指的是它在構建時需要傳遞的引數,其構造方法如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        // maximumPoolSize 必須大於 0,且必須大於 corePoolSize
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

第 1 個引數:corePoolSize 表示執行緒池的常駐核心執行緒數。如果設定為 0,則表示在沒有任何任務時,銷燬執行緒池;如果大於 0,即使沒有任務時也會保證執行緒池的執行緒數量等於此值。但需要注意,此值如果設定的比較小,則會頻繁的建立和銷燬執行緒(建立和銷燬的原因會在本課時的下半部分講到);如果設定的比較大,則會浪費系統資源,所以開發者需要根據自己的實際業務來調整此值。

第 2 個引數:maximumPoolSize 表示執行緒池在任務最多時,最大可以建立的執行緒數。官方規定此值必須大於 0,也必須大於等於 corePoolSize,此值只有在任務比較多,且不能存放在任務佇列時,才會用到。

第 3 個引數:keepAliveTime 表示執行緒的存活時間,當執行緒池空閒時並且超過了此時間,多餘的執行緒就會銷燬,直到執行緒池中的執行緒數量銷燬的等於 corePoolSize 為止,如果 maximumPoolSize 等於 corePoolSize,那麼執行緒池在空閒的時候也不會銷燬任何執行緒。

第 4 個引數:unit 表示存活時間的單位,它是配合 keepAliveTime 引數共同使用的。

第 5 個引數:workQueue 表示執行緒池執行的任務佇列,當執行緒池的所有執行緒都在處理任務時,如果來了新任務就會快取到此任務佇列中排隊等待執行。

第 6 個引數:threadFactory 表示執行緒的建立工廠,此引數一般用的比較少,我們通常在建立執行緒池時不指定此引數,它會使用預設的執行緒建立工廠的方法來建立執行緒,原始碼如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    // Executors.defaultThreadFactory() 為預設的執行緒建立工廠
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}
// 預設的執行緒建立工廠,需要實現 ThreadFactory 介面
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    // 建立執行緒
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon()) 
            t.setDaemon(false); // 建立一個非守護執行緒
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY); // 執行緒優先順序設定為預設值
        return t;
    }
}

我們也可以自定義一個執行緒工廠,通過實現 ThreadFactory 介面來完成,這樣就可以自定義執行緒的名稱或執行緒執行的優先順序了。

第 7 個引數:RejectedExecutionHandler 表示指定執行緒池的拒絕策略,當執行緒池的任務已經在快取佇列 workQueue 中儲存滿了之後,並且不能建立新的執行緒來執行此任務時,就會用到此拒絕策略,它屬於一種限流保護的機制。

執行緒池的工作流程要從它的執行方法 execute() 說起,原始碼如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 當前工作的執行緒數小於核心執行緒數
    if (workerCountOf(c) < corePoolSize) {
        // 建立新的執行緒執行此任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 檢查執行緒池是否處於執行狀態,如果是則把任務新增到佇列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再出檢查執行緒池是否處於執行狀態,防止在第一次校驗通過後執行緒池關閉
        // 如果是非執行狀態,則將剛加入佇列的任務移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果執行緒池的執行緒數為 0 時(當 corePoolSize 設定為 0 時會發生)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 新建執行緒執行任務
    }
    // 核心執行緒都在忙且佇列都已爆滿,嘗試新啟動一個執行緒執行失敗
    else if (!addWorker(command, false)) 
        // 執行拒絕策略
        reject(command);
}

其中 addWorker(Runnable firstTask, boolean core) 方法的引數說明如下:

  • firstTask,執行緒應首先執行的任務,如果沒有則可以設定為 null;
  • core,判斷是否可以建立執行緒的閥值(最大值),如果等於 true 則表示使用 corePoolSize 作為閥值,false 則表示使用maximumPoolSize 作為閥值。

考點分析

執行緒池任務執行的主要流程,可以參考以下流程圖:

與 ThreadPoolExecutor 相關的面試題還有以下幾個:

  • ThreadPoolExecutor 的執行方法有幾種?它們有什麼區別?
  • 什麼是執行緒的拒絕策略?
  • 拒絕策略的分類有哪些?
  • 如何自定義拒絕策略?
  • ThreadPoolExecutor 能不能實現擴充套件?如何實現擴充套件?

知識拓展

execute() VS submit()

execute() 和 submit() 都是用來執行執行緒池任務的,它們最主要的區別是,submit() 方法可以接收執行緒池執行的返回值,而 execute() 不能接收返回值。

來看兩個方法的具體使用:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L,
        TimeUnit.SECONDS, new LinkedBlockingQueue(20));
// execute 使用
executor.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Hello, execute.");
    }
});
// submit 使用
Future<String> future = executor.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        System.out.println("Hello, submit.");
        return "Success";
    }
});
System.out.println(future.get());

以上程式執行結果如下:

Hello, submit.
Hello, execute.
Success

從以上結果可以看出 submit() 方法可以配合 Futrue 來接收執行緒執行的返回值。它們的另一個區別是 execute() 方法屬於 Executor 介面的方法,而 submit() 方法則是屬於 ExecutorService 介面的方法,它們的繼承關係如下圖所示:

執行緒池的拒絕策略

當執行緒池中的任務佇列已經被存滿,再有任務新增時會先判斷當前執行緒池中的執行緒數是否大於等於執行緒池的最大值,如果是,則會觸發執行緒池的拒絕策略。

Java 自帶的拒絕策略有 4 種:

  • AbortPolicy,終止策略,執行緒池會丟擲異常並終止執行,它是預設的拒絕策略;
  • CallerRunsPolicy,把任務交給當前執行緒來執行;
  • DiscardPolicy,忽略此任務(最新的任務);
  • DiscardOldestPolicy,忽略最早的任務(最先加入佇列的任務)。

例如,我們來演示一個 AbortPolicy 的拒絕策略,程式碼如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.AbortPolicy()); // 新增 AbortPolicy 拒絕策略
for (int i = 0; i < 6; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName());
    });
}

以上程式的執行結果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lagou.interview.ThreadPoolExample$$Lambda$1/1096979270@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
 at com.lagou.interview.ThreadPoolExample.rejected(ThreadPoolExample.java:35)
 at com.lagou.interview.ThreadPoolExample.main(ThreadPoolExample.java:26)

可以看出當第 6 個任務來的時候,執行緒池則執行了AbortPolicy 拒絕策略,丟擲了異常。因為佇列最多儲存 2 個任務,最大可以建立 3 個執行緒來執行任務(2+3=5),所以當第 6 個任務來的時候,此執行緒池就“忙”不過來了。

自定義拒絕策略

自定義拒絕策略只需要新建一個 RejectedExecutionHandler 物件,然後重寫它的 rejectedExecution() 方法即可,如下程式碼所示:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new RejectedExecutionHandler() {  // 新增自定義拒絕策略
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 業務處理方法
                System.out.println("執行自定義拒絕策略");
            }
        });
for (int i = 0; i < 6; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName());
    });
}

以上程式碼執行的結果如下:

執行自定義拒絕策略
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2

可以看出執行緒池執行了自定義的拒絕策略,我們可以在 rejectedExecution 中新增自己業務處理的程式碼。

ThreadPoolExecutor 擴充套件

ThreadPoolExecutor 的擴充套件主要是通過重寫它的 beforeExecute() 和 afterExecute() 方法實現的,我們可以在擴充套件方法中新增紀錄檔或者實現資料統計,比如統計執行緒的執行時間,如下程式碼所示:

public class ThreadPoolExtend {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 執行緒池擴充套件呼叫
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2, 4, 10,
                TimeUnit.SECONDS, new LinkedBlockingQueue());
        for (int i = 0; i < 3; i++) {
            executor.execute(() -> {
                Thread.currentThread().getName();
            });
        }
    }
   /**
     * 執行緒池擴充套件
     */
    static class MyThreadPoolExecutor extends ThreadPoolExecutor {
        // 儲存執行緒執行開始時間
        private final ThreadLocal<Long> localTime = new ThreadLocal<>();
        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
        /**
         * 開始執行之前
         * @param t 執行緒
         * @param r 任務
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            Long sTime = System.nanoTime(); // 開始時間 (單位:納秒)
            localTime.set(sTime);
            System.out.println(String.format("%s | before | time=%s",
                    t.getName(), sTime));
            super.beforeExecute(t, r);
        }
        /**
         * 執行完成之後
         * @param r 任務
         * @param t 丟擲的異常
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            Long eTime = System.nanoTime(); // 結束時間 (單位:納秒)
            Long totalTime = eTime - localTime.get(); // 執行總時間
            System.out.println(String.format("%s | after | time=%s | 耗時:%s 毫秒",
                    Thread.currentThread().getName(), eTime, (totalTime / 1000000.0)));
            super.afterExecute(r, t);
        }
    }
}

以上程式的執行結果如下所示:

pool-1-thread-1 | before | time=4570298843700
pool-1-thread-2 | before | time=4570298840000
pool-1-thread-1 | after | time=4570327059500 | 耗時:28.2158 毫秒
pool-1-thread-2 | after | time=4570327138100 | 耗時:28.2981 毫秒
pool-1-thread-1 | before | time=4570328467800
pool-1-thread-1 | after | time=4570328636800 | 耗時:0.169 毫秒

小結

最後我們總結一下:執行緒池的使用必須要通過 ThreadPoolExecutor 的方式來建立,這樣才可以更加明確執行緒池的執行規則,規避資源耗盡的風險。同時,也介紹了 ThreadPoolExecutor 的七大核心引數,包括核心執行緒數和最大執行緒數之間的區別,當執行緒池的任務佇列沒有可用空間且執行緒池的執行緒數量已經達到了最大執行緒數時,則會執行拒絕策略,Java 自動的拒絕策略有 4 種,使用者也可以通過重寫 rejectedExecution() 來自定義拒絕策略,我們還可以通過重寫 beforeExecute() 和 afterExecute() 來實現 ThreadPoolExecutor 的擴充套件功能。

以上就是ThreadPoolExecutor引數含義及原始碼執行流程詳解的詳細內容,更多關於ThreadPoolExecutor執行流程的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com