首頁 > 軟體

詳解Java ScheduledThreadPoolExecutor的踩坑與解決方法

2022-10-22 14:00:14

概述

最近專案上反饋某個重要的定時任務突然不執行了,很頭疼,開發環境和測試環境都沒有出現過這個問題。定時任務採用的是ScheduledThreadPoolExecutor,後來一看程式碼發現踩了一個大坑....

還原"大坑"

這個坑就是如果ScheduledThreadPoolExecutor中執行的任務出錯丟擲異常後,不僅不會列印異常堆疊資訊,同時還會取消後面的排程, 直接看例子。

@Test
public void testException() throws InterruptedException {
    // 建立1個執行緒的排程任務執行緒池
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    // 建立一個任務
    Runnable runnable = new Runnable() {

        volatile int num = 0;

        @Override
        public void run() {
            num ++;
            // 模擬執行報錯
            if(num > 5) {
                throw new RuntimeException("執行錯誤");
            }
            log.info("exec num: [{}].....", num);
        }
    };

    // 每隔1秒鐘執行一次任務
    scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
    Thread.sleep(10000);
}

執行結果:

  • 只執行了5次後,就不列印,不執行了,因為報錯了
  • 任務報錯,也沒有列印一次堆疊,更導致排程任務取消,後果十分嚴重。

解決方案

解決方法也非常簡單,只要通過try catch捕獲異常即可。

執行結果:

看到不僅列印了異常堆疊,而且也會進行週期性的排程。

更推薦的做法

更好的建議可以在自己的專案中封裝一個包裝類,要求所有的排程都提交通過我們統一的包裝類, 如下程式碼:

@Slf4j
public class RunnableWrapper implements Runnable {
    // 實際要執行的執行緒任務
    private Runnable task;
    // 執行緒任務被建立出來的時間
    private long createTime;
    // 執行緒任務被執行緒池執行的開始時間
    private long startTime;
    // 執行緒任務被執行緒池執行的結束時間
    private long endTime;
    // 執行緒資訊
    private String taskInfo;

    private boolean showWaitLog;

    /**
     * 執行間隔時間多久,列印紀錄檔
     */
    private long durMs = 1000L;

    // 當這個任務被建立出來的時候,就會設定他的建立時間
    // 但是接下來有可能這個任務提交到執行緒池後,會進入執行緒池的佇列排隊
    public RunnableWrapper(Runnable task, String taskInfo) {
        this.task = task;
        this.taskInfo = taskInfo;
        this.createTime = System.currentTimeMillis();
    }

    public void setShowWaitLog(boolean showWaitLog) {
        this.showWaitLog = showWaitLog;
    }

    public void setDurMs(long durMs) {
        this.durMs = durMs;
    }

    // 當任務線上程池排隊的時候,這個run方法是不會被執行的
    // 但是當任務結束了排隊,得到執行緒池執行機會的時候,這個方法會被呼叫
    // 此時就可以設定執行緒任務的開始執行時間
    @Override
    public void run() {
        this.startTime = System.currentTimeMillis();

        // 此處可以通過呼叫監控系統的API,實現監控指標上報
        // 用執行緒任務的startTime-createTime,其實就是任務排隊時間
        // 這邊列印紀錄檔輸出,也可以輸出到監控系統中
        if(showWaitLog) {
            log.info("任務資訊: [{}], 任務排隊時間: [{}]ms", taskInfo, startTime - createTime);
        }

        // 接著可以呼叫包裝的實際任務的run方法
        try {
            task.run();
        } catch (Exception e) {
            log.error("run task error", e);
            throw e;
        }

        // 任務執行完畢以後,會設定任務執行結束的時間
        this.endTime = System.currentTimeMillis();

        // 此處可以通過呼叫監控系統的API,實現監控指標上報
        // 用執行緒任務的endTime - startTime,其實就是任務執行時間
        // 這邊列印任務執行時間,也可以輸出到監控系統中
        if(endTime - startTime > durMs) {
            log.info("任務資訊: [{}], 任務執行時間: [{}]ms", taskInfo, endTime - startTime);
        }

    }
}

使用:

我們還可以在包裝類裡面封裝各種監控行為,如本例列印紀錄檔執行時間等。

原理探究

那大家有沒有想過為什麼任務出錯會導致異常無法列印,甚至排程都取消了呢?讓我們從原始碼出發,一探究竟。

1.下面是排程任務的入口方法。

// ScheduledThreadPoolExecutor#scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 將執行任務和引數包裝成ScheduledFutureTask物件
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 延遲執行
    delayedExecute(t);
    return t;
}

這個方法主要做了兩個事情:

  • 將執行任務和引數包裝成ScheduledFutureTask物件
  • 呼叫delayedExecute方法延遲執行任務

2.延遲或週期性任務的主要執行方法, 主要是將任務丟到佇列中,後續由工作執行緒獲取執行。

// ScheduledThreadPoolExecutor#delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            // 將任務丟到阻塞佇列中
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 開啟工作執行緒,去執行任務,或者從佇列中獲取任務執行
                ensurePrestart();
        }
    }

3.現在任務已經在佇列中了,我們看下任務執行的內容是什麼,還記得前面的包裝物件ScheduledFutureTask類,它的實現類是ScheduledFutureTask,繼承了Runnable類。

// ScheduledFutureTask#run方法
public void run() {
    // 是不是週期性任務
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 不是週期性任務的話, 直接呼叫一次下面的run    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是週期性任務,則呼叫runAndReset方法,如果返回true,繼續執行
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 設定下次排程時間
        setNextRunTime();
        // 重新執行排程任務
        reExecutePeriodic(outerTask);
    }
}

這裡的關鍵就是看ScheduledFutureTask.super.runAndReset()方法是否返回true,如果是true的話繼續排程。

4.runAndReset方法也很簡單,關鍵就是看報異常如何處理。

// FutureTask#runAndReset
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    // 是否繼續下次排程,預設false
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 執行任務
                c.call(); 
                // 執行成功的話,設定為true
                ran = true;

                // 例外處理,關鍵點
            } catch (Throwable ex) {
                // 不會修改ran的值,最終是false,同時也不列印異常堆疊
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 返回結果
    return ran && s == NEW;
}
  • 關鍵點ran變數,最終返回是不是下次繼續排程執行
  • 如果丟擲異常的話,可以看到不會修改ran為true。

總結

Java的ScheduledThreadPoolExecutor定時任務執行緒池所排程的任務中如果丟擲了異常,並且異常沒有捕獲直接拋到框架中,會導致ScheduledThreadPoolExecutor定時任務不排程了。這個結論希望大家一定要記住,不然非常坑,關鍵是有時候測試環境、開發環境還無法復現,有一定的隨機性,真的到了生產就完蛋了。

到此這篇關於詳解Java ScheduledThreadPoolExecutor的踩坑與解決方法的文章就介紹到這了,更多相關Java ScheduledThreadPoolExecutor內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com