首頁 > 軟體

簡單聊一聊Java執行緒池ThreadPoolExecutor

2022-06-11 14:02:42

簡介

ThreadPoolExecutor是一個實現ExecutorService介面的執行緒池,ExecutorService是主要用來處理多執行緒任務的一個介面,通常比較簡單是用法是由Executors工廠類去建立。

執行緒池主要解決了兩個不同的問題:

  • 在執行大量非同步任務時,為了能夠提高效能,通常會減少每個任務的呼叫開銷。
  • 提供了一系列多執行緒任務的管理方法,便於多工執行時合理分配資源以及一些異常情況的處理。每個ThreadPoolExecutor還維護一些基本統計資訊。例如:已完成任務的數量,當前獲得執行緒數等。

引數說明

ThreadPoolExecutor提供了幾個核心引數,方便開發人員根據具體場景合理分配執行緒資源。

  • corePoolSize:核心執行緒數,線上程池建立時就已初始化好的n個核心執行緒,即使執行緒空閒著也會一直保留線上程池中不被銷燬,除非呼叫執行緒池方法設定了java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(true)(允許核心執行緒超時銷燬)。
  • maximumPoolSize:執行緒池允許存在最大執行緒數。
  • keepAliveTime:當執行緒數大於核心執行緒數時,多餘的執行緒在執行任務結束後等待新任務的最大等待時間。
  • unitTimeUnit型別,是keepAliveTime多餘執行緒最大空餘時間單位。
  • workQueue:必須指定一個阻塞佇列,線上程池執行execute方法時新進來的任務在執行前都會保留到此佇列裡進入等待。
  • threadFactory:建立執行緒的工廠,預設採用Executors.defaultThreadFactory()建立執行緒。
  • handler:拒絕策略,當最大執行緒數已佔滿,且佇列已滿,此時執行緒池將觸發拒絕策略,對新進來的任務做拒絕處理,具體的處理方案在後面詳細分析(預設使用java.util.concurrent.ThreadPoolExecutor.AbortPolicy直接丟擲異常拒絕處理)。

注:maximumPoolSize如果大於corePoolSize,則多出的部分執行緒數只有在阻塞佇列workQueue佔滿時才會建立核心執行緒之外的執行緒去執行任務,如果我們設定的阻塞佇列為無界佇列(預設大小為Integer.MAX_VALUE),則佇列永遠無法佔滿,就不會去建立額外的執行緒進行工作,一般情況如果任務數足夠,那麼也是在佇列大小還沒達到Integer.MAX_VALUE時就已經出現記憶體溢位了。Executors執行緒池工廠中的newFixedThreadPool()、newSingleThreadExecutor()方法就是使用了無界佇列LinkedBlockingQueue,防止記憶體溢位在日常開發過程中一般是不建議直接去使用Executors去建立執行緒池。

如何建立執行緒池

上面我們提到的可以使用Executors工廠直接建立執行緒池,但是Executors提供的建立執行緒池都是不可控的,我們還是得按自己的業務做好分析自定義一個執行緒池。

以下是執行緒池建立的一個案例:

@Slf4j
@Configuration
public class ThreadPoolConfig {

    @Value("${threadPool.corePoolSize:8}")
    private int corePoolSize;

    @Value("${threadPool.maximumPoolSize:16}")
    private int maximumPoolSize;

    @Value("${threadPool.keepAliveTime:60}")
    private int keepAliveTime;

    @Value("${threadPool.queueSize:99999}")
    private int queueSize;

    @Bean
    public ThreadPoolExecutor testExecutor() {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(queueSize);
        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, 
                TimeUnit.SECONDS, queue, getThreadFactory(), getRejectedExecutionHandler());
    }

    /**
     * 自定義執行緒池建立執行緒工廠,用於執行緒池建立執行緒的工廠
     * @return
     */
    private ThreadFactory getThreadFactory() {
        return new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                log.info("===> Create new thread ...");
                return new Thread(r);
            }
        };
    }

    /**
     * 自定義拒絕策略,繼續往佇列裡新增任務進入等待
     * @return
     */
    private RejectedExecutionHandler getRejectedExecutionHandler() {
        return new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 繼續往佇列裡新增任務,這裡只是一個案例,這種方式並不友好,會丟擲佇列已滿的異常
                log.info("===> Handler runnable ......");
                executor.getQueue().add(r);
            }
        };
    }
}

application.properties組態檔

threadPool:
  corePoolSize: 8
  maximumPoolSize: 16
  keepAliveTime: 60
  # 為方便測試這裡我們設定佇列數小一點
  queueSize: 99

由以上的執行緒池設定,我們寫一個demo測試一下:

擷取部分執行紀錄檔:

  • 紅框我們可以看到執行到了執行緒建立工廠部分程式碼塊
  • 藍色框紀錄檔我們可以看到largestPoolSize=11,這是由於我們設定的maximumPoolSize=16 > corePoolSize=8,我們demo執行的是110個任務並行,佇列大小是99,由此分析得出(需要額外出建立執行緒數 = 並行任務總數110 - 核心執行緒數8 - 佇列大小99 = 3),所以執行緒池在佇列已滿時會多建立3個執行緒用於執行任務,在達到keepAliveTime設定的最大空閒時間後這3個執行緒即會自動銷燬。

注:可能有的同學會想執行緒池使用後需要銷燬嗎?在這裡補充一下,如果我們是作為區域性變數建立出來的執行緒池(如:在執行的方法內使用Executors.newFixedThreadPool(10)建立臨時的執行緒池),這種情況我們用完就必須將它立即銷燬,否則主執行緒就會一直處於執行狀態。如果是全域性設定的執行緒池,那麼就是為整個系統中諸多業務提供使用的,這種就不需要對執行緒池做銷燬,因為一旦銷燬了其他的任務就無法繼續使用該執行緒池執行任務。

  • 銷燬執行緒池主要有兩種方式:
    • shutdown():此方法對執行緒池做銷燬,執行緒池會優先將剩餘未完成的任務執行完才會執行銷燬任務。
    • shutdownNow():此方法會對執行緒池做立即銷燬,無論執行緒池中的任務是否執行完成。

拒絕策略

通常我們在設定好有限佇列大小後,就會有可能出現佇列佔滿的情況,這時候我們的拒絕策略就會起到作用,接下來我們就來分析一下RejectedExecutionHandler介面具體有哪一些實現方式:

  • AbortPolicy:執行緒池的預設拒絕策略,在JDK提供的ThreadPoolExecutor執行緒池中有一個預設執行緒池變數private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();作為預設拒絕策略,檢視如下圖原始碼可知它就是直接丟擲RejectedExecutionException異常,並且會直接丟棄當前任務,如果擔心異常影響後續任務執行開發人員需自行捕獲例外處理

  • CallerRunsPolicy:只要在當執行緒池未被銷燬的情況下,不丟棄任務直接使用主執行緒(呼叫執行緒池執行的執行緒)執行該任務。因為該策略是由主執行緒直接執行任務的,所以不建議在並行度高的情況下使用,建議在並行度較低且任務不允許失敗的情況下才使用此策略

  • DiscardPolicy:直接丟棄當前任務,不做任何處理。直接丟棄任務的情況下,開發人員也無法排查到哪些任務被丟棄掉,一般不建議使用,除非是無關緊要的任務即使丟棄也無所謂的。

  • DiscardOldestPolicy:線上程池未被銷燬的情況下,丟棄最早進入佇列的一個任務(即最久未執行的任務),然後再重新將此任務加入執行緒池,在此策略下需注意被丟棄的任務的重要性,如果任務不重要可直接丟棄。

  • 自定義策略:在以上JDK提供的四種預設拒絕策略之外,我們還可以通過自定義的方式來處理被拒絕的任務。如果擔心任務被拒絕或者被丟棄造成不可預估的問題,在時效性沒有太大要求的情況下我們可以先將任務內容轉換成資料入庫做好紀錄檔記錄,後續可以使用定時任務或者通過MQ訊息延遲處理。由以上的執行緒池設定Demo中的拒絕策略改造虛擬碼如下:
private RejectedExecutionHandler getRejectedExecutionHandler() {
    return new RejectedExecutionHandler() {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 虛擬碼
            log.info("===> 可根據任務的重要性區分對待,將任務做轉換入庫延遲處理 ......");
        }
    };
}

總結

執行緒池是為了充分利用CPU資源,在合理分批使用的情況下能夠極大的提高我們程式的效能,以上的引數設定僅作為參考,並沒有一個標準的依據,只能在實際開發過程中開發人員自行多做一些測試來判斷引數如何設定更加合理。

在拒絕策略設定方面,如果被拒絕的任務相對緊急且重要不可丟棄的情況下,此類任務可獨立做一個執行緒池處理保證任務不丟失,程式只能慢慢優化變得越來越好,不可能有完美的程式即保證高效能又保證安全可靠。

到此這篇關於Java執行緒池ThreadPoolExecutor的文章就介紹到這了,更多相關Java執行緒池ThreadPoolExecutor內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com