首頁 > 軟體

redis stream 實現訊息佇列的實踐

2022-08-10 18:03:57

Redis5.0帶來了Stream型別。從字面上看是流型別,但其實從功能上看,應該是Redis對訊息佇列(MQ,Message Queue)的完善實現。

基於redis實現訊息佇列的方式有很多:

  • PUB/SUB,訂閱/釋出模式
  • 基於List的 LPUSH+BRPOP 的實現

redis 實現訊息對列4中方法

釋出訂閱

釋出訂閱優點: 典型的一對的,所有消費者都能同時消費到訊息。主動通知訂閱者而不是訂閱者輪詢去讀。

釋出訂閱缺點: 不支援多個消費者公平消費訊息,訊息沒有持久化,不管訂閱者是否收到訊息,訊息都會丟失。

使用場景:微服務間的訊息同步,如 分散式webSocker,資料同步等。

list 佇列

生產者通過lpush生成訊息,消費者通過blpop阻塞讀取訊息。

**list佇列優點:**支援多個消費者公平消費訊息,對訊息進行儲存,可以通過lrange查詢佇列內的訊息。

**list佇列缺點:**blpop仍然會阻塞當前連線,導致連線不可用。一旦blpop成功訊息就丟棄了,期間如果伺服器宕機訊息會丟失,不支援一對多消費者。

zset 佇列

生產者通過zadd 建立訊息時指定分數,可以確定訊息的順序,消費者通過zrange獲取訊息後進行消費,消費完後通zrem刪除訊息。

zset優點: 保證了訊息的順序,消費者消費失敗後重新入隊不會打亂消費順序。

zset缺點: 不支援一對多消費,多個消費者消費時可能出現讀取同一條訊息的情況,得通過加鎖或其他方式解決消費的冪等性。

zset使用場景:由於資料是有序的,常常被用於延遲佇列,如 redisson的DelayQueue

Stream 佇列

Redis5.0帶來了Stream型別。從字面上看是流型別,但其實從功能上看,應該是Redis對訊息佇列(MQ,Message Queue)的完善實現。

參考kafka的思想,通過多個消費者組和消費者支援一對多消費,公平消費,消費者內維護了pending列表防止訊息丟失。

提供訊息ack機制。

基本命令

xadd 生產訊息

往 stream 內建立訊息 語法為:

XADD key ID field string [field string …]

# * 表示自動生成id redis會根據時間戳+序列號自動生成id,不建議我們自己指定id
xadd stream1 * name zs age 23  

讀取訊息

讀取stream內的訊息,這個並不是消費,只是提供了檢視資料的功能,語法為:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#表示從 stream1 內取出一條訊息,從第0條訊息讀取(0表示最小的id)
xread count 1 streams stream1 0
#表示從 stream1 內 id=1649143363972-0 開始讀取一條訊息,讀取的是指定id的下一條訊息
xread count 1 streams msg 1649143363972-0

#表示一直阻塞讀取最新的訊息($表示獲取下一個生成的訊息)
xread count 1 block 0 streams stream1 $ 

xrange stream - + 10

XRANGE key startID endID count

#表示從stream1內取10條訊息 起始位置為 -(最小ID) 結束位置為+(最大ID)
xrange stream1 - + 10 

xgroup 消費者組

redis stream 借鑑了kafka的設計,採用了消費者和消費者組的概念。允許多個消費者組消費stream的訊息,每個消費者組都能收到完整的訊息,例如:stream內有10條訊息,消費者組A和消費者組B同時消費時,都能獲取到這10條訊息。

每個消費者組內可以有多個消費者消費,訊息會平均分攤給各個消費者,例如:stream有10條訊息,消費者A,B,C同時在同一個組內消費,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

建立消費者組:

#消費訊息首先得建立消費者組
# 表示為佇列 stream1 建立一個消費者組 group1 從訊息id=0(第一條訊息)開始讀取訊息
xgroup create stream1 group1 0

#查詢stream1內的所有消費者組資訊
xinfo groups stream1

xreadgroup 消費訊息

通過xreadgroup可以在消費者組內建立消費者消費訊息

XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#建立消費者讀取訊息
#在group1消費者組內通過consumer1消費stream1內的訊息,消費1條未分配的訊息 (> 表示未分配過消費者的訊息)
xreadgrup group group1 consumer1 count 1 streams stream1 > 

Pending 等待列表

通過 xreadgroup 讀取訊息時訊息會分配給對應的消費者,每個消費者內都維護了一個Pending列表用於儲存接收到的訊息,當訊息ack後會從pending列表內移除,也就是說pending列表內維護的是所有未ack的訊息id

每個Pending的訊息有4個屬性:

  • 訊息ID
  • 所屬消費者
  • IDLE,已讀取時長
  • delivery counter,訊息被讀取次數

XPENDING key group [start end count] [consumer]

#檢視pending列表
# 檢視group1組內的consumer1的pending列表 - 表示最小的訊息id + 表示最大的訊息ID
xpending stream1 group1 - + 10 consumer1
# 檢視group1組內的所有消費者pending類表
xpending stream1 group1 - + 10 

訊息確認

當消費者消費了訊息,需要通過 xack 命令確認訊息,xack後的訊息會從pending列表移除

XACK key gruopName ID

xack stream1 group1 xxx

訊息轉移

當消費者接收到訊息卻不能正確消費時(報錯或其他原因),可以使用 XCLAIM 將訊息轉移給其他消費者消費,需要設定組、轉移的目標消費者和訊息ID,同時需要提供IDLE(已被讀取時長),只有超過這個時長,才能被轉移。

通過xclaim轉移的訊息只是將訊息移入另一個消費者的pending列表,消費者並不能通過xreadgroup讀取到訊息,只能通過xpending讀取到。

# 表示將ID為 1553585533795-1 的訊息轉移到消費者B消費,前提是消費
XCLAIM stream1 group1 consumer1 3600000 1553585533795-1

資訊監控

redis提供了xinfo來檢視stream的資訊

#檢視sream資訊
xinfo stream steam1
#查詢消費者組資訊
xinfo groups group1 

#查詢消費者資訊
xinfo consumers consumer1

SpringBoot 整合

1 引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2 編寫消費者

@Slf4j
@Component
public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {

    public final String streamName      = "emailStream";
    public final String groupName       = "emailGroup";
    public final String consumerName    = "emailConsumer";


    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        //log.info("stream名稱-->{}",message.getStream());
        //log.info("訊息ID-->{}",message.getId());
        log.info("訊息內容-->{}",message.getValue());

        Map<String, String> msgMap = message.getValue();

        if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){
            //消費異常導致未能ack時,訊息會進入pending列表,我們可以啟動定時任務來讀取pending列表處理失敗的任務
            log.info("消費異常-->"+message);
           return;
        }

        StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
        //訊息應答
        streamOperations.acknowledge( streamName,groupName,message.getId() );

    }
	//我們可以啟動定時任務不斷監聽pending列表,處理死信訊息
}

3 設定redis

序列化設定

@EnableCaching
@Configuration
public class RedisConfig {


    /**
     * 設定redis序列化規則
     */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        return jackson2JsonRedisSerializer;
    }

    /**
     * RedisTemplate設定
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,
                                                       Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {

        // 設定redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();

        // key序列化
        redisTemplate.setKeySerializer(stringSerializer);
        // value序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

        // Hash key序列化
        redisTemplate.setHashKeySerializer(stringSerializer);
        // Hash value序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

消費者組和消費者設定

@Slf4j
@Configuration
public class RedisStreamConfig {

    @Autowired
    private EmailConsumer emailConsumer;

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block讀取超時時間
                .pollTimeout(Duration.ofSeconds(3))
                //count 數量(一次只獲取一條訊息)
                .batchSize(1)
                //序列化規則
                .serializer( stringRedisSerializer )
                .build();
    }

    /**
     * 開啟監聽器接收訊息
     */
    @Bean
    public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){

        StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 建立 stream 流
        if( !redisTemplate.hasKey(emailConsumer.streamName)){
            redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
            log.info("初始化stream {} success",emailConsumer.streamName);
        }

        //建立消費者組
        try {
            redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
        } catch (Exception e) {
            log.info("消費者組 {} 已存在",emailConsumer.groupName);
        }

        //註冊消費者 消費者名稱,從哪條訊息開始消費,消費者類
        // > 表示沒消費過的訊息
        // $ 表示最新的訊息
        listenerContainer.receive(
            Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
            StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
            emailConsumer
        );


        listenerContainer.start();
        return listenerContainer;
    }

}

4.生產者生產訊息

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){

    StreamOperations streamOperations = redisTemplate.opsForStream();

    for (int i = 0; i < count; i++) {
        AtomicInteger num = new AtomicInteger(i);

        Map msgMap = new HashMap();
        msgMap.put("count", i);
        msgMap.put("sID", num);
        //新增訊息
        streamOperations.add("emailStream",msgMap);
    }
    return "success";
}

參考檔案:

redis Stream 訊息佇列

SpringBoot整合redis stream 實現訊息佇列

到此這篇關於redis stream 實現訊息佇列的實踐的文章就介紹到這了,更多相關redis stream 訊息佇列內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com