首頁 > 軟體

Java實現手寫一個執行緒池的範例程式碼

2022-10-20 14:03:14

概述

執行緒池技術想必大家都不陌生把,相信在平時的工作中沒有少用,而且這也是面試頻率非常高的一個知識點,那麼大家知道它的實現原理和細節嗎?如果直接去看jdk原始碼的話,可能有一定的難度,那麼我們可以先通過手寫一個簡單的執行緒池框架,去掌握執行緒池的基本原理後,再去看jdk的執行緒池原始碼就會相對容易,而且不容易忘記。

執行緒池框架設計

我們都知道,執行緒資源的建立和銷燬並不是沒有代價的,甚至開銷是非常高的。同時,執行緒也不是任意多建立的,因為活躍的執行緒會消耗系統資源,特別是記憶體,在一定的範圍內,增加執行緒可以提高系統的吞吐率,如果超過了這個範圍,反而會降低程式的執行速度。

因此,設計一個容納多個執行緒的容器,容器中的執行緒可以重複使用,省去了頻繁建立和銷燬執行緒物件的操作, 達到下面的目標:

  • 降低資源消耗,減少了建立和銷燬執行緒的次數,每個工作執行緒都可以被重複利用,可執行多個任務
  • 提高響應速度,當任務到達時,如果有執行緒可以直接用,不會出現系統僵死
  • 提高執行緒的可管理性,如果無限制的建立執行緒,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控

執行緒池的核心思想: 執行緒複用,同一個執行緒可以被重複使用,來處理多個任務。

為了實現執行緒池功能,需要考慮下面幾個設計要點:

  • 執行緒池可以介面外部提交的任務執行
  • 執行緒池有工作執行緒的數量,有任務執行,沒有任務也空閒在那,等待任務過來,這樣既避免執行緒頻繁建立銷燬帶來的開銷,同時也可以避免執行緒池無限制的建立執行緒
  • 如果執行緒池接受提交的任務超過工作執行緒的數量了,該怎麼辦?可以用一個佇列把任務存下來,等工作執行緒完成任務後去佇列中獲取任務,執行
  • 那如果任務實在是太多太多了,達到了我們認為的佇列最大值,怎麼辦,我們可以設計一種任務太多的策略,可以進行切換,比如直接丟棄任務、報錯等等

看了上面的設計目標和要點,是不是能立刻想到一個非常經典的設計模型——生產者消費者模型。

  • 阻塞佇列儲存執行任務,比如外部main函數作為生產者向佇列生產任務。
  • 執行緒池中的工作執行緒作為消費者獲取任務執行。

現在我們將我們的設計思路轉換為程式碼。

程式碼實現

阻塞佇列的實現

  • 阻塞佇列主要存放任務,有容量限制
  • 阻塞佇列提供新增和刪除任務的API, 如果超過容量,阻塞不能新增任務,如果沒有任務,阻塞無法獲取任務。
/**
 * <p>自定義任務佇列, 用來存放任務 </p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  10:15
 * @version: 1.0.0
 */
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    // 容量
    private int capcity;
    // 雙端任務佇列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入鎖
    private ReentrantLock lock = new ReentrantLock();
    // 生產者條件變數
    private Condition fullWaitSet = lock.newCondition();
    // 生產者條件變數
    private Condition emptyWaitSet = lock.newCondition();

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 阻塞的方式新增任務
    public void put(T task) {
        lock.lock();
        try {
            // 通過while的方式
            while (deque.size() >= capcity) {
                log.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            log.debug("task add successfully");
            emptyWaitSet.signal();
        }  finally {
            lock.unlock();
        }
    }

    // 阻塞獲取任務
    public T take() {
        lock.lock();
        try {
            // 通過while的方式
            while (deque.isEmpty()) {
                try {
                    log.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            log.debug("take task successfully");
            // 從佇列中獲取元素
            return task;
        } finally {
            lock.unlock();
        }
    }
}
  • put()方法是向阻塞佇列中新增任務
  • take()方法是向阻塞佇列中獲取任務

執行緒池消費端實現

1.定義執行器介面

/**
 * <p>定義一個執行器的介面:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  12:31
 * @version: 1.0.0
 */
public interface Executor {

    /**
     * 提交任務執行
     * @param task 任務
     */
    void execute(Runnable task);
}

2.定義執行緒池類實現該介面

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {

    /**
     * 任務佇列
     */
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 核心工作執行緒數
     */
    private int coreSize;

    /**
     * 工作執行緒集合
     */
    private Set<Worker> workers = new HashSet<>();

    /**
     *  建立執行緒池
     * @param coreSize 工作執行緒數量
     * @param capcity 阻塞佇列容量
     */
    public ThreadPool(int coreSize, int capcity) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
    }

    /**
     * 提交任務執行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作執行緒數小於閾值,直接開始任務執行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超過了閾值,加入到佇列中
                taskQueue.put(task);
            }
        }
    }

    /**
     * 工作執行緒,對執行的任務做了一層包裝處理
     */
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 如果任務不為空,或者可以從佇列中獲取任務
            while (task != null || (task = taskQueue.take()) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 執行完後,設定任務為空
                    task = null;
                }
            }

              // 移除工作執行緒
            synchronized (workers){
                log.debug("remove worker successfully");
                workers.remove(this);
            }
        }
    }
}
  • Worker類是工作執行緒類,包裝了執行任務,裡面實現了從佇列獲取任務,然後執行任務。
  • execute方法的實現中,如果工作執行緒數量小於閾值的話,直接建立新的工作執行緒,否則將任務新增到佇列中。

3.演示

@Test
    public void testThreadPool1() throws InterruptedException {
        Executor executor = new ThreadPool(2, 4);
        // 提交任務
        for (int i = 0; i < 6; i++) {
            final  int j = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(10);
                    log.info("run task {}", j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(10);
        }

        Thread.sleep(10000);
    }

執行結果:

獲取任務超時設計

目前從佇列中獲取任務是永久阻塞等待的,可以改成阻塞一段時間沒有獲取任務,丟棄的策略。

@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
    // 容量
    private int capcity;
    // 雙端任務佇列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入鎖
    private ReentrantLock lock = new ReentrantLock();
    // 生產者條件變數
    private Condition fullWaitSet = lock.newCondition();
    // 生產者條件變數
    private Condition emptyWaitSet = lock.newCondition();

    public TimeoutBlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 帶超時時間的獲取
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 將 timeout 統一轉換為 納秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()){
                try {
                    if (nanos<=0){
                        return null;
                    }
                    // 返回的是剩餘的等待時間,更改navos的值,使虛假喚醒的時候可以繼續等待
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            return deque.getFirst();
        }finally {
            lock.unlock();
        }
    }

    // 帶超時時間的增加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try{
            // 將 timeout 統一轉換為 納秒
            long nanos = unit.toNanos(timeout);
            while (deque.size() == capcity){
                try {
                    if (nanos<=0){
                        return false;
                    }
                    // 更新剩餘需要等待的時間
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任務佇列 {}", task);
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }
}

新加TimeoutBlockingQueue類,新增offer和poll待超時的新增和獲取任務的方法。

拒絕策略設計

目前的實現還是有個漏洞,無法自定義任務超出閾值的一個拒絕策略,我們可以通過利用函數語言程式設計+策略模式去實現。

1.定義策略模式的函數式介面

/**
 * <p>拒絕策略的函數式介面:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  13:15
 * @version: 1.0.0
 */
@FunctionalInterface
public interface RejectPolicy<T> {

    /**
     * 拒絕策略的介面
     * @param queue
     * @param task
     */
    void reject(BlockingQueue<T> queue, T task);
}

2.新增函數式介面的呼叫入口

我們可以在阻塞佇列新增任務新加一個api, 新增任務如果超過容量,呼叫函數式介面。

@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    ........

    /**
     * 嘗試新增任務
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 如果佇列超過容量
            if (deque.size()> capcity){
                log.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            }else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

3.修改ThreadPool類

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
    .....

    /**
     * 拒絕策略
     */
    private RejectPolicy rejectPolicy;

    // 通過構造方法傳入執行的拒絕策略
    public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 提交任務執行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作執行緒數小於閾值,直接開始任務執行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超過了閾值,加入到佇列中
                //taskQueue.put(task);
                
                // 呼叫tryPut的方式
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

   ....
}

通過構造方法的方式傳入要執行的拒絕策略

呼叫tryPut方法新增任務

4.演示

以上就是Java實現手寫一個執行緒池的範例程式碼的詳細內容,更多關於Java執行緒池的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com