<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在前面的文章自己動手寫乞丐版執行緒池中,我們寫了一個非常簡單的執行緒池實現,這個只是一個非常簡單的實現,在本篇文章當中我們將要實現一個和JDK內部實現的執行緒池非常相似的執行緒池。
我們首先看一個JDK給我們提供的執行緒池ThreadPoolExecutor
的建構函式的引數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
引數解釋:
1.corePoolSize:這個引數你可以理解為執行緒池當中至少需要 corePoolSize 個執行緒,初始時執行緒池當中執行緒的個數為0,當執行緒池當中執行緒的個數小於 corePoolSize 每次提交一個任務都會建立一個執行緒,並且先執行這個提交的任務,然後再去任務佇列裡面去獲取新的任務,然後再執行。
2.maximumPoolSize:這個引數指的是執行緒池當中能夠允許的最大的執行緒的數目,當任務佇列滿了之後如果這個時候有新的任務想要加入佇列當中,當發現佇列滿了之後就建立新的執行緒去執行任務,但是需要滿足最大的執行緒的個數不能夠超過 maximumPoolSize 。
3.keepAliveTime 和 unit:這個主要是用於時間的表示,當佇列當中多長時間沒有資料的時候執行緒自己退出,前面談到了執行緒池當中任務過多的時候會超過 corePoolSize ,當執行緒池閒下來的時候這些多餘的執行緒就可以退出了。
4.workQueue:這個就是用於儲存任務的阻塞佇列。
5.threadFactory:這個引數倒不是很重要,執行緒工廠。
6.handler:這個表示拒絕策略,JDK給我們提供了四種策略:
如果上面的引數你不能夠理解,可以先閱讀這篇文章自己動手寫乞丐版執行緒池。基於上面談到的引數,執行緒池當中提交任務的流程大致如下圖所示:
根據前面的引數分析我們自己實現的執行緒池需要實現一下功能:
private AtomicInteger ct = new AtomicInteger(0); // 當前在執行任務的執行緒個數 private int corePoolSize; private int maximumPoolSize; private long keepAliveTime; private TimeUnit unit; private BlockingQueue<Runnable> taskQueue; private RejectPolicy policy; private ArrayList<Worker> workers = new ArrayList<>(); private volatile boolean isStopped; private boolean useTimed;
引數解釋如下:
public enum RejectPolicy { ABORT, CALLER_RUN, DISCARD_OLDEST, DISCARD }
workers:用於儲存工作執行緒。
isStopped:執行緒池是否被關閉了。
useTimed:主要是用於表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的時間內,如果沒有從任務佇列當中獲取到任務,執行緒就從執行緒池退出,但是需要保證執行緒池當中最小的執行緒個數不小於 corePoolSize 。
// 下面這個方法是向執行緒池提交任務 public void execute(Runnable runnable) throws InterruptedException { checkPoolState(); if (addWorker(runnable, false) // 如果能夠加入新的執行緒執行任務 加入成功就直接返回 || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 說明提交任務失敗 任務佇列已經滿了 || addWorker(runnable, true)) // 使用能夠使用的最大的執行緒數 (maximumPoolSize) 看是否能夠產生新的執行緒 return; // 如果任務佇列滿了而且不能夠加入新的執行緒 則拒絕這個任務 if (!taskQueue.offer(runnable)) reject(runnable); }
在上面的程式碼當中:
checkPoolState函數是檢查執行緒池的狀態,當執行緒池被停下來之後就不能夠在提交任務:
private void checkPoolState() { if (isStopped) { // 如果執行緒池已經停下來了,就不在向任務佇列當中提交任務了 throw new RuntimeException("thread pool has been stopped, so quit submitting task"); } }
addWorker函數是往執行緒池當中提交任務並且產生一個執行緒,並且這個執行緒執行的第一個任務就是傳遞的引數。max表示執行緒的最大數目,max == true 的時候表示使用 maximumPoolSize 否則使用 corePoolSize,當返回值等於 true 的時候表示執行成功,否則表示執行失敗。
/** * * @param runnable 需要被執行的任務 * @param max 是否使用 maximumPoolSize * @return boolean */ public synchronized boolean addWorker(Runnable runnable, boolean max) { if (ct.get() >= corePoolSize && !max) return false; if (ct.get() >= maximumPoolSize && max) return false; Worker worker = new Worker(runnable); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1)); thread.start(); return true; }
這個函數其實比較簡單,只需要將傳入的Callable物件封裝成一個FutureTask物件即可,因為FutureTask實現了Callable和Runnable兩個介面,然後將這個結果返回即可,得到這個物件,再呼叫物件的 get 方法就能夠得到結果。
public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException { checkPoolState(); FutureTask<V> futureTask = new FutureTask<>(task); execute(futureTask); return futureTask; }
根據前面提到的各種策略的具體實現方式,具體的程式碼實現如下所示:
private void reject(Runnable runnable) throws InterruptedException { switch (policy) { case ABORT: throw new RuntimeException("task queue is full"); case CALLER_RUN: runnable.run(); case DISCARD: // 直接放棄這個任務 return; case DISCARD_OLDEST: // 放棄等待時間最長的任務 也就是佇列當中的第一個任務 taskQueue.poll(); execute(runnable); // 重新執行這個任務 } }
一共兩種方式實現執行緒池關閉:
// 強制關閉執行緒池 public synchronized void stop() { isStopped = true; for (Worker worker : workers) { worker.stopWorker(); } } public synchronized void shutDown() { // 先表示關閉執行緒池 執行緒就不能再向執行緒池提交任務 isStopped = true; // 先等待所有的任務執行完成再關閉執行緒池 waitForAllTasks(); stop(); } private void waitForAllTasks() { // 當執行緒池當中還有任務的時候 就不退出迴圈 while (taskQueue.size() > 0) { Thread.yield(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
@Override public void run() { // 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓執行緒直接執行第一個任務 不需要 // 放入任務佇列再取出來執行了 firstTask.run(); thisThread = Thread.currentThread(); while (!isStopped) { try { // 是否使用時間就在這裡顯示出來了 Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take(); if (task == null) { int i; boolean exit = true; // 如果當前執行緒數大於核心執行緒數 則使用 CAS 去退出 用於保證線上程安全下的退出 // 且保證執行緒的個數不小於 corePoolSize 下面這段程式碼需要仔細分析一下 if (ct.get() > corePoolSize) { do{ i = ct.get(); if (i <= corePoolSize) { exit = false; break; } }while (!ct.compareAndSet(i, i - 1)); if (exit) { return; } } }else { task.run(); } } catch (InterruptedException e) { // do nothing } } }
我們現在來仔細分析一下,執行緒退出執行緒池的時候是如何保證執行緒池當中總的執行緒數是不小於 corePoolSize 的!首先整體的框架是使用 CAS 進行實現,具體程式碼為 do ... while 操作,然後在 while 操作裡面使用 CAS 進行測試替換,如果沒有成功再次獲取 ,當執行緒池當中核心執行緒的數目小於等於 corePoolSize 的時候也需要退出迴圈,因為執行緒池當中執行緒的個數不能小於 corePoolSize 。因此使用 break 跳出迴圈的執行緒是不會退出執行緒池的。
在我們自己實現的執行緒池當中當執行緒退出的時候,workers 當中還儲存這指向這個執行緒的物件,但是當執行緒退出的時候我們還沒有在 workers 當中刪除這個物件,因此這個執行緒物件不會被垃圾回收器收集掉,但是我們這個只是一個執行緒池實現的例子而已,並不用於生產環境,只是為了幫助大家理解執行緒池的原理。
package cscore.concurrent.java.threadpoolv2; import java.util.ArrayList; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ThreadPool { private AtomicInteger ct = new AtomicInteger(0); // 當前在執行任務的執行緒個數 private int corePoolSize; private int maximumPoolSize; private long keepAliveTime; private TimeUnit unit; private BlockingQueue<Runnable> taskQueue; private RejectPolicy policy; private ArrayList<Worker> workers = new ArrayList<>(); private volatile boolean isStopped; private boolean useTimed; public int getCt() { return ct.get(); } public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy , int maxTasks) { // please add -ea to vm options to make assert keyword enable assert corePoolSize > 0; assert maximumPoolSize > 0; assert keepAliveTime >= 0; assert maxTasks > 0; this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.unit = unit; this.policy = policy; this.keepAliveTime = keepAliveTime; taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks); useTimed = keepAliveTime != 0; } /** * * @param runnable 需要被執行的任務 * @param max 是否使用 maximumPoolSize * @return boolean */ public synchronized boolean addWorker(Runnable runnable, boolean max) { if (ct.get() >= corePoolSize && !max) return false; if (ct.get() >= maximumPoolSize && max) return false; Worker worker = new Worker(runnable); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1)); thread.start(); return true; } // 下面這個方法是向執行緒池提交任務 public void execute(Runnable runnable) throws InterruptedException { checkPoolState(); if (addWorker(runnable, false) // 如果能夠加入新的執行緒執行任務 加入成功就直接返回 || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 說明提交任務失敗 任務佇列已經滿了 || addWorker(runnable, true)) // 使用能夠使用的最大的執行緒數 (maximumPoolSize) 看是否能夠產生新的執行緒 return; // 如果任務佇列滿了而且不能夠加入新的執行緒 則拒絕這個任務 if (!taskQueue.offer(runnable)) reject(runnable); } private void reject(Runnable runnable) throws InterruptedException { switch (policy) { case ABORT: throw new RuntimeException("task queue is full"); case CALLER_RUN: runnable.run(); case DISCARD: return; case DISCARD_OLDEST: // 放棄等待時間最長的任務 taskQueue.poll(); execute(runnable); } } private void checkPoolState() { if (isStopped) { // 如果執行緒池已經停下來了,就不在向任務佇列當中提交任務了 throw new RuntimeException("thread pool has been stopped, so quit submitting task"); } } public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException { checkPoolState(); FutureTask<V> futureTask = new FutureTask<>(task); execute(futureTask); return futureTask; } // 強制關閉執行緒池 public synchronized void stop() { isStopped = true; for (Worker worker : workers) { worker.stopWorker(); } } public synchronized void shutDown() { // 先表示關閉執行緒池 執行緒就不能再向執行緒池提交任務 isStopped = true; // 先等待所有的任務執行完成再關閉執行緒池 waitForAllTasks(); stop(); } private void waitForAllTasks() { // 當執行緒池當中還有任務的時候 就不退出迴圈 while (taskQueue.size() > 0) { Thread.yield(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Worker implements Runnable { private Thread thisThread; private final Runnable firstTask; private volatile boolean isStopped; public Worker(Runnable firstTask) { this.firstTask = firstTask; } @Override public void run() { // 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓執行緒直接執行第一個任務 不需要 // 放入任務佇列再取出來執行了 firstTask.run(); thisThread = Thread.currentThread(); while (!isStopped) { try { Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take(); if (task == null) { int i; boolean exit = true; if (ct.get() > corePoolSize) { do{ i = ct.get(); if (i <= corePoolSize) { exit = false; break; } }while (!ct.compareAndSet(i, i - 1)); if (exit) { return; } } }else { task.run(); } } catch (InterruptedException e) { // do nothing } } } public synchronized void stopWorker() { if (isStopped) { throw new RuntimeException("thread has been interrupted"); } isStopped = true; thisThread.interrupt(); } } }
package cscore.concurrent.java.threadpoolv2; import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000); for (int i = 0; i < 10; i++) { RunnableFuture<Integer> submit = pool.submit(() -> { System.out.println(Thread.currentThread().getName() + " output a"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return 0; }); System.out.println(submit.get()); } int n = 15; while (n-- > 0) { System.out.println("Number Threads = " + pool.getCt()); Thread.sleep(1000); } pool.shutDown(); } }
上面測試程式碼的輸出結果如下所示:
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2
從上面的程式碼可以看出我們實現了正確的任務實現結果,同時執行緒池當中的核心執行緒數從 2 變到了 5 ,當執行緒池當中任務佇列全部別執行完成之後,執行緒的數目重新降下來了,這確實是我們想要達到的結果。
在本篇文章當中主要給大家介紹瞭如何實現一個類似於JDK中的執行緒池,裡面有非常多的實現細節,大家可以仔細捋一下其中的流程,對執行緒池的理解將會非常有幫助。
以上就是Java手寫執行緒池之向JDK執行緒池進發的詳細內容,更多關於Java手寫執行緒池的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45