首頁 > 軟體

SpringBoot整合Redis實現訊息釋出與訂閱的範例程式碼

2022-08-09 18:02:17

當我們在多個叢集應用中使用到本地快取時,在資料庫資料得到更新後,為保持各個副本當前被修改的資料與資料庫資料保持同步,在資料被操作後向其他叢集應用發出被更新資料的通知,使其刪除;下次當其他應用請求該被更新的資料時,應用會到資料庫去取,也就是最新的資料,從而使得被更新資料與資料庫保持同步!

能實現傳送與接收資訊的中間介有很多,比如:RocketMQ、RabbitMQ、ActiveMQ、Kafka等,本次主要簡單介紹Redis的推播與訂閱功能並整合Spring Boot的實現。

1.新增SpringBoot 整合Redis maven依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.設定Redis設定 RedisConfig.java

@Configuration
public class RedisConfig {
    @Value("${redis.server.nodes}")
    private String redisServerNodes;
 
    @Value("${redis.server.password}")
    private String redisServerPassword;
 
    //Redis叢集設定,單機的redis註釋掉該方法,在application組態檔裡面設定就可以了
    @Bean
    public RedisClusterConfiguration getRedisClusterConfiguration() {
 
        RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
 
        String[] serverArray = redisServerNodes.split(",");
        Set<RedisNode> nodes = new HashSet<RedisNode>();
        for (String ipPort : serverArray) {
            String[] ipAndPort = ipPort.split(":");
            nodes.add(new RedisNode(ipAndPort[0].trim(), Integer.parseInt(ipAndPort[1])));
        }
        redisClusterConfiguration.setClusterNodes(nodes);
        RedisPassword pwd = RedisPassword.of(redisServerPassword);
        redisClusterConfiguration.setPassword(pwd);
        return redisClusterConfiguration;
    }
 
    //指定redisTemplate型別,如下為<String, Object>
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        // 使用JSON格式序列化物件,對快取資料key和value進行轉換
        Jackson2JsonRedisSerializer<Object> jacksonSeial = new Jackson2JsonRedisSerializer<>(Object.class);
        // 解決查詢快取轉換異常的問題
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(objectMapper);
 
        // 設定RedisTemplate模板API的序列化方式為JSON
        template.setDefaultSerializer(jacksonSeial);
        return template;
    }
 
     /**
     * Redis訊息監聽器容器
     * 這個容器載入了RedisConnectionFactory和訊息監聽器
     * 可新增多個不同話題的redis監聽器,需要將訊息監聽器和訊息頻道繫結,
     * 通過反射呼叫訊息訂閱處理器的相關方法進行業務處理
     *
     * @param  redisConnectionFactory       連線工廠
     * @param  listener                     Redis訊息監聽器
     * @param  MessageListenerAdapter       Redis訊息監聽介面卡
     * @return RedisMessageListenerContainer    訊息監聽容器
     */
    @Bean
    @SuppressWarnings("all")
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
                                                   RedisMessageListener listener,
                                                   MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        // 所有的訂閱訊息,都需要在這裡進行註冊繫結
        // new PatternTopic(TOPIC_NAME1) 表示釋出資訊的頻道
        // 可以新增多個頻道以及設定不同的頻道
        container.addMessageListener(listener, new PatternTopic(SystemConstants.TOPIC_NAME1));
        container.addMessageListener(adapter, new PatternTopic(SystemConstants.TOPIC_NAME2));
 
        /**
         * 設定序列化物件
         * 特別注意:1. 釋出的時候和訂閱方都需要設定序列化
         *         2. 設定序列化物件必須放在 {加入訊息監聽器} 這步後面,不然接收器接收不到訊息
         */
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);
        return container;
    }
 
    /**
     * 這個地方是給messageListenerAdapter 傳入一個訊息接受的處理器,利用反射的方法呼叫「receiveMessage」
     * 也有好幾個過載方法,這邊預設呼叫處理器的方法 叫OnMessage
     *
     * @param redisMessageReceiver 訊息接收物件
     * @return 訊息監聽介面卡
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisMessageReceiver redisMessageReceiver) {
        MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "onMessage");
        Jackson2JsonRedisSerializer<Object> seria = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        receiveMessage.setSerializer(seria);
        return receiveMessage;
    }
}

3.Redis的訂閱主要有兩種實現方式

方式一:編寫Redis監聽類RedisMessageListener,實現Redis的監聽介面MessageListener,並重寫onMessage方法

方式二:編寫Redis訊息監聽介面卡類,並在RedisConfig.java中設定訊息監聽介面卡bean

方式一 與 方式二 主要是實現訂閱Redis推播的訊息後的具體操作,這兩種方式可以同時使用來訂閱多個頻道里的訊息

//方式一:
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
 
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private CacheManager cacheManager;
 
    @Override
    public void onMessage(Message message, byte[] pattern) {
    // 接收的topic
    log.info("接收訊息頻道:" + new String(pattern));
    //序列化物件(特別注意:釋出的時候需要設定序列化;訂閱方也需要設定序列化)
    MessageDto<?> messageDto = (MessageDto<?>) redisTemplate.getValueSerializer().deserialize(message.getBody());
    //MessageDto<T>為自己編寫的一個訊息物件類(如自定義有:String data,String title,T content 等屬性)
    log.info("接收訊息內容:{}", messageDto);
    //根據訊息內容做具體業務邏輯。。。。。。。。。
    //。。。。。。。。。。。。。。。。。。。。。。
    }
}
//方式二
@Slf4j
@Component
public class RedisMessageReceiver {
 
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;
 
    /**
     * 方法命名規則必須為function(Object messageDto) / function(Object messageDto,String topic)
     * @param messageDto 訊息物件
     * @param topic 訊息頻道名稱
     */
    public void onMessage(Object messageDto , String topic) {
        // 接收訊息頻道
        log.info("接收訊息頻道:" + topic);
        //接收訊息內容
        log.info("接收訊息內容:{}",messageDto);
    }
}

4.編寫Redis訊息的推播工具類,在需要推播訊息的地方使用來向Redis推播訊息(如:運算元據的地方)

@Slf4j
@Component
public class RedisMessageUtils {
 
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;
 
    /**
     * 向通道釋出訊息
     */
    public void sendMessage(String topic, Object message) {
        if (!StringUtils.hasText(topic)) {
            return;
        }
        try {
            redisTemplate.convertAndSend(topic, message);
            log.info("傳送訊息成功,topic:{},message:{}", topic, message);
        } catch (Exception e) {
            log.info("傳送訊息失敗,topic:{},message:{}", topic, message);
            e.printStackTrace();
        }
    }
}

5.使用

@RestController
@RequestMapping("/user")
public class UserController{
 
  @Autowired
  private UserService userService;
 
  @PostMapping("/getUsers")  
  public List<User> queryUsers(@RequestBody UserDto userDto){
   List<User> users=userService.queryUsers(userDto);
   //傳送測試訊息
   RedisMessageUtils.sendMessage(SystemConstants.TOPIC_NAME2, new MessageDto());
   return users;
  }
}

成功範例:

2099-12-31 23:59:59.999 [http-nio-8888-exec-1] INFO  com.xxx.yyy.util.RedisMessageUtils : 傳送訊息成功,topic:TOPIC2,message:MessageDto(data=null, title=null, content=null)
2099-12-31 23:59:59.999 [container-2] INFO  com.xxx.yyy.zzz.RedisMessageReceiver : 接收訊息頻道:TOPIC2
2099-12-31 23:59:59.999 [container-2] INFO  com.xxx.yyy.zzz.RedisMessageReceiver : 接收訊息內容:MessageDto(data=null, title=null, content=null)

以上就是SpringBoot整合Redis實現訊息釋出與訂閱的範例程式碼的詳細內容,更多關於SpringBoot Redis訊息釋出 訂閱的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com