首頁 > 軟體

淺析Disruptor高效能執行緒訊息傳遞並行框架

2022-03-01 16:00:45

前言碎語

Disruptor是英國LMAX公司開源的高效能的執行緒間傳遞訊息的並行框架,和jdk中的BlockingQueue非常類似,但是效能卻是BlockingQueue不能比擬的,下面是官方給出的一分測試報告,可以直觀的看出兩者的效能區別:


Disruptor 專案地址:https://github.com/LMAX-Exchange/disruptor

核心概念?

這麼效能炸裂的框架肯定要把玩一番,試用前,我們先了解下disruptor的主要的概念,然後結合樓主的weblog專案(之前使用的BlockingQueue),來實踐下

RingBuffer:環形的緩衝區,訊息事件資訊的載體。曾經 RingBuffer 是 Disruptor 中的最主要的物件,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的資料(事件)進行儲存和更新。在一些更高階的應用場景中,Ring Buffer 可以由使用者的自定義實現來完全替代。

Event:定義生產者和消費者之間進行交換的資料型別。

EventFactory:建立事件的工廠類介面,由使用者實現,提供具體的事件

EventHandler:事件處理介面,由使用者實現,用於處理事件。

目前為止,我們瞭解以上核心內容即可,更多的詳情,可以移步wiki檔案:https://github.com/LMAX-Exchange/disruptor

核心架構圖:

實踐Disruptor

改造boot-websocket-log專案,這是一個典型的生產者消費者模式的範例。然後將BlockingQueue替換成Disruptor,完成功能,有興趣的可以對比下。

第一步,定義事件型別

/**
 * Created by kl on 2018/8/24.
 * Content :程序紀錄檔事件內容載體
 */
public class LoggerEvent {
    private LoggerMessage log;
    public LoggerMessage getLog() {
        return log;
    }
    public void setLog(LoggerMessage log) {
        this.log = log;
    }
}

第二步,定義事件工廠

/**
 * Created by kl on 2018/8/24.
 * Content :程序紀錄檔事件工廠類
 */
public class LoggerEventFactory implements EventFactory{
    @Override
    public LoggerEvent newInstance() {
        return new LoggerEvent();
    }
}

第三步,定義資料處理器

/**
 * Created by kl on 2018/8/24.
 * Content :程序紀錄檔事件處理器
 */
@Component
public class LoggerEventHandler implements EventHandler{
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    @Override
    public void onEvent(LoggerEvent stringEvent, long l, boolean b) {
        messagingTemplate.convertAndSend("/topic/pullLogger",stringEvent.getLog());
    }
}

第四步,建立Disruptor實操類,定義事件釋出方法,釋出事件

/**
 * Created by kl on 2018/8/24.
 * Content :Disruptor 環形佇列
 */
@Component
public class LoggerDisruptorQueue {
    private Executor executor = Executors.newCachedThreadPool();
    // The factory for the event
    private LoggerEventFactory factory = new LoggerEventFactory();
    private FileLoggerEventFactory fileLoggerEventFactory = new FileLoggerEventFactory();
    // Specify the size of the ring buffer, must be power of 2.
    private int bufferSize = 2 * 1024;
    // Construct the Disruptor
    private Disruptordisruptor = new Disruptor<>(factory, bufferSize, executor);;
    private DisruptorfileLoggerEventDisruptor = new Disruptor<>(fileLoggerEventFactory, bufferSize, executor);;
    private static  RingBufferringBuffer;
    private static  RingBufferfileLoggerEventRingBuffer;
    @Autowired
    LoggerDisruptorQueue(LoggerEventHandler eventHandler,FileLoggerEventHandler fileLoggerEventHandler) {
        disruptor.handleEventsWith(eventHandler);
        fileLoggerEventDisruptor.handleEventsWith(fileLoggerEventHandler);
        this.ringBuffer = disruptor.getRingBuffer();
        this.fileLoggerEventRingBuffer = fileLoggerEventDisruptor.getRingBuffer();
        disruptor.start();
        fileLoggerEventDisruptor.start();
    }
    public static void publishEvent(LoggerMessage log) {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try {
            LoggerEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.setLog(log);  // Fill with data
        } finally {
            ringBuffer.publish(sequence);
        }
    }
    public static void publishEvent(String log) {
        if(fileLoggerEventRingBuffer == null) return;
        long sequence = fileLoggerEventRingBuffer.next();  // Grab the next sequence
        try {
            FileLoggerEvent event = fileLoggerEventRingBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.setLog(log);  // Fill with data
        } finally {
            fileLoggerEventRingBuffer.publish(sequence);
        }
    }
}

文末結語

以上四步已經完成了Disruptor的使用,啟動專案後就會不斷的釋出紀錄檔事件,處理器會將事件內容通過websocket傳送到前端頁面上展示,

boot-websocket-log專案地址:https://gitee.com/kailing/boot-websocket-log

Disruptor是高效能的程序內執行緒間的資料交換框架,特別適合紀錄檔類的處理。Disruptor也是從https://github.com/alipay/sofa-tracer瞭解到的,這是螞蟻金服 團隊開源的分散式鏈路追蹤專案,其中紀錄檔處理部分就是使用了Disruptor。

以上就是淺析Disruptor高效能執行緒訊息傳遞並行框架的詳細內容,更多關於Disruptor執行緒訊息傳遞並行框架的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com