首頁 > 軟體

關於dubbo 自定義執行緒池的問題

2022-04-18 13:00:17

初識dubbo

一、什麼是dubbo?

Dubbo是阿里巴巴開源的基於 Java 的高效能 RPC(一種遠端呼叫) 分散式服務架構(SOA),致力於提供高效能和透明化的RPC遠端服務呼叫方案,以及SOA服務治理方案,其實就是一種遠端服務呼叫的分散式框架

二、為什麼要用dubbo

在網際網路的發展,網站應用的規模不斷擴大,常規的垂直應用架構已無法應對,分散式服務架構以及流動計算架構勢在必行,亟需一個治理系統確保架構有條不紊的演進,所以就出現了dubbo

單一應用框架:當網站流量很小時,只需一個應用,將所有功能都部署在一起。

垂直應用框架:當存取量逐漸增大,單一應用增加機器帶來的加速度越來越小,將應用拆成互不相干的幾個應用,以提升效率。

分散式服務架構:當垂直應用越來越多,應用之間互動不可避免,將核心業務抽取出來,作為獨立的服務,逐漸形成穩定的服務中心,使前端應用能更快速的響應多變的市場需求。此時,用於提高業務複用及整合的分散式服務架構(RPC)是關鍵。

流動計算架構:當服務越來越多,容量的評估,小服務資源的浪費等問題逐漸顯現,此時需增加一個排程中心基於存取壓力實時管理叢集容量,提高叢集利用率。此時,用於提高機器利用率的資源排程和治理中心(SOA)是關鍵。

前言

在日常開發中,執行緒池幾乎涉及到了所有的開發框架,或者一些中介軟體,像我們熟悉的JDK執行緒池,druid連線執行緒池等等,執行緒池的使用,大大降低了人工維護執行緒的成本,而且提升了執行緒資源在使用中的效率;

dubbo執行緒池

dubbo也不例外,預設情況下,當我們在說到dubbo執行緒池的時候,通常是指服務提供者一端的執行緒池,其常用設定引數如下:

spring.dubbo.protocol.threads = 2000
spring.dubbo.protocol.threadpool = cached
spring.dubbo.protocol.dispatcher = message

對應到dubbo的組態檔中如下:

 <dubbo:protocol name="dubbo" threadpool="cached" dispatcher="message" threads="50" port="20880"/>

dubbo執行緒池說明

dubbo在使用的時候,都是通過建立真實的業務執行緒來進行操作的。已知的執行緒池模型主要有2個,固定大小執行緒池和帶快取的執行緒池;

  • fix執行緒池,即固定大小執行緒池。也是dubbo預設的使用方式,預設情況下,不做任何設定,執行緒池最大的執行緒個數為200個,並且是沒有任何等待佇列的。所以這種執行緒池,在極端情況下,可能會存在問題,比如某個應用執行某個大批次操作時,可能因為執行緒堵塞造成其他應用無法呼叫的情況;
  • cached執行緒池,非固定大小執行緒池,當執行緒不足的時候,會自動建立新執行緒。這種型別的執行緒池的問題在於,如果有較高的TPS過來的時候,如果請求的dubbo介面比較耗時,未能及時響應,則會連續不斷的建立新執行緒,則對系統的CPU以及負載都是巨大的壓力,甚至可能造成系統宕機的風險; dubbo 自定義執行緒池

在真實的使用過程中,大多數開發人員是忽略這個設定的,也就是說通常情況下預設是使用fix模式的,如果對於那種TPS比較高,或者dubbo介面中執行的核心業務邏輯比較耗時,並且系統要應對的並行也是居高不下的場景下,fix模式最終因為執行緒數建立不足而產生錯誤;

在這種情況下,出了錯誤之後,通常來說也是無感知的,怎麼能快速定位因執行緒建立不足而導致的問題呢?這就需要一種機制,能夠監控dubbo執行緒池,對執行過程中執行緒池的狀況進行監控,當核心指標達到告警閾值時,及時給出預警,通知開發或運維人員快速做出響應和調整;

這就需要自定義執行緒池來解決這個問題

自定義執行緒池程式碼實現步驟

1、自定義一個maven模組並新增核心依賴

<dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.7.0</version>
        </dependency>

2、自定義執行緒池類

自定義一個類,繼承FixedThreadPool 類,並實現Runnable介面,即該類本身作為一個執行緒;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.*;

public class WatchingPool extends FixedThreadPool implements Runnable {

    private static final Logger LOGGER = LoggerFactory.getLogger(WatchingPool.class);

    // 執行緒池預警值【可以根據實際情況動態調整大小】
    private static final double ALARM_PERCENT = 0.70;

    private final Map<URL, ThreadPoolExecutor> theadPoolMap = new ConcurrentHashMap<>();

    public WatchingPool() {
        // 建立一個定時任務,每3秒執行一次【可以根據實際情況動態調整引數】
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS);
    }

    @Override
    public Executor getExecutor(URL url) {
        // 重寫父類別getExecutor, 如果executor是ThreadPoolExecutor,則放入theadPoolMap中
        Executor executor = super.getExecutor(url);
        if (executor instanceof ThreadPoolExecutor) {
            theadPoolMap.put(url, (ThreadPoolExecutor) executor);
        }

        return executor;

    public void run() {
        for (Map.Entry<URL, ThreadPoolExecutor> entry : theadPoolMap.entrySet()) {
            URL url = entry.getKey();
            ThreadPoolExecutor threadPoolExecutor = entry.getValue();
            // 獲取正在活動的執行緒數
            int activeCount = threadPoolExecutor.getActiveCount();
            // 獲取總的執行緒數  (繼承的FixedThreadPool , 所以這裡獲取核心的執行緒數就是總的執行緒數)
            int corePoolSize = threadPoolExecutor.getCorePoolSize();

            double percent = activeCount / (corePoolSize * 1.0);

            LOGGER.info("執行緒池狀態:{}/{},: {}%", activeCount, corePoolSize, percent*100);
            if (percent > ALARM_PERCENT) {
                LOGGER.error("超出警戒線 : host:{}, 當前使用量 {}%, URL:{}", url.getHost(), percent*100, url);
            }

}

該類的邏輯,主要是設定了一個最大的閾值,當超出這個閾值時候,列印出相關的資訊,實際應用中,可以整合傳簡訊或郵件進行告警;

3、將上面的類設定到META-INF目錄中

4、在生產端工程中引入上面的模組

<dependency>
            <groupId>com.congge</groupId>
            <artifactId>common-pool</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

5、在生產端組態檔中,指定自定義執行緒池

<dubbo:protocol name="dubbo" threadpool="watchingPool" threads="50" port="20880"/>

6、改造消費端的啟動類,使用1000個執行緒不斷呼叫介面

import com.congge.service.HelloService;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.io.IOException;

public class ConsumerMain {
    public static void main(String[] args) throws Exception {

        ApplicationContext ac = new ClassPathXmlApplicationContext("spring-consumer.xml");
        HelloService service = (HelloService) ac.getBean("helloService");
        while (true){
            for(int i=0;i<1000;i++){
                Thread.sleep(6);
                new Thread( () ->{
                    String hello = service.hello("Hello Provider");
                    System.out.println(hello);
                }).start();
            }
        }
    }
}

為了看到效果,我們將組態檔這裡的執行緒數調整到了50個,下面啟動生產端和消費端的程式碼,觀察控制檯輸出紀錄檔(先啟動生產端,再啟動消費端)

生產端已就緒

啟動消費端,不斷髮起了呼叫

這時再次回到生產端控制檯,通過輸出紀錄檔可以看到整個執行緒池經歷了執行緒數不斷往上增長的過程,直到最後達到了警戒線;

到此這篇關於dubbo 自定義執行緒池的文章就介紹到這了,更多相關dubbo 自定義執行緒池內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com