首頁 > 軟體

非常適合新手學生的Java執行緒池優化升級版

2022-03-30 19:02:31

升級版執行緒池的優化

1:新增了4種拒絕策略。分別為:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy

2:對執行緒池MyThreadPoolExecutor的構造方法進行優化,增加了引數校驗,防止亂傳引數現象。

3:這是最重要的一個優化。

  • 移除執行緒池的執行緒預熱功能。因為執行緒預熱會極大的耗費記憶體,當我們不用執行緒池時也會一直在執行狀態。
  • 換來的是在呼叫execute方法新增任務時通過檢查workers執行緒集合目前的大小與corePoolSize的值去比較,再通過new MyWorker()去建立新增執行緒到執行緒池,這樣好處就是當我們建立執行緒池如果不使用的話則對當前記憶體沒有一點影響,當使用了才會建立執行緒並放入執行緒池中進行復用。

執行緒池構造器

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("執行緒池引數不合法");
        }
    }

執行緒池拒絕策略

策略介面:MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定義拒絕策略
 * @author 遊政傑
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

策略內部實現類

/**
     * 實現自定義拒絕策略
     */
    //拋異常策略(預設)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任務-> "+r.toString()+"被執行緒池-> "+t.toString()+" 拒絕");
        }
    }
    //默默丟棄策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丟棄掉最老的任務策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果執行緒池沒被關閉
                threadPoolExecutor.getWaitingQueue().poll();//丟掉最老的任務,此時就有位置當新任務了
                threadPoolExecutor.execute(runnable); //把新任務加入到佇列中
            }
        }
    }
    //由呼叫者呼叫策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判斷執行緒池是否被關閉
                runnable.run();
            }
        }
    }

封裝拒絕方法

    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

execute方法

    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//這種情況才能新增執行緒
                MyWorker worker = new MyWorker(); //通過構造方法新增執行緒
            }
            return true;
        }
    }

可以看出只有當往執行緒池放任務時才會建立執行緒物件。

手寫執行緒池原始碼

MyExecutorService

package com.springframework.concurrent;

import java.util.concurrent.BlockingQueue;

/**
 * 自定義執行緒池業務介面
 * @author 遊政傑
 */
public interface MyExecutorService {

    boolean execute(Runnable runnable);

    void shutdown();

    void shutdownNow();

    boolean isShutdown();

    BlockingQueue<Runnable> getWaitingQueue();

}

MyRejectedExecutionException

package com.springframework.concurrent;

/**
 * 自定義拒絕異常
 */
public class MyRejectedExecutionException extends RuntimeException {

    public MyRejectedExecutionException() {
    }
    public MyRejectedExecutionException(String message) {
        super(message);
    }

    public MyRejectedExecutionException(String message, Throwable cause) {
        super(message, cause);
    }

    public MyRejectedExecutionException(Throwable cause) {
        super(cause);
    }

}

MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定義拒絕策略
 * @author 遊政傑
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

核心類MyThreadPoolExecutor

package com.springframework.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 純手擼執行緒池框架
 * @author 遊政傑
 */
public class MyThreadPoolExecutor implements MyExecutorService{

    private static final AtomicInteger taskcount=new AtomicInteger(0);//執行任務次數
    private static final AtomicInteger threadNumber=new AtomicInteger(0); //執行緒編號
    private static volatile int corePoolSize; //核心執行緒數
    private final HashSet<MyWorker> workers; //工作執行緒
    private final BlockingQueue<Runnable> waitingQueue; //等待佇列
    private static final String THREADPOOL_NAME="MyThread-Pool-";//執行緒名稱
    private volatile boolean isRunning=true; //是否執行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private volatile ThreadFactory threadFactory; //執行緒工廠
    private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//預設拒絕策略
    private volatile MyRejectedExecutionHandle handle; //拒絕紫略

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("執行緒池引數不合法");
        }
    }
    /**
     * 實現自定義拒絕策略
     */
    //拋異常策略(預設)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任務-> "+r.toString()+"被執行緒池-> "+t.toString()+" 拒絕");
        }
    }
    //默默丟棄策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丟棄掉最老的任務策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果執行緒池沒被關閉
                threadPoolExecutor.getWaitingQueue().poll();//丟掉最老的任務,此時就有位置當新任務了
                threadPoolExecutor.execute(runnable); //把新任務加入到佇列中
            }
        }
    }
    //由呼叫者呼叫策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判斷執行緒池是否被關閉
                runnable.run();
            }
        }
    }
    //call拒絕方法
    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

    /**
     * MyWorker就是我們每一個執行緒物件
     */
    private final class MyWorker implements Runnable{

        final Thread thread; //為每個MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }

        //執行任務
        @Override
        public void run() {
            //迴圈接收任務
                while (true)
                {
                    //迴圈退出條件:
                    //1:當isRunning為false並且waitingQueue的佇列大小為0(也就是無任務了),會優雅的退出。
                    //2:當STOPNOW為true,則說明呼叫了shutdownNow方法進行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不斷取任務,當任務!=null時則呼叫run方法處理任務
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }

    //往執行緒池中放任務
    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//這種情況才能新增執行緒
                MyWorker worker = new MyWorker(); //通過構造方法新增執行緒
            }
            return true;
        }
    }
    //優雅的關閉
    @Override
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力關閉
    @Override
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }

    //判斷執行緒池是否關閉
    @Override
    public boolean isShutdown() {
        return !this.isRunning||STOPNOW;
    }

    //獲取等待佇列
    @Override
    public BlockingQueue<Runnable> getWaitingQueue() {
        return this.waitingQueue;
    }
}

執行緒池測試類

package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

  public static void main(String[] args) {


//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());

      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());


      for(int i=0;i<11;i++){

          int finalI = i;
          myThreadPoolExecutor.execute(()->{
              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
          });

      }

      myThreadPoolExecutor.shutdown();

//      myThreadPoolExecutor.shutdownNow();




  }
}

好了升級版執行緒池就優化到這了,後面可能還會出完善版,不斷進行優化。

到此這篇關於非常適合新手學生的Java執行緒池升級版的文章就介紹到這了,更多相關Java 執行緒池內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com