首頁 > 軟體

SpringBoot中使用Redis Stream實現訊息監聽範例

2022-06-17 18:03:26

Demo環境

  • JDK8
  • Maven3.6.3
  • springboot2.4.3
  • redis_version:6.2.1

倉庫地址

https://gitee.com/hlovez/redismq.git.

POM依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>vip.huhailong</groupId>
	<artifactId>redismq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>redismq</name>
	<description>base redis stream mq</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

這裡是一個簡單的Demo,所以關於redis的一些序列化設定就省略了。

設定監聽訊息類

設定監聽訊息類,這裡類需要實現StreamListener介面,該介面下只有一個要實現的方法——onMessage方法,程式碼:

package vip.huhailong.redismq.redistool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

/**
 * @author Huhailong
 * @Description 監聽訊息
 * @Date 2021/3/10.
 */
@Slf4j
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> entries) {
        log.info("接受到來自redis的訊息");
        System.out.println("message id "+entries.getId());
        System.out.println("stream "+entries.getStream());
        System.out.println("body "+entries.getValue());
    }
    
}

設定完該類後我們再建立一個類將該監聽器注入進去,程式碼:

package vip.huhailong.redismq.config;

import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import vip.huhailong.redismq.redistool.ListenerMessage;

import java.time.Duration;

/**
 * @author Huhailong
 * @Description
 * @Date 2021/3/12.
 */
@Configuration
public class RedisStreamConfig {

    @Autowired
    private ListenerMessage streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }
}

程式碼分析:

首先將我們實現了StreamListener的監聽器類注入。

subscription方法返回的是一個Subscription型別,它是與當前正在執行任務的連結,可以理解為訂閱的連結,它有倆個方法

  • boolean await(Duration timeout): 當訂閱變為活動或超時時,同步阻塞呼叫將返回
  • boolean isActive(): 如果當前正在訂閱則為true

程式碼中的var是使用了Lombok的可變區域性變數。主要是為了方便

StreamMessageListenerContainer: 訊息偵聽容器,不能在外部實現。建立後,StreamMessageListenerContainer可以訂閱Redis流並使用傳入的訊息。 StreamMessageListenerContainer允許多個流讀取請求,併為每個讀取請求返回一個Subscription控制程式碼。取消訂閱最終將終止後臺輪詢。使用鍵和值序列化器轉換訊息以支援各種序列化策略。具體檔案

__StreamMessageListenerContainerOptions: __ 它是上面4的選項,程式碼中pollTimeout表示輪詢超時時間

create方法使用給定的RedisConnectionFactory和上面的設定選項建立偵聽器。

接下來使用receiveAutoAck建立一個新訂閱,注意這裡接受到訊息後會被自動的確認,如果不想自動確認請使用其他的建立訂閱方式。該方法共有三個引數

  • 消費組 consumer group ,它不能為null (Consumer型別)
  • stream offset ,stream的偏移量(StreamOffset 型別)
  • listener 不能為null (StreamListener<K,V> 型別)

程式碼中表示消費者來自名稱為mygroup的組,消費者名稱為huhailong,這裡偏移量設定為了lastConsumed,它表示讀取ID大於消費者組使用的最後一個元素的所有新到達的元素。

現在就可以執行專案來驗證了,將專案執行起來後通過終端給對應key的stream新增一條訊息

> XADD mystream * message springboot

這時可以看到spring boot的控制檯列印除了一下訊息:

message id 1615532778588-0
stream mystream
body {message=springboot}

說明偵聽成功,它會一直處於監聽狀態,只要對應key的stream新增了新的訊息都會被偵聽到,到此也就簡單的實現了訊息佇列功能。

監聽倆個stream的實現

有網友想實現倆個或倆個以上的stream監聽,可以這樣實現(有其他更好的方法請留言評論,謝謝)
在上面監聽一個stream的基礎上在RedisStreamConfig類裡增加監聽方法,如下:

@Configuration
public class RedisStreamConfig {

    @Autowired
    private ListenerMessage streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
		initStream("mystream","mygroup");	//詳細描述請看下方的問題補充——初始化key和group
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }

    @Bean
    public Subscription subscription2(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
		initStream("mystream","mygroup");	//詳細描述請看下方的問題補充——初始化key和group
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream2", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }
}

記得建立該stream和對應的組,讓後啟動程式,在控制檯模擬新增資料,如圖:

然後idea控制檯的列印可以看到分別接收到了來自不同的stream的訊息

其他類和設定與監聽一個的時候相同,不需要改變。

【問題補充】確認完訊息刪除訊息

訊息確認完後訊息實際上沒有在redis中消失,這也是redis stream中的一個特性,如果要想真正的刪除該訊息,需要我們進行指定欄位ID刪除,如下:

在RedisUtil中新增刪除方法:

public void delField(String key, String fieldId){
        redisTemplate.opsForStream().delete(key,fieldId);
}

然後再ListenerMessage中的onMessage方法裡處理完訊息後新增刪除該訊息的方法:

@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

    @Autowired
    RedisUtil redisUtil;

    @Override
    public void onMessage(MapRecord<String, String, String> entries) {
        log.info("接受到來自redis的訊息");
        System.out.println("message id "+entries.getId().getValue());
        System.out.println("stream "+entries.getStream());
        System.out.println("body "+entries.getValue());
        //下面是刪除該訊息的方法
        redisUtil.delField("mystream",entries.getId().getValue());
    }

}

【問題補充】自動初始化stream的key和group問題-最新更新-2021年12月4日

最近看到評論大部分有報找不到key和group的錯資訊,這個是因為沒有初始化導致的,當時在寫這個demo的時候為了簡單測試直接寫死了我已經存在的key和group,部分朋友不知道這個是需要初始化的,所以我更新了一下程式碼,增加了初始化key和group的方法,這樣大家直接執行程式碼就可以了,不需要在做多餘的操作。
RedisUtil中增加增加的程式碼:

public RecordId addStream(String key, Map<String,Object> message){
        RecordId add = redisTemplate.opsForStream().add(key, message);
        return add;	//返回增加後的id
    }

public void addGroup(String key, String groupName){
     redisTemplate.opsForStream().createGroup(key,groupName);
 }

/**
* 用來判斷key是否存在
*/
public boolean hasKey(String key){
     if(key==null){
         return false;
     }else{
         return redisTemplate.hasKey(key);
     }

 }

然後再RedisStreamConfig中增加初始化方法,這裡注意要判斷key是否存在:

private void initStream(String key, String group){
	//判斷key是否存在,如果不存在則建立
    boolean hasKey = redisUtil.hasKey(key);
    if(!hasKey){
        Map<String,Object> map = new HashMap<>();
        map.put("field","value");
        RecordId recordId = redisUtil.addStream(key, map);
        redisUtil.addGroup(key,group);
        //將初始化的值刪除掉
        redisUtil.delField(key,recordId.getValue());
        log.info("stream:{}-group:{} initialize success",key,group);
    }
}

然後再上面設定監聽的方法裡呼叫這個初始化方法就可以了

@Bean
    public Subscription subscription(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
        initStream("mystream","mygroup");	//呼叫初始化
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }

https://gitee.com/hlovez/redismq.git已更新,程式碼已優化整理上傳

到此這篇關於SpringBoot中使用Redis Stream實現訊息監聽範例的文章就介紹到這了,更多相關SpringBoot 訊息監聽內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com