<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
reactor設計模式是event-driven architecture的一種實現方式,處理多個使用者端並行的向伺服器端請求服務的場景。每種服務在伺服器端可能由多個方法組成。reactor會解耦並行請求的服務並分發給對應的事件處理器來處理。
中心思想是將所有要處理的I/o事件註冊到一箇中心I/o多路複用器上,同時主執行緒/程序阻塞在多路複用器上;一旦有I/o事件到來或是準備就緒(檔案描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應l/o事件分發到對應的處理器中。
處理機制為:主程式將事件以及對應事件處理的方法在Reactor上進行註冊, 如果相應的事件發生,Reactor將會主動呼叫事件註冊的介面,即 回撥函數.
前提準備:1單例模式:單例模式(Singleton Pattern,也稱為單件模式),使用最廣泛的設計模式之一。其意圖是保證一個類(結構體)僅有一個範例,並提供一個存取它的全域性存取點,該範例被所有程式模組共用。
2.回撥函數:把一段可執行的程式碼像引數傳遞那樣傳給其他程式碼,而這段程式碼會在某個時刻被呼叫執行,這就叫做回撥。
對epoll反應堆中結構體定義
/*fd包含的屬性*/ struct nitem { // fd int fd; //要監聽的檔案描述符 int status; //是否在監聽:1->在紅黑樹上(監聽),0->不在(不監聽) int events; //對應的監聽事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回撥函數) void *arg; //指向自己結構體指標 #if 0 NCALLBACK callback; #else NCALLBACK *readcb; // epollin NCALLBACK *writecb; // epollout NCALLBACK *acceptcb; // epollin #endif unsigned char sbuffer[BUFFER_LENGTH]; // int slength; unsigned char rbuffer[BUFFER_LENGTH]; int rlength; }; /*分塊儲存*/ struct itemblock { struct itemblock *next; struct nitem *items; }; /*epoll反應堆中包括通訊的fd以及epoll的(epfd)*/ struct reactor { int epfd; struct itemblock *head; };
單例模式,建立reactor的一個範例
/*單例模式*/ struct reactor *instance = NULL; int init_reactor(struct reactor *r) { if (r == NULL) return -1; int epfd = epoll_create(1); //int size r->epfd = epfd; // fd --> item r->head = (struct itemblock*)malloc(sizeof(struct itemblock)); if (r->head == NULL) { close(epfd); return -2; } memset(r->head, 0, sizeof(struct itemblock)); r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); if (r->head->items == NULL) { free(r->head); close(epfd); return -2; } memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem))); r->head->next = NULL; return 0; } struct reactor *getInstance(void) { //singleton if (instance == NULL) { instance = (struct reactor *)malloc(sizeof(struct reactor)); if (instance == NULL) return NULL; memset(instance, 0, sizeof(struct reactor)); if (0 > init_reactor(instance)) { free(instance); return NULL; } } return instance; }
事件註冊
/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/ /*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/ /*fd找到對應事件*/ /*驅動註冊*/ int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; //1 if (event == READ_CB) { r->head->items[fd].fd = fd; r->head->items[fd].readcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } //2 else if (event == WRITE_CB) { r->head->items[fd].fd = fd; r->head->items[fd].writecb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLOUT; } //3 else if (event == ACCEPT_CB) { r->head->items[fd].fd = fd; r->head->items[fd].acceptcb = cb; //回撥函數 r->head->items[fd].arg = arg; ev.events = EPOLLIN; } ev.data.ptr = &r->head->items[fd]; /*NOSET_CB 0*/ if (r->head->items[fd].events == NOSET_CB) { if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_ADD failed, %dn", errno); return -1; } r->head->items[fd].events = event; } else if (r->head->items[fd].events != event) { if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_MOD failedn"); return -1; } r->head->items[fd].events = event; } return 0; }
回撥函數書寫
int write_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *sbuffer = R->head->items[fd].sbuffer; int length = R->head->items[fd].slength; int ret = send(fd, sbuffer, length, 0); if (ret < length) { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } else { nreactor_set_event(fd, read_callback, READ_CB, NULL); } return 0; } // 5k qps int read_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *buffer = R->head->items[fd].rbuffer; #if 0 //ET int idx = 0, ret = 0; while (idx < BUFFER_LENGTH) { ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (ret == -1) { break; } else if (ret > 0) { idx += ret; } else {// == 0 break; } } if (idx == BUFFER_LENGTH && ret != -1) { nreactor_set_event(fd, read_callback, READ_CB, NULL); } else if (ret == 0) { nreactor_set_event //close(fd); } else { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #else //LT int ret = recv(fd, buffer, BUFFER_LENGTH, 0); if (ret == 0) { // fin nreactor_del_event(fd, NULL, 0, NULL); close(fd); } else if (ret > 0) { unsigned char *sbuffer = R->head->items[fd].sbuffer; memcpy(sbuffer, buffer, ret); R->head->items[fd].slength = ret; printf("readcb: %sn", sbuffer); nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #endif } // web server // ET / LT int accept_callback(int fd, int event, void *arg) { int connfd; struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)n", strerror(errno), errno); return 0; } nreactor_set_event(connfd, read_callback, READ_CB, NULL); }
監聽描述符變化
// accept --> EPOLL /*epoll_wait監聽0*/ int reactor_loop(int listenfd) { struct reactor *R = getInstance(); struct epoll_event events[POLL_SIZE] = {0}; while (1) { int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { struct nitem *item = (struct nitem *)events[i].data.ptr; int connfd = item->fd; if (connfd == listenfd) { // item->acceptcb(listenfd, 0, NULL); } else { if (events[i].events & EPOLLIN) { // item->readcb(connfd, 0, NULL); } if (events[i].events & EPOLLOUT) { item->writecb(connfd, 0, NULL); } } } } return 0; }
完整程式碼實現
#define MAXLNE 4096 #define POLL_SIZE 1024 #define BUFFER_LENGTH 1024 #define MAX_EPOLL_EVENT 1024 #define NOSET_CB 0 #define READ_CB 1 #define WRITE_CB 2 #define ACCEPT_CB 3 /*單例模式*/ typedef int NCALLBACK(int fd, int event, void *arg); /*fd包含的屬性*/ struct nitem { // fd int fd; //要監聽的檔案描述符 int status; //是否在監聽:1->在紅黑樹上(監聽),0->不在(不監聽) int events; //對應的監聽事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回撥函數) void *arg; //指向自己結構體指標 #if 0 NCALLBACK callback; #else NCALLBACK *readcb; // epollin NCALLBACK *writecb; // epollout NCALLBACK *acceptcb; // epollin #endif unsigned char sbuffer[BUFFER_LENGTH]; // int slength; unsigned char rbuffer[BUFFER_LENGTH]; int rlength; }; /*分塊儲存*/ struct itemblock { struct itemblock *next; struct nitem *items; }; /*epoll反應堆中包括通訊的fd以及epoll的(epfd)*/ struct reactor { int epfd; struct itemblock *head; }; /*初始化結構體*/ int init_reactor(struct reactor *r); int read_callback(int fd, int event, void *arg); int write_callback(int fd, int event, void *arg); int accept_callback(int fd, int event, void *arg); /*單例模式*/ struct reactor *instance = NULL; struct reactor *getInstance(void) { //singleton if (instance == NULL) { instance = (struct reactor *)malloc(sizeof(struct reactor)); if (instance == NULL) return NULL; memset(instance, 0, sizeof(struct reactor)); if (0 > init_reactor(instance)) { free(instance); return NULL; } } return instance; } /*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/ /*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/ /*fd找到對應事件*/ /*驅動註冊*/ int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; //1 if (event == READ_CB) { r->head->items[fd].fd = fd; r->head->items[fd].readcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } //2 else if (event == WRITE_CB) { r->head->items[fd].fd = fd; r->head->items[fd].writecb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLOUT; } //3 else if (event == ACCEPT_CB) { r->head->items[fd].fd = fd; r->head->items[fd].acceptcb = cb; //回撥函數 r->head->items[fd].arg = arg; ev.events = EPOLLIN; } ev.data.ptr = &r->head->items[fd]; /*NOSET_CB 0*/ if (r->head->items[fd].events == NOSET_CB) { if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_ADD failed, %dn", errno); return -1; } r->head->items[fd].events = event; } else if (r->head->items[fd].events != event) { if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_MOD failedn"); return -1; } r->head->items[fd].events = event; } return 0; } /*nreactor_del_event(fd, NULL, 0, NULL);*/ /*下樹*/ /*nreactor_del_event(fd, NULL, 0, NULL);*/ int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; ev.data.ptr = arg; epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev); r->head->items[fd].events = 0; return 0; } int write_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *sbuffer = R->head->items[fd].sbuffer; int length = R->head->items[fd].slength; int ret = send(fd, sbuffer, length, 0); if (ret < length) { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } else { nreactor_set_event(fd, read_callback, READ_CB, NULL); } return 0; } // 5k qps int read_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *buffer = R->head->items[fd].rbuffer; #if 0 //ET int idx = 0, ret = 0; while (idx < BUFFER_LENGTH) { ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (ret == -1) { break; } else if (ret > 0) { idx += ret; } else {// == 0 break; } } if (idx == BUFFER_LENGTH && ret != -1) { nreactor_set_event(fd, read_callback, READ_CB, NULL); } else if (ret == 0) { nreactor_set_event //close(fd); } else { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #else //LT int ret = recv(fd, buffer, BUFFER_LENGTH, 0); if (ret == 0) { // fin nreactor_del_event(fd, NULL, 0, NULL); close(fd); } else if (ret > 0) { unsigned char *sbuffer = R->head->items[fd].sbuffer; memcpy(sbuffer, buffer, ret); R->head->items[fd].slength = ret; printf("readcb: %sn", sbuffer); nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #endif } // web server // ET / LT int accept_callback(int fd, int event, void *arg) { int connfd; struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)n", strerror(errno), errno); return 0; } nreactor_set_event(connfd, read_callback, READ_CB, NULL); } int init_server(int port) { int listenfd; struct sockaddr_in servaddr; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)n", strerror(errno), errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(port); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { printf("bind socket error: %s(errno: %d)n", strerror(errno), errno); return 0; } if (listen(listenfd, 10) == -1) { printf("listen socket error: %s(errno: %d)n", strerror(errno), errno); return 0; } return listenfd; } int init_reactor(struct reactor *r) { if (r == NULL) return -1; int epfd = epoll_create(1); //int size r->epfd = epfd; // fd --> item r->head = (struct itemblock*)malloc(sizeof(struct itemblock)); if (r->head == NULL) { close(epfd); return -2; } memset(r->head, 0, sizeof(struct itemblock)); r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); if (r->head->items == NULL) { free(r->head); close(epfd); return -2; } memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem))); r->head->next = NULL; return 0; } // accept --> EPOLL /*epoll_wait監聽0*/ int reactor_loop(int listenfd) { struct reactor *R = getInstance(); struct epoll_event events[POLL_SIZE] = {0}; while (1) { int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { struct nitem *item = (struct nitem *)events[i].data.ptr; int connfd = item->fd; if (connfd == listenfd) { // item->acceptcb(listenfd, 0, NULL); } else { if (events[i].events & EPOLLIN) { // item->readcb(connfd, 0, NULL); } if (events[i].events & EPOLLOUT) { item->writecb(connfd, 0, NULL); } } } } return 0; } int main(int argc, char **argv) { int connfd, n; int listenfd = init_server(9999); nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL); //nreactor_set_event(listenfd, accept_callback, read_callback, write_callback); reactor_loop(listenfd); return 0; }
到此這篇關於Reactor原理與實現的文章就介紹到這了,更多相關Reactor原理內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45