<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
https://gitee.com/hlovez/redismq.git.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>vip.huhailong</groupId> <artifactId>redismq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redismq</name> <description>base redis stream mq</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
這裡是一個簡單的Demo,所以關於redis的一些序列化設定就省略了。
設定監聽訊息類,這裡類需要實現StreamListener介面,該介面下只有一個要實現的方法——onMessage方法,程式碼:
package vip.huhailong.redismq.redistool; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; /** * @author Huhailong * @Description 監聽訊息 * @Date 2021/3/10. */ @Slf4j @Component public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> { @Override public void onMessage(MapRecord<String, String, String> entries) { log.info("接受到來自redis的訊息"); System.out.println("message id "+entries.getId()); System.out.println("stream "+entries.getStream()); System.out.println("body "+entries.getValue()); } }
設定完該類後我們再建立一個類將該監聽器注入進去,程式碼:
package vip.huhailong.redismq.config; import lombok.var; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.Subscription; import vip.huhailong.redismq.redistool.ListenerMessage; import java.time.Duration; /** * @author Huhailong * @Description * @Date 2021/3/12. */ @Configuration public class RedisStreamConfig { @Autowired private ListenerMessage streamListener; @Bean public Subscription subscription(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; } }
程式碼分析:
首先將我們實現了StreamListener的監聽器類注入。
subscription方法返回的是一個Subscription型別,它是與當前正在執行任務的連結,可以理解為訂閱的連結,它有倆個方法
程式碼中的var是使用了Lombok的可變區域性變數。主要是為了方便
StreamMessageListenerContainer: 訊息偵聽容器,不能在外部實現。建立後,StreamMessageListenerContainer可以訂閱Redis流並使用傳入的訊息。 StreamMessageListenerContainer允許多個流讀取請求,併為每個讀取請求返回一個Subscription控制程式碼。取消訂閱最終將終止後臺輪詢。使用鍵和值序列化器轉換訊息以支援各種序列化策略。具體檔案
__StreamMessageListenerContainerOptions: __ 它是上面4的選項,程式碼中pollTimeout表示輪詢超時時間
create方法使用給定的RedisConnectionFactory和上面的設定選項建立偵聽器。
接下來使用receiveAutoAck建立一個新訂閱,注意這裡接受到訊息後會被自動的確認,如果不想自動確認請使用其他的建立訂閱方式。該方法共有三個引數
程式碼中表示消費者來自名稱為mygroup的組,消費者名稱為huhailong,這裡偏移量設定為了lastConsumed,它表示讀取ID大於消費者組使用的最後一個元素的所有新到達的元素。
現在就可以執行專案來驗證了,將專案執行起來後通過終端給對應key的stream新增一條訊息
> XADD mystream * message springboot
這時可以看到spring boot的控制檯列印除了一下訊息:
message id 1615532778588-0
stream mystream
body {message=springboot}
說明偵聽成功,它會一直處於監聽狀態,只要對應key的stream新增了新的訊息都會被偵聽到,到此也就簡單的實現了訊息佇列功能。
有網友想實現倆個或倆個以上的stream監聽,可以這樣實現(有其他更好的方法請留言評論,謝謝)
在上面監聽一個stream的基礎上在RedisStreamConfig類裡增加監聽方法,如下:
@Configuration public class RedisStreamConfig { @Autowired private ListenerMessage streamListener; @Bean public Subscription subscription(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); initStream("mystream","mygroup"); //詳細描述請看下方的問題補充——初始化key和group var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; } @Bean public Subscription subscription2(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); initStream("mystream","mygroup"); //詳細描述請看下方的問題補充——初始化key和group var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream2", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; } }
記得建立該stream和對應的組,讓後啟動程式,在控制檯模擬新增資料,如圖:
然後idea控制檯的列印可以看到分別接收到了來自不同的stream的訊息
其他類和設定與監聽一個的時候相同,不需要改變。
訊息確認完後訊息實際上沒有在redis中消失,這也是redis stream中的一個特性,如果要想真正的刪除該訊息,需要我們進行指定欄位ID刪除,如下:
在RedisUtil中新增刪除方法:
public void delField(String key, String fieldId){ redisTemplate.opsForStream().delete(key,fieldId); }
然後再ListenerMessage中的onMessage方法裡處理完訊息後新增刪除該訊息的方法:
@Component public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> { @Autowired RedisUtil redisUtil; @Override public void onMessage(MapRecord<String, String, String> entries) { log.info("接受到來自redis的訊息"); System.out.println("message id "+entries.getId().getValue()); System.out.println("stream "+entries.getStream()); System.out.println("body "+entries.getValue()); //下面是刪除該訊息的方法 redisUtil.delField("mystream",entries.getId().getValue()); } }
最近看到評論大部分有報找不到key和group的錯資訊,這個是因為沒有初始化導致的,當時在寫這個demo的時候為了簡單測試直接寫死了我已經存在的key和group,部分朋友不知道這個是需要初始化的,所以我更新了一下程式碼,增加了初始化key和group的方法,這樣大家直接執行程式碼就可以了,不需要在做多餘的操作。
RedisUtil中增加增加的程式碼:
public RecordId addStream(String key, Map<String,Object> message){ RecordId add = redisTemplate.opsForStream().add(key, message); return add; //返回增加後的id } public void addGroup(String key, String groupName){ redisTemplate.opsForStream().createGroup(key,groupName); } /** * 用來判斷key是否存在 */ public boolean hasKey(String key){ if(key==null){ return false; }else{ return redisTemplate.hasKey(key); } }
然後再RedisStreamConfig中增加初始化方法,這裡注意要判斷key是否存在:
private void initStream(String key, String group){ //判斷key是否存在,如果不存在則建立 boolean hasKey = redisUtil.hasKey(key); if(!hasKey){ Map<String,Object> map = new HashMap<>(); map.put("field","value"); RecordId recordId = redisUtil.addStream(key, map); redisUtil.addGroup(key,group); //將初始化的值刪除掉 redisUtil.delField(key,recordId.getValue()); log.info("stream:{}-group:{} initialize success",key,group); } }
然後再上面設定監聽的方法裡呼叫這個初始化方法就可以了
@Bean public Subscription subscription(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); initStream("mystream","mygroup"); //呼叫初始化 var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; }
https://gitee.com/hlovez/redismq.git已更新,程式碼已優化整理上傳
到此這篇關於SpringBoot中使用Redis Stream實現訊息監聽範例的文章就介紹到這了,更多相關SpringBoot 訊息監聽內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45