首頁 > 軟體

一文詳解Reactor模型與實現範例

2023-03-17 06:05:41

前言

週六在公司寫Reactor模型,

一女同事問我為啥都2023年了還在學習Reactor模型呀,

我問她為啥快30的年紀了,週六還在公司看我寫Reactor呀,

一時間辦公室裡,男的,女的,都沉默了。

在網路IO設計中,有兩種高效能模型:Reactor模型和Proactor模型。Reactor基於同步IO模式,Proactor基於非同步IO模式。

Netty網路框架,Redis等中介軟體中都有使用到Reactor模型。本文將對Reactor模型的如下三種分類進行學習和實現。

  • 單Reactor單執行緒模型;
  • 單Reactor多執行緒模型;
  • 主從Reactor多執行緒模型。

如果不具備網路IO的相關知識,建議先閱讀Java網路IO模型分析與實現

正文

一. Reactor設計模式

Reactor翻譯過來的意思是:反應堆,所以Reactor設計模式本質是基於事件驅動的。在Reactor設計模式中,存在如下幾個角色。

  • Handle(事件)。Reactor整體是基於Handle進行驅動,這裡的Handle叫做事件,可以類比為BIO中的Socket,NIO中的Socket管道。比如當Socket管道有連線建立,或者有資料可讀,那麼此時就稱作事件發生;
  • EventHandler(事件處理器)。有事件發生,就需要有相應的元件來處理事件,那麼這裡的元件就叫做事件處理器。EventHandler是一個抽象概念,其會有不同的具體實現,因為事件會有不同的型別,那麼不同型別的事件,肯定都需要有相應的具體處理邏輯,這裡的具體處理邏輯,就由EventHandler的具體實現來承載;
  • Concrete Event Handler(具體事件處理器)。是EventHandler的具體實現,用於處理不同型別的事件;
  • Synchronous Event Demultiplexer(事件多路分解器)。(這裡將Synchronous Event Demultiplexer簡稱為Demultiplexer)Demultiplexer用於監聽事件並得到所有發生事件的集合,在監聽的狀態下是阻塞的,直到有事件發生為止。Demultiplexer有一個很好的類比,就是NIO中的多路複用器Selector,當呼叫Selector的select() 方法後,會進入監聽狀態,當從select() 方法返回時,會得到SelectionKey的一個集合,而每一個SelectionKey中就儲存著有事件發生的Socket管道;
  • Initiation Dispatcher(事件分發器)。現在已經有Concrete Event Handler(具體事件處理器)來處理不同的事件,也能通過Synchronous Event Demultiplexer(事件多路分解器)拿到發生的事件,那麼最後需要做的事情,肯定就是將事件分發到正確的事件處理器上進行處理,而Initiation Dispatcher就是完成這個分發的事情。

Reactor設計模式的一個簡單類圖,如下所示。

通常,Reactor設計模式中的Reactor,可以理解為上述圖中的Synchronous Event Demultiplexer + Initiation Dispatcher。

二. 單Reactor單執行緒模型

單Reactor單執行緒模型中,只有一個Reactor在監聽事件和分發事件,並且監聽事件,分發事件和處理事件都在一個執行緒中完成。示意圖如下所示。

上述示意圖中,一次完整的處理流程可以概括如下。

  • Reactor監聽到ACCEPT事件發生,表示此時有使用者端建立連線;
  • Reactor將ACCEPT事件分發給Acceptor處理;
  • Acceptor會在伺服器端建立與使用者端通訊的client-socket管道,然後註冊到IO多路複用器selector上,並監聽READ事件;
  • Reactor監聽到READ事件發生,表示此時使用者端資料可讀;
  • Reactor將ACCEPT事件分發給Handler處理,Handler處理READ事件就會基於client-socket管道完成使用者端資料的讀取。

下面將基於Java語言,實現一個簡單的單Reactor單執行緒模型的伺服器端,整體程式碼實現完全符合上述示意圖,大家可以進行參照閱讀。

首先實現Reactor,如下所示。

public class Reactor implements Runnable {
    private final Selector selector;
    public Reactor(int port) throws IOException {
        // 開啟多路複用
        selector = Selector.open();
        // 伺服器端建立listen-socket管道
        ServerSocketChannel listenSocketChannel = ServerSocketChannel.open();
        // 繫結埠
        listenSocketChannel.socket().bind(new InetSocketAddress(port));
        // 設定為非阻塞模式
        listenSocketChannel.configureBlocking(false);
        // ACCEPT事件的附加器是Acceptor
        listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT,
                new Acceptor(selector, listenSocketChannel));
    }
    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                // 獲取發生的事件
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterable = selectionKeys.iterator();
                while (iterable.hasNext()) {
                    // 對事件進行分發
                    dispatch(iterable.next());
                    iterable.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            LockSupport.parkNanos(1000 * 1000 * 1000);
        }
    }
    private void dispatch(SelectionKey selectionKey) {
        // 獲取事件的附加器
        // ACCEPT事件的附加器是Acceptor,故由Acceptor來處理ACCEPT事件
        // READ事件的附加器是Handler,故由Handler來處理READ事件
        Runnable attachment = (Runnable) selectionKey.attachment();
        if (attachment != null) {
            attachment.run();
        }
    }
}

已知Reactor會監聽使用者端連線的ACCEPT事件,還已知ACCEPT事件由Acceptor處理,所以在向多路複用器註冊伺服器端用於監聽使用者端連線的listen-socket管道時,新增了一個Acceptor作為附加器,那麼當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。

下面看一下Acceptor的實現,如下所示。

public class Acceptor implements Runnable {
    private final Selector selector;
    private final ServerSocketChannel listenSocketChannel;
    public Acceptor(Selector selector, ServerSocketChannel listenSocketChannel) {
        this.selector = selector;
        this.listenSocketChannel = listenSocketChannel;
    }
    @Override
    public void run() {
        try {
            // 為連線的使用者端建立client-socket管道
            SocketChannel clientSocketChannel = listenSocketChannel.accept();
            // 設定為非阻塞
            clientSocketChannel.configureBlocking(false);
            // READ事件的附加器是Handler
            clientSocketChannel.register(selector, SelectionKey.OP_READ,
                    new Handler(clientSocketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在Acceptor中就是在伺服器端建立與使用者端通訊的client-socket管道,然後註冊到多路複用器上並指定監聽READ事件,同時又因為READ事件由Handler處理,所以還新增了一個Handler作為附加器,當READ事件發生時可以獲取到作為READ事件附加器的Handler來處理READ事件。

下面看一下Handler的實現,如下所示。

public class Handler implements Runnable {
    private final SocketChannel clientSocketChannel;
    public Handler(SocketChannel clientSocketChannel) {
        this.clientSocketChannel = clientSocketChannel;
    }
    @Override
    public void run() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        try {
            // 讀取資料
            int read = clientSocketChannel.read(byteBuffer);
            if (read <= 0) {
                clientSocketChannel.close();
            } else {
                System.out.println(new String(byteBuffer.array()));
            }
        } catch (IOException e1) {
            try {
                clientSocketChannel.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
            e1.printStackTrace();
        }
    }
}

在Handler中就是簡單的讀取資料並列印,當讀取資料為空或者發生異常時,需要及時將管道關閉。

最後編寫一個主程式將Reactor執行起來,如下所示。

public class MainServer {
    public static void main(String[] args) throws IOException {
        Thread reactorThread = new Thread(new Reactor(8080));
        reactorThread.start();
    }
}

現在來思考一下,單Reactor單執行緒模型有什麼優點和缺點。優點其實就是模型簡單,實現方便。缺點有兩點,如下所示。

  • 一個Reactor同時負責監聽ACCEPT事件和READ事件;
  • 只有一個執行緒在工作,處理效率低,無法利用多核CPU的優勢。

但是儘管單Reactor單執行緒模型有上述的缺點,但是著名的快取中介軟體Redis的伺服器端,就是使用的單Reactor單執行緒模型,示意圖如下。

那為什麼以效能著稱的Redis會採取單Reactor單執行緒模型呢,其實就是因為Redis的操作都在記憶體中,讀寫都非常快速,所以單Reactor單執行緒模型也能執行得很流暢,同時還避免了多執行緒下的各種並行問題。

三. 單Reactor多執行緒模型

在理解了單Reactor單執行緒模型後,那麼肯定就能想到,假如在Handler中處理READ事件的這個事情能夠使用一個執行緒池來完成,從而就可以實現READ事件的處理不會阻塞主執行緒。而這樣的一個模型,其實就是單Reactor多執行緒模型,示意圖如下所示。

和單Reactor單執行緒模型唯一的不同,就是在Handler中多了一個執行緒池。

單Reactor多執行緒模型的程式碼實現,除了Handler以外,其餘和單Reactor單執行緒模型一摸一樣,所以下面就看一下單Reactor多執行緒模型中的Handler實現,如下所示。

public class Handler implements Runnable {
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32,
            60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
    private final SocketChannel clientSocketChannel;
    public Handler(SocketChannel clientSocketChannel) {
        this.clientSocketChannel = clientSocketChannel;
    }
    @Override
    public void run() {
        threadPool.execute(() -> {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            try {
                // 讀取資料
                int read = clientSocketChannel.read(byteBuffer);
                if (read <= 0) {
                    clientSocketChannel.close();
                } else {
                    System.out.println(new String(byteBuffer.array()));
                }
                // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它使用者端請求
                LockSupport.parkNanos(1000 * 1000 * 1000 * 10L);
            } catch (IOException e1) {
                try {
                    clientSocketChannel.close();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
                e1.printStackTrace();
            }
        });
    }
}

其實就是每一個READ事件的處理會作為一個任務被扔到執行緒池中去處理。

單Reactor多執行緒模型雖然解決了只有一個執行緒的問題,但是可以發現,仍舊是隻有一個Reactor在同時監聽ACCEPT事件和READ事件。

那麼現在思考一下,為什麼一個Reactor同時監聽ACCEPT事件和READ事件是不好的。其實就是因為通常使用者端連線的建立是不頻繁的,但是連線建立後資料的收發是頻繁的,所以如果能夠將監聽READ事件這個動作拆分出來,讓多個子Reactor來監聽READ事件,而原來的主Reactor只監聽ACCEPT事件,那麼整體的效率,會進一步提升,而這,就是主從Reactor多執行緒模型。

四. 主從Reactor多執行緒模型

主從Reactor模型中,有一個主Reactor,專門監聽ACCEPT事件,然後有多個從Reactor,專門監聽READ事件,示意圖如下所示。

上述示意圖中,一次完整的處理流程可以概括如下。

  • 主Reactor監聽到ACCEPT事件發生,表示此時有使用者端建立連線;
  • 主Reactor將ACCEPT事件分發給Acceptor處理;
  • Acceptor會在伺服器端建立與使用者端通訊的client-socket管道,然後註冊到從Reactor的IO多路複用器selector上,並監聽READ事件;
  • 從Reactor監聽到READ事件發生,表示此時使用者端資料可讀;
  • 從Reactor將ACCEPT事件分發給Handler處理,Handler處理READ事件就會基於client-socket管道完成使用者端資料的讀取。

下面將基於Java語言,實現一個簡單的主從Reactor多執行緒模型的伺服器端,整體程式碼實現完全符合上述示意圖,大家可以進行參照閱讀。

首先是主Reactor的實現,如下所示。

public class MainReactor implements Runnable {
    private final Selector selector;
    public MainReactor(int port) throws IOException {
        // 開多路複用器
        selector = Selector.open();
        // 伺服器端建立listen-socket管道
        ServerSocketChannel listenSocketChannel = ServerSocketChannel.open();
        // 設定為非阻塞
        listenSocketChannel.configureBlocking(false);
        // 繫結監聽埠
        listenSocketChannel.socket().bind(new InetSocketAddress(port));
        // 將listen-socket管道繫結到主Reactor的多路複用器上
        // 並且主Reactor上只會註冊listen-socket管道,用於監聽ACCEPT事件
        listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT,
                new Acceptor(listenSocketChannel));
    }
    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterable = selectionKeys.iterator();
                while (iterable.hasNext()) {
                    // 對事件進行分發
                    dispatch(iterable.next());
                    iterable.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            LockSupport.parkNanos(1000 * 1000 * 1000);
        }
    }
    private void dispatch(SelectionKey selectionKey) {
        // 獲取事件附加器,只會是Acceptor
        Runnable attachment = (Runnable) selectionKey.attachment();
        if (attachment != null) {
            attachment.run();
        }
    }
}

主Reactor的實現中,還是先建立伺服器端監聽使用者端連線的listen-socket管道,然後註冊到主Reactor的IO多路複用器上,並監聽ACCEPT事件,同時我們現在知道,主Reactor的IO多路複用器上只會註冊listen-socket管道且只會監聽ACCEPT事件。同樣,也新增了一個Acceptor作為附加器,那麼當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。

下面是Acceptor的實現,如下所示。

public class Acceptor implements Runnable {
    // 指定從Reactor一共有16個
    private static final int TOTAL_SUBREACTOR_NUM = 16;
    // 伺服器端的listen-socket管道
    private final ServerSocketChannel listenSocketChannel;
    // 用於執行從Reactor
    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            TOTAL_SUBREACTOR_NUM, TOTAL_SUBREACTOR_NUM * 2,
            60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
    // 從Reactor集合
    private final List<SubReactor> subReactors = new ArrayList<>(TOTAL_SUBREACTOR_NUM);
    public Acceptor(ServerSocketChannel listenSocketChannel) throws IOException {
        this.listenSocketChannel = listenSocketChannel;
        // 將從Reactor初始化出來並執行
        for (int i = 0; i < TOTAL_SUBREACTOR_NUM; i++) {
            SubReactor subReactor = new SubReactor(Selector.open());
            subReactors.add(subReactor);
            threadPool.execute(subReactor);
        }
    }
    @Override
    public void run() {
        try {
            // 為連線的使用者端建立client-socket管道
            SocketChannel clientSocketChannel = listenSocketChannel.accept();
            // 設定為非阻塞
            clientSocketChannel.configureBlocking(false);
            // 任意選擇一個從Reactor,讓其監聽連線的使用者端的READ事件
            Optional<SubReactor> anySubReactor = subReactors.stream().findAny();
            if (anySubReactor.isPresent()) {
                SubReactor subReactor = anySubReactor.get();
                // 從Reactor的多路複用器會阻塞在select()方法上
                // 這裡需要先喚醒多路複用器,立即從select()方法返回
                subReactor.getSelector().wakeup();
                // 讓從Reactor負責處理使用者端的READ事件
                clientSocketChannel.register(subReactor.getSelector(), SelectionKey.OP_READ,
                        new Handler(clientSocketChannel));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

首先在Acceptor的建構函式中,會將所有從Reactor初始化出來,並且每一個從Reactor都會持有一個IO多路複用器。當一個從Reactor建立出來後就會立即執行,此時從Reactor的IO多路複用器就會開始監聽,即阻塞在select() 方法上。

然後在Acceptor的主體邏輯中,會為連線的使用者端建立client-socket管道,然後從所有從Reactor中基於某種策略(隨機)選擇一個從Reactor,並將client-socket管道註冊在選擇的從Reactor的IO多路複用器上,有一點需要注意,此時從Reactor的IO多路複用器可能會阻塞在select() 方法上,所以註冊前需要先通過wakeup() 方法進行喚醒。

接下來繼續看從Reactor的實現,如下所示。

public class SubReactor implements Runnable {
    private final Selector selector;
    public SubReactor(Selector selector) {
        this.selector = selector;
    }
    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    // 對事件進行分發
                    dispatch(iterator.next());
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            LockSupport.parkNanos(1000 * 1000 * 1000);
        }
    }
    private void dispatch(SelectionKey selectionKey) {
        // 獲取事件附加器,只會是Handler
        Runnable runnable = (Runnable) selectionKey.attachment();
        if (runnable != null) {
            runnable.run();
        }
    }
    public Selector getSelector() {
        return selector;
    }
}

從Reactor的實現中,會監聽伺服器端為連線的使用者端建立的client-socket管道上的READ事件,一旦有READ事件發生,就會使用作為附加器的Handler來處理READ事件。同樣,從Reactor的IO多路複用器上只會註冊client-socket管道且只會監聽READ事件。

然後是Handler,因為是多執行緒模型,所以其實現和第三節中的Handler完全一樣,下面再貼一下程式碼。

public class Handler implements Runnable {
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32,
            60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
    private final SocketChannel clientSocketChannel;
    public Handler(SocketChannel clientSocketChannel) {
        this.clientSocketChannel = clientSocketChannel;
    }
    @Override
    public void run() {
        threadPool.execute(() -> {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            try {
                // 讀取資料
                int read = clientSocketChannel.read(byteBuffer);
                if (read <= 0) {
                    clientSocketChannel.close();
                } else {
                    System.out.println(new String(byteBuffer.array()));
                }
                // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它使用者端請求
                LockSupport.parkNanos(1000 * 1000 * 1000 * 10L);
            } catch (IOException e1) {
                try {
                    clientSocketChannel.close();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
                e1.printStackTrace();
            }
        });
    }
}

最後編寫一個主程式將主Reactor執行起來,如下所示。

public class MainServer {
    public static void main(String[] args) throws IOException {
        Thread mainReactorThread = new Thread(new MainReactor(8080));
        mainReactorThread.start();
    }
}

總結

Reactor模型主要就是監聽事件,分發事件和處理事件。其中Reactor角色會負責監聽事件 和分發事件,Handler角色和Acceptor角色會負責處理事件。

Reactor模型雖然分為:單Reactor單執行緒模型,單Reactor多執行緒模型和主從Reactor多執行緒模型,但是其本質就是NIO的實現,是不過套了Reactor設計模式的外殼。

在網路通訊框架Netty中,三種Reactor模型都有使用到,所以想要學習Netty的精髓,理解Reactor模型是必不可少的。

以上就是一文詳解Reactor模型與實現範例的詳細內容,更多關於Reactor模型實現的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com