<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Redis5.0帶來了Stream型別。從字面上看是流型別,但其實從功能上看,應該是Redis對訊息佇列(MQ,Message Queue)的完善實現。
基於redis實現訊息佇列的方式有很多:
釋出訂閱優點: 典型的一對的,所有消費者都能同時消費到訊息。主動通知訂閱者而不是訂閱者輪詢去讀。
釋出訂閱缺點: 不支援多個消費者公平消費訊息,訊息沒有持久化,不管訂閱者是否收到訊息,訊息都會丟失。
使用場景:微服務間的訊息同步,如 分散式webSocker,資料同步等。
生產者通過lpush生成訊息,消費者通過blpop阻塞讀取訊息。
**list佇列優點:**支援多個消費者公平消費訊息,對訊息進行儲存,可以通過lrange查詢佇列內的訊息。
**list佇列缺點:**blpop仍然會阻塞當前連線,導致連線不可用。一旦blpop成功訊息就丟棄了,期間如果伺服器宕機訊息會丟失,不支援一對多消費者。
生產者通過zadd 建立訊息時指定分數,可以確定訊息的順序,消費者通過zrange獲取訊息後進行消費,消費完後通zrem刪除訊息。
zset優點: 保證了訊息的順序,消費者消費失敗後重新入隊不會打亂消費順序。
zset缺點: 不支援一對多消費,多個消費者消費時可能出現讀取同一條訊息的情況,得通過加鎖或其他方式解決消費的冪等性。
zset使用場景:由於資料是有序的,常常被用於延遲佇列,如 redisson的DelayQueue
Redis5.0帶來了Stream型別。從字面上看是流型別,但其實從功能上看,應該是Redis對訊息佇列(MQ,Message Queue)的完善實現。
參考kafka的思想,通過多個消費者組和消費者支援一對多消費,公平消費,消費者內維護了pending列表防止訊息丟失。
提供訊息ack機制。
往 stream 內建立訊息 語法為:
XADD key ID field string [field string …]
# * 表示自動生成id redis會根據時間戳+序列號自動生成id,不建議我們自己指定id xadd stream1 * name zs age 23
讀取stream內的訊息,這個並不是消費,只是提供了檢視資料的功能,語法為:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#表示從 stream1 內取出一條訊息,從第0條訊息讀取(0表示最小的id) xread count 1 streams stream1 0 #表示從 stream1 內 id=1649143363972-0 開始讀取一條訊息,讀取的是指定id的下一條訊息 xread count 1 streams msg 1649143363972-0 #表示一直阻塞讀取最新的訊息($表示獲取下一個生成的訊息) xread count 1 block 0 streams stream1 $ xrange stream - + 10
XRANGE key startID endID count
#表示從stream1內取10條訊息 起始位置為 -(最小ID) 結束位置為+(最大ID) xrange stream1 - + 10
redis stream 借鑑了kafka的設計,採用了消費者和消費者組的概念。允許多個消費者組消費stream的訊息,每個消費者組都能收到完整的訊息,例如:stream內有10條訊息,消費者組A和消費者組B同時消費時,都能獲取到這10條訊息。
每個消費者組內可以有多個消費者消費,訊息會平均分攤給各個消費者,例如:stream有10條訊息,消費者A,B,C同時在同一個組內消費,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9
建立消費者組:
#消費訊息首先得建立消費者組 # 表示為佇列 stream1 建立一個消費者組 group1 從訊息id=0(第一條訊息)開始讀取訊息 xgroup create stream1 group1 0 #查詢stream1內的所有消費者組資訊 xinfo groups stream1
通過xreadgroup可以在消費者組內建立消費者消費訊息
XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#建立消費者讀取訊息 #在group1消費者組內通過consumer1消費stream1內的訊息,消費1條未分配的訊息 (> 表示未分配過消費者的訊息) xreadgrup group group1 consumer1 count 1 streams stream1 >
通過 xreadgroup 讀取訊息時訊息會分配給對應的消費者,每個消費者內都維護了一個Pending列表用於儲存接收到的訊息,當訊息ack後會從pending列表內移除,也就是說pending列表內維護的是所有未ack的訊息id
每個Pending的訊息有4個屬性:
XPENDING key group [start end count] [consumer]
#檢視pending列表 # 檢視group1組內的consumer1的pending列表 - 表示最小的訊息id + 表示最大的訊息ID xpending stream1 group1 - + 10 consumer1 # 檢視group1組內的所有消費者pending類表 xpending stream1 group1 - + 10
當消費者消費了訊息,需要通過 xack
命令確認訊息,xack後的訊息會從pending列表移除
XACK key gruopName ID
xack stream1 group1 xxx
當消費者接收到訊息卻不能正確消費時(報錯或其他原因),可以使用 XCLAIM
將訊息轉移給其他消費者消費,需要設定組、轉移的目標消費者和訊息ID,同時需要提供IDLE(已被讀取時長),只有超過這個時長,才能被轉移。
通過xclaim轉移的訊息只是將訊息移入另一個消費者的pending列表,消費者並不能通過xreadgroup讀取到訊息,只能通過xpending讀取到。
# 表示將ID為 1553585533795-1 的訊息轉移到消費者B消費,前提是消費 XCLAIM stream1 group1 consumer1 3600000 1553585533795-1
redis提供了xinfo來檢視stream的資訊
#檢視sream資訊 xinfo stream steam1 #查詢消費者組資訊 xinfo groups group1 #查詢消費者資訊 xinfo consumers consumer1
1 引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2 編寫消費者
@Slf4j @Component public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> { public final String streamName = "emailStream"; public final String groupName = "emailGroup"; public final String consumerName = "emailConsumer"; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public void onMessage(MapRecord<String, String, String> message) { //log.info("stream名稱-->{}",message.getStream()); //log.info("訊息ID-->{}",message.getId()); log.info("訊息內容-->{}",message.getValue()); Map<String, String> msgMap = message.getValue(); if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){ //消費異常導致未能ack時,訊息會進入pending列表,我們可以啟動定時任務來讀取pending列表處理失敗的任務 log.info("消費異常-->"+message); return; } StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream(); //訊息應答 streamOperations.acknowledge( streamName,groupName,message.getId() ); } //我們可以啟動定時任務不斷監聽pending列表,處理死信訊息 }
3 設定redis
序列化設定
@EnableCaching @Configuration public class RedisConfig { /** * 設定redis序列化規則 */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(om); return jackson2JsonRedisSerializer; } /** * RedisTemplate設定 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) { // 設定redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); // key序列化 redisTemplate.setKeySerializer(stringSerializer); // value序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // Hash key序列化 redisTemplate.setHashKeySerializer(stringSerializer); // Hash value序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
消費者組和消費者設定
@Slf4j @Configuration public class RedisStreamConfig { @Autowired private EmailConsumer emailConsumer; @Autowired private RedisTemplate<String,Object> redisTemplate; @Bean public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){ StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); return StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() //block讀取超時時間 .pollTimeout(Duration.ofSeconds(3)) //count 數量(一次只獲取一條訊息) .batchSize(1) //序列化規則 .serializer( stringRedisSerializer ) .build(); } /** * 開啟監聽器接收訊息 */ @Bean public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){ StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory, streamMessageListenerContainerOptions); //如果 流不存在 建立 stream 流 if( !redisTemplate.hasKey(emailConsumer.streamName)){ redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", "")); log.info("初始化stream {} success",emailConsumer.streamName); } //建立消費者組 try { redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName); } catch (Exception e) { log.info("消費者組 {} 已存在",emailConsumer.groupName); } //註冊消費者 消費者名稱,從哪條訊息開始消費,消費者類 // > 表示沒消費過的訊息 // $ 表示最新的訊息 listenerContainer.receive( Consumer.from(emailConsumer.groupName, emailConsumer.consumerName), StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()), emailConsumer ); listenerContainer.start(); return listenerContainer; } }
4.生產者生產訊息
@GetMapping("/redis/ps") public String redisPublish(String content,Integer count){ StreamOperations streamOperations = redisTemplate.opsForStream(); for (int i = 0; i < count; i++) { AtomicInteger num = new AtomicInteger(i); Map msgMap = new HashMap(); msgMap.put("count", i); msgMap.put("sID", num); //新增訊息 streamOperations.add("emailStream",msgMap); } return "success"; }
SpringBoot整合redis stream 實現訊息佇列
到此這篇關於redis stream 實現訊息佇列的實踐的文章就介紹到這了,更多相關redis stream 訊息佇列內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45