<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
1)建立一個執行緒的執行緒池。 Executors.newSingleThreadExecutor(); //建立的原始碼 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 2)建立固定大小的執行緒池,引數為int,是執行緒池核心執行緒和最大執行緒的數量 Executors.newFixedThreadPool(2); //建立的原始碼 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 3)建立一個執行緒數不設限的執行緒池, //建立的原始碼,核心執行緒是0,最大執行緒是Integer.MAX_VALUE Executors.newCachedThreadPool(); public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
使用方法,使用同步程式碼塊,保證執行緒池範例是唯一的。
使用方法: private static ExecutorService sSingleThreadExecutor = null; // lazy, guarded by class public static ExecutorService singleThreadExecutor() { //當前的類物件為鎖 synchronized (ThreadPool.class) { if (sSingleThreadExecutor == null) { sSingleThreadExecutor = Executors.newSingleThreadExecutor(); } return sSingleThreadExecutor; } }
通過以上三種方式,可以建立一個簡單的執行緒池。
但是有弊端:
newSingleThreadExecutor和newFixedThreadPool,執行的請求佇列是長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而造成oom。
而newCachedThreadPool允許的執行緒數量為最大值Integer.MAX_VALUE,也會造成oom。
下面是OkHttp中Dispatcher.java執行緒池:
ExecutorService executorService; public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
OkHttp中ConnectionPool.java
private static final Executor executor = new ThreadPoolExecutor(0 , Integer.MAX_VALUE , 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
使用方式:
//call 實現 Runnable 介面。呼叫execute方法即可將入執行緒池,執行run方法中的程式碼。 executorService().execute(call);
corePoolSize:核心執行緒數,即使是空閒執行緒也不會銷燬。這樣做的目的是為了降低執行任務時建立執行緒的時間和效能開銷。
maximumPoolSize:最大執行緒數。當核心執行緒被用完時,會建立新的執行緒來執行任務,但是建立的數量不能超過這個最大值。
keepAliveTime:執行緒的存活時間。除核心執行緒外,其他執行緒一旦執行完任務,就會處於空閒狀態,超過這個時間就會被銷燬。
unit:keepAliveTime設定的時間單位。
workQueue:任務的阻塞佇列。執行緒數量有限,當任務過多來不及執行時,就會加入到這個阻塞佇列中,等到有空閒程序,
就會從這個佇列取出任務去執行。佇列都是先進先出的FIFO。
threadFactory:新執行緒產生的方式。
handler:拒絕策略,超過任務佇列設定的最大值時。再有新的任務進來,就會執行這個拒絕策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
執行緒池的阻塞佇列:
ArrayBlockingQueue:
是基於陣列的任務佇列。裡面用一個陣列來存放任務。當我們new的時候,需要指定陣列大小。
還有兩個int變數putIndex和takeIndex用來表示佇列的頭部和尾部在陣列中的位置。
LinkedBlockingQueue:
是基於連結串列的,內部用一個單向連結串列來存放任務。建立時可以指定大小,如果不指定則是Integer.MAX_VALUE
PriorityBlockingQueue:
基於優先順序的阻塞佇列。
SynchronousQueue:
一種無緩衝的等待佇列。有新任務進來直接交給執行緒執行。
OkHttp中使用的就是這種佇列,他的最大執行緒數為Integer.MAX_VALUE。保證有任務進來就能馬上執行。
RejectedExecutionHandler拒絕策略,這是一個介面。不同的實現執行不同的策略。
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); } AbortPolicy:拒絕行為直接丟擲異常 RejectedExecutionException public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } DiscardPolicy:保持靜默,什麼也不做。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } DiscardOldestPolicy:丟棄任務隊裡中最老的任務,嘗試將新任務加入佇列 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } CallerRunsPolicy:直接由提交任務這執行這個任務。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } 如果在建立執行緒池的時候,不知道具體的拒絕策略。那麼ThreadPoolExecutor預設的策略是AbortPolicy。 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
執行緒池可以執行兩種型別的任務:Runable和Callable
class MyRunable implements Runnable{ @Override public void run() { } } class MyCallable implements Callable{ @Override public Object call() throws Exception { return null; } } Runnable 沒有返回值,返回的是void,不允許丟擲異常。 Callable 有返回值,返回的是Object,允許丟擲異常。
執行緒池的狀態:
//執行狀態,可以接受新任務,並且處理排隊任務。 private static final int RUNNING = -1 << COUNT_BITS; //關閉狀態,不再接受新任務,不過仍然會處理排隊任務。 private static final int SHUTDOWN = 0 << COUNT_BITS; //停止狀態,不再接受新任務,也不處理排隊任務,同時中斷處理中的任務 private static final int STOP = 1 << COUNT_BITS; //整理狀態,當前所有任務終止,workerCount計數為0,執行緒切換為TIDYING狀態,並且執行terminal()方法 private static final int TIDYING = 2 << COUNT_BITS; //終止狀態,說明terminal()方法執行完成。 private static final int TERMINATED = 3 << COUNT_BITS;
ctlof是得到新的ctl值。通過ctl可以計算執行緒池的狀態和數量
runStateOf 計算當前執行緒池的狀態。
workerCountOf計算執行緒池的數量。
// ctlOf計算ctl的新值,也就是執行緒池狀態和執行緒池中執行緒數量。 private static int ctlOf(int rs, int wc) { return rs | wc; } //獲取ctl的高三位,也就是執行緒池的狀態。 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取ctl的低29位,也就是執行緒池中的執行緒數。 private static int workerCountOf(int c) { return c & CAPACITY; } 其中runStateOf(int c)和workerCountOf(int c)的引數c就是通過ctlOf(int rs, int wc)獲得的ctl值。
向執行緒池中新增一個任務:executorService().execute(call);
然後看看原始碼中是如何執行的,是如何新增任務的。
ctl 用來表示執行緒池的狀態和執行緒數量, 在ThreadPoolExcutor中使用32位元二進位制數來表示執行緒池的狀態和執行緒中執行緒數量。 其中前3位表示執行緒池的狀態,後29位表示執行緒池中的執行緒數。 public void execute(Runnable command) { int c = ctl.get(); //如果工作執行緒數量小於核心執行緒數, //提交的任務會通過addWorker(command, true)建立一個新的核心執行緒來執行, 這個引數傳的是true,表示去新增核心執行緒。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)){ //新增成功則return return; } //新增核心執行緒失敗則重新獲取執行緒池的狀態和數量 c = ctl.get(); } //進入到下面說明當前工作執行緒大於或等於核心執行緒。 //如果執行緒池處於執行狀態,則加入佇列 if (isRunning(c) && workQueue.offer(command)) { //如果入隊成功,則重新獲取執行緒池的狀態 int recheck = ctl.get(); //如果執行緒池不處於執行狀態,則從佇列中remove if (!isRunning(recheck) && remove(command)){ //成功刪除,則執行拒絕策略 reject(command); }else if (workerCountOf(recheck) == 0){ //進入這個分支有兩種情況1.執行緒池處於執行狀態 2.執行緒從不處於執行狀態,但是remove失敗 則會判斷workerCountOf如果工作執行緒為0,則會建立非核心執行緒去執行任務。 addWorker為null,和false。false表示非核心執行緒。null說明建立的執行緒去執行佇列裡的任務。 addWorker(null, false); } //進入到這個分支有兩種情況1.執行緒池處於非執行狀態2.執行狀態但是入隊失敗了。 這時候建立非核心執行緒去執行任務 }else if (!addWorker(command, false)){ 如果建立非核心執行緒失敗了,則執行拒絕策略。 reject(command); } }
通過以上原始碼分析,執行緒池的執行原理可以總結為一下幾點:
1.通過execute方法提交任務時,執行執行緒小於corePoolSize時,則會建立新的核心執行緒來執行這個任務。
2.通過excute方法提交任務時,執行執行緒大於等於corePoolSize時,則會加入到佇列中,等待執行緒排程執行。
3.通過excuete方法提交任務時,執行執行緒大於等於corePoolSize時,並且加入佇列失敗(佇列滿了),新提交的任務將會通過建立新的執行緒執行。
4.通過excute方法提交任務時,執行執行緒大於maximumPoolSize時,佇列也滿了,則會執行拒絕策略。
5.當執行緒池中的執行緒執行完任務處於空閒狀態時,則會嘗試從任務佇列中取頭結點任務執行。
接下來看addWorker如何新增任務。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果執行緒池處於非執行狀態,則不會建立執行緒。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())){ return false; } //如果執行緒池處於執行狀態,則直接走下面的建立新增邏輯。 for (;;) { //獲取工作執行緒數量 int wc = workerCountOf(c); //wc >= CAPACITY 工作執行緒大於最大容量 // wc >= (core ? corePoolSize : maximumPoolSize) 如果工作執行緒大於了核心執行緒或最大執行緒, //只要這兩個條件有一個成立則return。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){ return false; } //建立執行緒數量+1,這裡用到了CAS。關於CAS後面再寫文章分析。 if (compareAndIncrementWorkerCount(c)){ break retry; } //如果CAS操作失敗,執行緒數量沒有加1,則重新獲取執行緒的狀態。 c = ctl.get(); // Re-read ctl //判斷當前狀態和之前狀態,如果不同,說明執行緒池狀態發生了變化。重新跳到retry的外層迴圈。 //如果相同,則說明執行緒池沒有變化,繼續進行內層迴圈。 if (runStateOf(c) != rs){ continue retry; } // else CAS failed due to workerCount change; retry inner loop } } //執行到這說明執行緒數量已經完成+1,接下來進行執行緒的建立。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //這個建立一個worker物件。在worker構造方法中,會利用ThreadPoolExecutor中傳遞過了的ThreadFactory建立一個Thread //預設是通過Executors.defaultThreadFactory(),建立一個執行緒。 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //拿到一個重入鎖物件。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //拿到執行緒池的狀態 int rs = runStateOf(ctl.get()); //如果執行緒池處於執行狀態或者處於關閉狀態並且firstTask == null if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) { throw new IllegalThreadStateException(); } //新增到work集合 workers.add(w); int s = workers.size(); if (s > largestPoolSize){ //更新一下最大執行緒數 largestPoolSize = s; } //標誌位,新增成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //新增成功則啟動執行緒 t.start(); //啟動成功 workerStarted = true; } } } finally { //如果沒有啟動成功則從執行緒池中移除。 if (! workerStarted){ addWorkerFailed(w); } } return workerStarted; }
關鍵程式碼看看 w = new Worker(firstTask);
做了啥
Worker(Runnable firstTask) { setState(-1); //將傳進來的任務賦值給成員變數 this.firstTask = firstTask; //建立一個執行緒,並把Worker本身當做Runnable傳進了Thread中去。 this.thread = getThreadFactory().newThread(this); } public interface ThreadFactory { Thread newThread(Runnable r); }
注意newThread(this)。Worker把自己當做Runnable傳到了執行緒中去。當呼叫t.start()方法時會呼叫Worker的run方法。
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果task不為null,則先執行當前任務 //如果task傳進來是null則從佇列中取任務,執行佇列裡的任務。 //getTask()就是從任務佇列中提取在等待的隊伍。 while (task != null || (task = getTask()) != null) { w.lock(); //(runStateAtLeast(ctl.get(), STOP) 執行緒池處於STOP,TIDYING,TERMINATED狀態 處於這些狀態的執行緒池是無法執行任務的。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()){ //中斷執行緒 wt.interrupt(); } //執行到下面說明執行緒池處於RUNNING或SHUTDOWN狀態 //由此也可以看出SHUTDOWN狀態的執行緒池,是可以執行佇列裡的任務的,但是佇列不在接收新的任務新增 try { beforeExecute(wt, task); Throwable thrown = null; try { //執行任務的 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask()從任務佇列中,提取任務。
private Runnable getTask() { boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; try { //從任務佇列中取出任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }
通過以上原始碼分析,可以總結一下幾點。
addWorker(Runnable firstTask, boolean core)
1.如果firstTask為null,則會建立執行緒去執行佇列裡的任務。
2.如果不為null,則會去執行當前任務,然後再執行佇列裡的任務。
3.core 如果為true,則會建立核心執行緒,如果為false,則會建立非核心執行緒。
4.addWorker 會建立執行緒,啟動執行緒,執行任務。
在建立執行緒之前會判斷執行緒池的狀態、以及核心執行緒或最大執行緒數。
如果建立成功啟動執行緒的start方法,然後呼叫worker的runWorker()方法。
到此這篇關於Java執行緒池ThreadPoolExecutor原始碼深入分析的文章就介紹到這了,更多相關Java ThreadPoolExecutor內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45