<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
執行緒池技術想必大家都不陌生把,相信在平時的工作中沒有少用,而且這也是面試頻率非常高的一個知識點,那麼大家知道它的實現原理和細節嗎?如果直接去看jdk原始碼的話,可能有一定的難度,那麼我們可以先通過手寫一個簡單的執行緒池框架,去掌握執行緒池的基本原理後,再去看jdk的執行緒池原始碼就會相對容易,而且不容易忘記。
我們都知道,執行緒資源的建立和銷燬並不是沒有代價的,甚至開銷是非常高的。同時,執行緒也不是任意多建立的,因為活躍的執行緒會消耗系統資源,特別是記憶體,在一定的範圍內,增加執行緒可以提高系統的吞吐率,如果超過了這個範圍,反而會降低程式的執行速度。
因此,設計一個容納多個執行緒的容器,容器中的執行緒可以重複使用,省去了頻繁建立和銷燬執行緒物件的操作, 達到下面的目標:
執行緒池的核心思想: 執行緒複用,同一個執行緒可以被重複使用,來處理多個任務。
為了實現執行緒池功能,需要考慮下面幾個設計要點:
看了上面的設計目標和要點,是不是能立刻想到一個非常經典的設計模型——生產者消費者模型。
現在我們將我們的設計思路轉換為程式碼。
/** * <p>自定義任務佇列, 用來存放任務 </p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 10:15 * @version: 1.0.0 */ @Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { // 容量 private int capcity; // 雙端任務佇列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入鎖 private ReentrantLock lock = new ReentrantLock(); // 生產者條件變數 private Condition fullWaitSet = lock.newCondition(); // 生產者條件變數 private Condition emptyWaitSet = lock.newCondition(); public BlockingQueue(int capcity) { this.capcity = capcity; } // 阻塞的方式新增任務 public void put(T task) { lock.lock(); try { // 通過while的方式 while (deque.size() >= capcity) { log.debug("wait to add queue"); try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } deque.offer(task); log.debug("task add successfully"); emptyWaitSet.signal(); } finally { lock.unlock(); } } // 阻塞獲取任務 public T take() { lock.lock(); try { // 通過while的方式 while (deque.isEmpty()) { try { log.debug("wait to take task"); emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); T task = deque.poll(); log.debug("take task successfully"); // 從佇列中獲取元素 return task; } finally { lock.unlock(); } } }
1.定義執行器介面
/** * <p>定義一個執行器的介面:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 12:31 * @version: 1.0.0 */ public interface Executor { /** * 提交任務執行 * @param task 任務 */ void execute(Runnable task); }
2.定義執行緒池類實現該介面
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor { /** * 任務佇列 */ private BlockingQueue<Runnable> taskQueue; /** * 核心工作執行緒數 */ private int coreSize; /** * 工作執行緒集合 */ private Set<Worker> workers = new HashSet<>(); /** * 建立執行緒池 * @param coreSize 工作執行緒數量 * @param capcity 阻塞佇列容量 */ public ThreadPool(int coreSize, int capcity) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); } /** * 提交任務執行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作執行緒數小於閾值,直接開始任務執行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超過了閾值,加入到佇列中 taskQueue.put(task); } } } /** * 工作執行緒,對執行的任務做了一層包裝處理 */ class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 如果任務不為空,或者可以從佇列中獲取任務 while (task != null || (task = taskQueue.take()) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { // 執行完後,設定任務為空 task = null; } } // 移除工作執行緒 synchronized (workers){ log.debug("remove worker successfully"); workers.remove(this); } } } }
3.演示
@Test public void testThreadPool1() throws InterruptedException { Executor executor = new ThreadPool(2, 4); // 提交任務 for (int i = 0; i < 6; i++) { final int j = i; executor.execute(() -> { try { Thread.sleep(10); log.info("run task {}", j); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread.sleep(10); } Thread.sleep(10000); }
執行結果:
目前從佇列中獲取任務是永久阻塞等待的,可以改成阻塞一段時間沒有獲取任務,丟棄的策略。
@Slf4j(topic = "c.TimeoutBlockingQueue") public class TimeoutBlockingQueue<T> { // 容量 private int capcity; // 雙端任務佇列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入鎖 private ReentrantLock lock = new ReentrantLock(); // 生產者條件變數 private Condition fullWaitSet = lock.newCondition(); // 生產者條件變數 private Condition emptyWaitSet = lock.newCondition(); public TimeoutBlockingQueue(int capcity) { this.capcity = capcity; } // 帶超時時間的獲取 public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ // 將 timeout 統一轉換為 納秒 long nanos = unit.toNanos(timeout); while (deque.isEmpty()){ try { if (nanos<=0){ return null; } // 返回的是剩餘的等待時間,更改navos的值,使虛假喚醒的時候可以繼續等待 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return deque.getFirst(); }finally { lock.unlock(); } } // 帶超時時間的增加 public boolean offer(T task , long timeout , TimeUnit unit){ lock.lock(); try{ // 將 timeout 統一轉換為 納秒 long nanos = unit.toNanos(timeout); while (deque.size() == capcity){ try { if (nanos<=0){ return false; } // 更新剩餘需要等待的時間 nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任務佇列 {}", task); deque.addLast(task); emptyWaitSet.signal(); return true; }finally { lock.unlock(); } } }
新加TimeoutBlockingQueue類,新增offer和poll待超時的新增和獲取任務的方法。
目前的實現還是有個漏洞,無法自定義任務超出閾值的一個拒絕策略,我們可以通過利用函數語言程式設計+策略模式去實現。
1.定義策略模式的函數式介面
/** * <p>拒絕策略的函數式介面:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 13:15 * @version: 1.0.0 */ @FunctionalInterface public interface RejectPolicy<T> { /** * 拒絕策略的介面 * @param queue * @param task */ void reject(BlockingQueue<T> queue, T task); }
2.新增函數式介面的呼叫入口
我們可以在阻塞佇列新增任務新加一個api, 新增任務如果超過容量,呼叫函數式介面。
@Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { ........ /** * 嘗試新增任務 * @param rejectPolicy * @param task */ public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try{ // 如果佇列超過容量 if (deque.size()> capcity){ log.debug("task too much, do reject"); rejectPolicy.reject(this, task); }else { deque.offer(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }
3.修改ThreadPool類
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor { ..... /** * 拒絕策略 */ private RejectPolicy rejectPolicy; // 通過構造方法傳入執行的拒絕策略 public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); this.rejectPolicy = rejectPolicy; } /** * 提交任務執行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作執行緒數小於閾值,直接開始任務執行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超過了閾值,加入到佇列中 //taskQueue.put(task); // 呼叫tryPut的方式 taskQueue.tryPut(rejectPolicy, task); } } } .... }
通過構造方法的方式傳入要執行的拒絕策略
呼叫tryPut方法新增任務
4.演示
以上就是Java實現手寫一個執行緒池的範例程式碼的詳細內容,更多關於Java執行緒池的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45