<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
依賴
<!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- Redis-Jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
application.yaml組態檔
spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 4000 jedis: pool: max-wait: -1 max-active: -1 max-idle: 20 min-idle: 10
public class ObjectMapperConfig { public static final ObjectMapper objectMapper; private static final String PATTERN = "yyyy-MM-dd HH:mm:ss"; static { JavaTimeModule javaTimeModule = new JavaTimeModule(); javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer()); javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer()); objectMapper = new ObjectMapper() // 轉換為格式化的json(控制檯列印時,自動格式化規範) //.enable(SerializationFeature.INDENT_OUTPUT) // Include.ALWAYS 是序列化對像所有屬性(預設) // Include.NON_NULL 只有不為null的欄位才被序列化,屬性為NULL 不序列化 // Include.NON_EMPTY 如果為null或者 空字串和空集合都不會被序列化 // Include.NON_DEFAULT 屬性為預設值不序列化 .setSerializationInclusion(JsonInclude.Include.NON_NULL) // 如果是空物件的時候,不拋異常 .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) // 反序列化的時候如果多了其他屬性,不丟擲異常 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) // 取消時間的轉化格式,預設是時間戳,可以取消,同時需要設定要表現的時間格式 .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) .setDateFormat(new SimpleDateFormat(PATTERN)) // 對LocalDateTime序列化跟反序列化 .registerModule(javaTimeModule) .setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY) // 此項必須設定,否則會報java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX .enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY) ; } static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> { @Override public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException { gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN))); } } static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> { @Override public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException { return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN)); } } }
@Configuration public class RedisConfig { /** * redisTemplate設定 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 設定連線工廠 template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(預設使用JDK的序列化方式) Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class); jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // 使用StringRedisSerializer來序列化和反序列化redis的key,value採用json序列化 template.setKeySerializer(stringRedisSerializer); template.setValueSerializer(jacksonSerializer); // 設定hash key 和value序列化模式 template.setHashKeySerializer(stringRedisSerializer); template.setHashValueSerializer(jacksonSerializer); template.afterPropertiesSet(); return template; } }
在Redis中,List型別是按照插入順序排序的字串連結串列。和資料結構中的普通連結串列一樣,我們可以在其頭部和尾部新增新的元素
優勢:
生產者
@Slf4j @Component public class MessageProducer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; public void lPush() { for (int i = 0; i < 10; i++) { new Thread(() -> { Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world"); log.info(Thread.currentThread().getName() + ":put message size = " + size); }).start(); } } }
消費者:消費訊息,定時器以達到監聽佇列功能
@Slf4j @Component @EnableScheduling public class MessageConsumer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; @Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000) public void rPop() { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info(message); } }
@RestController public class RedisController { @Autowired private MessageProducer messageProducer; @GetMapping("/lPush") public void lPush() { messageProducer.lPush(); } }
測試
http://localhost:8080/lPush
可能出現的問題:
1.通過定時器監聽List中是否有待處理訊息,每執行一次都會發起一次連線,這會造成不必要的浪費。
2.生產速度大於消費速度,佇列堆積,訊息時效性差,佔用記憶體。
修改訊息消費者程式碼。
當佇列沒有元素時,會阻塞10秒,然後再次監聽佇列,
需要注意的是,阻塞時間必須小於連線超時時間
@Slf4j @Component @EnableScheduling public class MessageConsumer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; //@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000) public void rPop() { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info(message); } @PostConstruct public void brPop() { new Thread(() -> { while (true) { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS); log.info(message); } }).start(); } }
阻塞時間不能為負,直接報錯超時為負
阻塞時間為零,此時阻塞時間等於超時時間,最後報錯連線超時
阻塞時間大於超時時間,報錯連線超時
測試:
訊息不可重複消費,因為訊息從佇列POP之後就被移除了,即不支援多個消費者消費同一批資料
訊息丟失,消費期間發生異常,訊息未能正常消費
訊息可以重複消費,多個消費者訂閱同一頻道即可
一個消費者根據匹配規則訂閱多個頻道
消費者只能消費訂閱之後釋出的訊息,這意味著,消費者下線再上線這期間釋出的訊息將會丟失
資料不具有持久化。同樣Redis宕機也會資料丟失
訊息釋出後,是推播到一個緩衝區(記憶體),消費者從緩衝區拉取訊息,當訊息堆積,緩衝區溢位,消費者就會被迫下線,同時釋放對應的緩衝區
RedisConfig中新增監聽器
/** * redis訊息監聽器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱頻道,萬用字元*表示任意多個預留位置 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); return container; }
訂閱者
package com.yzm.redis08.message; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; public class MySubscribe implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { System.out.println("訂閱頻道:" + new String(message.getChannel())); System.out.println("接收資料:" + new String(message.getBody())); } }
訊息釋出
@GetMapping("/publish") public void publish() { redisTemplate.convertAndSend("channel_first", "hello world"); }
另一種釋出方式
/** * redis訊息監聽器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱頻道,萬用字元*表示任意多個預留位置 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); // 萬用字元?:表示一個預留位置 MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage"); listenerAdapter.afterPropertiesSet(); container.addMessageListener(listenerAdapter, new PatternTopic("channel?")); return container; }
public class MySubscribe2 { public void getMessage(Object message, String channel) { System.out.println("訂閱頻道2:" + channel); System.out.println("接收資料2:" + message); } }
@GetMapping("/publish2") public void publish2() { redisTemplate.convertAndSend("channel2", "hello world"); }
訊息是實體物件,進行轉換
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class User implements Serializable { private static final long serialVersionUID = 5250232737975907491L; private Integer id; private String username; }
public class MySubscribe3 implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class); jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper); User user = jacksonSerializer.deserialize(message.getBody()); System.out.println("訂閱頻道3:" + new String(message.getChannel())); System.out.println("接收資料3:" + user); } }
/** * redis訊息監聽器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱頻道,萬用字元*:表示任意多個預留位置 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); // 萬用字元?:表示一個預留位置 MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage"); listenerAdapter.afterPropertiesSet(); container.addMessageListener(listenerAdapter, new PatternTopic("channel?")); container.addMessageListener(new MySubscribe3(), new PatternTopic("user")); return container; }
@GetMapping("/publish3") public void publish3() { User user = User.builder().id(1).username("yzm").build(); redisTemplate.convertAndSend("user", user); }
生產訊息,score = 時間搓+60s亂數
public static final String MESSAGE_ZKEY = "message:ZSetqueue"; public volatile AtomicInteger count = new AtomicInteger(); public void zAdd() { for (int i = 0; i < 10; i++) { new Thread(() -> { int increment = count.getAndIncrement(); log.info(Thread.currentThread().getName() + ":put message to zset = " + increment); double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000); redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score); }).start(); } }
消費者:定時任務,每秒執行一次
public static final String MESSAGE_ZKEY = "message:ZSetqueue"; public SimpleDateFormat simpleDateFormat = new SimpleDateFormat(); @Scheduled(initialDelay = 5 * 1000, fixedRate = 1000) public void zrangebysocre() { log.info("延時佇列消費。。。"); // 拉取score小於當前時間戳的訊息 Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis()); if (messages != null) { for (Object message : messages) { Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message); log.info("消費了:" + message + "消費時間為:" + simpleDateFormat.format(score)); redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message); } } }
@GetMapping("/zadd") public void zadd() { messageProducer.zAdd(); }
到此這篇關於springboot整合redis之訊息佇列的文章就介紹到這了,更多相關springboot redis訊息佇列內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45