<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Redis的資料型別有String、Hash、Set、List、Zset、bitMap(基於String型別)、 Hyperloglog(基於String型別)、Geo(地理位置)、Streams流。
// 批次設定 > mset key1 value1 key2 value2 // 批次獲取 > mget key1 key2 // 獲取長度 > strlen key // 字串追加內容 > append key xxx // 獲取指定區間的字元 > getrange key 0 5 // 整數值遞增 (遞增指定的值) > incr intkey (incrby intkey 10) // 當key 存在時將覆蓋 > SETEX key seconds value // 將 key 的值設為 value ,當且僅當 key 不存在。 > SETNX key value
// 將雜湊表 key 中的域 field 的值設為 value 。 > HSET key field value // 返回雜湊表 key 中給定域 field 的值。 > HGET key field // 返回雜湊表 key 中的所有域。 > HKEYS key // 返回雜湊表 key 中所有域的值。 > HVALS key // 為雜湊表 key 中的域 field 的值加上增量 increment 。 > HINCRBY key field increment // 檢視雜湊表 key 中,給定域 field 是否存在。 > HEXISTS key field
儲存有序的字串列表,元素可以重複。列表的最大長度為 2^32 - 1 個元素(4294967295,每個列表超過 40 億個元素)。
// 將一個或多個值 value 插入到列表 key 的表頭 > LPUSH key value [value ...] // 將一個或多個值 value 插入到列表 key 的表尾(最右邊)。 > RPUSH key value [value ...] // 移除並返回列表 key 的頭元素。 > LPOP key // 移除並返回列表 key 的尾元素。 > RPOP key // BLPOP 是列表的阻塞式(blocking)彈出原語。 > BLPOP key [key ...] timeout // BRPOP 是列表的阻塞式(blocking)彈出原語。 > BRPOP key [key ...] timeout // 獲取指點位置元素 > LINDEX key index
因為list是有序的,所以我們可以先進先出,先進後出的列表都可以做
// 將一個或多個 member 元素加入到集合 key 當中,已經存在於集合的 member 元素將被忽略。 > SADD key member [member ...] // 返回集合 key 中的所有成員。 > SMEMBERS key // 返回集合 key 的基數(集合中元素的數量)。 > SCARD key // 如果命令執行時,只提供了 key 引數,那麼返回集合中的一個隨機元素。 > SRANDMEMBER key [count] // 移除並返回集合中的一個隨機元素。 > SPOP key // 移除集合 key 中的一個或多個 member 元素,不存在的 member 元素會被忽略。 > SREM key member [member ...] // 判斷 member 元素是否集合 key 的成員。 > SISMEMBER key member // 獲取前一個集合有而後面1個集合沒有的 > sdiff huihuiset huihuiset1 // 獲取交集 > sinter huihuiset huihuiset1 // 獲取並集 > sunion huihuiset huihuiset1
sorted set,有序的set,每個元素有個score。
score相同時,按照key的ASCII碼排序。
//將一個或多個 member 元素及其 score 值加入到有序集 key 當中。 > ZADD key score member [[score member] [score member] ...] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞增(從小到大)來排序 > ZRANGE key start stop [WITHSCORES] // 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞減(從大到小)來排列。 > ZREVRANGE key start stop [WITHSCORES] // 返回有序集 key 中,所有 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員。有序整合員按 score 值遞增(從小到大)次序排列。 > ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] // 移除有序集 key 中的一個或多個成員,不存在的成員將被忽略。 > ZREM key member [member ...] // 返回有序集 key 的基數。 > ZCARD key // 為有序集 key 的成員 member 的 score 值加上增量 increment 。 > ZINCRBY key increment member // 返回有序集 key 中, score 值在 min 和 max 之間(預設包括 score 值等於 min 或 max )的成員的數量。 > ZCOUNT key min max // 返回有序集 key 中成員 member 的排名。其中有序整合員按 score 值遞增(從小到大)順序排列。 > ZRANK key member
// 1. multi命令開啟事務,exec命令提交事務 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> set k1 v1 QUEUED 127.0.0.1:6379(TX)> set k2 v2 QUEUED 127.0.0.1:6379(TX)> exec 1) OK 2) OK // 2. 組隊的過程中可以通過discard來放棄組隊。 127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> set k3 v3 QUEUED 127.0.0.1:6379(TX)> set k4 v4 QUEUED 127.0.0.1:6379(TX)> discard OK
事務中的所有命令都會序列化、按順序地執行。
事務在執行過程中,不會被其他使用者端傳送來的命令請求所打斷。
佇列中的命令沒有提交之前,都不會被實際地執行,因為事務提交之前任何指令都不會被實際執行,也就不存在“事務內的查詢要看到事務裡的更新,在事務外查詢不能看到”這個讓人萬分頭疼的問題。
Redis同一個事務中如果有一條命令執行失敗,其後的命令仍然會被執行,沒有回滾。
local lockSet = redis.call('exists', KEYS[1]) if lockSet == 0 then redis.call('set', KEYS[1], ARG[1]) redis.call('expire', KEYS[1], ARG[2]) end return lockSet
資料最外層的結構為RedisDb
typedef struct redisDb { dict *dict; /* The keyspace for this DB */ //資料庫的鍵 dict *expires; /* Timeout of keys with a timeout set */ //設定了超時時間的鍵 dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //使用者端等待的keys dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ //所在數 據庫ID long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,僅用於統計 unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb;
我們發現資料儲存在dict中
typedef struct dict { dictType *type; //理解為物件導向思想,為支援不同的資料型別對應dictType抽象方法,不同的資料型別可以不同實現 void *privdata; //也可不同的資料型別相關,不同型別特定函數的可選引數。 dictht ht[2]; //2個hash表,用來資料儲存 2個dictht long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash標記 -1代表不再rehash unsigned long iterators; /* number of iterators currently running */ } dict;
typedef struct dictht { dictEntry **table; //dictEntry 陣列 unsigned long size; //陣列大小 預設為4 #define DICT_HT_INITIAL_SIZE 4 unsigned long sizemask; //size-1 用來取模得到資料的下標值 unsigned long used; //改hash表中已有的節點資料 } dictht;
typedef struct dictEntry { void *key; //key union { void *val; uint64_t u64; int64_t s64; double d; } v; //value struct dictEntry *next; //指向下一個節點 } dictEntry;
/* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ #define OBJ_ENCODING_RAW 0 /* Raw representation */ #define OBJ_ENCODING_INT 1 /* Encoded as integer */ #define OBJ_ENCODING_HT 2 /* Encoded as hash table */ #define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ #define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */ #define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ #define OBJ_ENCODING_INTSET 6 /* Encoded as intset */ #define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ #define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */ #define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */ #define LRU_BITS 24 #define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ #define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */ #define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT typedef struct redisObject { unsigned type:4; //資料型別 string hash list unsigned encoding:4; //底層的資料結構 跳錶 unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; //用於回收,參照計數法 void *ptr; //指向具體的資料結構的記憶體地址 比如 跳錶、壓縮列表 } robj;
因為我們的dictEntry陣列預設大小是4,如果不進行擴容,那麼我們所有的資料都會以連結串列的形式新增至陣列下標 隨著資料量越來越大,之前只需要hash取模就能得到下標位置,現在得去迴圈我下標的連結串列,所以效能會越來越慢,當我們的資料量達到一定程度後,我們就得去觸發擴容操作。
擴容,肯定是在新增資料的時候才會擴容,所以我們找一個新增資料的入口。
int dictReplace(dict *d, void *key, void *val) { dictEntry *entry, *existing, auxentry; /* Try to add the element. If the key * does not exists dictAdd will succeed. */ entry = dictAddRaw(d,key,&existing); if (entry) { dictSetVal(d, entry, val); return 1; } /* Set the new value and free the old one. Note that it is important * to do that in this order, as the value may just be exactly the same * as the previous one. In this context, think to reference counting, * you want to increment (set), and then decrement (free), and not the * reverse. */ auxentry = *existing; dictSetVal(d, existing, val); dictFreeVal(d, &auxentry); return 0; }
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing){ long index; dictEntry *entry; dictht *ht; if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 進行漸進式hash 按步rehash /* Get the index of the new element, or -1 if * the element already exists. */ if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1) return NULL; /* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ //如果在hash 放在ht[1] 否則放在ht[0] ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; entry = zmalloc(sizeof(*entry)); //採用頭插法插入hash桶 entry->next = ht->table[index]; ht->table[index] = entry; //已使用++ ht->used++; /* Set the hash entry fields. */ dictSetKey(d, entry, key); return entry; }
static long _dictKeyIndex(dict *d, const void *key,uint64_t hash, dictEntry **existing){ unsigned long idx, table; dictEntry *he; if (existing) *existing = NULL; /* Expand the hash table if needed */ //擴容如果需要 if (_dictExpandIfNeeded(d) == DICT_ERR) return -1; //比較是否存在這個值 for (table = 0; table <= 1; table++) { idx = hash & d->ht[table].sizemask; /* Search if this slot does not already contain the given key */ he = d->ht[table].table[idx]; while(he) { if (key==he->key || dictCompareKeys(d, key,he->key)) { if (existing) *existing = he; return -1; } he = he->next; } if (!dictIsRehashing(d)) break; } return idx; }
/* Expand the hash table if needed */ static int _dictExpandIfNeeded(dict *d){ /* Incremental rehashing already in progress. Return.*/ if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回 /* If the hash table is empty expand it to the initialsize. */ //如果ht[0]的長度為0,設定預設長度4 dictExpand為擴容的關鍵方法 if (d->ht[0].size == 0) return dictExpand(d,DICT_HT_INITIAL_SIZE); //擴容條件 hash表中已使用的數量大於等於 hash表的大小 //並且dict_can_resize為1 或者 已使用的大小大於hash表大小的 5倍,大於等於1倍的時候,下面2個滿足一個條件即可 if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){ //擴容成原來的2倍 return dictExpand(d, d->ht[0].used*2); } return DICT_OK; }
int dictExpand(dict *d, unsigned long size){ /* the size is invalid if it is smaller than the number of * elements already inside the hash table */ //正在擴容,不允許第二次擴容,或者ht[0]的資料量大於擴容後的數量,沒有意義,這次不擴容 if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR; dictht n; /* the new hash table */ //得到最接近2的冪 跟hashMap思想一樣 unsigned long realsize = _dictNextPower(size); /* Rehashing to the same table size is not useful. */ //如果跟ht[0]的大小一致 直接返回錯誤 if (realsize == d->ht[0].size) return DICT_ERR; /* Allocate the new hash table and initialize all pointers to NULL */ //為新的dictht賦值 n.size = realsize; n.sizemask = realsize-1; n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0; /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ //ht[0].table為空,代表是初始化 if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; } /* Prepare a second hash table for incremental rehashing */ //將擴容後的dictht賦值給ht[1] d->ht[1] = n; //標記為0,代表可以開始rehash d->rehashidx = 0; return DICT_OK; }
假如一次性把資料全部遷移會很耗時間,會讓單條指令等待很久,會形成阻塞。
所以,redis採用漸進式Rehash,所謂漸進式,就是慢慢的,不會一次性把所有資料遷移。
那什麼時候會進行漸進式資料遷移?
if (dictIsRehashing(d)) _dictRehashStep(d); //這個程式碼在增刪改查的方法中都會呼叫
static void _dictRehashStep(dict *d) { if (d->iterators == 0) dictRehash(d,1); }
int dictRehash(dict *d, int n) { int empty_visits = n*10; /* Max number of empty buckets to visit. */ //要存取的最大的空桶數 防止此次耗時過長 if (!dictIsRehashing(d)) return 0; //如果沒有在rehash返回0 //迴圈,最多1次,並且ht[0]的資料沒有遷移完 進入迴圈 while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; /* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ assert(d->ht[0].size > (unsigned long)d->rehashidx); //rehashidx rehash的索引,預設0 如果hash桶為空 進入自旋 並且判斷是否到了最大的存取空桶數 while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; } de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下標為rehashidx桶 /* Move all the keys in this bucket from the old to the new hash HT */ while(de) { //迴圈hash桶的連結串列 並且轉移到ht[1] uint64_t h; nextde = de->next; /* Get the index in the new hash table */ h = dictHashKey(d, de->key) & d- >ht[1].sizemask; de->next = d->ht[1].table[h]; //頭插 d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } //轉移完後 將ht[0]相對的hash桶設定為null d->ht[0].table[d->rehashidx] = NULL; d->rehashidx++; } /* Check if we already rehashed the whole table... */ //ht[0].used=0 代表rehash全部完成 if (d->ht[0].used == 0) { //清空ht[0]table zfree(d->ht[0].table); //將ht[0] 重新指向已經完成rehash的ht[1] d->ht[0] = d->ht[1]; //將ht[1]所有資料重新設定 _dictReset(&d->ht[1]); //設定-1,代表rehash完成 d->rehashidx = -1; return 0; } /* More to rehash... */ return 1; } //將ht[1]的屬性復位 static void _dictReset(dictht *ht) { ht->table = NULL; ht->size = 0; ht->sizemask = 0; ht->used = 0; }
再講定時任務之前,我們要知道Redis中有個時間事件驅動,在 server.c檔案下有個serverCron 方法。
serverCron 每隔多久會執行一次,執行頻率根據redis.conf中的hz設定,serverCorn中有個方法databasesCron()
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { //....... /* We need to do a few operations on clients asynchronously. */ clientsCron(); /* Handle background operations on Redis databases. */ databasesCron(); //處理Redis資料庫上的後臺操作。 //....... }
void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); } } /* Defrag keys gradually. */ activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ if (!hasActiveChildProcess()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; /* Resize */ for (j = 0; j < dbs_per_call; j++) { tryResizeHashTables(resize_db % server.dbnum); resize_db++; } /* Rehash */ //Rehash邏輯 if (server.activerehashing) { for (j = 0; j < dbs_per_call; j++) { //進入incrementallyRehash int work_done = incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ break; } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db++; rehash_db %= server.dbnum; } } } } }
int incrementallyRehash(int dbid) { /* Keys dictionary */ if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */ } /* Expires */ if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return 1; /* already used our millisecond for this loop... */ } return 0; }
int dictRehashMilliseconds(dict *d, int ms) { long long start = timeInMilliseconds(); int rehashes = 0; //進入rehash方法 dictRehash 去完成漸進時HASH while(dictRehash(d,100)) { rehashes += 100; //判斷是否超時 if (timeInMilliseconds()-start > ms) break; } return rehashes; }
Redis中String的底層沒有用c的char來實現,而是採用了SDS(simple Dynamic String)的資料結構來實現。並且提供了5種不同的型別
typedef char *sds; /* Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. */ struct __attribute__ ((__packed__)) sdshdr5 { unsigned char flags; /* 3 lsb of type, and 5 msb of string length */ char buf[]; } struct __attribute__ ((__packed__)) sdshdr8 { uint8_t len; /* used */ //已使用的長度 uint8_t alloc; /* excluding the header and null terminator */ //分配的總容量 不包含頭和空終止符 unsigned char flags; /* 3 lsb of type, 5 unused bits */ //低三位型別 高5bit未使用 char buf[]; //資料buf 儲存字串陣列 }; struct __attribute__ ((__packed__)) sdshdr16 { uint16_t len; /* used */ uint16_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr32 { uint32_t len; /* used */ uint32_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr64 { uint64_t len; /* used */ uint64_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; };
Redis的字典相當於Java語言中的HashMap,他是無序字典。內部結構上同樣是陣列 + 連結串列二維結構。第一維hash的陣列位置碰撞時,就會將碰撞的元素使用連結串列串起來。
壓縮列表會根據存入的資料的不同型別以及不同大小,分配不同大小的空間。所以是為了節省記憶體而採用的。
因為是一塊完整的記憶體空間,當裡面的元素髮生變更時,會產生連鎖更新,嚴重影響我們的存取效能。所以,只適用於資料量比較小的場景。
所以,Redis會有相關設定,Hashes只有小資料量的時候才會用到ziplist,當hash物件同時滿足以下兩個條件的時候,使用ziplist編碼:
hash-max-ziplist-value 64 // ziplist中最大能存放的值長度 hash-max-ziplist-entries 512 // ziplist中最多能存放的entry節點數量
兼顧了ziplist的節省記憶體,並且一定程度上解決了連鎖更新的問題,我們的 quicklistNode節點裡面是個ziplist,每個節點是分開的。那麼就算髮生了連鎖更新,也只會發生在一個quicklistNode節點。
quicklist.h檔案
typedef struct{ struct quicklistNode *prev; //前指標 struct quicklistNode *next; //後指標 unsigned char *zl; //資料指標 指向ziplist結果 unsigned int sz; //ziplist大小 /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素 unsigned int encoding : 2; /* RAW==1 or LZF==2 */ // 是否壓縮, 1沒有壓縮 2 lzf壓縮 unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //預留容器欄位 unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can't compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ //預留欄位 } quicklistNode;
Redis用intset或hashtable儲存set。滿足下面條件,就用inset儲存。
set-max-intset-entries 512
不滿足上述條件的,都使用hash表儲存,value存null。
預設使用ziplist編碼。
在ziplist的內部,按照score排序遞增來儲存。插入的時候要移動之後的資料。
如果元素數量大於等於128個,或者任一member長度大於等於64位元組使用 skiplist+dict儲存。
zset-max-ziplist-entries 128 zset-max-ziplist-value 64
如果不滿足條件,採用跳錶。
結構定義(servier.h)
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; //sds資料 double score; //score struct zskiplistNode *backward; //後退指標 //層級陣列 struct zskiplistLevel { struct zskiplistNode *forward; //前進指標 unsigned long span; //跨度 } level[]; } zskiplistNode; //跳錶列表 typedef struct zskiplist { struct zskiplistNode *header, *tail; //頭尾節點 unsigned long length; //節點數量 int level; //最大的節點層級 } zskiplist;
zslInsert 新增節點方法 (t_zset.c)
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; while (x->level[i].forward && (x->level[i].forward->score < score || 咕泡出品 必屬精品精品屬精品 咕泡出品 必屬精品 咕泡出品 必屬精品 咕泡咕泡 (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); //不同層級建立連結串列聯絡 for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; }
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
所謂惰性過期,就是在每次存取操作key的時候,判斷這個key是不是過期了,過期了就刪除。
int expireIfNeeded(redisDb *db, robj *key) { if (!keyIsExpired(db,key)) return 0; /* If we are running in the context of a slave, instead of * evicting the expired key from the database, we return ASAP: * the slave key expiration is controlled by the master that will * send us synthesized DEL operations for expired keys. * * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ // 如果設定有masterhost,說明是從節點,那麼不操作刪除 if (server.masterhost != NULL) return 1; /* Delete the key */ server.stat_expiredkeys++; propagateExpire(db,key,server.lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); //是否是非同步刪除 防止單個Key的資料量很大 阻塞主執行緒 是4.0之後新增的新功能,預設關閉 int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); if (retval) signalModifiedKey(NULL,db,key); return retval; }
該策略可以最大化的節省CPU資源。
但是卻對記憶體非常不友好。因為如果沒有再次存取,就可能一直堆積再記憶體中。佔用記憶體
所以就需要另一種策略來配合使用,就是定期過期
那麼多久去清除一次,我們在講Rehash的時候,有個方法是serverCron,執行頻率根據redis.conf中的hz設定。
這方法除了做Rehash以外,還會做很多其他的事情,比如:
// serverCron方法呼叫databasesCron方法(server.c) /* Handle background operations on Redis databases. */ databasesCron(); void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); //過期刪除方法 } else { expireSlaveKeys(); } } }
由於Redis記憶體是有大小的,並且我可能裡面的資料都沒有過期,當快滿的時候,我又沒有過期的資料進行淘汰,那麼這個時候記憶體也會滿。記憶體滿了,Redis也會不能放入新的資料。所以,我們不得已需要一些策略來解決這個問題來保證可用性。
New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.
預設,不淘汰 能讀不能寫
Keeps most recently used keys; removes least recently used (LRU) keys
基於偽LRU演演算法,在所有的key中去淘汰
Keeps frequently used keys; removes least frequently used (LFU) keys.
基於偽LFU演演算法,在所有的key中去淘汰
Removes least recently used keys with the expire field set to true.
基於偽LRU演演算法,在設定了過期時間的key中去淘汰
Removes least frequently used keys with the expire field set to true.
基於偽LFU演演算法 在設定了過期時間的key中去淘汰
Randomly removes keys to make space for the new data added.
基於隨機演演算法,在所有的key中去淘汰
Randomly removes keys with expire field set to true.
基於隨機演演算法 在設定了過期時間的key中去淘汰
Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.
根據過期時間來,淘汰即將過期的
Lru,Least Recently Used 翻譯過來是最久未使用,根據時間軸來走,仍很久沒用的資料。只要最近有用過,我就預設是有效的。
那麼它的一個衡量標準是啥?時間!根據使用時間,從近到遠,越遠的越容易淘汰。
需求:得到物件隔多久沒存取,隔的時間越久,越容易被淘汰
redis中,物件都會被redisObject物件包裝,其中有個欄位叫lru。
redisObject物件 (server.h檔案)
typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj;
看LRU的欄位的說明,那麼我們大概能猜出來,redis去實現lru淘汰演演算法肯定跟我們這個物件的lru相關
並且這個欄位的大小為24bit,那麼這個欄位記錄的是物件操作存取時候的秒單位時間的後24bit
相當於:
long timeMillis=System.currentTimeMillis(); System.out.println(timeMillis/1000); //獲取當前秒 System.out.println(timeMillis/1000 & ((1<<24)-1)); //獲取秒的後24位元
我們知道了這個物件的最後操作存取的時間。如果我們要得到這個物件多久沒存取了,我們是不是就很簡單,用我當前的時間-這個物件的存取時間就可以了!但是這個存取時間是秒單位時間的後24bit 所以,也是用當前的時間的秒單位的後24bit去減。
estimateObjectIdleTime方法(evict.c)
unsigned long long estimateObjectIdleTime(robj *o) { //獲取秒單位時間的最後24位元 unsigned long long lruclock = LRU_CLOCK(); //因為只有24位元,所有最大的值為2的24次方-1 //超過最大值從0開始,所以需要判斷lruclock(當前系統時間)跟快取物件的lru欄位的大小 if (lruclock >= o->lru) { //如果lruclock>=robj.lru,返回lruclock-o->lru,再轉換單位 return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION; } else { //否則採用lruclock + (LRU_CLOCK_MAX - o->lru),得到對 象的值越小,返回的值越大,越大越容易被淘汰 return (lruclock + (LRU_CLOCK_MAX - o->lru)) * LRU_CLOCK_RESOLUTION; } }
用lruclock與 redisObject.lru進行比較,因為lruclock只獲取了當前秒單位時間的後24位元,所以肯定有個輪詢。
所以,我們會用lruclock跟 redisObject.lru進行比較,如果lruclock>redisObject.lru,那麼我們用lruclock- redisObject.lru,否則lruclock+(24bit的最大值-redisObject.lru),得到lru越小,那麼返回的資料越大,相差越大的越優先會被淘汰!
<https://www.processon.com/view/link/5fbe0541e401fd2d6ed8fc38>
LFU,英文 Least Frequently Used,翻譯成中文就是最不常用的優先淘汰。
不常用,它的衡量標準就是次數,次數越少的越容易被淘汰。
這個實現起來應該也很簡單,物件被操作存取的時候,去記錄次數,每次操作存取一次,就+1;淘汰的時候,直接去比較這個次數,次數越少的越容易淘汰!
何為時效性問題?就是隻會去考慮數量,但是不會去考慮時間。
ps:去年的某個新聞很火,點選量3000W,今年有個新聞剛出來點選量100次,本來我們是想保留今年的新的新聞的,但是如果根據LFU來做的話,我們發現淘汰的是今年的新聞,明顯是不合理的
導致的問題:新的資料進不去,舊的資料出不來。
那麼如何解決呢,且往下看
我們還是來看redisObject(server.h)
typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj;
我們看它的lru,它如果在LFU演演算法的時候!它前面16位元代表的是時間,後8位元代表的是一個數值,frequency是頻率,應該就是代表這個物件的存取次數,我們先給它叫做counter。
前16bits時間有啥用?
跟時間相關,我猜想應該是跟解決時效性相關的,那麼怎麼解決的?從生活中找例子
大家應該充過一些會員,比如我這個年紀的,小時候喜歡充騰訊的黃鑽、綠鑽、藍鑽等等。但是有一個點,假如哪天我沒充錢了的話,或者沒有續VIP的時候,我這個鑽石等級會隨著時間的流失而降低。比如我本來是黃鑽V6,但是一年不充錢的話,可能就變成了V4。
那麼回到Redis,大膽的去猜測,那麼這個時間是不是去記錄這個物件有多久沒存取了,如果多久沒存取,我就去減少對應的次數。
LFUDecrAndReturn方法(evict.c)
unsigned long LFUDecrAndReturn(robj *o) { //lru欄位右移8位元,得到前面16位元的時間 unsigned long ldt = o->lru >> 8; //lru欄位與255進行&運算(255代表8位元的最大值), //得到8位元counter值 unsigned long counter = o->lru & 255; //如果設定了lfu_decay_time,用LFUTimeElapsed(ldt) 除以設定的值 //LFUTimeElapsed(ldt)原始碼見下 //總的沒存取的分鐘時間/設定值,得到每分鐘沒存取衰減多少 unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0; if (num_periods) //不能減少為負數,非負數用couter值減去衰減值 counter = (num_periods > counter) ? 0 : counter - num_periods; return counter; }
衰減因子由設定
lfu-decay-time 1 //多少分鐘沒操作存取就去衰減一次
後8bits的次數,最大值是255,夠不夠?
肯定不夠,一個物件的存取操作次數肯定不止255次。
但是我們可以讓資料達到255的很難。那麼怎麼做的?這個比較簡單,我們先看原始碼,然後再總結
LFULogIncr方法 (evict.c 檔案)
uint8_t LFULogIncr(uint8_t counter) { //如果已經到最大值255,返回255 ,8位元的最大值 if (counter == 255) return 255; //得到亂數(0-1) double r = (double)rand()/RAND_MAX; //LFU_INIT_VAL表示基數值(在server.h設定) double baseval = counter - LFU_INIT_VAL; //如果達不到基數值,表示快不行了,baseval =0 if (baseval < 0) baseval = 0; //如果快不行了,肯定給他加counter //不然,按照機率是否加counter,同時跟baseval與lfu_log_factor相關 //都是在分子,所以2個值越大,加counter機率越小 double p = 1.0/(baseval*server.lfu_log_factor+1); if (r < p) counter++; return counter; }
自動觸發
save 900 1 900s檢查一次,至少有1個key被修改就觸發 save 300 10 300s檢查一次,至少有10個key被修改就觸發 save 60 10000 60s檢查一次,至少有10000個key被修改
任何元件在正常關閉的時候,都會去完成應該完成的事。比如Mysql 中的Redolog持久化,正常關閉的時候也會去持久化。
資料清空指令會觸發RDB操作,並且是觸發一個空的RDB檔案,所以, 如果在沒有開啟其他的持久化的時候,flushall是可以刪庫跑路的,在生產環境慎用。
優勢
不足
由於RDB的資料可靠性非常低,所以Redis又提供了另外一種持久化方案: Append Only File 簡稱:AOF
AOF預設是關閉的,你可以在組態檔中進行開啟:
appendonly no //預設關閉,可以進行開啟 # The name of the append only file (default:"appendonly.aof") appendfilename "appendonly.aof" //AOF檔名
追加檔案,即每次更改的命令都會附加到我的AOF檔案中。
AOF會記錄每個寫操作,那麼問題來了。我難道每次操作命令都要和磁碟互動?
當然不行,所以redis支援幾種策略,由使用者來決定要不要每次都和磁碟互動
# appendfsync always 表示每次寫入都執行fsync(重新整理)函數 效能會非常非常慢 但是非常安全 appendfsync everysec 每秒執行一次fsync函數 可能丟失1s的資料 # appendfsync no 由作業系統保證資料同步到磁碟,速度最快 你的資料只需要交給作業系統就行
預設1s一次,最多有1s丟失
由於AOF是追加的形式,所以檔案會越來越大,越大的話,資料載入越慢。 所以我們需要對AOF檔案進行重寫。
何為重寫
比如 我們的incr指令,假如我們incr了100次,現在資料是100,但是我們的aof檔案中會有100條incr指令,但是我們發現這個100條指令用處不大, 假如我們能把最新的記憶體裡的資料儲存下來的話。所以,重寫就是做了這麼一件事情,把當前記憶體的資料重寫下來,然後把之前的追加的檔案刪除。
重寫流程
但是重寫是把當前記憶體的資料,寫入一個新的AOF檔案,如果當前資料比較大,然後以指令的形式寫入的話,會很慢很慢
所以在4.0之後,在重寫的時候是採用RDB的方式去生成新的AOF檔案,然 後後續追加的指令,還是以指令追加的形式追加的檔案末尾
aof-use-rdb-preamble yes //是否開啟RDB與AOF混合模式
什麼時候重寫
組態檔redis.conf
# 重寫觸發機制 auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb 就算達到了第一個百分比的大小,也必須大於 64M 在 aof 檔案小於64mb的時候不進行重寫,當到達64mb的時候,就重寫一 次。重寫後的 aof 檔案可能是40mb。上面設定了auto-aof-rewritepercentag為100,即 aof 檔案到了80mb的時候,進行重寫。
優勢
不足
採用RDB或者不持久化, 會有資料丟失,因為是手動或者設定以快照的形式來進行備份。
解決:啟用AOF,以命令追加的形式進行備份,但是預設也會有1s丟失, 這是在效能與資料安全性中尋求的一個最適合的方案,如果為了保證資料 一致性,可以將設定更改為always,但是效能很慢,一般不用。
因為Redis的資料是主非同步同步給從的,提升了效能,但是由於是非同步同步到從。所以存在資料丟失的可能
快取雪崩就是Redis的大量熱點資料同時過期(失效),因為設定了相同的過期時間,剛好這個時候Redis請求的並行量又很大,就會導致所有的請求落到資料庫。
快取穿透是指快取和資料庫中都沒有的資料,但是使用者一直請求不存在的資料!這時的使用者很可能就是攻擊者,惡意搞你們公司的,攻擊會導致資料庫壓力過大。
解決方案:布隆過濾器
布隆過濾器的思想:既然是因為你Redis跟DB都沒有,然後使用者惡意一直存取不存在的key,然後全部打到Redis跟DB。那麼我們能不能單獨把DB的資料單獨存到另外一個地方,但是這個地方只存一個簡單的標記,標記這個key存不存在,如果存在,就去存取Redis跟DB,否則直接返回。並且這個記憶體最好很小。
單個key過期的時候有大量並行,使用互斥鎖,回寫redis,並且採用雙重檢查鎖來提升效能!減少對DB的存取。
到此這篇關於一篇文章帶你弄清楚Redis的精髓的文章就介紹到這了,更多相關一文搞懂Redis內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45