首頁 > 軟體

Redis原始碼設計剖析之事件處理範例詳解

2022-09-20 22:03:44

1. Redis事件介紹

Redis伺服器是一個事件驅動程式,所謂事件驅動就是輸入一條命令並且按下回車,然後訊息被組裝成Redis協定的格式傳送給Redis伺服器,這個時候就會產生一個事件,Redis伺服器會接收改命令,處理該命令和傳送回復,而當我們沒有與伺服器進行互動時,伺服器就會處於阻塞等待狀態,它會讓出CPU然後進入睡眠狀態,當事件觸發時,就會被作業系統喚醒.

而Redis伺服器需要處理以下兩類事件:

檔案事件:Redis 伺服器通過通訊端與使用者端(或者其他Redis伺服器)進行連線,而檔案事件就是伺服器對通訊端操作的抽象. 伺服器與使用者端(或者其他伺服器)的通訊會產生相應的檔案事件,而伺服器則通過監聽並處理這些事件來完成一系列網路通訊操作.

時間事件:Redis 伺服器中的一些操作(比如serverCron函數)需要在給定的時間點執行,而時間事件就是伺服器對這類定時操作的抽象.

2. 事件的抽象

Redis把檔案事件時間事件分別抽象成一個資料結構來管理.

2.1 檔案事件結構

typedef struct aeFileEvent {
    // 檔案時間型別:AE_NONE,AE_READABLE,AE_WRITABLE
    int mask;
    // 可讀處理常式
    aeFileProc *rfileProc;
    // 可寫處理常式
    aeFileProc *wfileProc;
    // 使用者端傳入的資料
    void *clientData;
} aeFileEvent;  //檔案事件

其中rfileProcwfileProc成員分別為兩個函數指標,他們的原型為:

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

該函數是回撥函數,如果當前檔案事件所指定的事件型別發生時,則會呼叫對應的回撥函數來處理該事件.

當事件就緒的時候,我們需要知道檔案事件的檔案描述符還有事件型別才能對於鎖定該事件,因此定義了aeFiredEvent結構統一管理:

typedef struct aeFiredEvent {
    // 就緒事件的檔案描述符
    int fd;
    // 就緒事件型別:AE_NONE,AE_READABLE,AE_WRITABLE
    int mask;
} aeFiredEvent; //就緒事件

檔案事件的型別:

#define AE_NONE 0           //未設定
#define AE_READABLE 1       //事件可讀
#define AE_WRITABLE 2       //事件可寫

2.2 時間事件結構

typedef struct aeTimeEvent {
    // 時間事件的id
    long long id;
    // 時間事件到達的時間的秒數
    long when_sec; /* seconds */
    // 時間事件到達的時間的毫秒數
    long when_ms; /* milliseconds */
    // 時間事件處理常式
    aeTimeProc *timeProc;
    // 時間事件終結函數
    aeEventFinalizerProc *finalizerProc;
    // 使用者端傳入的資料
    void *clientData;
    // 指向下一個時間事件
    struct aeTimeEvent *next;
} aeTimeEvent;  //時間事件

可以看出,時間事件的結構就是一個連結串列的節點,因為struct aeTimeEvent *next是指向下一個時間事件的指標.

和檔案事件一樣,當時間事件所指定的事件發生時,也會呼叫對應的回撥函數,結構成員timeProcfinalizerProc都是回撥函數,函數原型如下:

typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

雖然對檔案事件和時間事件都做了抽象,Redis仍然需要對事件做一個整體的抽象,用來描述一個事件的狀態. 也就是下面要介紹的事件狀態結構:aeEventLoop.

2.3 事件狀態結構

typedef struct aeEventLoop {
    // 當前已註冊的最大的檔案描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 檔案描述符監聽集合的大小
    int setsize; /* max number of file descriptors tracked */
    // 下一個時間事件的ID
    long long timeEventNextId;
    // 最後一次執行事件的時間
    time_t lastTime;     /* Used to detect system clock skew */
    // 註冊的檔案事件表
    aeFileEvent *events; /* Registered events */
    // 已就緒的檔案事件表
    aeFiredEvent *fired; /* Fired events */
    // 時間事件的頭節點指標
    aeTimeEvent *timeEventHead;
    // 事件處理開關
    int stop;
    // 多路複用庫的事件狀態資料
    void *apidata; /* This is used for polling API specific data */
    // 執行處理事件之前的函數
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;  //事件輪詢的狀態結構

aeEventLoop結構儲存了一個void *型別的萬能指標apidata,用來儲存輪詢事件的狀態,也就是儲存底層呼叫的多路複用庫的事件狀態.

RedisI/O多路複用程式的所有功能都是通過包裝常見的selectepollevportkqueue這些I/O多路複用函數庫來實現的,每個I/O多路複用函數庫在Redis原始碼中都對應著一個單獨的檔案,比如ae_select.cae_epoll.c等等.

他們在編譯階段,會根據不同的系統選擇效能最高的一個多路複用庫作為Redis的多路複用程式實現,而且所有庫的API都是相同的,這就可以讓Redis多路複用程式的底層可以互換.

下面是具體選擇庫的原始碼:

// IO複用的選擇,效能依次下降,Linux支援 "ae_epoll.c" 和 "ae_select.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

也可以通過命令INFO server來檢視當前使用的是哪個多路複用庫:

可以看到Linux下預設使用的是epoll多路複用庫,那麼apidata儲存的就是epoll模型的事件狀態結構,它在ae_epoll.c原始檔中:

typedef struct aeApiState {
    // epoll事件的檔案描述符
    int epfd;
    // 事件表
    struct epoll_event *events;
} aeApiState;   // 事件的狀態

epoll模型的struct epoll_event結構中定義著epoll事件的型別,比如EPOLLINEPOLLOUT等等,但是Redis的檔案結構aeFileEvent中也在mask中定義了自己的事件型別,例如:AE_READABLEAE_WRITABLE等等,於是就需要實現一箇中間層將兩者的事件型別相聯絡起來,這就是之前提到的ae_epoll.c檔案中實現的相同的API:

// 建立一個epoll範例,儲存到eventLoop中
static int aeApiCreate(aeEventLoop *eventLoop)
// 調整事件表的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize)  
// 釋放epoll範例和事件表空間
static void aeApiFree(aeEventLoop *eventLoop)
// 在epfd標識的事件表上註冊fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 在epfd標識的事件表上注刪除fd的事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 等待所監聽檔案描述符上有事件發生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回正在使用的IO多路複用庫的名字
static char *aeApiName(void)

這些API會講epoll的底層函數封裝起來,Redis實現事件時,只需要呼叫這些介面即可.

我們以下面兩個API的原始碼舉例:

aeApiAddEvent

該函數會向Redis事件狀態結構aeEventLoop的事件表event註冊一個事件,對應的是epoll_ctl函數.

// 在epfd標識的事件表上註冊fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    // EPOLL_CTL_ADD,向epfd註冊fd的上的event
    // EPOLL_CTL_MOD,修改fd已註冊的event
    // #define AE_NONE 0           //未設定
    // #define AE_READABLE 1       //事件可讀
    // #define AE_WRITABLE 2       //事件可寫
    // 判斷fd事件的操作,如果沒有設定事件,則進行關聯mask型別事件,否則進行修改
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    // struct epoll_event {
    //      uint32_t     events;      /* Epoll events */
    //      epoll_data_t data;        /* User data variable */
    // };
    ee.events = 0;
    // 如果是修改事件,合併之前的事件型別
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    // 根據mask對映epoll的事件型別
    if (mask & AE_READABLE) ee.events |= EPOLLIN;   //讀事件
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  //寫事件
    ee.data.fd = fd;    //設定事件所從屬的目標檔案描述符
    // 將ee事件註冊到epoll中
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

aeApiPoll

等待所監聽檔案描述符上有事件發生,對應著底層的epoll_wait函數.

// 等待所監聽檔案描述符上有事件發生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 監聽事件表上是否有事件發生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 至少有一個就緒的事件
    if (retval > 0) {
        int j;
        numevents = retval;
        // 遍歷就緒的事件表,將其加入到eventLoop的就緒事件表中
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            // 根據就緒的事件型別,設定mask
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            // 新增到就緒事件表中
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    // 返回就緒的事件個數
    return numevents;
}

3. 事件的實現

事件的所有原始碼都定義在ae.c原始檔中,先從aeMain函數說起.

// 事件輪詢的主函數
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 一直處理事件
    while (!eventLoop->stop) {
        // 執行處理事件之前的函數
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        //處理到時的時間事件和就緒的檔案事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

可以看到,如果伺服器一直處理事件,那麼就是一個死迴圈,而一個最典型的事件驅動,就是一個死迴圈. 在迴圈中,程式會呼叫處理事件的函數aeProcessEvents(),它的引數是一個事件狀態結構aeEventLoopAE_ALL_EVENTS.

事件型別的宏定義,在ae.h標頭檔案中:

#define AE_FILE_EVENTS 1                                //檔案事件
#define AE_TIME_EVENTS 2                                //時間事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)   //檔案和時間事件
#define AE_DONT_WAIT 4
// 處理到時的時間事件和就緒的檔案事件
// 如果flags = 0,函數什麼都不做,直接返回
// 如果flags設定了 AE_ALL_EVENTS ,則執行所有型別的事件
// 如果flags設定了 AE_FILE_EVENTS ,則執行檔案事件
// 如果flags設定了 AE_TIME_EVENTS ,則執行時間事件
// 如果flags設定了 AE_DONT_WAIT ,那麼函數處理完事件後直接返回,不阻塞等待
// 函數返回執行的事件個數
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    // 如果什麼事件都沒有設定則直接返回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    // 請注意,既然我們要處理時間事件,即使沒有要處理的檔案事件,我們仍要呼叫select(),以便在下一次事件準備啟動之前進行休眠
    // 當前還沒有要處理的檔案事件,或者設定了時間事件但是沒有設定不阻塞標識
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // 如果設定了時間事件而沒有設定不阻塞標識
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 獲取最近到時的時間事件
            shortest = aeSearchNearestTimer(eventLoop);
        // 獲取到了最早到時的時間事件
        if (shortest) {
            long now_sec, now_ms;
            // 獲取當前時間
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            // 等待該時間事件到時所需要的時長
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
            // 如果沒到時
            if (ms > 0) {
                // 儲存時長到tvp中
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            // 如果已經到時,則將tvp的時間設定為0
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        // 沒有獲取到了最早到時的時間事件,時間事件連結串列為空
        } else {
            // 如果設定了不阻塞標識
            if (flags & AE_DONT_WAIT) {
                // 將tvp的時間設定為0,就不會阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 阻塞到第一個時間事件的到來
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
        // 等待所監聽檔案描述符上有事件發生
        // 如果tvp為NULL,則阻塞在此,否則等待tvp設定阻塞的時間,就會有時間事件到時
        // 返回了就緒檔案事件的個數
        numevents = aeApiPoll(eventLoop, tvp);
        // 遍歷就緒檔案事件表
        for (j = 0; j < numevents; j++) {
            // 獲取就緒檔案事件的地址
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            // 獲取就緒檔案事件的型別,檔案描述符
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            // 如果是檔案可讀事件發生
            if (fe->mask & mask & AE_READABLE) {
                // 設定讀事件標識 且 呼叫讀事件方法處理讀事件
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 如果是檔案可寫事件發生
            if (fe->mask & mask & AE_WRITABLE) {
                // 讀寫事件的執行發法不同,則執行寫事件,避免重複執行相同的方法
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;    //執行的事件次數加1
        }
    }
    /* Check time events */
    // 執行時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
}

Redis伺服器在沒有被事件觸發時,如果沒有設定AE_DONT_WAIT標識,就會開始阻塞等待. 但是它不會死等待,因為還需要處理時間事件,所以在呼叫aeApiPoll進行監聽之前,會先從時間事件表中獲取一個最近到達的時間,根據需要等待的時間構建一個struct timeval tv, *tvp結構的變數,這個變數儲存著伺服器阻塞等待檔案事件的最長時間,一旦時間到達而沒有觸發檔案事件aeApiPoll函數就會停止阻塞,進而呼叫processTimeEvents函數處理時間事件.

如果在阻塞等待的最長時間之間,觸發了檔案事件,就會先執行檔案事件,後執行時間事件,因此處理時間事件通常比預設的會晚一點.

而執行檔案事件rfileProcwfileProc也是呼叫了回撥函數,Redis將檔案事件的處理分為了好幾種,用於處理不同的網路通訊需求:

  • acceptTcpHandler:用於accept client的connect.
  • acceptUnixHandler:用於accept client的本地connect.
  • sendReplyToClient:用於向client傳送命令回覆.
  • readQueryFromClient:用於讀入client傳送的請求.

然後我們來看一下獲取最快達到時間事件的函數aeSearchNearestTimer實現:

// 尋找第一個快到時的時間事件
// 這個操作是有用的知道有多少時間可以選擇該事件設定為不用推遲任何事件的睡眠中。
// 如果事件連結串列沒有時間將返回NULL。
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    // 時間事件頭節點地址
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;
    // 遍歷所有的時間事件
    while(te) {
        // 尋找第一個快到時的時間事件,儲存到nearest中
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

該函數就是遍歷時間事件連結串列,然後找到最小值.

我們重點看執行時間事件的函數processTimeEvents函數的實現:

// 執行時間事件
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);
    // 這裡嘗試發現時間混亂的情況,上一次處理事件的時間比當前時間還要大
    // 重置最近一次處理事件的時間
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    // 設定上一次時間事件處理的時間為當前時間
    eventLoop->lastTime = now;
    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;   //當前時間事件表中的最大ID
    // 遍歷時間事件連結串列
    while(te) {
        long now_sec, now_ms;
        long long id;
        /* Remove events scheduled for deletion. */
        // 如果時間事件已被刪除了
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            // 從事件連結串列中刪除事件的節點
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            // 呼叫時間事件終結方法清除該事件
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }
        // 確保我們不處理在此迭代中由時間事件建立的時間事件. 請注意,此檢查目前無效:我們總是在頭節點新增新的計時器,但是如果我們更改實施細節,則該檢查可能會再次有用:我們將其保留在未來的防禦
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        // 獲取當前時間
        aeGetTime(&now_sec, &now_ms);
        // 找到已經到時的時間事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            // 呼叫時間事件處理方法
            retval = te->timeProc(eventLoop, id, te->clientData);
            // 時間事件次數加1
            processed++;
            // 如果不是定時事件,則繼續設定它的到時時間
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            // 如果是定時時間,則retval為-1,則將其時間事件刪除,惰性刪除
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        // 更新前驅節點指標和後繼節點指標
        prev = te;
        te = te->next;
    }
    return processed;   //返回執行事件的次數
}

如果時間事件不存在,則就呼叫finalizerProc指向的回撥函數,刪除當前的時間事件. 如果存在,就呼叫timeProc指向的回撥函數處理時間事件. Redis的時間事件分為兩類:

  • 定時事件:讓一段程式在指定的時間後執行一次.
  • 週期性事件:讓一段程式每隔指定的時間後執行一次.

如果當前的時間事件是週期性,那麼就會在將時間週期新增到週期事件的到時時間中. 如果是定時事件,則將該時間事件刪除.

參考資料:

《Redis設計與實現》

以上就是Redis原始碼設計剖析之事件處理範例詳解的詳細內容,更多關於Redis事件處理的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com