首頁 > 軟體

一篇文章帶你弄清楚Redis的精髓

2023-02-08 22:02:42

一、Redis的特性

1.1 Redis為什麼快?

  • 基於記憶體操作,操作不需要跟磁碟互動,單次執行很快
  • 命令執行是單執行緒,因為是基於記憶體操作,單次執行的時間快於執行緒切換時間,同時通訊採用多路複用
  • Redis本身就是一個k-v結構,類似於hashMap,所以查詢效能接近於O(1)
  • 同時redis自己底層資料結構支援,比如跳錶、SDS
  • lO多路複用,單個執行緒中通過記錄跟蹤每一個sock(I/O流)的狀態來管理多個I/O流

1.2 Redis其他特性

  • 更豐富的資料型別,雖然都是k、v結構,value可以儲存很多的資料型別
  • 完善的記憶體管理機制、保證資料一致性:持久化機制、過期策略
  • 支援多種程式語言
  • 高可用,叢集、保證高可用

1.3 Redis高可用

  • 很完善的記憶體管理機制,過期、淘汰、持久化
  • 叢集模式,主從、哨兵、cluster叢集

二、Redis資料型別以及使用場景

Redis的資料型別有String、Hash、Set、List、Zset、bitMap(基於String型別)、 Hyperloglog(基於String型別)、Geo(地理位置)、Streams流。

2.1 String

2.1.1 基本指令

// 批次設定
> mset key1 value1 key2 value2
// 批次獲取
> mget key1 key2
// 獲取長度
> strlen key  
//  字串追加內容
> append key xxx
// 獲取指定區間的字元
> getrange key 0 5
// 整數值遞增 (遞增指定的值)
> incr intkey (incrby intkey 10)
// 當key 存在時將覆蓋
> SETEX key seconds value
// 將 key 的值設為 value ,當且僅當 key 不存在。
> SETNX key value

2.1.2 應用場景

  1. 快取相關場景 快取、 token(跟過期屬性完美契合)
  2. 執行緒安全的計數場景 (軟限流、分散式ID等)

2.2 Hash

2.2.1 基本指令

// 將雜湊表 key 中的域 field 的值設為 value 。
> HSET key field value
// 返回雜湊表 key 中給定域 field 的值。
>  HGET key field
// 返回雜湊表 key 中的所有域。
> HKEYS key
// 返回雜湊表 key 中所有域的值。
>  HVALS key
// 為雜湊表 key 中的域 field 的值加上增量 increment 。
> HINCRBY key field increment
// 檢視雜湊表 key 中,給定域 field 是否存在。
> HEXISTS key field

2.2.2 應用場景

  1. 儲存物件類的資料(官網說的)比如一個物件下有多個欄位
  2. 統計類的資料,可以對單個統計資料進行單獨操作

2.3 List

儲存有序的字串列表,元素可以重複。列表的最大長度為 2^32 - 1 個元素(4294967295,每個列表超過 40 億個元素)。

2.3.1 基本指令

// 將一個或多個值 value 插入到列表 key 的表頭
> LPUSH key value [value ...]
// 將一個或多個值 value 插入到列表 key 的表尾(最右邊)。
> RPUSH key value [value ...]
// 移除並返回列表 key 的頭元素。
> LPOP key
// 移除並返回列表 key 的尾元素。
> RPOP key
// BLPOP 是列表的阻塞式(blocking)彈出原語。
> BLPOP key [key ...] timeout
// BRPOP 是列表的阻塞式(blocking)彈出原語。
> BRPOP key [key ...] timeout
// 獲取指點位置元素
> LINDEX key index

2.3.2 應用場景

  1. 使用者訊息時間線

因為list是有序的,所以我們可以先進先出,先進後出的列表都可以做

2.4 Set

2.4.1 基本指令

// 將一個或多個 member 元素加入到集合 key 當中,已經存在於集合的 member 元素將被忽略。
> SADD key member [member ...]
// 返回集合 key 中的所有成員。
> SMEMBERS key
// 返回集合 key 的基數(集合中元素的數量)。
> SCARD key
// 如果命令執行時,只提供了 key 引數,那麼返回集合中的一個隨機元素。
> SRANDMEMBER key [count]
// 移除並返回集合中的一個隨機元素。
> SPOP key
// 移除集合 key 中的一個或多個 member 元素,不存在的 member 元素會被忽略。
> SREM key member [member ...]
// 判斷 member 元素是否集合 key 的成員。
> SISMEMBER key member
// 獲取前一個集合有而後面1個集合沒有的
> sdiff huihuiset huihuiset1
// 獲取交集
> sinter huihuiset huihuiset1
// 獲取並集
> sunion huihuiset huihuiset1

2.4.2 應用場景

  1. 抽獎 spop跟srandmember隨機彈出或者獲取元素
  2. 點贊、簽到等,sadd集合儲存
  3. 交集並集 關注等場景

2.5 ZSet(SortedSet)

sorted set,有序的set,每個元素有個score。

score相同時,按照key的ASCII碼排序。

2.5.1 基本指令

//將一個或多個 member 元素及其 score 值加入到有序集 key 當中。
> ZADD key score member [[score member] [score member] ...]
// 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞增(從小到大)來排序
> ZRANGE key start stop [WITHSCORES]
// 返回有序集 key 中,指定區間內的成員。其中成員的位置按 score 值遞減(從大到小)來排列。
> ZREVRANGE key start stop [WITHSCORES]
// 返回有序集 key 中,所有 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員。有序整合員按 score 值遞增(從小到大)次序排列。
>  ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
// 移除有序集 key 中的一個或多個成員,不存在的成員將被忽略。
> ZREM key member [member ...]
// 返回有序集 key 的基數。
> ZCARD key
// 為有序集 key 的成員 member 的 score 值加上增量 increment 。
> ZINCRBY key increment member
// 返回有序集 key 中, score 值在 min 和 max 之間(預設包括 score 值等於 min 或 max )的成員的數量。
> ZCOUNT key min max
// 返回有序集 key 中成員 member 的排名。其中有序整合員按 score 值遞增(從小到大)順序排列。
> ZRANK key member

2.5.2 應用場景

  1. 排行榜

三、Redis的事務

3.1 事務基本操作

// 1. multi命令開啟事務,exec命令提交事務
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
// 2. 組隊的過程中可以通過discard來放棄組隊。
 127.0.0.1:6379> multi
OK
 127.0.0.1:6379(TX)> set k3 v3
QUEUED
 127.0.0.1:6379(TX)> set k4 v4
QUEUED
 127.0.0.1:6379(TX)> discard
OK

3.2 Redis事務特性

  1. 單獨的隔離操作

事務中的所有命令都會序列化、按順序地執行。

事務在執行過程中,不會被其他使用者端傳送來的命令請求所打斷。

  1. 沒有隔離級別的概念

佇列中的命令沒有提交之前,都不會被實際地執行,因為事務提交之前任何指令都不會被實際執行,也就不存在“事務內的查詢要看到事務裡的更新,在事務外查詢不能看到”這個讓人萬分頭疼的問題。

  1. 不保證原子性

Redis同一個事務中如果有一條命令執行失敗,其後的命令仍然會被執行,沒有回滾。

四、Redis 分散式鎖

4.1 Lua 實現分散式鎖

local lockSet = redis.call('exists', KEYS[1])
if lockSet == 0 
  then
    redis.call('set', KEYS[1], ARG[1])
    redis.call('expire', KEYS[1], ARG[2])
  end
return lockSet

5.1 Redis dict字典

5.1.1 RedisDb資料介面(server.h檔案)

資料最外層的結構為RedisDb

typedef struct redisDb {
    dict *dict; /* The keyspace for this DB */ //資料庫的鍵
    dict *expires; /* Timeout of keys with a timeout set */ //設定了超時時間的鍵
    dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //使用者端等待的keys
    dict *ready_keys; /* Blocked keys that received a PUSH */
    dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
    int id; /* Database ID */ //所在數 據庫ID
    long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,僅用於統計
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

5.1.2 dict資料結構(dict.h檔案)

我們發現資料儲存在dict中

typedef struct dict {
    dictType *type; //理解為物件導向思想,為支援不同的資料型別對應dictType抽象方法,不同的資料型別可以不同實現
    void *privdata; //也可不同的資料型別相關,不同型別特定函數的可選引數。
    dictht ht[2]; //2個hash表,用來資料儲存 2個dictht
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash標記 -1代表不再rehash
    unsigned long iterators; /* number of iterators currently running */
} dict;

5.1.3 dictht結構(dict.h檔案)

typedef struct dictht {
    dictEntry **table; //dictEntry 陣列
    unsigned long size; //陣列大小 預設為4 #define DICT_HT_INITIAL_SIZE 4
    unsigned long sizemask; //size-1 用來取模得到資料的下標值
    unsigned long used; //改hash表中已有的節點資料
} dictht;

5.1.4 dictEntry節點結構(dict.h檔案)

typedef struct dictEntry {
    void *key; //key
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v; //value
    struct dictEntry *next; //指向下一個節點
} dictEntry;

/* Objects encoding. Some kind of objects like Strings and
Hashes can be
* internally represented in multiple ways. The 'encoding'
field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old
list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string
encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list
of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree
of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of
obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution
in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object
never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object
allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject {
unsigned type:4; //資料型別 string hash list
unsigned encoding:4; //底層的資料結構 跳錶
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant
8 bits frequency
* and most significant 16 bits
access time). */
int refcount; //用於回收,參照計數法
void *ptr; //指向具體的資料結構的記憶體地址 比如 跳錶、壓縮列表
} robj;

5.2 Redis資料結構圖

5.3 Redis擴容機制

因為我們的dictEntry陣列預設大小是4,如果不進行擴容,那麼我們所有的資料都會以連結串列的形式新增至陣列下標 隨著資料量越來越大,之前只需要hash取模就能得到下標位置,現在得去迴圈我下標的連結串列,所以效能會越來越慢,當我們的資料量達到一定程度後,我們就得去觸發擴容操作。

5.3.1 什麼時候擴容

5.3.1.1 擴容機制
  1. 當沒有fork子程序在進行RDB或AOF持久化時,ht[0]的used大於size時,觸發擴容
  2. 如果有子程序在進行RDB或者AOF時,ht[0]的used大於等於size的5倍的時候,會觸發擴容
5.3.1.2 原始碼驗證

擴容,肯定是在新增資料的時候才會擴容,所以我們找一個新增資料的入口。

  1. 入口,新增或替換dictReplace (dict.c檔案)
int dictReplace(dict *d, void *key, void *val) {
    dictEntry *entry, *existing, auxentry;
    /* Try to add the element. If the key
    * does not exists dictAdd will succeed. */
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }
    /* Set the new value and free the old one. Note that
    it is important
    * to do that in this order, as the value may just be
    exactly the same
    * as the previous one. In this context, think to
    reference counting,
    * you want to increment (set), and then decrement
    (free), and not the
    * reverse. */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}
  1. 進入dictAddRaw方法 (dict.c檔案)
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing){
    long index;
    dictEntry *entry;
    dictht *ht;
    
    if (dictIsRehashing(d)) _dictRehashStep(d); //如果正在Rehash 進行漸進式hash 按步rehash
    
    /* Get the index of the new element, or -1 if
    * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
        return NULL;
    
    /* Allocate the memory and store the new entry.
    * Insert the element in top, with the assumption that
    in a database
    * system it is more likely that recently added
    entries are accessed
    * more frequently. */
    //如果在hash 放在ht[1] 否則放在ht[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    //採用頭插法插入hash桶
    entry->next = ht->table[index];
    ht->table[index] = entry;
    //已使用++
    ht->used++;
    
    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
static long _dictKeyIndex(dict *d, const void *key,uint64_t hash, dictEntry **existing){
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;
    
    /* Expand the hash table if needed */
    //擴容如果需要
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    //比較是否存在這個值
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain
        the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key,he->key)) {
                if (existing) *existing = he;
                    return -1;
                }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d){
    /* Incremental rehashing already in progress. Return.*/
    if (dictIsRehashing(d)) return DICT_OK; //正在rehash,返回
    
    /* If the hash table is empty expand it to the initialsize. */
    //如果ht[0]的長度為0,設定預設長度4 dictExpand為擴容的關鍵方法
    if (d->ht[0].size == 0) 
            return dictExpand(d,DICT_HT_INITIAL_SIZE);

    //擴容條件 hash表中已使用的數量大於等於 hash表的大小
    //並且dict_can_resize為1 或者 已使用的大小大於hash表大小的 5倍,大於等於1倍的時候,下面2個滿足一個條件即可
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
        //擴容成原來的2倍
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}
5.3.2.1 擴容步驟
  1. 當滿足我擴容條件,觸發擴容時,判斷是否在擴容,如果在擴容,或者 擴容的大小跟我現在的ht[0].size一樣,這次擴容不做
  2. new一個新的dictht,大小為ht[0].used * 2(但是必須向上2的冪,比 如6 ,那麼大小為8) ,並且ht[1]=新建立的dictht
  3. 我們有個更大的table了,但是需要把資料遷移到ht[1].table ,所以將 dict的rehashidx(資料遷移的偏移量)賦值為0 ,代表可以進行資料遷 移了,也就是可以rehash了
  4. 等待資料遷移完成,資料不會馬上遷移,而是採用漸進式rehash,慢慢的把資料遷移到ht[1]
  5. 當資料遷移完成,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
  6. 把dict的rehashidex=-1
5.3.2.2 原始碼驗證
  1. dictExpand方法在_dictExpandIfNeeded 方法中呼叫 (dict.c檔案),為擴容的關鍵方法
int dictExpand(dict *d, unsigned long size){
    /* the size is invalid if it is smaller than the
    number of
    * elements already inside the hash table */
    //正在擴容,不允許第二次擴容,或者ht[0]的資料量大於擴容後的數量,沒有意義,這次不擴容
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;
    
    dictht n; /* the new hash table */
    //得到最接近2的冪 跟hashMap思想一樣
    unsigned long realsize = _dictNextPower(size);
    
    /* Rehashing to the same table size is not useful. */
    //如果跟ht[0]的大小一致 直接返回錯誤
    if (realsize == d->ht[0].size) return DICT_ERR;
    
    /* Allocate the new hash table and initialize all
    pointers to NULL */
    //為新的dictht賦值
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;
    
    /* Is this the first initialization? If so it's not
    really a rehashing
    * we just set the first hash table so that it can
    accept keys. */
    //ht[0].table為空,代表是初始化
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }
    
    /* Prepare a second hash table for incremental rehashing */
    //將擴容後的dictht賦值給ht[1]
    d->ht[1] = n;
    //標記為0,代表可以開始rehash
    d->rehashidx = 0;
    return DICT_OK;
}
5.3.3.1 遷移方式

假如一次性把資料全部遷移會很耗時間,會讓單條指令等待很久,會形成阻塞。

所以,redis採用漸進式Rehash,所謂漸進式,就是慢慢的,不會一次性把所有資料遷移。

那什麼時候會進行漸進式資料遷移?

  1. 每次進行key的crud操作都會進行一個hash桶的資料遷移
  2. 定時任務,進行部分資料遷移
5.3.3.2 原始碼驗證
CRUD觸發都會觸發_dictRehashStep(漸進式hash)
  1. 每次增刪改的時候都會呼叫_dictRehashStep方法
if (dictIsRehashing(d)) _dictRehashStep(d); //這個程式碼在增刪改查的方法中都會呼叫
  1. _dictRehashStep方法
static void _dictRehashStep(dict *d) {
  if (d->iterators == 0) dictRehash(d,1);
}
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty
    buckets to visit. */ //要存取的最大的空桶數 防止此次耗時過長
    
    if (!dictIsRehashing(d)) return 0; //如果沒有在rehash返回0
    
    //迴圈,最多1次,並且ht[0]的資料沒有遷移完 進入迴圈
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;
        /* Note that rehashidx can't overflow as we are
        sure there are more
        * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        //rehashidx rehash的索引,預設0 如果hash桶為空 進入自旋 並且判斷是否到了最大的存取空桶數
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下標為rehashidx桶
        /* Move all the keys in this bucket from the old
        to the new hash HT */
        while(de) { //迴圈hash桶的連結串列 並且轉移到ht[1]
            uint64_t h;
            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d-
            >ht[1].sizemask;
            de->next = d->ht[1].table[h]; //頭插
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        //轉移完後 將ht[0]相對的hash桶設定為null
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    /* Check if we already rehashed the whole table... */
    //ht[0].used=0 代表rehash全部完成
    if (d->ht[0].used == 0) {
        //清空ht[0]table
        zfree(d->ht[0].table);
        //將ht[0] 重新指向已經完成rehash的ht[1]
        d->ht[0] = d->ht[1];
        //將ht[1]所有資料重新設定
        _dictReset(&d->ht[1]);
        //設定-1,代表rehash完成
        d->rehashidx = -1;
        return 0;
    }
    /* More to rehash... */
    return 1;
}

//將ht[1]的屬性復位
static void _dictReset(dictht *ht)
{
    ht->table = NULL;
    ht->size = 0;
    ht->sizemask = 0;
    ht->used = 0;
}

再講定時任務之前,我們要知道Redis中有個時間事件驅動,在 server.c檔案下有個serverCron 方法。

serverCron 每隔多久會執行一次,執行頻率根據redis.conf中的hz設定,serverCorn中有個方法databasesCron()

  1. 定時方法serverCron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //.......
    /* We need to do a few operations on clients
    asynchronously. */
    clientsCron();
    /* Handle background operations on Redis databases. */
    databasesCron(); //處理Redis資料庫上的後臺操作。
    //.......
}
  1. databasesCron方法
void databasesCron(void) {
    /* Expire keys by random sampling. Not required for
    slaves
    * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }
    /* Defrag keys gradually. */
    activeDefragCycle();
    /* Perform hash tables rehashing if needed, but only
    if there are no
    * other processes saving the DB on disk. Otherwise
    rehashing is bad
    * as will cause a lot of copy-on-write of memory
    pages. */
    if (!hasActiveChildProcess()) {
        /* We use global counters so if we stop the
        computation at a given
        * DB we'll be able to start from the successive
        in the next
        * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;
        /* Don't test more DBs than we have. */
        if (dbs_per_call > server.dbnum) 
            dbs_per_call = server.dbnum;
        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }
        /* Rehash */ //Rehash邏輯
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                //進入incrementallyRehash
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop
                        here, we'll do
                        * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash,
                        we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
}

int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; 
        /* already used our millisecond for this
        loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this
        loop... */
    }
    return 0;
}
  1. 進入dictRehashMilliseconds(dict.c)
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;
    //進入rehash方法 dictRehash 去完成漸進時HASH
    while(dictRehash(d,100)) {
        rehashes += 100;
        //判斷是否超時
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

5.4 String型別資料結構

Redis中String的底層沒有用c的char來實現,而是採用了SDS(simple Dynamic String)的資料結構來實現。並且提供了5種不同的型別

5.4.1 SDS資料結構定義(sds.h檔案)

typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags
    byte directly.
    * However is here to document the layout of type 5 SDS
strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of
        string length */
    char buf[];
}

struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len;  /* used */ //已使用的長度
    uint8_t alloc; /* excluding the header and null terminator */ //分配的總容量 不包含頭和空終止符
    unsigned char flags; /* 3 lsb of type, 5 unused bits */ //低三位型別 高5bit未使用
    char buf[]; //資料buf 儲存字串陣列
};

struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};

struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};

struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};
  1. char字串不記錄自身長度,所以獲取一個字串長度的複雜度是O(n),但是SDS記錄分配的長度alloc,已使用的長度len,獲取長度為 O(1)
  2. 減少修改字串帶來的記憶體重分配次數
  3. 空間預分配,SDS長度如果小於1MB,預分配跟長度一樣的,大於1M,每次跟len的大小多1M
  4. 惰性空間釋放,擷取的時候不馬上釋放空間,供下次使用!同時提供相應的釋放SDS未使用空間的API
  5. 二進位制安全

5.5 Hash型別資料結構

Redis的字典相當於Java語言中的HashMap,他是無序字典。內部結構上同樣是陣列 + 連結串列二維結構。第一維hash的陣列位置碰撞時,就會將碰撞的元素使用連結串列串起來。

5.5.1 Hash資料結構圖

5.5.2 Hash的壓縮列表

壓縮列表會根據存入的資料的不同型別以及不同大小,分配不同大小的空間。所以是為了節省記憶體而採用的。

因為是一塊完整的記憶體空間,當裡面的元素髮生變更時,會產生連鎖更新,嚴重影響我們的存取效能。所以,只適用於資料量比較小的場景。

所以,Redis會有相關設定,Hashes只有小資料量的時候才會用到ziplist,當hash物件同時滿足以下兩個條件的時候,使用ziplist編碼:

  1. 雜湊物件儲存的鍵值對數量<512個
  2. 所有的鍵值對的鍵和值的字串長度都<64byte(一個英文字母一個位元組)
hash-max-ziplist-value 64 // ziplist中最大能存放的值長度
hash-max-ziplist-entries 512 // ziplist中最多能存放的entry節點數量

5.6 List型別資料結構

5.6.1 quickList快速列表

兼顧了ziplist的節省記憶體,並且一定程度上解決了連鎖更新的問題,我們的 quicklistNode節點裡面是個ziplist,每個節點是分開的。那麼就算髮生了連鎖更新,也只會發生在一個quicklistNode節點。

quicklist.h檔案

typedef struct{
    struct quicklistNode *prev; //前指標
    struct quicklistNode *next; //後指標
    unsigned char *zl; //資料指標 指向ziplist結果
    unsigned int sz; //ziplist大小 /* ziplist
    size in bytes */
    unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素
    unsigned int encoding : 2; /* RAW==1 or LZF==2 */ //
    是否壓縮, 1沒有壓縮 2 lzf壓縮
    unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //預留容器欄位
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for  future usage */ //預留欄位
} quicklistNode;

5.6.2 list資料結構圖

5.7 Set型別資料結構

Redis用intset或hashtable儲存set。滿足下面條件,就用inset儲存。

  1. 如果不是整數型別,就用dictht hash表(陣列+連結串列)
  2. 如果元素個數超過512個,也會用hashtable儲存。跟一個設定有關:
set-max-intset-entries 512

不滿足上述條件的,都使用hash表儲存,value存null。

5.8 ZSet資料結構

5.8.1 ZipList

預設使用ziplist編碼。

在ziplist的內部,按照score排序遞增來儲存。插入的時候要移動之後的資料。

如果元素數量大於等於128個,或者任一member長度大於等於64位元組使用 skiplist+dict儲存。

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

如果不滿足條件,採用跳錶。

5.8.2 跳錶(skiplist)

結構定義(servier.h)

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
 sds ele; //sds資料
 double score; //score
 struct zskiplistNode *backward; //後退指標
 //層級陣列
 struct zskiplistLevel {
     struct zskiplistNode *forward; //前進指標
     unsigned long span; //跨度
 } level[];
} zskiplistNode;

//跳錶列表
typedef struct zskiplist {
 struct zskiplistNode *header, *tail; //頭尾節點
 unsigned long length; //節點數量
 int level; //最大的節點層級
} zskiplist;

zslInsert 新增節點方法 (t_zset.c)

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds
ele) {
 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 unsigned int rank[ZSKIPLIST_MAXLEVEL];
 int i, level;
 serverAssert(!isnan(score));
 x = zsl->header;
 for (i = zsl->level-1; i >= 0; i--) {
     /* store rank that is crossed to reach the insert
     position */
     rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
     while (x->level[i].forward &&
         (x->level[i].forward->score < score ||
         咕泡出品 必屬精品精品屬精品 咕泡出品 必屬精品 咕泡出品 必屬精品 咕泡咕泡
         (x->level[i].forward->score == score
         &&
         sdscmp(x->level[i].forward->ele,ele) <
         0)))
         {
         rank[i] += x->level[i].span;
         x = x->level[i].forward;
     }
     update[i] = x;
 }
 /* we assume the element is not already inside, since
 we allow duplicated
    * scores, reinserting the same element should never
    happen since the
    * caller of zslInsert() should test in the hash table
    if the element is
    * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    //不同層級建立連結串列聯絡
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is
        inserted here */
        x->level[i].span = update[i]->level[i].span -
        (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) +
        1;
    }
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL :
    update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
        zsl->length++;
    return x;
}
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64
elements */

六、Redis過期策略

6.1 惰性過期

所謂惰性過期,就是在每次存取操作key的時候,判斷這個key是不是過期了,過期了就刪除。

6.1.1 原始碼驗證

  1. expireIfNeeded方法(db.c檔案)
int expireIfNeeded(redisDb *db, robj *key) {
    if (!keyIsExpired(db,key)) return 0;
    /* If we are running in the context of a slave,
    instead of
    * evicting the expired key from the database, we
    return ASAP:
    * the slave key expiration is controlled by the
    master that will
    * send us synthesized DEL operations for expired
    keys.
    *
    * Still we try to return the right information to the
    caller,
    * that is, 0 if we think the key should be still
    valid, 1 if
    * we think the key is expired at this time. */
    // 如果設定有masterhost,說明是從節點,那麼不操作刪除
    if (server.masterhost != NULL) return 1;
    /* Delete the key */
    server.stat_expiredkeys++;
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
    "expired",key,db->id);
    //是否是非同步刪除 防止單個Key的資料量很大 阻塞主執行緒 是4.0之後新增的新功能,預設關閉
    int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
    if (retval) signalModifiedKey(NULL,db,key);
    return retval;
}

該策略可以最大化的節省CPU資源。

但是卻對記憶體非常不友好。因為如果沒有再次存取,就可能一直堆積再記憶體中。佔用記憶體

所以就需要另一種策略來配合使用,就是定期過期

6.2 定期過期

那麼多久去清除一次,我們在講Rehash的時候,有個方法是serverCron,執行頻率根據redis.conf中的hz設定。

這方法除了做Rehash以外,還會做很多其他的事情,比如:

  1. 清理資料庫中的過期鍵值對
  2. 更新伺服器的各類統計資訊,比如時間、記憶體佔用、資料庫佔用情況等
  3. 關閉和清理連線失效的使用者端
  4. 嘗試進行持久化操作

6.2.1 實現流程

  1. 定時serverCron方法去執行清理,執行頻率根據redis.cong中的hz設定的值(預設是10,也就是1s執行10次,100ms執行一次)
  2. 執行清理的時候,不是去掃描所有的key,而是去掃描所有設定了過期時間的key redisDB.expires
  3. 如果每次去把所有的key都拿出來,那麼假如過期的key很多,就會很慢,所以也不是一次性拿出所有的key
  4. 根據hash桶的維度去掃描key,掃到20(可設定)個key為止。假如第一個桶是15個key沒有滿足20,繼續掃描第二個桶,第二個桶20個key,由於是以hash桶的維度掃描的,所以第二個掃到了就會全掃,總共掃描35個key
  5. 找到掃描的key裡面過期的key,並進行刪除
  6. 如果取了400個空桶(最多拿400個桶),或者掃描的刪除比例跟掃描的總數超過10%,繼續執行4、5步
  7. 也不能無限的迴圈,迴圈16次後回去檢測時間,超過指定時間會跳出

6.2.2 原始碼驗證

  1. 入口serverCrom
// serverCron方法呼叫databasesCron方法(server.c)
/* Handle background operations on Redis databases. */
databasesCron();

void databasesCron(void) {
    /* Expire keys by random sampling. Not required for
    slaves
    * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        //過期刪除方法
        } else {
            expireSlaveKeys();
        }
    }
}

由於Redis記憶體是有大小的,並且我可能裡面的資料都沒有過期,當快滿的時候,我又沒有過期的資料進行淘汰,那麼這個時候記憶體也會滿。記憶體滿了,Redis也會不能放入新的資料。所以,我們不得已需要一些策略來解決這個問題來保證可用性。

7.1 淘汰策略

7.1.1 noeviction

New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.

預設,不淘汰 能讀不能寫

7.1.2 allkeys-lru

Keeps most recently used keys; removes least recently used (LRU) keys

基於偽LRU演演算法,在所有的key中去淘汰

7.1.3 allkeys-lfu

Keeps frequently used keys; removes least frequently used (LFU) keys.

基於偽LFU演演算法,在所有的key中去淘汰

7.1.4 volatile-lru

Removes least recently used keys with the expire field set to true.

基於偽LRU演演算法,在設定了過期時間的key中去淘汰

7.1.5 volatile-lfu

Removes least frequently used keys with the expire field set to true.

基於偽LFU演演算法 在設定了過期時間的key中去淘汰

7.1.6 allkeys-random

Randomly removes keys to make space for the new data added.

基於隨機演演算法,在所有的key中去淘汰

7.1.7 volatile-random

Randomly removes keys with expire field set to true.

基於隨機演演算法 在設定了過期時間的key中去淘汰

7.1.8 volatile-ttl

Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.

根據過期時間來,淘汰即將過期的

7.2 淘汰流程

  1. 首先,我們會有個淘汰池,預設大小是16,並且裡面的資料是末尾淘汰制
  2. 每次指令操作的時候,自旋會判斷當前記憶體是否滿足指令所需要的記憶體
  3. 如果當前不滿足,會從淘汰池的尾部拿一個最適合淘汰的資料
  • 會取樣(設定 maxmemory-samples),從Redis中獲取隨機獲取到取樣的資料,解決一次性讀取所有的資料慢的問題
  • 在取樣的資料中,根據淘汰演演算法找到最適合淘汰的資料
  • 將最合適的那個資料跟淘汰池中的資料進行比較,是否比淘汰池的資料更適合淘汰,如果更適合,放入淘汰池
  • 按照適合的程度進行排序,最適合淘汰的放入尾部
  1. 將需要淘汰的資料從Redis刪除,並且從淘汰池移除

7.3 LRU演演算法

Lru,Least Recently Used 翻譯過來是最久未使用,根據時間軸來走,仍很久沒用的資料。只要最近有用過,我就預設是有效的。

那麼它的一個衡量標準是啥?時間!根據使用時間,從近到遠,越遠的越容易淘汰。

7.3.1 實現原理

需求:得到物件隔多久沒存取,隔的時間越久,越容易被淘汰

  1. 首先,LRU是根據這個物件的存取操作時間來進行淘汰的,那麼我們需要知道這個物件最後操作的存取時間
  2. 知道了物件的最後操作存取時間後,我們只需要跟當前系統時間來進行對比,就能算出物件多久沒存取了

7.3.2 原始碼驗證

redis中,物件都會被redisObject物件包裝,其中有個欄位叫lru。

redisObject物件 (server.h檔案)

typedef struct redisObject {
 unsigned type:4;
 unsigned encoding:4;
 unsigned lru:LRU_BITS; /* LRU time (relative to global
 lru_clock) or
 * LFU data (least significant 8 bits frequency
 * and most significant 16 bits access time). */
 int refcount;
 void *ptr;
} robj;

看LRU的欄位的說明,那麼我們大概能猜出來,redis去實現lru淘汰演演算法肯定跟我們這個物件的lru相關

並且這個欄位的大小為24bit,那麼這個欄位記錄的是物件操作存取時候的秒單位時間的後24bit

相當於:

long timeMillis=System.currentTimeMillis();
System.out.println(timeMillis/1000); //獲取當前秒
System.out.println(timeMillis/1000 & ((1<<24)-1)); //獲取秒的後24位元

我們知道了這個物件的最後操作存取的時間。如果我們要得到這個物件多久沒存取了,我們是不是就很簡單,用我當前的時間-這個物件的存取時間就可以了!但是這個存取時間是秒單位時間的後24bit 所以,也是用當前的時間的秒單位的後24bit去減。

estimateObjectIdleTime方法(evict.c)

unsigned long long estimateObjectIdleTime(robj *o) {
 //獲取秒單位時間的最後24位元
 unsigned long long lruclock = LRU_CLOCK();
 //因為只有24位元,所有最大的值為2的24次方-1
 //超過最大值從0開始,所以需要判斷lruclock(當前系統時間)跟快取物件的lru欄位的大小
 if (lruclock >= o->lru) {
     //如果lruclock>=robj.lru,返回lruclock-o->lru,再轉換單位
     return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
 } else {
     //否則採用lruclock + (LRU_CLOCK_MAX - o->lru),得到對
     象的值越小,返回的值越大,越大越容易被淘汰
     return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
     LRU_CLOCK_RESOLUTION;
 }
}

7.3.3 LRU總結

用lruclock與 redisObject.lru進行比較,因為lruclock只獲取了當前秒單位時間的後24位元,所以肯定有個輪詢。

所以,我們會用lruclock跟 redisObject.lru進行比較,如果lruclock>redisObject.lru,那麼我們用lruclock- redisObject.lru,否則lruclock+(24bit的最大值-redisObject.lru),得到lru越小,那麼返回的資料越大,相差越大的越優先會被淘汰!

<​​https://www.processon.com/view/link/5fbe0541e401fd2d6ed8fc38​​>

7.4 LFU演演算法

LFU,英文 Least Frequently Used,翻譯成中文就是最不常用的優先淘汰。

不常用,它的衡量標準就是次數,次數越少的越容易被淘汰。

這個實現起來應該也很簡單,物件被操作存取的時候,去記錄次數,每次操作存取一次,就+1;淘汰的時候,直接去比較這個次數,次數越少的越容易淘汰!

7.4.1 LFU的時效性問題

何為時效性問題?就是隻會去考慮數量,但是不會去考慮時間。

ps:去年的某個新聞很火,點選量3000W,今年有個新聞剛出來點選量100次,本來我們是想保留今年的新的新聞的,但是如果根據LFU來做的話,我們發現淘汰的是今年的新聞,明顯是不合理的

導致的問題:新的資料進不去,舊的資料出不來。

那麼如何解決呢,且往下看

7.4.2 原始碼分析

我們還是來看redisObject(server.h)

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global
    lru_clock) or
    * LFU data (least significant 8 bits frequency
    * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

我們看它的lru,它如果在LFU演演算法的時候!它前面16位元代表的是時間,後8位元代表的是一個數值,frequency是頻率,應該就是代表這個物件的存取次數,我們先給它叫做counter。

前16bits時間有啥用?

跟時間相關,我猜想應該是跟解決時效性相關的,那麼怎麼解決的?從生活中找例子

大家應該充過一些會員,比如我這個年紀的,小時候喜歡充騰訊的黃鑽、綠鑽、藍鑽等等。但是有一個點,假如哪天我沒充錢了的話,或者沒有續VIP的時候,我這個鑽石等級會隨著時間的流失而降低。比如我本來是黃鑽V6,但是一年不充錢的話,可能就變成了V4。

那麼回到Redis,大膽的去猜測,那麼這個時間是不是去記錄這個物件有多久沒存取了,如果多久沒存取,我就去減少對應的次數。

LFUDecrAndReturn方法(evict.c)

unsigned long LFUDecrAndReturn(robj *o) {
    //lru欄位右移8位元,得到前面16位元的時間
    unsigned long ldt = o->lru >> 8;
    //lru欄位與255進行&運算(255代表8位元的最大值),
    //得到8位元counter值
    unsigned long counter = o->lru & 255;
    //如果設定了lfu_decay_time,用LFUTimeElapsed(ldt) 除以設定的值
    //LFUTimeElapsed(ldt)原始碼見下
    //總的沒存取的分鐘時間/設定值,得到每分鐘沒存取衰減多少
    unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
    if (num_periods)
    //不能減少為負數,非負數用couter值減去衰減值
    counter = (num_periods > counter) ? 0 : counter - num_periods;
    return counter;
}

衰減因子由設定

lfu-decay-time 1 //多少分鐘沒操作存取就去衰減一次

後8bits的次數,最大值是255,夠不夠?

肯定不夠,一個物件的存取操作次數肯定不止255次。

但是我們可以讓資料達到255的很難。那麼怎麼做的?這個比較簡單,我們先看原始碼,然後再總結

LFULogIncr方法 (evict.c 檔案)

uint8_t LFULogIncr(uint8_t counter) {
    //如果已經到最大值255,返回255 ,8位元的最大值
    if (counter == 255) return 255;
    //得到亂數(0-1)
    double r = (double)rand()/RAND_MAX;
    //LFU_INIT_VAL表示基數值(在server.h設定)
    double baseval = counter - LFU_INIT_VAL;
    //如果達不到基數值,表示快不行了,baseval =0
    if (baseval < 0) baseval = 0;
    //如果快不行了,肯定給他加counter
    //不然,按照機率是否加counter,同時跟baseval與lfu_log_factor相關
    //都是在分子,所以2個值越大,加counter機率越小
    double p = 1.0/(baseval*server.lfu_log_factor+1);
    if (r < p) counter++;
    return counter;
}

八、Redis 持久化

8.1 RBD

8.1.1 何時觸發RBD

自動觸發

  1. 設定觸發
save 900 1 900s檢查一次,至少有1個key被修改就觸發
save 300 10 300s檢查一次,至少有10個key被修改就觸發
save 60 10000 60s檢查一次,至少有10000個key被修改
  1. shutdown正常關閉

任何元件在正常關閉的時候,都會去完成應該完成的事。比如Mysql 中的Redolog持久化,正常關閉的時候也會去持久化。

  1. flushall指令觸發

資料清空指令會觸發RDB操作,並且是觸發一個空的RDB檔案,所以, 如果在沒有開啟其他的持久化的時候,flushall是可以刪庫跑路的,在生產環境慎用。

8.1.2 RDB的優劣

優勢

  1. 是個非常緊湊型的檔案,非常適合備份與災難恢復
  2. 最大限度的提升了效能,會fork一個子程序,父程序永遠不會產於磁碟 IO或者類似操作
  3. 更快的重啟

不足

  1. 資料安全性不是很高,因為是根據設定的時間來備份,假如每5分鐘備份 一次,也會有5分鐘資料的丟失
  2. 經常fork子程序,所以比較耗CPU,對CPU不是很友好

8.2 AOF

由於RDB的資料可靠性非常低,所以Redis又提供了另外一種持久化方案: Append Only File 簡稱:AOF

AOF預設是關閉的,你可以在組態檔中進行開啟:

appendonly no //預設關閉,可以進行開啟
# The name of the append only file (default:"appendonly.aof")
appendfilename "appendonly.aof" //AOF檔名

追加檔案,即每次更改的命令都會附加到我的AOF檔案中。

8.2.1 同步機制

AOF會記錄每個寫操作,那麼問題來了。我難道每次操作命令都要和磁碟互動?

當然不行,所以redis支援幾種策略,由使用者來決定要不要每次都和磁碟互動

# appendfsync always 表示每次寫入都執行fsync(重新整理)函數 效能會非常非常慢 但是非常安全
appendfsync everysec 每秒執行一次fsync函數 可能丟失1s的資料
# appendfsync no 由作業系統保證資料同步到磁碟,速度最快 你的資料只需要交給作業系統就行

預設1s一次,最多有1s丟失

8.2.2 重寫機制

由於AOF是追加的形式,所以檔案會越來越大,越大的話,資料載入越慢。 所以我們需要對AOF檔案進行重寫。

何為重寫

比如 我們的incr指令,假如我們incr了100次,現在資料是100,但是我們的aof檔案中會有100條incr指令,但是我們發現這個100條指令用處不大, 假如我們能把最新的記憶體裡的資料儲存下來的話。所以,重寫就是做了這麼一件事情,把當前記憶體的資料重寫下來,然後把之前的追加的檔案刪除。

重寫流程

  • 在Redis7之前
  1. Redis fork一個子程序,在一個臨時檔案中寫入新的AOF(當前記憶體的資料生成的新的AOF)
  2. 那麼在寫入新的AOF的時候,主程序還會有指令進入,那麼主程序會在記憶體快取區中累計新的指令 (但是同時也會寫在舊的AOF檔案中,就算 重寫失敗,也不會導致AOF損壞或者資料丟失)
  3. 如果子程序重寫完成,父程序會收到完成訊號,並且把記憶體快取中的指令追加到新的AOF檔案中
  4. 替換舊的AOF檔案 ,並且將新的指令附加到重寫好的AOF檔案中
  • 在Redis7之後,AOF檔案不再是一個,所以會有臨時清單的概念
  1. 子程序開始在一個臨時檔案中寫入新的基礎AOF
  2. 父級開啟一個新的增量 AOF 檔案以繼續寫入更新。如果重寫失敗,舊的基礎和增量檔案(如果有的話)加上這個新開啟的增量檔案就代表了完整的更新資料集,所以我們是安全的
  3. 當子程序完成基礎檔案的重寫後,父程序收到一個訊號,並使用新開啟 的增量檔案和子程序生成的基礎檔案來構建臨時清單,並將其持久化
  4. 現在 Redis 對清單檔案進行原子交換,以便此 AOF 重寫的結果生效。 Redis 還會清理舊的基礎檔案和任何未使用的增量檔案

但是重寫是把當前記憶體的資料,寫入一個新的AOF檔案,如果當前資料比較大,然後以指令的形式寫入的話,會很慢很慢

所以在4.0之後,在重寫的時候是採用RDB的方式去生成新的AOF檔案,然 後後續追加的指令,還是以指令追加的形式追加的檔案末尾

aof-use-rdb-preamble yes //是否開啟RDB與AOF混合模式

什麼時候重寫

組態檔redis.conf

# 重寫觸發機制
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb 就算達到了第一個百分比的大小,也必須大於 64M

在 aof 檔案小於64mb的時候不進行重寫,當到達64mb的時候,就重寫一
次。重寫後的 aof 檔案可能是40mb。上面設定了auto-aof-rewritepercentag為100,即 aof 檔案到了80mb的時候,進行重寫。

8.2.3 AOF的優勢與不足

優勢

  1. 安全性高,就算是預設的持久化同步機制,也最多隻會丟失1s資料
  2. AOF由於某些原因,比如磁碟滿了等導致追加失敗,也能通過redischeck-aof 工具來修復
  3. 格式都是追加的紀錄檔,所以可讀性更高

不足

  1. 資料集一般比RDB大
  2. 持久化跟資料載入比RDB更慢
  3. 在7.0之前,重寫的時候,因為重寫的時候,新的指令會快取在記憶體區,所以會導致大量的記憶體使用
  4. 並且重寫期間,會跟磁碟進行2次IO,一個是寫入老的AOF檔案,一個是寫入新的AOF檔案

九、Redis常見問題總結

9.1 Redis資料丟失場景

9.1.1 持久化丟失

採用RDB或者不持久化, 會有資料丟失,因為是手動或者設定以快照的形式來進行備份。

解決:啟用AOF,以命令追加的形式進行備份,但是預設也會有1s丟失, 這是在效能與資料安全性中尋求的一個最適合的方案,如果為了保證資料 一致性,可以將設定更改為always,但是效能很慢,一般不用。

9.1.2 主從切換

因為Redis的資料是主非同步同步給從的,提升了效能,但是由於是非同步同步到從。所以存在資料丟失的可能

  1. master寫入資料k1 ,由於是非同步同步到slave,當master沒有同步給slave的時候,master掛了
  2. slave會成為新的master,並且沒有同步k1
  3. master重啟,會成為新master的slave,同步資料會清空自己的資料,從新的master載入
  4. k1丟失

9.2 Redis快取雪崩、穿透、擊穿問題分析

9.2.1 快取雪崩

快取雪崩就是Redis的大量熱點資料同時過期(失效),因為設定了相同的過期時間,剛好這個時候Redis請求的並行量又很大,就會導致所有的請求落到資料庫。

  1. 保證Redis的高可用,防止由於Redis不可用導致全部打到DB
  2. 加互斥鎖或者使用佇列,針對同一個key只允許一個執行緒到資料庫查詢
  3. 快取定時預先更新,避免同時失效
  4. 通過加亂數,使key在不同的時間過期

9.2.2 快取穿透

快取穿透是指快取和資料庫中都沒有的資料,但是使用者一直請求不存在的資料!這時的使用者很可能就是攻擊者,惡意搞你們公司的,攻擊會導致資料庫壓力過大。

解決方案:布隆過濾器

布隆過濾器的思想:既然是因為你Redis跟DB都沒有,然後使用者惡意一直存取不存在的key,然後全部打到Redis跟DB。那麼我們能不能單獨把DB的資料單獨存到另外一個地方,但是這個地方只存一個簡單的標記,標記這個key存不存在,如果存在,就去存取Redis跟DB,否則直接返回。並且這個記憶體最好很小。

9.2.3 快取擊穿

單個key過期的時候有大量並行,使用互斥鎖,回寫redis,並且採用雙重檢查鎖來提升效能!減少對DB的存取。

到此這篇關於一篇文章帶你弄清楚Redis的精髓的文章就介紹到這了,更多相關一文搞懂Redis內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com