首頁 > 軟體

詳解Redis Stream做訊息佇列

2022-09-26 14:06:23

List

眾所周知redis資料結構中的list的lpush與rpop可以用於常規訊息佇列,從集合的最左端寫入,最右端彈出消費。並且支援多個生產者與多個消費者並行拿資料,資料只能由一個消費者拿到。

但這個方案並不能保證消費者消費訊息後是否成功處理的問題(服務掛掉或處理異常等),機制屬於對等模式不能做廣播模式(釋出/訂閱模式)

Pub/sub

於是redis提供了相應的釋出訂閱功能,為了解除對等的強繫結模式引入了Channel管道

當生產者向管道中釋出訊息,訂閱了該管道的消費者能夠同時接收到該訊息,而且為了簡化訂閱多個管道需要顯式關注多個名稱提供了pattern能力。

通過名稱匹配如果接收訊息的頻道wmyskxz.chat,consumer3也會收到訊息。

但這個方案也有很大的詬病就是不會持久化,如果服務掛掉重啟資料就全丟棄了,也沒有提供ack機制,不保證資料可靠性,不管有沒有消費成功發後既忘。

Stream

stream的話結構很像kafka的設計思想,提供了consumer group和offset機制,結構上感覺跟kafka的topic差不多,只是沒有對應partation副本機制,而是一個追加訊息的連結串列結構。使用者端呼叫XADD時候自動建立stream。每個訊息都會持久化並存在唯一的id標識

Consumer Group

消費者組的概念跟kafka的消費者概念如出一轍,消費者既可以用XREAD命令進行獨立消費,也可以多個消費者同時加入一個消費者組。一條訊息只能由一個消費者組中的一個消費者消費。這樣可以在分散式系統中保證訊息的唯一性。

其實這個特性我後來仔細琢磨了一下當時自認為無懈可擊的流式圖表為了保證分散式系統訊息唯一做了redis分散式鎖。有點雞肋,明明消費者組已經保證了資料的唯一性。只能說加鎖可以壓縮資源成本

last_delivered_id

用於標識消費者組消費在stream上消費位置的遊標,每個消費者組都有一個stream內唯一的名稱,消費者組不會自動建立,需要用XGROUP CREATE顯式建立。

pending_ids

每個消費者內部都有一個狀態變數。用來表示已經被使用者端消費但沒有ack的消費。目的是為了保證使用者端至少消費了訊息一次(atleastonce)。如果消費者收到了訊息處理完了但是沒有回覆ack,就會導致列表不斷增長,如果有很多消費組的話,那麼這個列表佔用的記憶體就會放大

curd

  • xadd 追加訊息
  • xdel 刪除訊息,這裡的刪除僅僅是設定了標誌位,不影響訊息總長度
  • xrange 獲取訊息列表,會自動過濾已經刪除的訊息
  • xlen 訊息長度
  • del 刪除Stream

pending_ids如何避免訊息丟失

在使用者端消費者讀取Stream訊息時,Redis伺服器將訊息回覆給使用者端的過程中,使用者端突然斷開了連線,訊息就丟失了。

但是pending_ids裡已經儲存了發出去的訊息ID。待使用者端重新連上之後,可以再次收到pending_ids中的訊息ID列表。

不過此時xreadgroup的起始訊息必須是任意有效的訊息ID,一般將引數設為0-0,表示讀取所有的pending_ids訊息以及自last_delivered_id之後的新訊息。

嵌入SpringBoot

redis stream雖然還是有一些弊端,但是相比較而言用kafka之類的訊息元件太重,redis用作訊息佇列已經很合適了。

這裡簡單提一下思路,本質上是提供一個管理訊息的一個小功能,定義一個註解用於建立stream管道

建立一個註解類,標註該註解的類必須繼承StreamListener<String, ObjectRecord<String, Object>>類且重寫onMessage方法。方法上也加這個註解

建立一個config類實現BeanPostProcessor介面,重寫bean宣告週期postProcessAfterInitializationpostProcessBeforeInitialization方法。該方法會在spring啟動流程裡的refresh方法載入bean的宣告週期中掃描到所有加了註解的bean。

通過執行緒池挨個建立stream的group組與stream的consumer監聽連線,config類記得繼承DisposableBean類在destroy方法裡把連線關掉免得oom。

註冊redis stream api提供的consumer容器

這裡一定注意pollTimeout引數,看名字就知道預設拉取資料時間間隔,這個引數如果寫的值很小或者寫0,你就看你cpu高不高就完了。

@Bean("listenerContainer")
@DependsOn(value = "redisConnectionFactory")
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() {
   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>>
   options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
         .batchSize(10)
         .serializer(new StringRedisSerializer())
         .executor(new ForkJoinPool())
         .pollTimeout(Duration.ofSeconds(3))
         .targetType(Object.class)
         .build();
   return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}

建立消費者

private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
   StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream();

   if (stringRedisTemplate.hasKey(streamKey)) {
      StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey);

      AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false);

      groups.forEach(groupInfo -> {
         if (Objects.equals(group, groupInfo.getRaw().get("name"))) {
            groupHasKey.set(true);
         }
      });

      if (groups.isEmpty() || !groupHasKey.get()) {
         creatGroup(streamKey, group);
      } else {
         groups.stream().forEach(g -> {
            log.info("XInfoGroups:{}", g);
            StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName());
            log.info("XInfoConsumers:{}", consumers);
         });
      }
   } else {
      creatGroup(streamKey, group);
   }
   StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
   Consumer consumer = Consumer.from(group, consumerName);

   Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener);
   listenerContainer.start();
   this.containerList.add(listenerContainer);
   return subscription;
}

到此這篇關於詳解Redis Stream做訊息佇列的文章就介紹到這了,更多相關Redis Stream內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com