<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
眾所周知redis資料結構中的list的lpush與rpop可以用於常規訊息佇列,從集合的最左端寫入,最右端彈出消費。並且支援多個生產者與多個消費者並行拿資料,資料只能由一個消費者拿到。
但這個方案並不能保證消費者消費訊息後是否成功處理的問題(服務掛掉或處理異常等),機制屬於對等模式不能做廣播模式(釋出/訂閱模式)
於是redis提供了相應的釋出訂閱功能,為了解除對等的強繫結模式引入了Channel管道
。
當生產者向管道中釋出訊息,訂閱了該管道的消費者能夠同時接收到該訊息,而且為了簡化訂閱多個管道需要顯式關注多個名稱提供了pattern能力。
通過名稱匹配如果接收訊息的頻道wmyskxz.chat,consumer3也會收到訊息。
但這個方案也有很大的詬病就是不會持久化,如果服務掛掉重啟資料就全丟棄了,也沒有提供ack機制,不保證資料可靠性,不管有沒有消費成功發後既忘。
stream的話結構很像kafka的設計思想,提供了consumer group和offset機制,結構上感覺跟kafka的topic差不多,只是沒有對應partation副本機制,而是一個追加訊息的連結串列結構。使用者端呼叫XADD時候自動建立stream。每個訊息都會持久化並存在唯一的id標識
消費者組的概念跟kafka的消費者概念如出一轍,消費者既可以用XREAD
命令進行獨立消費,也可以多個消費者同時加入一個消費者組。一條訊息只能由一個消費者組中的一個消費者消費。這樣可以在分散式系統中保證訊息的唯一性。
其實這個特性我後來仔細琢磨了一下當時自認為無懈可擊的流式圖表為了保證分散式系統訊息唯一做了redis分散式鎖。有點雞肋,明明消費者組已經保證了資料的唯一性。只能說加鎖可以壓縮資源成本
用於標識消費者組消費在stream上消費位置的遊標,每個消費者組都有一個stream內唯一的名稱,消費者組不會自動建立,需要用XGROUP CREATE
顯式建立。
每個消費者內部都有一個狀態變數。用來表示已經
被使用者端消費但沒有ack的消費。目的是為了保證使用者端至少消費了訊息一次(atleastonce
)。如果消費者收到了訊息處理完了但是沒有回覆ack,就會導致列表不斷增長,如果有很多消費組的話,那麼這個列表佔用的記憶體就會放大
在使用者端消費者讀取Stream訊息時,Redis伺服器將訊息回覆給使用者端的過程中,使用者端突然斷開了連線,訊息就丟失了。
但是pending_ids裡已經儲存了發出去的訊息ID。待使用者端重新連上之後,可以再次收到pending_ids中的訊息ID列表。
不過此時xreadgroup的起始訊息必須是任意有效的訊息ID,一般將引數設為0-0,表示讀取所有的pending_ids訊息以及自last_delivered_id之後的新訊息。
redis stream雖然還是有一些弊端,但是相比較而言用kafka之類的訊息元件太重,redis用作訊息佇列已經很合適了。
這裡簡單提一下思路,本質上是提供一個管理訊息的一個小功能,定義一個註解用於建立stream管道
建立一個註解類,標註該註解的類必須繼承StreamListener<String, ObjectRecord<String, Object>>類且重寫onMessage方法。方法上也加這個註解
建立一個config類實現BeanPostProcessor
介面,重寫bean宣告週期postProcessAfterInitialization
和postProcessBeforeInitialization
方法。該方法會在spring啟動流程裡的refresh方法載入bean的宣告週期中掃描到所有加了註解的bean。
通過執行緒池挨個建立stream的group組與stream的consumer監聽連線,config類記得繼承DisposableBean類在destroy方法裡把連線關掉免得oom。
這裡一定注意pollTimeout引數,看名字就知道預設拉取資料時間間隔,這個引數如果寫的值很小或者寫0,你就看你cpu高不高就完了。
@Bean("listenerContainer") @DependsOn(value = "redisConnectionFactory") public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) .serializer(new StringRedisSerializer()) .executor(new ForkJoinPool()) .pollTimeout(Duration.ofSeconds(3)) .targetType(Object.class) .build(); return StreamMessageListenerContainer.create(redisConnectionFactory, options); }
private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) { StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream(); if (stringRedisTemplate.hasKey(streamKey)) { StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey); AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false); groups.forEach(groupInfo -> { if (Objects.equals(group, groupInfo.getRaw().get("name"))) { groupHasKey.set(true); } }); if (groups.isEmpty() || !groupHasKey.get()) { creatGroup(streamKey, group); } else { groups.stream().forEach(g -> { log.info("XInfoGroups:{}", g); StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName()); log.info("XInfoConsumers:{}", consumers); }); } } else { creatGroup(streamKey, group); } StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed()); Consumer consumer = Consumer.from(group, consumerName); Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener); listenerContainer.start(); this.containerList.add(listenerContainer); return subscription; }
到此這篇關於詳解Redis Stream做訊息佇列的文章就介紹到這了,更多相關Redis Stream內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45