首頁 > 軟體

學生視角手把手帶你寫Java 執行緒池初版

2022-03-21 13:00:18

Java手寫執行緒池(第一代)

經常使用執行緒池,故今天突發奇想,手寫一個執行緒池,會有很多不足,請多多寬容。因為這也是第一代的版本,後續會更完善。

手寫執行緒池-定義引數

	private final AtomicInteger taskcount=new AtomicInteger(0);
    private final AtomicInteger threadNumber=new AtomicInteger(0);
    private volatile int corePoolSize; 
    private final Set<MyThreadPoolExecutor.MyWorker> workers; 
    private final BlockingQueue<Runnable> waitingQueue; 
    private final String THREADPOOL_NAME="MyThread-Pool-";
    private volatile boolean isRunning=true; 
    private volatile boolean STOPNOW=false; 
    private final ThreadFactory threadFactory; 
  • taskcount:執行任務次數
  • threadNumber:執行緒編號,從0開始依次遞增。
  • corePoolSize:核心執行緒數
  • workers:工作執行緒
  • waitingQueue:等待佇列
  • THREADPOOL_NAME:執行緒名稱
  • isRunning:是否執行
  • STOPNOW:是否立刻停止
  • threadFactory:執行緒工廠

手寫執行緒池-構造器

    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this.corePoolSize=corePoolSize;
        this.workers=new HashSet<>(corePoolSize);
        this.waitingQueue=waitingQueue;
        this.threadFactory=threadFactory;
        //執行緒預熱
        for (int i = 0; i < corePoolSize; i++) {
            new MyWorker();
        }
    }

該構造器作用:

1:對引數進行賦值。

2:執行緒預熱。根據corePoolSize的大小來呼叫MyWorker的構造器。我們可以看看MyWorker構造器做了什麼。

	final Thread thread; //為每個MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }
  • MyWorker構造器通過執行緒工廠對當前物件生成Thread;
  • 並設定執行緒名為:MyThread-Pool-自增執行緒編號;
  • 然後呼叫執行緒的start方法啟動執行緒;
  • 最後存放在workers這個Set集合中,這樣就可以實現執行緒複用了。

手寫執行緒池-預設構造器

	public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory());
    }
  • 預設構造器的賦初始值:
  • corePoolSize:5
  • waitingQueue:new ArrayBlockingQueue<>(10),長度為10的有限阻塞佇列
  • threadFactory:Executors.defaultThreadFactory()

手寫執行緒池-execute方法

	public boolean execute(Runnable runnable)
    {
        return waitingQueue.offer(runnable);
    }
  • 本質上其實就是把Runnable(任務)放到waitingQueue中。

手寫執行緒池-處理任務

	   @Override
        public void run() {
            //迴圈接收任務
                while (true)
                {
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }

本質上就是一個死迴圈接收任務,退出條件如下:

  • 優雅的退出。當isRunning為false並且waitingQueue的佇列大小為0(也就是無任務了)
  • 暴力退出。當STOPNOW為true,則說明呼叫了shutdownNow方法
  • else語句塊會不斷取任務,當任務!=null時則呼叫run方法處理任務

手寫執行緒池-優雅關閉執行緒池

	public void shutdown()
    {
        this.isRunning=false;
    }

手寫執行緒池-暴力關閉執行緒池

	public void shutdownNow()
    {
        this.STOPNOW=true;
    }

手寫執行緒池-原始碼

  • 手寫執行緒池類的原始碼
package com.springframework.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 執行緒池類
 * @author 遊政傑
 */
public class MyThreadPoolExecutor {

    private final AtomicInteger taskcount=new AtomicInteger(0);//執行任務次數
    private final AtomicInteger threadNumber=new AtomicInteger(0); //執行緒編號
    private volatile int corePoolSize; //核心執行緒數
    private final Set<MyThreadPoolExecutor.MyWorker> workers; //工作執行緒
    private final BlockingQueue<Runnable> waitingQueue; //等待佇列
    private final String THREADPOOL_NAME="MyThread-Pool-";//執行緒名稱
    private volatile boolean isRunning=true; //是否執行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private final ThreadFactory threadFactory; //執行緒工廠

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory());
    }

    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this.corePoolSize=corePoolSize;
        this.workers=new HashSet<>(corePoolSize);
        this.waitingQueue=waitingQueue;
        this.threadFactory=threadFactory;
        //執行緒預熱
        for (int i = 0; i < corePoolSize; i++) {
            new MyWorker();
        }
    }

    /**
     * MyWorker就是我們每一個執行緒物件
     */
    private final class MyWorker implements Runnable{

        final Thread thread; //為每個MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }

        @Override
        public void run() {
            //迴圈接收任務
                while (true)
                {
                    //迴圈退出條件:
                    //1:當isRunning為false並且waitingQueue的佇列大小為0(也就是無任務了),會優雅的退出。
                    //2:當STOPNOW為true,則說明呼叫了shutdownNow方法進行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不斷取任務,當任務!=null時則呼叫run方法處理任務
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }

    public boolean execute(Runnable runnable)
    {
        return waitingQueue.offer(runnable);
    }
    //優雅的關閉
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力關閉
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }
}
  • 測試使用手寫執行緒池程式碼
package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

  public static void main(String[] args) {

    
      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory());

      for(int i=0;i<10;i++){

          int finalI = i;
          myThreadPoolExecutor.execute(()->{
              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
          });

      }

      myThreadPoolExecutor.shutdown();

//      myThreadPoolExecutor.shutdownNow();



  }
}

問題

為什麼自定義執行緒池的execute執行的任務有時會變少?

那是因為waitingQueue滿了放不下任務了,導致任務被丟棄,相當於DiscardPolicy拒絕策略

解決辦法有:

1:設定最大執行緒數,自動對執行緒池擴容。

2:調大waitingQueue的容量capacity

最後:因為這是我手寫的執行緒池的初代版本,基本實現執行緒池的複用功能,然而還有很多未完善,將來會多出幾篇完善後的文章,對目前手寫的執行緒池進行升級。

後續還會繼續出關於作者手寫Spring框架,手寫Tomcat等等框架的博文!!!!!

到此這篇關於學生視角手把手帶你寫Java 執行緒池的文章就介紹到這了,更多相關Java 執行緒池內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com