<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Redis網路連線庫對應的檔案是networking.c
,這個檔案主要負責:
Redis伺服器是一個同時與多個使用者端建立連線的程式. 當用戶端連線上伺服器時,伺服器會建立一個server.h/client
結構來儲存使用者端的狀態資訊. server.h/client
結構如下所示:
typedef struct client { // client獨一無二的ID uint64_t id; /* Client incremental unique ID. */ // client的通訊端 int fd; /* Client socket. */ // 指向當前的資料庫 redisDb *db; /* Pointer to currently SELECTed DB. */ // 儲存指向資料庫的ID int dictid; /* ID of the currently SELECTed DB. */ // client的名字 robj *name; /* As set by CLIENT SETNAME. */ // 輸入緩衝區 sds querybuf; /* Buffer we use to accumulate client queries. */ // 輸入快取的峰值 size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ // client輸入命令時,引數的數量 int argc; /* Num of arguments of current command. */ // client輸入命令的參數列 robj **argv; /* Arguments of current command. */ // 儲存使用者端執行命令的歷史記錄 struct redisCommand *cmd, *lastcmd; /* Last command executed. */ // 請求協定型別,內聯或者多條命令 int reqtype; /* Request protocol type: PROTO_REQ_* */ // 參數列中未讀取命令引數的數量,讀取一個,該值減1 int multibulklen; /* Number of multi bulk arguments left to read. */ // 命令內容的長度 long bulklen; /* Length of bulk argument in multi bulk request. */ // 回覆快取列表,用於傳送大於固定回覆緩衝區的回覆 list *reply; /* List of reply objects to send to the client. */ // 回覆快取列表物件的總位元組數 unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ // 已傳送的位元組數或物件的位元組數 size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ // client建立所需時間 time_t ctime; /* Client creation time. */ // 最後一次和伺服器互動的時間 time_t lastinteraction; /* Time of the last interaction, used for timeout */ // 使用者端的輸出緩衝區超過軟性限制的時間,記錄輸出緩衝區第一次到達軟性限制的時間 time_t obuf_soft_limit_reached_time; // client狀態的標誌 int flags; /* Client flags: CLIENT_* macros. */ // 認證標誌,0表示未認證,1表示已認證 int authenticated; /* When requirepass is non-NULL. */ // 從節點的複製狀態 int replstate; /* Replication state if this is a slave. */ // 在ack上設定從節點的寫處理器,是否在slave向master傳送ack, int repl_put_online_on_ack; /* Install slave write handler on ACK. */ // 儲存主伺服器傳來的RDB檔案的檔案描述符 int repldbfd; /* Replication DB file descriptor. */ // 讀取主伺服器傳來的RDB檔案的偏移量 off_t repldboff; /* Replication DB file offset. */ // 主伺服器傳來的RDB檔案的大小 off_t repldbsize; /* Replication DB file size. */ // 主伺服器傳來的RDB檔案的大小,符合協定的字串形式 sds replpreamble; /* Replication DB preamble. */ // replication複製的偏移量 long long reploff; /* Replication offset if this is our master. */ // 通過ack命令接收到的偏移量 long long repl_ack_off; /* Replication ack offset, if this is a slave. */ // 通過ack命令接收到的偏移量所用的時間 long long repl_ack_time;/* Replication ack time, if this is a slave. */ // FULLRESYNC回覆給從節點的offset long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */ // 從節點的埠號 int slave_listening_port; /* As configured with: REPLCONF listening-port */ // 從節點IP地址 char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ // 從節點的功能 int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ // 事物狀態 multiState mstate; /* MULTI/EXEC state */ // 阻塞型別 int btype; /* Type of blocking op if CLIENT_BLOCKED. */ // 阻塞的狀態 blockingState bpop; /* blocking state */ // 最近一個寫全域性的複製偏移量 long long woff; /* Last write global replication offset. */ // 監控列表 list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ // 訂閱頻道 dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ // 訂閱的模式 list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ // 被快取的ID sds peerid; /* Cached peer ID. */ /* Response buffer */ // 回覆固定緩衝區的偏移量 int bufpos; // 回覆固定緩衝區 char buf[PROTO_REPLY_CHUNK_BYTES]; } client;
建立使用者端的原始碼:
// 建立一個新的client client *createClient(int fd) { client *c = zmalloc(sizeof(client)); //分配空間 // 如果fd為-1,表示建立的是一個無網路連線的偽使用者端,用於執行lua指令碼的時候 // 如果fd不等於-1,表示建立一個有網路連線的使用者端 if (fd != -1) { // 設定fd為非阻塞模式 anetNonBlock(NULL,fd); // 禁止使用 Nagle 演演算法,client向核心遞交的每個封包都會立即傳送給server出去,TCP_NODELAY anetEnableTcpNoDelay(NULL,fd); // 如果開啟了tcpkeepalive,則設定 SO_KEEPALIVE if (server.tcpkeepalive) // 設定tcp連線的keep alive選項 anetKeepAlive(NULL,fd,server.tcpkeepalive); // 建立一個檔案事件狀態el,且監聽讀事件,開始接受命令的輸入 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } // 預設選0號資料庫 selectDb(c,0); // 設定client的ID c->id = server.next_client_id++; // client的通訊端 c->fd = fd; // client的名字 c->name = NULL; // 回覆固定(靜態)緩衝區的偏移量 c->bufpos = 0; // 輸入快取區 c->querybuf = sdsempty(); // 輸入快取區的峰值 c->querybuf_peak = 0; // 請求協定型別,內聯或者多條命令,初始化為0 c->reqtype = 0; // 引數個數 c->argc = 0; // 參數列 c->argv = NULL; // 當前執行的命令和最近一次執行的命令 c->cmd = c->lastcmd = NULL; // 查詢緩衝區剩餘未讀取命令的數量 c->multibulklen = 0; // 讀入引數的長度 c->bulklen = -1; // 已發的位元組數 c->sentlen = 0; // client的狀態 c->flags = 0; // 設定建立client的時間和最後一次互動的時間 c->ctime = c->lastinteraction = server.unixtime; // 認證狀態 c->authenticated = 0; // replication複製的狀態,初始為無 c->replstate = REPL_STATE_NONE; // 設定從節點的寫處理器為ack,是否在slave向master傳送ack c->repl_put_online_on_ack = 0; // replication複製的偏移量 c->reploff = 0; // 通過ack命令接收到的偏移量 c->repl_ack_off = 0; // 通過ack命令接收到的偏移量所用的時間 c->repl_ack_time = 0; // 從節點的埠號 c->slave_listening_port = 0; // 從節點IP地址 c->slave_ip[0] = ' '; // 從節點的功能 c->slave_capa = SLAVE_CAPA_NONE; // 回覆連結串列 c->reply = listCreate(); // 回覆連結串列的位元組數 c->reply_bytes = 0; // 回覆緩衝區的記憶體大小軟限制 c->obuf_soft_limit_reached_time = 0; // 回覆連結串列的釋放和複製方法 listSetFreeMethod(c->reply,decrRefCountVoid); listSetDupMethod(c->reply,dupClientReplyValue); // 阻塞型別 c->btype = BLOCKED_NONE; // 阻塞超過時間 c->bpop.timeout = 0; // 造成阻塞的鍵字典 c->bpop.keys = dictCreate(&setDictType,NULL); // 儲存解除阻塞的鍵,用於儲存PUSH入元素的鍵,也就是dstkey c->bpop.target = NULL; // 阻塞狀態 c->bpop.numreplicas = 0; // 要達到的複製偏移量 c->bpop.reploffset = 0; // 全域性的複製偏移量 c->woff = 0; // 監控的鍵 c->watched_keys = listCreate(); // 訂閱頻道 c->pubsub_channels = dictCreate(&setDictType,NULL); // 訂閱模式 c->pubsub_patterns = listCreate(); // 被快取的peerid,peerid就是 ip:port c->peerid = NULL; // 訂閱釋出模式的釋放和比較方法 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); // 將真正的client放在伺服器的使用者端連結串列中 if (fd != -1) listAddNodeTail(server.clients,c); // 初始化client的事物狀態 initClientMultiState(c); return c; }
根據建立的檔案描述符fd
,可以建立用於不同場景下的client
. 這個fd
就是伺服器接收使用者端connect
後所返回的檔案描述符.
fd == -1
,表示建立一個無網路連線的使用者端。主要用於執行 lua 指令碼時.fd != -1
,表示接收到一個正常的使用者端連線,則會建立一個有網路連線的使用者端,也就是建立一個檔案事件,來監聽這個fd是否可讀,當用戶端傳送資料,則事件被觸發.建立使用者端的過程,會將server.h/client
結構的所有成員初始化,接下里會介紹部分重點的成員.
int id
:伺服器對於每一個連線進來的都會建立一個ID,使用者端的ID從1開始。每次重啟伺服器會重新整理. int fd
:當前使用者端狀態描述符。分為無網路連線的使用者端和有網路連線的使用者端. int flags
:使用者端狀態的標誌. robj *name
:預設建立的使用者端是沒有名字的,可以通過CLIENT SETNAME
命令設定名字. 後面會介紹該命令的實現. int reqtype
:請求協定的型別. 因為Redis伺服器支援Telnet
的連線,因此Telnet命令請求協定型別是PROTO_REQ_INLINE
,而redis-cli
命令請求的協定型別是PROTO_REQ_MULTIBULK
.
用於儲存伺服器接受使用者端命令的成員:
sds querybuf
:儲存使用者端發來命令請求的輸入緩衝區. 以Redis通訊協定的方式儲存. size_t querybuf_peak
:儲存輸入緩衝區的峰值. int argc
:命令引數個數. robj *argv
:命令參數列.
用於儲存伺服器給使用者端回覆的成員:
char buf[16*1024]
:儲存執行完命令所得命令回覆資訊的靜態緩衝區,它的大小是固定的,所以主要儲存的是一些比較短的回覆. 分配client
結構空間時,就會分配一個16K
的大小. int bufpos
:記錄靜態緩衝區的偏移量,也就是buf陣列已經使用的位元組數. list *reply
:儲存命令回覆的連結串列. 因為靜態緩衝區大小固定,主要儲存固定長度的命令回覆,當處理一些返回大量回復的命令,則會將命令回覆以連結串列的形式連線起來. unsigned long long reply_bytes
:儲存回覆連結串列的位元組數. size_t sentlen
:已傳送回復的位元組數.
使用者端釋放的函數是freeClient()
,主要就是釋放各種資料結構和清空一些緩衝區等操作,這裡就不再列出原始碼.
我們可以重點關注一下非同步釋放使用者端,原始碼如下:
// 非同步釋放client void freeClientAsync(client *c) { // 如果是已經即將關閉或者是lua指令碼的偽client,則直接返回 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; c->flags |= CLIENT_CLOSE_ASAP; // 將client加入到即將關閉的client連結串列中 // server.clients_to_close 中儲存著伺服器中所有待關閉的連結串列 listAddNodeTail(server.clients_to_close,c); }
設定非同步釋放使用者端的目的主要是:防止底層函數正在向用戶端的輸出緩衝區寫資料的時候,關閉使用者端,這樣是不安全的. Redis會安排使用者端在serverCron()
函數的安全時間釋放它.
當然也可以取消非同步釋放,那麼就會呼叫freeClient()
函數立即釋放,原始碼如下:
// 取消設定非同步釋放的client void freeClientsInAsyncFreeQueue(void) { // 遍歷所有即將關閉的client while (listLength(server.clients_to_close)) { listNode *ln = listFirst(server.clients_to_close); client *c = listNodeValue(ln); // 取消立即關閉的標誌 c->flags &= ~CLIENT_CLOSE_ASAP; freeClient(c); // 從即將關閉的client連結串列中刪除 listDelNode(server.clients_to_close,ln); } }
當用戶端連線上Redis伺服器後,伺服器會得到一個檔案描述符fd
,而且伺服器會監聽該檔案描述符的讀事件,這些在createClient()
函數中. 那麼當用戶端傳送了命令,觸發了AE_READABLE
事件,那麼就會呼叫回撥函數readQueryFromClient()
來從檔案描述符fd中讀發來的命令,並儲存在輸入緩衝區querybuf
中. 而這個回撥函數就是我們在Redis事件處理一文中所提到的指向回撥函數的指標rfileProc
和wfileProc
. 那麼,我們先來分析readQueryFromClient
函數.
// 讀取client的輸入緩衝區的內容 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); // 讀入的長度,預設16MB readlen = PROTO_IOBUF_LEN; // 如果是多條請求,根據請求的大小,設定讀入的長度readlen if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } // 輸入緩衝區的長度 qblen = sdslen(c->querybuf); // 更新緩衝區的峰值 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 擴充套件緩衝區的大小 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 將client發來的命令,讀入到輸入緩衝區中 nread = read(fd, c->querybuf+qblen, readlen); // 讀操作出錯 if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } // 讀操作完成 } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } // 更新輸入緩衝區的已用大小和未用大小。 sdsIncrLen(c->querybuf,nread); // 設定最後一次伺服器和client互動的時間 c->lastinteraction = server.unixtime; // 如果是主節點,則更新複製操作的偏移量 if (c->flags & CLIENT_MASTER) c->reploff += nread; // 更新從網路輸入的位元組數 server.stat_net_input_bytes += nread; // 如果輸入緩衝區長度超過伺服器設定的最大緩衝區長度 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { // 將client資訊轉換為sds sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); // 輸入緩衝區儲存在bytes中 bytes = sdscatrepr(bytes,c->querybuf,64); // 列印到紀錄檔 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); // 釋放空間 sdsfree(ci); sdsfree(bytes); freeClient(c); return; } // 處理client輸入的命令內容 processInputBuffer(c); }
實際上,這個readQueryFromClient()
函數是read函數的封裝,從檔案描述符fd
中讀出資料到輸入緩衝區querybuf
中,並更新輸入緩衝區的峰值querybuf_peak
,而且會檢查讀的長度,如果大於了server.client_max_querybuf_len
則會退出,而這個閥值在伺服器初始化為PROTO_MAX_QUERYBUF_LEN (1024*1024*1024)
也就是1G
大小.
回憶之前的各種命令實現,都是通過client的argv和argc這兩個成員來處理的. 因此,伺服器還需要將輸入緩衝區querybuf
中的資料,處理成參數列的物件,也就是上面的processInputBuffer()
函數. 原始碼如下:
// 處理client輸入的命令內容 void processInputBuffer(client *c) { server.current_client = c; // 一直讀輸入緩衝區的內容 while(sdslen(c->querybuf)) { // 如果處於暫停狀態,直接返回 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; // 如果client處於被阻塞狀態,直接返回 if (c->flags & CLIENT_BLOCKED) break; // 如果client處於關閉狀態,則直接返回 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; // 如果是未知的請求型別,則判定請求型別 if (!c->reqtype) { // 如果是"*"開頭,則是多條請求,是client發來的 if (c->querybuf[0] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; // 否則就是內聯請求,是Telnet發來的 } else { c->reqtype = PROTO_REQ_INLINE; } } // 如果是內聯請求 if (c->reqtype == PROTO_REQ_INLINE) { // 處理Telnet發來的內聯命令,並建立成物件,儲存在client的參數列中 if (processInlineBuffer(c) != C_OK) break; // 如果是多條請求 } else if (c->reqtype == PROTO_REQ_MULTIBULK) { // 將client的querybuf中的協定內容轉換為client的參數列中的物件 if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } // 如果引數為0,則重置client if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ // 執行命令成功後重置client if (processCommand(c) == C_OK) resetClient(c); if (server.current_client == NULL) break; } } // 執行成功,則將用於崩潰報告的client設定為NULL server.current_client = NULL; }
redis-cli
命令請求的協定型別是PROTO_REQ_MULTIBULK
,進而呼叫processMultibulkBuffer()
函數來處理:
// 將client的querybuf中的協定內容轉換為client的參數列中的物件 int processMultibulkBuffer(client *c) { char *newline = NULL; int pos = 0, ok; long long ll; // 參數列中命令數量為0,因此先分配空間 if (c->multibulklen == 0) { /* The client should have been reset */ serverAssertWithInfo(c,NULL,c->argc == 0); /* Multi bulk length cannot be read without a rn */ // 查詢第一個換行符 newline = strchr(c->querybuf,'r'); // 沒有找到rn,表示不符合協定,返回錯誤 if (newline == NULL) { if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big mbulk count string"); setProtocolError(c,0); } return C_ERR; } /* Buffer should also contain n */ // 檢查格式 if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) return C_ERR; /* We know for sure there is a whole line since newline != NULL, * so go ahead and find out the multi bulk length. */ // 保證第一個字元為'*' serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); // 將'*'之後的數位轉換為整數。*3rn ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); if (!ok || ll > 1024*1024) { addReplyError(c,"Protocol error: invalid multibulk length"); setProtocolError(c,pos); return C_ERR; } // 指向"*3rn"的"rn"之後的位置 pos = (newline-c->querybuf)+2; // 空白命令,則將之前的刪除,保留未閱讀的部分 if (ll <= 0) { sdsrange(c->querybuf,pos,-1); return C_OK; } // 引數數量 c->multibulklen = ll; /* Setup argv array on client structure */ // 分配client參數列的空間 if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*c->multibulklen); } serverAssertWithInfo(c,NULL,c->multibulklen > 0); // 讀入multibulklen個引數,並建立物件儲存在參數列中 while(c->multibulklen) { /* Read bulk length if unknown */ // 讀入引數的長度 if (c->bulklen == -1) { // 找到換行符,確保"rn"存在 newline = strchr(c->querybuf+pos,'r'); if (newline == NULL) { if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { addReplyError(c, "Protocol error: too big bulk count string"); setProtocolError(c,0); return C_ERR; } break; } /* Buffer should also contain n */ // 檢查格式 if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) break; // $3rnSETrn...,確保是'$'字元,保證格式 if (c->querybuf[pos] != '$') { addReplyErrorFormat(c, "Protocol error: expected '$', got '%c'", c->querybuf[pos]); setProtocolError(c,pos); return C_ERR; } // 將引數長度儲存到ll。 ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); if (!ok || ll < 0 || ll > 512*1024*1024) { addReplyError(c,"Protocol error: invalid bulk length"); setProtocolError(c,pos); return C_ERR; } // 定位第一個引數的位置,也就是SET的S pos += newline-(c->querybuf+pos)+2; // 引數長度太長,進行優化 if (ll >= PROTO_MBULK_BIG_ARG) { size_t qblen; /* If we are going to read a large object from network * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. */ // 如果我們要從網路中讀取一個大的物件,嘗試使它可能從c-> querybuf邊界開始,以便我們可以優化物件建立,避免大量的資料副本 // 儲存未讀取的部分 sdsrange(c->querybuf,pos,-1); // 重置偏移量 pos = 0; // 獲取querybuf中已使用的長度 qblen = sdslen(c->querybuf); /* Hint the sds library about the amount of bytes this string is * going to contain. */ // 擴充套件querybuf的大小 if (qblen < (size_t)ll+2) c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen); } // 儲存引數的長度 c->bulklen = ll; } /* Read bulk argument */ // 因為唯讀了multibulklen位元組的資料,讀到的資料不夠,則直接跳出迴圈,執行processInputBuffer()函數迴圈讀取 if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) { /* Not enough data (+2 == trailing rn) */ break; // 為引數建立了物件 } else { /* Optimization: if the buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ // 如果讀入的長度大於32k if (pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && (signed) sdslen(c->querybuf) == c->bulklen+2) { c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); // 跳過換行 sdsIncrLen(c->querybuf,-2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one * likely... */ // 設定一個新長度 c->querybuf = sdsnewlen(NULL,c->bulklen+2); sdsclear(c->querybuf); pos = 0; // 建立物件儲存在client的參數列中 } else { c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; } // 清空命令內容的長度 c->bulklen = -1; // 未讀取命令引數的數量,讀取一個,該值減1 c->multibulklen--; } } /* Trim to pos */ // 刪除已經讀取的,保留未讀取的 if (pos) sdsrange(c->querybuf,pos,-1); /* We're done when c->multibulk == 0 */ // 命令的引數全部被讀取完 if (c->multibulklen == 0) return C_OK; /* Still not read to process the command */ return C_ERR; }
我們結合一個多條批次回覆進行分析。一個多條批次回覆以 *<argc>rn
為字首,後跟多條不同的批次回覆,其中 argc
為這些批次回覆的數量. 那麼SET nmykey nmyvalue
命令轉換為Redis協定內容如下:
"*3rn$3rnSETrn$5rnmykeyrn$7rnmyvaluern"
當進入processMultibulkBuffer()
函數之後,如果是第一次執行該函數,那麼argv
中未讀取的命令數量為0,也就是說參數列為空,那麼會執行if (c->multibulklen == 0)
的程式碼,這裡的程式碼會解析*3rn
,將3
儲存到multibulklen
中,表示後面的引數個數,然後根據引數個數,為argv
分配空間.
接著,執行multibulklen
次while迴圈,每次讀一個引數,例如$3rnSETrn
,也是先讀出引數長度,儲存在bulklen
中,然後將引數SET
儲存構建成物件儲存到參數列中. 每次讀一個引數,multibulklen
就會減1,當等於0時,就表示命令的引數全部讀取到參數列完畢.
於是命令接收的整個過程完成.
命令回覆的函數,也是事件處理程式的回撥函數之一. 當伺服器的client的回覆緩衝區有資料,那麼就會呼叫aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c)
函數,將檔案描述符fd
和AE_WRITABLE
事件關聯起來,當用戶端可寫時,就會觸發事件,呼叫sendReplyToClient()
函數,執行寫事件. 我們重點看這個函數的程式碼:
// 寫事件處理程式,只是傳送回復給client void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); // 傳送完資料會刪除fd的可讀事件 writeToClient(fd,privdata,1); }
這個函數直接呼叫了writeToClient()
函數:
// 將輸出緩衝區的資料寫給client,如果client被釋放則返回C_ERR,沒被釋放則返回C_OK int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; size_t objmem; robj *o; // 如果指定的client的回覆緩衝區中還有資料,則返回真,表示可以寫socket while(clientHasPendingReplies(c)) { // 固定緩衝區傳送未完成 if (c->bufpos > 0) { // 將緩衝區的資料寫到fd中 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); // 寫失敗跳出迴圈 if (nwritten <= 0) break; // 更新傳送的資料計數器 c->sentlen += nwritten; totwritten += nwritten; // 如果傳送的資料等於buf的偏移量,表示傳送完成 if ((int)c->sentlen == c->bufpos) { // 則將其重置 c->bufpos = 0; c->sentlen = 0; } // 固定緩衝區傳送完成,傳送回復連結串列的內容 } else { // 回覆連結串列的第一條回覆物件,和物件值的長度和所佔的記憶體 o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); objmem = getStringObjectSdsUsedMemory(o); // 跳過空物件,並刪除這個物件 if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); c->reply_bytes -= objmem; continue; } // 將當前節點的值寫到fd中 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); // 寫失敗跳出迴圈 if (nwritten <= 0) break; // 更新傳送的資料計數器 c->sentlen += nwritten; totwritten += nwritten; // 傳送完成,則刪除該節點,重置傳送的資料長度,更新回覆連結串列的總位元組數 if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; c->reply_bytes -= objmem; } } // 更新寫到網路的位元組數 server.stat_net_output_bytes += totwritten; // 如果這次寫的總量大於NET_MAX_WRITES_PER_EVENT的限制,則會中斷本次的寫操作,將處理時間讓給其他的client,以免一個非常的回覆獨佔伺服器,剩餘的資料下次繼續在寫 // 但是,如果當伺服器的記憶體數已經超過maxmemory,即使超過最大寫NET_MAX_WRITES_PER_EVENT的限制,也會繼續執行寫入操作,是為了儘快寫入給使用者端 if (totwritten > NET_MAX_WRITES_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break; } // 處理寫入失敗 if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return C_ERR; } } // 寫入成功 if (totwritten > 0) { // 如果不是主節點伺服器,則更新最近和伺服器互動的時間 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; } // 如果指定的client的回覆緩衝區中已經沒有資料,傳送完成 if (!clientHasPendingReplies(c)) { c->sentlen = 0; // 刪除當前client的可讀事件的監聽 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ // 如果指定了寫入按成之後立即關閉的標誌,則釋放client if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClient(c); return C_ERR; } } return C_OK; }
這個函數實際上是對write()
函數的封裝,將靜態回覆緩衝區buf
或回覆連結串列reply
中的資料迴圈寫到檔案描述符fd
中. 如果寫完了,則將當前使用者端的AE_WRITABLE
事件刪除.
CLIENT
相關的命令大致有6條:
CLIENT KILL [ip:port] [ID client-id] [TYPE normal|master|slave|pubsub] [ADDR ip:port] [SKIPME yes/no] CLIENT GETNAME CLIENT LIST CLIENT PAUSE timeout CLIENT REPLY ON|OFF|SKIP CLIENT SETNAME connection-name
下面是client
命令的實現:
// client 命令的實現 void clientCommand(client *c) { listNode *ln; listIter li; client *client; // CLIENT LIST 的實現 if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) { /* CLIENT LIST */ // 獲取所有的client資訊 sds o = getAllClientsInfoString(); // 新增到到輸入緩衝區中 addReplyBulkCBuffer(c,o,sdslen(o)); sdsfree(o); // CLIENT REPLY ON|OFF|SKIP 命令實現 } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) { /* CLIENT REPLY ON|OFF|SKIP */ // 如果是 ON if (!strcasecmp(c->argv[2]->ptr,"on")) { // 取消 off 和 skip 的標誌 c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF); // 回覆 +OK addReply(c,shared.ok); // 如果是 OFF } else if (!strcasecmp(c->argv[2]->ptr,"off")) { // 開啟 OFF標誌 c->flags |= CLIENT_REPLY_OFF; // 如果是 SKIP } else if (!strcasecmp(c->argv[2]->ptr,"skip")) { // 沒有設定 OFF 則設定 SKIP 標誌 if (!(c->flags & CLIENT_REPLY_OFF)) c->flags |= CLIENT_REPLY_SKIP_NEXT; } else { addReply(c,shared.syntaxerr); return; } // CLIENT KILL [ip:port] [ID client-id] [TYPE normal | master | slave | pubsub] [ADDR ip:port] [SKIPME yes / no] } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { /* CLIENT KILL <ip:port> * CLIENT KILL <option> [value] ... <option> [value] */ char *addr = NULL; int type = -1; uint64_t id = 0; int skipme = 1; int killed = 0, close_this_client = 0; // CLIENT KILL addr:port只能通過地址殺死client,舊版本相容 if (c->argc == 3) { /* Old style syntax: CLIENT KILL <addr> */ addr = c->argv[2]->ptr; skipme = 0; /* With the old form, you can kill yourself. */ // 新版本可以根據[ID client-id] [master|normal|slave|pubsub] [ADDR ip:port] [SKIPME yes/no]殺死client } else if (c->argc > 3) { int i = 2; /* Next option index. */ /* New style syntax: parse options. */ // 解析語法 while(i < c->argc) { int moreargs = c->argc > i+1; // CLIENT KILL [ID client-id] if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) { long long tmp; // 獲取client的ID if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL) != C_OK) return; id = tmp; // CLIENT KILL TYPE type, 這裡的 type 可以是 [master|normal|slave|pubsub] } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) { // 獲取client的型別,[master|normal|slave|pubsub]四種之一 type = getClientTypeByName(c->argv[i+1]->ptr); if (type == -1) { addReplyErrorFormat(c,"Unknown client type '%s'", (char*) c->argv[i+1]->ptr); return; } // CLIENT KILL [ADDR ip:port] } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) { // 獲取ip:port addr = c->argv[i+1]->ptr; // CLIENT KILL [SKIPME yes/no] } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) { // 如果是yes,設定設定skipme,呼叫該命令的使用者端將不會被殺死 if (!strcasecmp(c->argv[i+1]->ptr,"yes")) { skipme = 1; // 設定為no會影響到還會殺死呼叫該命令的使用者端。 } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) { skipme = 0; } else { addReply(c,shared.syntaxerr); return; } } else { addReply(c,shared.syntaxerr); return; } i += 2; } } else { addReply(c,shared.syntaxerr); return; } /* Iterate clients killing all the matching clients. */ listRewind(server.clients,&li); // 迭代所有的client節點 while ((ln = listNext(&li)) != NULL) { client = listNodeValue(ln); // 比較當前client和這四類資訊,如果有一個不符合就跳過本層迴圈,否則就比較下一個資訊 if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; if (type != -1 && getClientType(client) != type) continue; if (id != 0 && client->id != id) continue; if (c == client && skipme) continue; /* Kill it. */ // 殺死當前的client if (c == client) { close_this_client = 1; } else { freeClient(client); } // 計算殺死client的個數 killed++; } /* Reply according to old/new format. */ // 回覆client資訊 if (c->argc == 3) { // 沒找到符合資訊的 if (killed == 0) addReplyError(c,"No such client"); else addReply(c,shared.ok); } else { // 傳送殺死的個數 addReplyLongLong(c,killed); } /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY * only after we queued the reply to its output buffers. */ if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY; // CLIENT SETNAME connection-name } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) { int j, len = sdslen(c->argv[2]->ptr); char *p = c->argv[2]->ptr; /* Setting the client name to an empty string actually removes * the current name. */ // 設定名字為空 if (len == 0) { // 先釋放掉原來的名字 if (c->name) decrRefCount(c->name); c->name = NULL; addReply(c,shared.ok); return; } /* Otherwise check if the charset is ok. We need to do this otherwise * CLIENT LIST format will break. You should always be able to * split by space to get the different fields. */ // 檢查名字格式是否正確 for (j = 0; j < len; j++) { if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */ addReplyError(c, "Client names cannot contain spaces, " "newlines or special characters."); return; } } // 釋放原來的名字 if (c->name) decrRefCount(c->name); // 設定新名字 c->name = c->argv[2]; incrRefCount(c->name); addReply(c,shared.ok); // CLIENT GETNAME } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) { // 回覆名字 if (c->name) addReplyBulk(c,c->name); else addReply(c,shared.nullbulk); // CLIENT PAUSE timeout } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) { long long duration; // 以毫秒為單位將等待時間儲存在duration中 if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS) != C_OK) return; // 暫停client pauseClients(duration); addReply(c,shared.ok); } else { addReplyError(c, "Syntax error, try CLIENT (LIST | KILL | GETNAME | SETNAME | PAUSE | REPLY)"); } }
以上就是Redis原始碼與設計剖析之網路連線庫的詳細內容,更多關於Redis 網路連線庫的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45