<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
多執行緒程式設計是現代軟體開發中不可或缺的一部分,但是手動管理執行緒可能會變得非常複雜,因為需要考慮許多並行問題,例如執行緒安全和資源競爭。為了避免這些問題,Java提供了ThreadPoolExecutor類,它是一種高度優化的多執行緒執行器,可以管理執行緒池、執行執行緒任務和控制執行緒池的大小和生命週期等
public class CustomThreadPoolDemo { public static void main(String[] args) { // 建立執行緒池,大小為3,最大執行緒數為6,空閒執行緒存活時間為5秒,使用自定義執行緒工廠和拒絕策略 ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler()); // 提交10個任務 for (int i = 0; i < 10; i++) { executor.submit(new Task(i)); } // 關閉執行緒池 executor.shutdown(); } static class Task implements Runnable { private int taskId; public Task(int taskId) { this.taskId = taskId; } @Override public void run() { System.out.println("Task " + taskId + " is running in thread " + Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is done."); } } static class CustomThreadFactory implements java.util.concurrent.ThreadFactory { private int count = 1; @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("CustomThreadPool-" + count++); return t; } } static class CustomRejectedExecutionHandler implements java.util.concurrent.RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("Task " + ((Task) r).taskId + " is rejected."); } } }
該範例程式碼使用ThreadPoolExecutor
類建立了一個大小為3,最大執行緒數為6,空閒執行緒存活時間為5秒的執行緒池,任務佇列的大小為10,使用了自定義的執行緒工廠和拒絕策略。然後提交了10個任務,每個任務輸出了當前執行緒的名稱,並休眠了3秒鐘。當程式執行時,可能會出現任務被拒絕執行的情況,拒絕策略會輸出任務被拒絕的資訊。
ThreadPoolExecutor提供了兩種執行任務的方法:
Future<?> submit(Runnable task) void execute(Runnable command)
實際上submit中也是呼叫了execute方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
private final AtomicInteger ctl
執行緒池原始碼中使用ctl通過高低位的方式來記錄執行緒池的狀態和當前執行緒池中的工作執行緒數量。
Integer佔用4個位元組也就是32位元,執行緒池有5種狀態,要標識5種狀態需要3位
前三位
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
Integer.SIZE為32,所以COUNT_BITS為29,最終各個狀態對應的二級製為:
RUNNING:11100000 00000000 00000000 00000000
SHUTDOWN:00000000 00000000 00000000 00000000
STOP:00100000 00000000 00000000 00000000
TIDYING:01000000 00000000 00000000 00000000
TERMINATED:01100000 00000000 00000000 00000000
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //ctl初始值是ctlOf(RUNNING, 0),表示執行緒池處於執行中,工作執行緒數為0 int c = ctl.get(); //判斷工作執行緒是否小於核心執行緒數 if (workerCountOf(c) < corePoolSize) { //小於核心執行緒要新增工作執行緒 if (addWorker(command, true)) return; //新增失敗重新獲取一次ctl c = ctl.get(); } //執行緒池是否處於Running狀態 && 入隊是否成功 if (isRunning(c) && workQueue.offer(command)) {//入隊成功 //重新獲取ctl int recheck = ctl.get(); //如果執行緒池不是Running狀態就需要移除掉這個任務 if (! isRunning(recheck) && remove(command)) //觸發拒絕策略 reject(command); //工作執行緒為0時要去建立新的工作執行緒 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果執行緒池狀態不是RUNNING,或者執行緒池狀態是RUNNING但是佇列滿了,則去新增一個非核心工作執行緒。false表示非核心執行緒 else if (!addWorker(command, false)) reject(command); }
//core:true核心執行緒 false非核心執行緒 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取ctl值 int c = ctl.get(); //獲取高3位 int rs = runStateOf(c); // 執行緒池如果是SHUTDOWN狀態並且佇列非空則建立執行緒,如果佇列為空則不建立執行緒 // 執行緒池如果是STOP狀態則直接不建立執行緒 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取工作執行緒數 int wc = workerCountOf(c); //工作執行緒數超過規定數量則不建立執行緒 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //修改工作執行緒 if (compareAndIncrementWorkerCount(c)) //成功則退出 retry這個迴圈 break retry; //CAS失敗說明有其他執行緒也在增加工作執行緒數量,此時重新獲取ctl值 c = ctl.get(); // Re-read ctl //如果發現執行緒池的狀態發生了變化,則繼續回到retry,重新判斷執行緒池的狀態是不是SHUTDOWN或STOP // 如果狀態沒有變化,則繼續利用cas來增加工作執行緒數,直到cas成功 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //到了這裡說明ctl新增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //Worker實現了Runnable介面 在構造一個Worker物件時,就會利用ThreadFactory新建一個執行緒 w = new Worker(firstTask); //拿出執行緒物件此時執行緒還沒有start啟動 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 獲取高三位 int rs = runStateOf(ctl.get()); // 如果執行緒池的狀態是RUNNING // 或者執行緒池的狀態變成了SHUTDOWN,但是當前執行緒沒有自己的第一個任務,那就表示當前呼叫addWorker方法是為了從佇列中獲取任務來執行 // 正常情況下執行緒池的狀態如果是SHUTDOWN,是不能建立新的工作執行緒的,但是佇列中如果有任務,那就是上面說的特例情況 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果Worker物件對應的執行緒已經在執行了,那就有問題,直接拋異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers用來記錄當前執行緒池中工作執行緒,呼叫執行緒池的shutdown方法時會遍歷worker物件中斷對應執行緒 workers.add(w); int s = workers.size(); // largestPoolSize用來跟蹤執行緒池在執行過程中工作執行緒數的峰值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //啟動執行緒 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 在上述過程中如果拋了異常,需要從works中移除所新增的work,並且還要修改ctl,工作執行緒數-1,表示新建工作執行緒失敗 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker核心邏輯:
剛剛有說到Worker實現了Runnable介面,看看他重寫的Run方法中執行過什麼
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
final void runWorker(Worker w) { //獲取當前工作執行緒 Thread wt = Thread.currentThread(); //獲取第一個任務 Runnable task = w.firstTask; //置空 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //判斷當前第一個任務是否為空,為空的話從阻塞佇列獲取一個任務,阻塞佇列也為空就會阻塞在getTask()方法中 //也不會一直阻塞下去,keepAliveTime超時後還沒有獲取到任務就會返回null,退出迴圈,這個執行緒也就是中止了 while (task != null || (task = getTask()) != null) { w.lock(); //執行緒池狀態為STOP,則要中斷自己,但是如果發現中斷標記為true,那是不對的,因為執行緒池狀態不是STOP,工作執行緒仍然是要正常工作的,不能中斷掉,算是SHUTDOWN,也要等任務都執行完之後,執行緒才結束,而目前執行緒還在執行任務的過程中,不能中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //空方法給自定義執行緒池實現 beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //空方法給自定義執行緒池實現 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } //正常退出了while迴圈 // completedAbruptly=false,表示執行緒正常退出 completedAbruptly = false; } finally { //如果執行緒正常退出這個執行緒會自然死亡 //但是如果是由於執行任務的時候拋了異常,那麼這個執行緒不應該直接結束,而應該繼續從佇列中獲取下一個任務 processWorkerExit(w, completedAbruptly); } }
private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果completedAbruptly為true,表示是執行任務的時候拋了異常,那就修改ctl,工作執行緒數-1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 將當前Work物件從workers中移除 workers.remove(w); } finally { mainLock.unlock(); } // 因為當前是處理執行緒退出流程中,所以要嘗試去修改執行緒池的狀態為TINDYING tryTerminate(); //獲取當前ctl值 int c = ctl.get(); // 如果執行緒池的狀態為RUNNING或者SHUTDOWN,則可能要替補一個執行緒 if (runStateLessThan(c, STOP)) { // completedAbruptly為false,表示執行緒是正常要退出了,則看是否需要保留執行緒 if (!completedAbruptly) { // 如果allowCoreThreadTimeOut為true,但是阻塞佇列中還有任務,那就至少得保留一個工作執行緒來處理阻塞佇列中的任務 // 如果allowCoreThreadTimeOut為false,那min就是corePoolSize,表示至少得保留corePoolSize個工作執行緒活著 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果當前工作執行緒數大於等於min,則表示符合所需要保留的最小執行緒數,那就直接return,不會呼叫下面的addWorker方法新開一個工作執行緒了 if (workerCountOf(c) >= min) return; // replacement not needed } //新開工作執行緒 addWorker(null, false); } }
某個工作執行緒正常情況下會不停的迴圈從阻塞佇列中獲取任務來執行,正常情況下就是通過阻塞來保證執行緒永遠活著,但是會有一些特殊情況:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果執行緒池狀態是STOP,表示當前執行緒不需要處理任務了,那就修改ctl工作執行緒數-1 // 如果執行緒池狀態是SHUTDOWN,但是阻塞佇列中為空,表示當前任務沒有任務要處理了,那就修改ctl工作執行緒數-1 // return null表示當前執行緒無需處理任務,執行緒退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //當前工作執行緒數 int wc = workerCountOf(c); // 用來判斷當前執行緒是無限阻塞還是超時阻塞,如果一個執行緒超時阻塞,那麼一旦超時了,那麼這個執行緒最終就會退出 // 如果是無限阻塞,那除非被中斷了,不然這個執行緒就一直等著獲取佇列中的任務 // allowCoreThreadTimeOut為true,表示執行緒池中的所有執行緒都可以被回收掉,則當前執行緒應該直接使用超時阻塞,一旦超時就回收 // allowCoreThreadTimeOut為false,則要看當前工作執行緒數是否超過了corePoolSize,如果超過了,則表示超過部分的執行緒要用超時阻塞,一旦超時就回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果工作執行緒數超過了工作執行緒的最大限制或者執行緒超時了,則要修改ctl,工作執行緒數減1,並且return null // return null就會導致外層的while迴圈退出,從而導致執行緒直接執行結束 // 直播課程裡會細講timed && timedOut if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 要麼超時阻塞,要麼無限阻塞 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 表示沒有超時,在阻塞期間獲取到了任務 if (r != null) return r; // 超時了,重新進入迴圈,上面的程式碼會判斷出來當前執行緒阻塞超時了,最後return null,執行緒會執行結束 timedOut = true; } catch (InterruptedException retry) { // 如果執行緒池的狀態變成了STOP或者SHUTDOWN,最終也會return null,執行緒會執行結束 // 但是如果執行緒池的狀態仍然是RUNNING,那當前執行緒會繼續從佇列中去獲取任務,表示忽略了本次中斷 // 只有通過呼叫執行緒池的shutdown方法或shutdownNow方法才能真正中斷執行緒池中的執行緒 timedOut = false; } } }
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改ctl,將執行緒池狀態改為SHUTDOWN advanceRunState(SHUTDOWN); // 中斷工作執行緒 interruptIdleWorkers(); // 空方法,給子類擴充套件使用 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍歷所有正在工作的執行緒,要麼在執行任務,要麼在阻塞等待任務 for (Worker w : workers) { Thread t = w.thread; // 如果執行緒沒有被中斷,並且能夠拿到鎖,就中斷執行緒 // Worker在執行任務時會先加鎖,執行完任務之後會釋放鎖 // 所以只要這裡拿到了鎖,就表示執行緒空出來了,可以中斷了 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
ThreadPoolExecutor是Java並行程式設計中非常重要的一個類,它可以優化多執行緒程式設計的效率和可靠性。在本文中,我們深入探討了ThreadPoolExecutor的實現原理、工作機制和使用方法,總結如下:
首先,ThreadPoolExecutor是一種高度優化的多執行緒執行器,它可以管理執行緒池、執行執行緒任務和控制執行緒池的大小和生命週期等。ThreadPoolExecutor的實現基於生產者-消費者模型,它可以根據任務佇列中的任務數量自動調整執行緒池的大小,從而實現對系統資源的最優利用。
其次,ThreadPoolExecutor的使用非常靈活,可以通過設定ThreadPoolExecutor的引數來實現不同的執行緒池策略,例如核心執行緒數、最大執行緒數、任務佇列型別、拒絕策略等。此外,ThreadPoolExecutor還提供了一些重要的方法,例如submit()、execute()和shutdown()等,用於提交任務、執行任務和關閉執行緒池。
最後,在高並行環境下,應儘可能避免使用無界佇列,以防止記憶體漏失和系統資源耗盡。此外,還可以通過使用執行緒池監視器和執行緒池飽和策略來監控執行緒池的狀態和效能,以確保系統的穩定性和可靠性。
以上就是Java多執行緒程式設計基石ThreadPoolExecutor範例詳解的詳細內容,更多關於Java多執行緒ThreadPoolExecutor的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45