<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
週六在公司寫Reactor模型,
一女同事問我為啥都2023年了還在學習Reactor模型呀,
我問她為啥快30的年紀了,週六還在公司看我寫Reactor呀,
一時間辦公室裡,男的,女的,都沉默了。
在網路IO設計中,有兩種高效能模型:Reactor模型和Proactor模型。Reactor基於同步IO模式,Proactor基於非同步IO模式。
Netty網路框架,Redis等中介軟體中都有使用到Reactor模型。本文將對Reactor模型的如下三種分類進行學習和實現。
如果不具備網路IO的相關知識,建議先閱讀Java網路IO模型分析與實現。
Reactor翻譯過來的意思是:反應堆,所以Reactor設計模式本質是基於事件驅動的。在Reactor設計模式中,存在如下幾個角色。
Reactor設計模式的一個簡單類圖,如下所示。
通常,Reactor設計模式中的Reactor,可以理解為上述圖中的Synchronous Event Demultiplexer + Initiation Dispatcher。
單Reactor單執行緒模型中,只有一個Reactor在監聽事件和分發事件,並且監聽事件,分發事件和處理事件都在一個執行緒中完成。示意圖如下所示。
上述示意圖中,一次完整的處理流程可以概括如下。
下面將基於Java語言,實現一個簡單的單Reactor單執行緒模型的伺服器端,整體程式碼實現完全符合上述示意圖,大家可以進行參照閱讀。
首先實現Reactor,如下所示。
public class Reactor implements Runnable { private final Selector selector; public Reactor(int port) throws IOException { // 開啟多路複用 selector = Selector.open(); // 伺服器端建立listen-socket管道 ServerSocketChannel listenSocketChannel = ServerSocketChannel.open(); // 繫結埠 listenSocketChannel.socket().bind(new InetSocketAddress(port)); // 設定為非阻塞模式 listenSocketChannel.configureBlocking(false); // ACCEPT事件的附加器是Acceptor listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, listenSocketChannel)); } @Override public void run() { while (!Thread.interrupted()) { try { // 獲取發生的事件 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterable = selectionKeys.iterator(); while (iterable.hasNext()) { // 對事件進行分發 dispatch(iterable.next()); iterable.remove(); } } catch (IOException e) { e.printStackTrace(); } LockSupport.parkNanos(1000 * 1000 * 1000); } } private void dispatch(SelectionKey selectionKey) { // 獲取事件的附加器 // ACCEPT事件的附加器是Acceptor,故由Acceptor來處理ACCEPT事件 // READ事件的附加器是Handler,故由Handler來處理READ事件 Runnable attachment = (Runnable) selectionKey.attachment(); if (attachment != null) { attachment.run(); } } }
已知Reactor會監聽使用者端連線的ACCEPT事件,還已知ACCEPT事件由Acceptor處理,所以在向多路複用器註冊伺服器端用於監聽使用者端連線的listen-socket管道時,新增了一個Acceptor作為附加器,那麼當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。
下面看一下Acceptor的實現,如下所示。
public class Acceptor implements Runnable { private final Selector selector; private final ServerSocketChannel listenSocketChannel; public Acceptor(Selector selector, ServerSocketChannel listenSocketChannel) { this.selector = selector; this.listenSocketChannel = listenSocketChannel; } @Override public void run() { try { // 為連線的使用者端建立client-socket管道 SocketChannel clientSocketChannel = listenSocketChannel.accept(); // 設定為非阻塞 clientSocketChannel.configureBlocking(false); // READ事件的附加器是Handler clientSocketChannel.register(selector, SelectionKey.OP_READ, new Handler(clientSocketChannel)); } catch (IOException e) { e.printStackTrace(); } } }
在Acceptor中就是在伺服器端建立與使用者端通訊的client-socket管道,然後註冊到多路複用器上並指定監聽READ事件,同時又因為READ事件由Handler處理,所以還新增了一個Handler作為附加器,當READ事件發生時可以獲取到作為READ事件附加器的Handler來處理READ事件。
下面看一下Handler的實現,如下所示。
public class Handler implements Runnable { private final SocketChannel clientSocketChannel; public Handler(SocketChannel clientSocketChannel) { this.clientSocketChannel = clientSocketChannel; } @Override public void run() { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try { // 讀取資料 int read = clientSocketChannel.read(byteBuffer); if (read <= 0) { clientSocketChannel.close(); } else { System.out.println(new String(byteBuffer.array())); } } catch (IOException e1) { try { clientSocketChannel.close(); } catch (IOException e2) { e2.printStackTrace(); } e1.printStackTrace(); } } }
在Handler中就是簡單的讀取資料並列印,當讀取資料為空或者發生異常時,需要及時將管道關閉。
最後編寫一個主程式將Reactor執行起來,如下所示。
public class MainServer { public static void main(String[] args) throws IOException { Thread reactorThread = new Thread(new Reactor(8080)); reactorThread.start(); } }
現在來思考一下,單Reactor單執行緒模型有什麼優點和缺點。優點其實就是模型簡單,實現方便。缺點有兩點,如下所示。
但是儘管單Reactor單執行緒模型有上述的缺點,但是著名的快取中介軟體Redis的伺服器端,就是使用的單Reactor單執行緒模型,示意圖如下。
那為什麼以效能著稱的Redis會採取單Reactor單執行緒模型呢,其實就是因為Redis的操作都在記憶體中,讀寫都非常快速,所以單Reactor單執行緒模型也能執行得很流暢,同時還避免了多執行緒下的各種並行問題。
在理解了單Reactor單執行緒模型後,那麼肯定就能想到,假如在Handler中處理READ事件的這個事情能夠使用一個執行緒池來完成,從而就可以實現READ事件的處理不會阻塞主執行緒。而這樣的一個模型,其實就是單Reactor多執行緒模型,示意圖如下所示。
和單Reactor單執行緒模型唯一的不同,就是在Handler中多了一個執行緒池。
單Reactor多執行緒模型的程式碼實現,除了Handler以外,其餘和單Reactor單執行緒模型一摸一樣,所以下面就看一下單Reactor多執行緒模型中的Handler實現,如下所示。
public class Handler implements Runnable { private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); private final SocketChannel clientSocketChannel; public Handler(SocketChannel clientSocketChannel) { this.clientSocketChannel = clientSocketChannel; } @Override public void run() { threadPool.execute(() -> { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try { // 讀取資料 int read = clientSocketChannel.read(byteBuffer); if (read <= 0) { clientSocketChannel.close(); } else { System.out.println(new String(byteBuffer.array())); } // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它使用者端請求 LockSupport.parkNanos(1000 * 1000 * 1000 * 10L); } catch (IOException e1) { try { clientSocketChannel.close(); } catch (IOException e2) { e2.printStackTrace(); } e1.printStackTrace(); } }); } }
其實就是每一個READ事件的處理會作為一個任務被扔到執行緒池中去處理。
單Reactor多執行緒模型雖然解決了只有一個執行緒的問題,但是可以發現,仍舊是隻有一個Reactor在同時監聽ACCEPT事件和READ事件。
那麼現在思考一下,為什麼一個Reactor同時監聽ACCEPT事件和READ事件是不好的。其實就是因為通常使用者端連線的建立是不頻繁的,但是連線建立後資料的收發是頻繁的,所以如果能夠將監聽READ事件這個動作拆分出來,讓多個子Reactor來監聽READ事件,而原來的主Reactor只監聽ACCEPT事件,那麼整體的效率,會進一步提升,而這,就是主從Reactor多執行緒模型。
主從Reactor模型中,有一個主Reactor,專門監聽ACCEPT事件,然後有多個從Reactor,專門監聽READ事件,示意圖如下所示。
上述示意圖中,一次完整的處理流程可以概括如下。
下面將基於Java語言,實現一個簡單的主從Reactor多執行緒模型的伺服器端,整體程式碼實現完全符合上述示意圖,大家可以進行參照閱讀。
首先是主Reactor的實現,如下所示。
public class MainReactor implements Runnable { private final Selector selector; public MainReactor(int port) throws IOException { // 開多路複用器 selector = Selector.open(); // 伺服器端建立listen-socket管道 ServerSocketChannel listenSocketChannel = ServerSocketChannel.open(); // 設定為非阻塞 listenSocketChannel.configureBlocking(false); // 繫結監聽埠 listenSocketChannel.socket().bind(new InetSocketAddress(port)); // 將listen-socket管道繫結到主Reactor的多路複用器上 // 並且主Reactor上只會註冊listen-socket管道,用於監聽ACCEPT事件 listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(listenSocketChannel)); } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterable = selectionKeys.iterator(); while (iterable.hasNext()) { // 對事件進行分發 dispatch(iterable.next()); iterable.remove(); } } catch (IOException e) { e.printStackTrace(); } LockSupport.parkNanos(1000 * 1000 * 1000); } } private void dispatch(SelectionKey selectionKey) { // 獲取事件附加器,只會是Acceptor Runnable attachment = (Runnable) selectionKey.attachment(); if (attachment != null) { attachment.run(); } } }
主Reactor的實現中,還是先建立伺服器端監聽使用者端連線的listen-socket管道,然後註冊到主Reactor的IO多路複用器上,並監聽ACCEPT事件,同時我們現在知道,主Reactor的IO多路複用器上只會註冊listen-socket管道且只會監聽ACCEPT事件。同樣,也新增了一個Acceptor作為附加器,那麼當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。
下面是Acceptor的實現,如下所示。
public class Acceptor implements Runnable { // 指定從Reactor一共有16個 private static final int TOTAL_SUBREACTOR_NUM = 16; // 伺服器端的listen-socket管道 private final ServerSocketChannel listenSocketChannel; // 用於執行從Reactor private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( TOTAL_SUBREACTOR_NUM, TOTAL_SUBREACTOR_NUM * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); // 從Reactor集合 private final List<SubReactor> subReactors = new ArrayList<>(TOTAL_SUBREACTOR_NUM); public Acceptor(ServerSocketChannel listenSocketChannel) throws IOException { this.listenSocketChannel = listenSocketChannel; // 將從Reactor初始化出來並執行 for (int i = 0; i < TOTAL_SUBREACTOR_NUM; i++) { SubReactor subReactor = new SubReactor(Selector.open()); subReactors.add(subReactor); threadPool.execute(subReactor); } } @Override public void run() { try { // 為連線的使用者端建立client-socket管道 SocketChannel clientSocketChannel = listenSocketChannel.accept(); // 設定為非阻塞 clientSocketChannel.configureBlocking(false); // 任意選擇一個從Reactor,讓其監聽連線的使用者端的READ事件 Optional<SubReactor> anySubReactor = subReactors.stream().findAny(); if (anySubReactor.isPresent()) { SubReactor subReactor = anySubReactor.get(); // 從Reactor的多路複用器會阻塞在select()方法上 // 這裡需要先喚醒多路複用器,立即從select()方法返回 subReactor.getSelector().wakeup(); // 讓從Reactor負責處理使用者端的READ事件 clientSocketChannel.register(subReactor.getSelector(), SelectionKey.OP_READ, new Handler(clientSocketChannel)); } } catch (IOException e) { e.printStackTrace(); } } }
首先在Acceptor的建構函式中,會將所有從Reactor初始化出來,並且每一個從Reactor都會持有一個IO多路複用器。當一個從Reactor建立出來後就會立即執行,此時從Reactor的IO多路複用器就會開始監聽,即阻塞在select() 方法上。
然後在Acceptor的主體邏輯中,會為連線的使用者端建立client-socket管道,然後從所有從Reactor中基於某種策略(隨機)選擇一個從Reactor,並將client-socket管道註冊在選擇的從Reactor的IO多路複用器上,有一點需要注意,此時從Reactor的IO多路複用器可能會阻塞在select() 方法上,所以註冊前需要先通過wakeup() 方法進行喚醒。
接下來繼續看從Reactor的實現,如下所示。
public class SubReactor implements Runnable { private final Selector selector; public SubReactor(Selector selector) { this.selector = selector; } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 對事件進行分發 dispatch(iterator.next()); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } LockSupport.parkNanos(1000 * 1000 * 1000); } } private void dispatch(SelectionKey selectionKey) { // 獲取事件附加器,只會是Handler Runnable runnable = (Runnable) selectionKey.attachment(); if (runnable != null) { runnable.run(); } } public Selector getSelector() { return selector; } }
從Reactor的實現中,會監聽伺服器端為連線的使用者端建立的client-socket管道上的READ事件,一旦有READ事件發生,就會使用作為附加器的Handler來處理READ事件。同樣,從Reactor的IO多路複用器上只會註冊client-socket管道且只會監聽READ事件。
然後是Handler,因為是多執行緒模型,所以其實現和第三節中的Handler完全一樣,下面再貼一下程式碼。
public class Handler implements Runnable { private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); private final SocketChannel clientSocketChannel; public Handler(SocketChannel clientSocketChannel) { this.clientSocketChannel = clientSocketChannel; } @Override public void run() { threadPool.execute(() -> { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try { // 讀取資料 int read = clientSocketChannel.read(byteBuffer); if (read <= 0) { clientSocketChannel.close(); } else { System.out.println(new String(byteBuffer.array())); } // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它使用者端請求 LockSupport.parkNanos(1000 * 1000 * 1000 * 10L); } catch (IOException e1) { try { clientSocketChannel.close(); } catch (IOException e2) { e2.printStackTrace(); } e1.printStackTrace(); } }); } }
最後編寫一個主程式將主Reactor執行起來,如下所示。
public class MainServer { public static void main(String[] args) throws IOException { Thread mainReactorThread = new Thread(new MainReactor(8080)); mainReactorThread.start(); } }
Reactor模型主要就是監聽事件,分發事件和處理事件。其中Reactor角色會負責監聽事件 和分發事件,Handler角色和Acceptor角色會負責處理事件。
Reactor模型雖然分為:單Reactor單執行緒模型,單Reactor多執行緒模型和主從Reactor多執行緒模型,但是其本質就是NIO的實現,是不過套了Reactor設計模式的外殼。
在網路通訊框架Netty中,三種Reactor模型都有使用到,所以想要學習Netty的精髓,理解Reactor模型是必不可少的。
以上就是一文詳解Reactor模型與實現範例的詳細內容,更多關於Reactor模型實現的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45