2021-05-12 14:32:11
Linux下epoll使用原始碼樣例分析
前言
在Linux的高效能網路程式設計中,繞不開的就是epoll。和select、poll等系統呼叫相比,epoll在需要監視大量檔案描述符並且其中只有少數活躍的時候,表現出無可比擬的優勢。epoll能讓核心記住所關注的描述符,並在對應的描述符事件就緒的時候,在epoll的就緒連結串列中新增這些就緒元素,並喚醒對應的epoll等待進程。
本文就是筆者在探究epoll原始碼過程中,對kernel將就緒描述符新增到epoll並喚醒對應進程的一次原始碼分析(基於linux-2.6.32核心版本)。由於篇幅所限,筆者聚焦於tcp協定下socket可讀事件的原始碼分析。
簡單的epoll例子
下面的例子,是從筆者本人用c語言寫的dbproxy中的一段程式碼。由於細節過多,所以做了一些刪減。
int init_reactor(int listen_fd,int worker_count){
......
// 建立多個epoll fd,以充分利用多核
for(i=0;i<worker_count;i++){
reactor->worker_fd = epoll_create(EPOLL_MAX_EVENTS);
}
/* epoll add listen_fd and accept */
// 將accept後的事件加入到對應的epoll fd中
int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
// 將連線描述符註冊到對應的worker裡面
epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);
}
// reactor的worker執行緒
static void* rw_thread_func(void* arg){
......
for(;;){
// epoll_wait等待事件觸發
int retval = epoll_wait(epfd,events,EPOLL_MAX_EVENTS,500);
if(retval > 0){
for(j=0; j < retval; j++){
// 處理讀事件
if(event & EPOLLIN){
handle_ready_read_connection(conn);
continue;
}
/* 處理其它事件 */
}
}
}
......
}
上述程式碼事實上就是實現了一個reactor模式中的accept與read/write處理執行緒,如下圖所示:
epoll_create
Unix的萬物皆檔案的思想在epoll裡面也有體現,epoll_create呼叫返回一個檔案描述符,此描述符掛載在anon_inode_fs(匿名inode檔案系統)的根目錄下面。讓我們看下具體的epoll_create系統呼叫原始碼:
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
由上述原始碼可見,epoll_create的引數是基本沒有意義的,kernel簡單的判斷是否為0,然後就直接就呼叫了sys_epoll_create1。由於linux的系統呼叫是通過(SYSCALL_DEFINE1,SYSCALL_DEFINE2......SYSCALL_DEFINE6)定義的,那麼sys_epoll_create1對應的原始碼即是SYSCALL_DEFINE(epoll_create1)。
(注:受限於暫存器數量的限制,(80x86下的)kernel限制系統呼叫最多有6個引數。據ulk3所述,這是由於32位元80x86暫存器的限制)
接下來,我們就看下epoll_create1的原始碼:
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
// kzalloc(sizeof(*ep), GFP_KERNEL),用的是核心空間
error = ep_alloc(&ep);
// 獲取尚未被使用的檔案描述符,即描述符陣列的槽位
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
// 在匿名inode檔案系統中分配一個inode,並得到其file結構體
// 且file->f_op = &eventpoll_fops
// 且file->private_data = ep;
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
// 將file填入到對應的檔案描述符陣列的槽裡面
fd_install(fd,file);
ep->file = file;
return fd;
}
最後epoll_create生成的檔案描述符如下圖所示:
struct eventpoll
所有的epoll系統呼叫都是圍繞eventpoll結構體做操作,現簡要描述下其中的成員:
/*
* 此結構體儲存在file->private_data中
*/
struct eventpoll {
// 自旋鎖,在kernel內部用自旋鎖加鎖,就可以同時多線(進)程對此結構體進行操作
// 主要是保護ready_list
spinlock_t lock;
// 這個互斥鎖是為了保證在eventloop使用對應的檔案描述符的時候,檔案描述符不會被移除掉
struct mutex mtx;
// epoll_wait使用的等待佇列,和進程喚醒有關
wait_queue_head_t wq;
// file->poll使用的等待佇列,和進程喚醒有關
wait_queue_head_t poll_wait;
// 就緒的描述符佇列
struct list_head rdllist;
// 通過紅黑樹來組織當前epoll關注的檔案描述符
struct rb_root rbr;
// 在向使用者空間傳輸就緒事件的時候,將同時發生事件的檔案描述符鏈入到這個連結串列裡面
struct epitem *ovflist;
// 對應的user
struct user_struct *user;
// 對應的檔案描述符
struct file *file;
// 下面兩個是用於環路檢測的優化
int visited;
struct list_head visited_list_link;
};
本文講述的是kernel是如何將就緒事件傳遞給epoll並喚醒對應進程上,因此在這裡主要聚焦於(wait_queue_head_t wq)等成員。
epoll_ctl(add)
我們看下epoll_ctl(EPOLL_CTL_ADD)是如何將對應的檔案描述符插入到eventpoll中的。 借助於spin_lock(自旋鎖)和mutex(互斥鎖),epoll_ctl呼叫可以在多個KSE(核心排程實體,即進程/執行緒)中並行執行。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
/* 校驗epfd是否是epoll的描述符 */
// 此處的互斥鎖是為了防止並行呼叫epoll_ctl,即保護內部資料結構
// 不會被並行的新增修改刪除破壞
mutex_lock_nested(&ep->mtx, 0);
switch (op) {
case EPOLL_CTL_ADD:
...
// 插入到紅黑樹中
error = ep_insert(ep, &epds, tfile, fd);
...
break;
......
}
mutex_unlock(&ep->mtx);
}
上述過程如下圖所示:
ep_insert
在ep_insert中初始化了epitem,然後初始化了本文關注的焦點,即事件就緒時候的回撥函數,程式碼如下所示:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
/* 初始化epitem */
// &epq.pt->qproc = ep_ptable_queue_proc
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 在這裡將回撥函數注入
revents = tfile->f_op->poll(tfile, &epq.pt);
// 如果當前有事件已經就緒,那麼一開始就會被加入到ready list
// 例如可寫事件
// 另外,在tcp內部ack之後呼叫tcp_check_space,最終呼叫sock_def_write_space來喚醒對應的epoll_wait下的進程
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
// wake_up ep對應在epoll_wait下的進程
if (waitqueue_active(&ep->wq)){
wake_up_locked(&ep->wq);
}
......
}
// 將epitem插入紅黑樹
ep_rbtree_insert(ep, epi);
......
}
tfile->f_op->poll的實現
向kernel更底層註冊回撥函數的是tfile->f_op->poll(tfile, &epq.pt)這一句,我們來看一下對於對應的socket檔案描述符,其fd=>file->f_op->poll的初始化過程:
// 將accept後的事件加入到對應的epoll fd中
int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
// 將連線描述符註冊到對應的worker裡面
epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);
回顧一下上述user space程式碼,fd即client_fd是由tcp的listen_fd通過accept呼叫而來,那麼我們看下accept呼叫鏈的關鍵路徑:
accept
|->accept4
|->sock_attach_fd(newsock, newfile, flags & O_NONBLOCK);
|->init_file(file,...,&socket_file_ops);
|->file->f_op = fop;
/* file->f_op = &socket_file_ops */
|->fd_install(newfd, newfile); // 安裝fd
那麼,由accept獲得的client_fd的結構如下圖所示:
注:由於是tcp socket,所以這邊sock->ops=inet_stream_ops
回撥函數的安裝
kernel的呼叫路徑如下:
sock_poll /*tfile->f_op->poll(tfile, &epq.pt)*/;
|->sock->ops->poll
|->tcp_poll
/* 這邊重要的是拿到了sk_sleep用於KSE(進程/執行緒)的喚醒 */
|->sock_poll_wait(file, sk->sk_sleep, wait);
|->poll_wait
|->p->qproc(filp, wait_address, p);
/* p為&epq.pt,而且&epq.pt->qproc= ep_ptable_queue_proc*/
|-> ep_ptable_queue_proc(filp,wait_address,p);
繞了一大圈之後,我們的回撥函數的安裝其實就是呼叫了eventpoll.c中的ep_ptable_queue_proc,而且向其中傳遞了sk->sk_sleep作為其waitqueue的head,其原始碼如下所示:
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
// 取出當前client_fd對應的epitem
struct epitem *epi = ep_item_from_epqueue(pt);
// &pwq->wait->func=ep_poll_callback,用於回撥喚醒
// 注意,這邊不是init_waitqueue_entry,即沒有將當前KSE(current,當前進程/執行緒)寫入到
// wait_queue當中,因為不一定是從當前安裝的KSE喚醒,而應該是喚醒epoll_wait的KSE
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
// 這邊的whead是sk->sk_sleep,將當前的waitqueue鏈入到socket對應的sleep列表
add_wait_queue(whead, &pwq->wait);
}
這樣client_fd的結構進一步完善,如下圖所示:
ep_poll_callback函數是喚醒對應epoll_wait的地方,我們將在後面一起講述。
epoll_wait
epoll_wait主要是呼叫了ep_poll:
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
/* 檢查epfd是否是epoll_create建立的fd */
// 呼叫ep_poll
error = ep_poll(ep, events, maxevents, timeout);
...
}
緊接著,我們看下ep_poll函數:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
......
retry:
// 獲取spinlock
spin_lock_irqsave(&ep->lock, flags);
// 將當前task_struct寫入到waitqueue中以便喚醒
// wq_entry->func = default_wake_function;
init_waitqueue_entry(&wait, current);
// WQ_FLAG_EXCLUSIVE,排他性喚醒,配合SO_REUSEPORT從而解決accept驚群問題
wait.flags |= WQ_FLAG_EXCLUSIVE;
// 鏈入到ep的waitqueue中
__add_wait_queue(&ep->wq, &wait);
for (;;) {
// 設定當前進程狀態為可打斷
set_current_state(TASK_INTERRUPTIBLE);
// 檢查當前執行緒是否有信號要處理,有則返回-EINTR
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
// schedule排程,讓出CPU
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
// 到這裡,表明超時或者有事件觸發等動作導致進程重新排程
__remove_wait_queue(&ep->wq, &wait);
// 設定進程狀態為running
set_current_state(TASK_RUNNING);
......
// 檢查是否有可用事件
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
......
// 向使用者空間拷貝就緒事件
ep_send_events(ep, events, maxevents)
}
上述邏輯如下圖所示:
ep_send_events
ep_send_events函數主要就是呼叫了ep_scan_ready_list,顧名思義ep_scan_ready_list就是掃描就緒列表:
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv,
int depth)
{
...
// 將epfd的rdllist鏈入到txlist
list_splice_init(&ep->rdllist, &txlist);
...
/* sproc = ep_send_events_proc */
error = (*sproc)(ep, &txlist, priv);
...
// 處理ovflist,即在上面sproc過程中又到來的事件
...
}
其主要呼叫了ep_send_events_proc:
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
// 遍歷ready list
epi = list_first_entry(head, struct epitem, rdllink);
list_del_init(&epi->rdllink);
// readylist只是表明當前epi有事件,具體的事件資訊還是得呼叫對應file的poll
// 這邊的poll即是tcp_poll,根據tcp本身的資訊設定掩碼(mask)等資訊 & 上興趣事件掩碼,則可以得知當前事件是否是epoll_wait感興趣的事件
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
if(revents){
/* 將event放入到使用者空間 */
/* 處理ONESHOT邏輯 */
// 如果不是邊緣觸發,則將當前的epi重新加回到可用列表中,這樣就可以下一次繼續觸發poll,如果下一次poll的revents不為0,那麼使用者空間依舊能感知 */
else if (!(epi->event.events & EPOLLET)){
list_add_tail(&epi->rdllink, &ep->rdllist);
}
/* 如果是邊緣觸發,那麼就不加回可用列表,因此只能等到下一個可用事件觸發的時候才會將對應的epi放到可用列表裡面*/
eventcnt++
}
/* 如poll出來的revents事件epoll_wait不感興趣(或者本來就沒有事件),那麼也不會加回到可用列表 */
......
}
return eventcnt;
}
上述程式碼邏輯如下所示:
事件到來新增到epoll就緒佇列(rdllist)的過程
經過上述章節的詳述之後,我們終於可以闡述,tcp在資料到來時是怎麼加入到epoll的就緒佇列的了。
可讀事件到來
首先我們看下tcp封包從網絡卡驅動到kernel內部tcp協定處理呼叫鏈:
step1:
網路分組到來的核心路徑,網絡卡發起中斷後呼叫netif_rx將事件掛入CPU的等待佇列,並喚起軟中斷(soft_irq),再通過linux的軟中斷機制呼叫net_rx_action,如下圖所示:
註:上圖來自PLKA(<<深入Linux核心架構>>)
step2:
緊接著跟蹤next_rx_action
next_rx_action
|-process_backlog
......
|->packet_type->func 在這裡我們考慮ip_rcv
|->ipprot->handler 在這裡ipprot過載為tcp_protocol
(handler 即為tcp_v4_rcv)
我們再看下對應的tcp_v4_rcv
tcp_v4_rcv
|->tcp_v4_do_rcv
|->tcp_rcv_state_process
|->tcp_data_queue
|-> sk->sk_data_ready(sock_def_readable)
|->wake_up_interruptible_sync_poll(sk->sleep,...)
|->__wake_up
|->__wake_up_common
|->curr->func
/* 這裡已經被ep_insert新增為ep_poll_callback,而且設定了排它標識WQ_FLAG_EXCLUSIVE*/
|->ep_poll_callback
這樣,我們就看下最終喚醒epoll_wait的ep_poll_callback函數:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
// 獲取wait對應的epitem
struct epitem *epi = ep_item_from_wait(wait);
// epitem對應的eventpoll結構體
struct eventpoll *ep = epi->ep;
// 獲取自旋鎖,保護ready_list等結構
spin_lock_irqsave(&ep->lock, flags);
// 如果當前epi沒有被鏈入ep的ready list,則鏈入
// 這樣,就把當前的可用事件加入到epoll的可用列表了
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
// 如果有epoll_wait在等待的話,則喚醒這個epoll_wait進程
// 對應的&ep->wq是在epoll_wait呼叫的時候通過init_waitqueue_entry(&wait, current)而生成的
// 其中的current即是對應呼叫epoll_wait的進程資訊task_struct
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
}
上述過程如下圖所示:
最後wake_up_locked呼叫__wake_up_common,然後呼叫了在init_waitqueue_entry註冊的default_wake_function,呼叫路徑為:
wake_up_locked
|->__wake_up_common
|->default_wake_function
|->try_wake_up (wake up a thread)
|->activate_task
|->enqueue_task running
將epoll_wait進程推入可執行佇列,等待核心重新排程進程,然後epoll_wait對應的這個進程重新執行後,就從schedule恢復,繼續下面的ep_send_events(向使用者空間拷貝事件並返回)。
wake_up過程如下圖所示:
可寫事件到來
可寫事件的執行過程和可讀事件大同小異:
首先,在epoll_ctl_add的時候預先會呼叫一次對應檔案描述符的poll,如果返回事件裡有可寫掩碼的時候直接呼叫wake_up_locked以喚醒對應的epoll_wait進程。 然後,在tcp在底層驅動有資料到來的時候可能攜帶了ack從而可以釋放部分已經被對端接收的資料,於是觸發可寫事件,這一部分的呼叫鏈為:
tcp_input.c
tcp_v4_rcv
|-tcp_v4_do_rcv
|-tcp_rcv_state_process
|-tcp_data_snd_check
|->tcp_check_space
|->tcp_new_space
|->sk->sk_write_space
/* tcp下即是sk_stream_write_space*/
最後在此函數裡面sk_stream_write_space喚醒對應的epoll_wait進程
void sk_stream_write_space(struct sock *sk)
{
// 即有1/3可寫空間的時候才觸發可寫事件
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) {
clear_bit(SOCK_NOSPACE, &sock->flags);
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
wake_up_interruptible_poll(sk->sk_sleep, POLLOUT |
POLLWRNORM | POLLWRBAND)
......
}
}
關閉描述符(close fd)
值得注意的是,我們在close對應的檔案描述符的時候,會自動呼叫eventpoll_release將對應的file從其關聯的epoll_fd中刪除,kernel關鍵路徑如下:
close fd
|->filp_close
|->fput
|->__fput
|->eventpoll_release
|->ep_remove
所以我們在關閉對應的檔案描述符後,並不需要通過epoll_ctl_del來刪掉對應epoll中相應的描述符。
總結
epoll作為linux下非常優秀的事件觸發機制得到了廣泛的運用。其原始碼還是比較複雜的,本文只是闡述了epoll讀寫事件的觸發機制,探究linux kernel原始碼的過程非常快樂^_^。
相關文章