首頁 > 軟體

springboot整合redis之訊息佇列

2022-06-29 22:01:23

一、專案准備

依賴

        <!-- RedisTemplate -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!-- Redis-Jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

application.yaml組態檔

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    timeout: 4000
    jedis:
      pool:
        max-wait: -1
        max-active: -1
        max-idle: 20
        min-idle: 10

二、設定類

public class ObjectMapperConfig {

    public static final ObjectMapper objectMapper;
    private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";

    static {
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());
        javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());
        objectMapper = new ObjectMapper()
                // 轉換為格式化的json(控制檯列印時,自動格式化規範)
                //.enable(SerializationFeature.INDENT_OUTPUT)
                // Include.ALWAYS  是序列化對像所有屬性(預設)
                // Include.NON_NULL 只有不為null的欄位才被序列化,屬性為NULL 不序列化
                // Include.NON_EMPTY 如果為null或者 空字串和空集合都不會被序列化
                // Include.NON_DEFAULT 屬性為預設值不序列化
                .setSerializationInclusion(JsonInclude.Include.NON_NULL)
                // 如果是空物件的時候,不拋異常
                .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
                // 反序列化的時候如果多了其他屬性,不丟擲異常
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                // 取消時間的轉化格式,預設是時間戳,可以取消,同時需要設定要表現的時間格式
                .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                .setDateFormat(new SimpleDateFormat(PATTERN))
                // 對LocalDateTime序列化跟反序列化
                .registerModule(javaTimeModule)

                .setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY)
                // 此項必須設定,否則會報java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX
                .enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY)
        ;
    }

    static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
        @Override
        public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
            gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN)));
        }
    }

    static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
        @Override
        public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException {
            return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN));
        }
    }

}
@Configuration
public class RedisConfig {

    /**
     * redisTemplate設定
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 設定連線工廠
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(預設使用JDK的序列化方式)
        Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // 使用StringRedisSerializer來序列化和反序列化redis的key,value採用json序列化
        template.setKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jacksonSerializer);

        // 設定hash key 和value序列化模式
        template.setHashKeySerializer(stringRedisSerializer);
        template.setHashValueSerializer(jacksonSerializer);
        template.afterPropertiesSet();

        return template;
    }
}

三、redis中list資料型別

在Redis中,List型別是按照插入順序排序的字串連結串列。和資料結構中的普通連結串列一樣,我們可以在其頭部和尾部新增新的元素

優勢:

  • 順序排序,保證先進先出
  • 佇列為空時,自動從Redis資料庫刪除
  • 在佇列的兩頭插入或刪除元素,效率極高,即使佇列中元素達到百萬級
  • List中可以包含的最大元素數量是4294967295

定時器監聽佇列

生產者

@Slf4j
@Component
public class MessageProducer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    public void lPush() {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world");
                log.info(Thread.currentThread().getName() + ":put message size = " + size);
            }).start();
        }
    }
}

消費者:消費訊息,定時器以達到監聽佇列功能

@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
    public void rPop() {
        String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
        log.info(message);
    }
}
@RestController
public class RedisController {

    @Autowired
    private MessageProducer messageProducer;

    @GetMapping("/lPush")
    public void lPush() {
        messageProducer.lPush();
    }
}

測試

http://localhost:8080/lPush

可能出現的問題:

1.通過定時器監聽List中是否有待處理訊息,每執行一次都會發起一次連線,這會造成不必要的浪費。

2.生產速度大於消費速度,佇列堆積,訊息時效性差,佔用記憶體。

執行即監控佇列

修改訊息消費者程式碼。

當佇列沒有元素時,會阻塞10秒,然後再次監聽佇列,
需要注意的是,阻塞時間必須小於連線超時時間

@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    //@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
    public void rPop() {
        String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
        log.info(message);
    }

    @PostConstruct
    public void brPop() {
        new Thread(() -> {
            while (true) {
                String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS);
                log.info(message);
            }
        }).start();
    }
}

阻塞時間不能為負,直接報錯超時為負
阻塞時間為零,此時阻塞時間等於超時時間,最後報錯連線超時
阻塞時間大於超時時間,報錯連線超時

測試:

訊息不可重複消費,因為訊息從佇列POP之後就被移除了,即不支援多個消費者消費同一批資料

訊息丟失,消費期間發生異常,訊息未能正常消費

四、釋出/訂閱模式

訊息可以重複消費,多個消費者訂閱同一頻道即可

一個消費者根據匹配規則訂閱多個頻道

消費者只能消費訂閱之後釋出的訊息,這意味著,消費者下線再上線這期間釋出的訊息將會丟失

資料不具有持久化。同樣Redis宕機也會資料丟失

訊息釋出後,是推播到一個緩衝區(記憶體),消費者從緩衝區拉取訊息,當訊息堆積,緩衝區溢位,消費者就會被迫下線,同時釋放對應的緩衝區

RedisConfig中新增監聽器

    /**
     * redis訊息監聽器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //訂閱頻道,萬用字元*表示任意多個預留位置
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));

        return container;
    }

訂閱者

package com.yzm.redis08.message;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

public class MySubscribe implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("訂閱頻道:" + new String(message.getChannel()));
        System.out.println("接收資料:" + new String(message.getBody()));
    }
}

訊息釋出

    @GetMapping("/publish")
    public void publish() {
        redisTemplate.convertAndSend("channel_first", "hello world");
    }

另一種釋出方式

    /**
     * redis訊息監聽器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //訂閱頻道,萬用字元*表示任意多個預留位置
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        // 萬用字元?:表示一個預留位置
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));

        return container;
    }
public class MySubscribe2 {

    public void getMessage(Object message, String channel) {
        System.out.println("訂閱頻道2:" + channel);
        System.out.println("接收資料2:" + message);
    }
}
    @GetMapping("/publish2")
    public void publish2() {
        redisTemplate.convertAndSend("channel2", "hello world");
    }

訊息是實體物件,進行轉換

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private static final long serialVersionUID = 5250232737975907491L;
    private Integer id;
    private String username;
}
public class MySubscribe3 implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class);
        jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
        User user = jacksonSerializer.deserialize(message.getBody());
        
        System.out.println("訂閱頻道3:" + new String(message.getChannel()));
        System.out.println("接收資料3:" + user);
    }
}
    /**
     * redis訊息監聽器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //訂閱頻道,萬用字元*:表示任意多個預留位置
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        // 萬用字元?:表示一個預留位置
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));

        container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));

        return container;
    }

    @GetMapping("/publish3")
    public void publish3() {
        User user = User.builder().id(1).username("yzm").build();
        redisTemplate.convertAndSend("user", user);
    }

五、ZSet實現延遲佇列

生產訊息,score = 時間搓+60s亂數

    public static final String MESSAGE_ZKEY = "message:ZSetqueue";
    public volatile AtomicInteger count =  new AtomicInteger();
    public void zAdd() {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                int increment = count.getAndIncrement();
                log.info(Thread.currentThread().getName() + ":put message to zset = " + increment);
                double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000);
                redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score);
            }).start();
        }
    }

消費者:定時任務,每秒執行一次

    public static final String MESSAGE_ZKEY = "message:ZSetqueue";
    public SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    @Scheduled(initialDelay = 5 * 1000, fixedRate = 1000)
    public void zrangebysocre() {
        log.info("延時佇列消費。。。");
        // 拉取score小於當前時間戳的訊息
        Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis());
        if (messages != null) {
            for (Object message : messages) {
                Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message);
                log.info("消費了:" + message + "消費時間為:" + simpleDateFormat.format(score));
                redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message);
            }
        }
    }
    @GetMapping("/zadd")
    public void zadd() {
        messageProducer.zAdd();
    }

 到此這篇關於springboot整合redis之訊息佇列的文章就介紹到這了,更多相關springboot redis訊息佇列內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com