首頁 > 軟體

Redis命令處理過程原始碼解析

2022-02-10 19:00:57

本文基於社群版Redis 4.0.8

1、命令解析

Redis伺服器接收到的命令請求首先儲存在使用者端物件的querybuf輸入緩衝區,然後解析命令請求的各個引數,並儲存在使用者端物件的argv和argc欄位。

使用者端解析命令請求的入口函數為readQueryFromClient,會讀取socket資料儲存到使用者端物件的輸入緩衝區,並呼叫函數processInputBuffer解析命令請求。

注:內聯命令:使用telnet對談輸入命令的方式

void processInputBuffer(client *c) {
    ......
    //迴圈遍歷輸入緩衝區,獲取命令引數,呼叫processMultibulkBuffer解析命令引數和長度
    while(sdslen(c->querybuf)) {
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;//處理telnet方式的內聯命令
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break; //解析命令引數和長度暫存到使用者端結構體中
        } else {
            serverPanic("Unknown request type");
        }
    }    
}

//解析命令引數和長度暫存到使用者端結構體中
int processMultibulkBuffer(client *c) {
    //定位到行尾
    newline = strchr(c->querybuf,'r');
    //解析命令請求引數數目,並儲存在使用者端物件的c->multibulklen欄位
    serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
    ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
    c->multibulklen = ll;
    pos = (newline-c->querybuf)+2;//記錄已解析命令的請求長度resp的長度
    /* Setup argv array on client structure */
    //分配請求引數儲存空間
    c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    
    // 開始迴圈解析每個請求引數
    while(c->multibulklen) {
        ......
        newline = strchr(c->querybuf+pos,'r');
        if (c->querybuf[pos] != '$') {
            return C_ERR;
        ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
        pos += newline-(c->querybuf+pos)+2;
        c->bulklen = ll;//字串引數長度暫存在使用者端物件的bulklen欄位
        
        //讀取該長度的引數內容,並建立字串物件,同時更新待解析引數multibulklen
        c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen);
        pos += c->bulklen+2;
        c->multibulklen--;
    }

2、命令呼叫

當multibulklen的值更新為0時,表示引數解析完成,開始呼叫processCommand來處理命令,處理命令前有很多校驗邏輯,如下:

void processInputBuffer(client *c) {
    
    ......
     //呼叫processCommand來處理命令
     if (processCommand(c) == C_OK) {
         ......
     }
}

//處理命令函數
int processCommand(client *c) {
    //校驗是否是quit命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    //呼叫lookupCommand,檢視該命令是否存在
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return C_OK;
    //檢查使用者許可權
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        addReply(c,shared.noautherr);
    //還有很多檢查,不一一列舉,比如叢集/持久化/複製等
    /* 真正執行命令 */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        queueMultiCommand(c);
        //將結果寫入outbuffer
        addReply(c,shared.queued);
    } 
// 呼叫execCommand執行命令
void execCommand(client *c) {
    call(c,CMD_CALL_FULL);//呼叫call執行命令
//呼叫execCommand呼叫call執行命令
void call(client *c, int flags) {
    start = ustime();
    c->cmd->proc(c);//執行命令
    duration = ustime()-start;
    //如果是慢查詢,記錄慢查詢
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        //記錄到慢紀錄檔中
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    //更新統計資訊:當前命令執行時間和呼叫次數
    if (flags & CMD_CALL_STATS) {
        c->lastcmd->microseconds += duration;
        c->lastcmd->calls++;

3、返回結果

Redis返回結果並不是直接返回給使用者端,而是先寫入到輸出緩衝區(buf欄位)或者輸出連結串列(reply欄位)

int processCommand(client *c) {
    ......
    //將結果寫入outbuffer
    addReply(c,shared.queued);
    ......
    
}
//將結果寫入outbuffer
void addReply(client *c, robj *obj) {
    //呼叫listAddNodeHead將使用者端新增到伺服器端結構體的client_pending_write連結串列,以便後續能快速查詢出哪些使用者端有資料需要傳送
    if (prepareClientToWrite(c) != C_OK) return;
    
    //然後新增字串到輸出緩衝區
    if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
        //如果新增失敗,則新增到輸出連結串列中
        _addReplyObjectToList(c,obj); 
}

addReply函數只是將待傳送給使用者端的資料暫存在輸出連結串列或者輸出緩衝區,那麼什麼時候將這些資料傳送給使用者端呢?答案是開啟事件迴圈時,呼叫的beforesleep函數,該函數專門執行一些不是很費時的操作,如過期鍵刪除,向用戶端返回命令回覆等

void beforeSleep(struct aeEventLoop *eventLoop) {
    ......
     /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
}

//回覆使用者端命令函數
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        /* 傳送使用者端資料 */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
        /* If there is nothing left, do nothing. Otherwise install
         * the write handler. */
         //如果資料量很大,一次性沒有傳送完成,則進行新增檔案事件,監聽當前使用者端socket檔案描述符的可寫事件即可
        if (clientHasPendingReplies(c) &&
            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                sendReplyToClient, c) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    return processed;

到這裡,命令請求才算真正處理完成了。

到此這篇關於Redis命令處理過程原始碼解析的文章就介紹到這了,更多相關Redis命令處理內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com