<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
websocket的訂閱就是在前後端建立ws連線之後,前端通過傳送一定格式的訊息,後端解析出來去訂閱或者取消訂閱redis頻道。
訂閱頻道訊息格式:
{ "cmd":"subscribe", "topic":[ "topic_name" ] }
模糊訂閱格式
{ "cmd":"psubscribe", "topic":[ "topic_name" ] }
取消訂閱格式
{ "cmd":"unsubscribe", "topic":[ "topic_name" ] }
兩個核心類,一個是redis的訂閱監聽類,一個是websocket的釋出訂閱類。
package com.curtain.core; import com.curtain.config.GetBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import java.util.Arrays; /** * @Author Curtain * @Date 2021/6/7 14:27 * @Description */ @Component @Slf4j public class RedisPubSub extends JedisPubSub { private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class); private Jedis jedis; //訂閱 public void subscribe(String... channels) { jedis = jedisPool.getResource(); try { jedis.subscribe(this, channels); } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到異常後關閉連線重新訂閱 log.info("監聽遇到異常,四秒後重新訂閱頻道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } subscribe(channels); } } //模糊訂閱 public void psubscribe(String... channels) { Jedis jedis = jedisPool.getResource(); try { jedis.psubscribe(this, channels); } catch (ArithmeticException e) {//取消訂閱故意造成的異常 if (jedis != null) jedis.close(); } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到異常後關閉連線重新訂閱 log.info("監聽遇到異常,四秒後重新訂閱頻道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } psubscribe(channels); } } public void unsubscribeAndClose(String... channels){ unsubscribe(channels); if (jedis != null && !isSubscribed()) jedis.close(); } public void punsubscribeAndClose(String... channels){ punsubscribe(channels); if (jedis != null && !isSubscribed()) jedis.close(); } @Override public void onSubscribe(String channel, int subscribedChannels) { log.info("subscribe redis channel:" + channel + ", 執行緒id:" + Thread.currentThread().getId()); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { log.info("psubscribe redis channel:" + pattern + ", 執行緒id:" + Thread.currentThread().getId()); } @Override public void onPMessage(String pattern, String channel, String message) { log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 執行緒id:" + Thread.currentThread().getId()); WebSocketServer.publish(message, pattern); WebSocketServer.publish(message, channel); } @Override public void onMessage(String channel, String message) { log.info("receive from redis channal: " + channel + ",message:" + message + ", 執行緒id:" + Thread.currentThread().getId()); WebSocketServer.publish(message, channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { log.info("unsubscribe redis channel:" + channel); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { log.info("punsubscribe redis channel:" + pattern); } }
1.jedis監聽redis頻道的時候如果遇見異常會關閉連線導致後續沒有監聽該頻道,所以這裡在subscribe捕獲到異常的時候會重新建立一個jedis連線訂閱該redis頻道。
這個類會有兩個ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>型別類變數,分別儲存訂閱和模糊訂閱的資訊。
外面一層的String對應的值是topic_name,裡面一層的String對應的值是sessionId。前端傳送過來的訊息裡面對應的這三類操作其實就是對這兩個map裡面的。
還有個ConcurrentHashMap<String, RedisPubSub>型別的變數,儲存的是事件-RedisPubSub,便於取消訂閱的時候找到監聽該頻道(事件)的RedisPubSub物件。
資訊進行增加或者刪除;後端往前端推播資料也會根據不同的topic_name推播到不同的訂閱者這邊。
package com.curtain.core; import com.alibaba.fastjson.JSON; import com.curtain.config.WebsocketProperties; import com.curtain.service.Cancelable; import com.curtain.service.impl.TaskExecuteService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * @Author Curtain * @Date 2021/5/14 16:49 * @Description */ @ServerEndpoint("/ws") @Component @Slf4j public class WebSocketServer { /** * concurrent包的執行緒安全Set,用來存放每個使用者端對應的MyWebSocket物件。 */ private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>(); /** * 存放psub的事件 **/ private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>(); /** * 存放topic(pattern)-對應的RedisPubsub */ private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>(); /** * 與某個使用者端的連線對談,需要通過它來給使用者端傳送資料 */ private Session session; private String sessionId = ""; //要注入的物件 private static TaskExecuteService executeService; private static WebsocketProperties properties; private Cancelable cancelable; @Autowired public void setTaskExecuteService(TaskExecuteService taskExecuteService) { WebSocketServer.executeService = taskExecuteService; } @Autowired public void setWebsocketProperties(WebsocketProperties properties) { WebSocketServer.properties = properties; } /** * 連線建立成功呼叫的方法 */ @OnOpen public void onOpen(Session session) { this.session = session; this.sessionId = session.getId(); //構造推播資料 Map pubHeader = new HashMap(); pubHeader.put("name", "connect_status"); pubHeader.put("type", "create"); pubHeader.put("from", "pubsub"); pubHeader.put("time", new Date().getTime() / 1000); Map pubPayload = new HashMap(); pubPayload.put("status", "success"); Map pubMap = new HashMap(); pubMap.put("header", pubHeader); pubMap.put("payload", pubPayload); sendMessage(JSON.toJSONString(pubMap)); cancelable = executeService.runPeriodly(() -> { try { if (cancelable != null && !session.isOpen()) { log.info("斷開連線,停止傳送ping"); cancelable.cancel(); } else { String data = "ping"; ByteBuffer payload = ByteBuffer.wrap(data.getBytes()); session.getBasicRemote().sendPing(payload); } } catch (IOException e) { e.printStackTrace(); } }, properties.getPeriod()); } @OnMessage public void onMessage(String message) { synchronized (session) { Map msgMap = (Map) JSON.parse(message); String cmd = (String) msgMap.get("cmd"); //訂閱訊息 if ("subscribe".equals(cmd)) { List<String> topics = (List<String>) msgMap.get("topic"); //本地記錄訂閱資訊 for (int i = 0; i < topics.size(); i++) { String topic = topics.get(i); log.info("============================subscribe-start============================"); log.info("sessionId:" + this.sessionId + ",開始訂閱:" + topic); if (webSocketMap.containsKey(topic)) {//有人訂閱過了 webSocketMap.get(topic).put(this.sessionId, this); } else {//之前還沒人訂閱過,所以需要訂閱redis頻道 ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>(); map.put(this.sessionId, this); webSocketMap.put(topic, map); new Thread(() -> { RedisPubSub redisPubSub = new RedisPubSub(); //存入map redisPubSubMap.put(topic, redisPubSub); redisPubSub.subscribe(topic); }).start(); } log.info("sessionId:" + this.sessionId + ",完成訂閱:" + topic); log(); log.info("============================subscribe-end============================"); } } //psubscribe if ("psubscribe".equals(cmd)) { List<String> topics = (List<String>) msgMap.get("topic"); //本地記錄訂閱資訊 for (int i = 0; i < topics.size(); i++) { String topic = topics.get(i); log.info("============================psubscribe-start============================"); log.info("sessionId:" + this.sessionId + ",開始模糊訂閱:" + topic); if (pWebSocketMap.containsKey(topic)) {//有人訂閱過了 pWebSocketMap.get(topic).put(this.sessionId, this); } else {//之前還沒人訂閱過,所以需要訂閱redis頻道 ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>(); map.put(this.sessionId, this); pWebSocketMap.put(topic, map); new Thread(() -> { RedisPubSub redisPubSub = new RedisPubSub(); //存入map redisPubSubMap.put(topic, redisPubSub); redisPubSub.psubscribe(topic); }).start(); } log.info("sessionId:" + this.sessionId + ",完成模糊訂閱:" + topic); log(); log.info("============================psubscribe-end============================"); } } //取消訂閱 if ("unsubscribe".equals(cmd)) { List<String> topics = (List<String>) msgMap.get("topic"); //刪除本地對應的訂閱資訊 for (String topic : topics) { log.info("============================unsubscribe-start============================"); log.info("sessionId:" + this.sessionId + ",開始刪除訂閱:" + topic); if (webSocketMap.containsKey(topic)) { ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } if (pWebSocketMap.containsKey(topic)) { ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info("sessionId:" + this.sessionId + ",完成刪除訂閱:" + topic); log(); log.info("============================unsubscribe-end============================"); } } } } @OnMessage public void onPong(PongMessage pongMessage) { try { log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連線關閉呼叫的方法 */ @OnClose public void onClose() { synchronized (session) { log.info("============================onclose-start============================"); //刪除訂閱 Iterator iterator = webSocketMap.keySet().iterator(); while (iterator.hasNext()) { String topic = (String) iterator.next(); ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } //刪除模糊訂閱 Iterator iteratorP = pWebSocketMap.keySet().iterator(); while (iteratorP.hasNext()) { String topic = (String) iteratorP.next(); ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info("sessionId:" + this.sessionId + ",斷開連線:"); //debug log(); log.info("============================onclose-end============================"); } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { synchronized (session) { log.info("============================onError-start============================"); log.error("使用者錯誤,sessionId:" + session.getId() + ",原因:" + error.getMessage()); error.printStackTrace(); log.info("關閉錯誤使用者對應的連線"); //刪除訂閱 Iterator iterator = webSocketMap.keySet().iterator(); while (iterator.hasNext()) { String topic = (String) iterator.next(); ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } //刪除模糊訂閱 Iterator iteratorP = pWebSocketMap.keySet().iterator(); while (iteratorP.hasNext()) { String topic = (String) iteratorP.next(); ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info("完成錯誤使用者對應的連線關閉"); //debug log(); log.info("============================onError-end============================"); } } /** * 實現伺服器主動推播 */ public void sendMessage(String message) { synchronized (session) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } } public static void publish(String msg, String topic) { ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); if (map != null && map.values() != null) { for (WebSocketServer webSocketServer : map.values()) webSocketServer.sendMessage(msg); } map = pWebSocketMap.get(topic); if (map != null && map.values() != null) { for (WebSocketServer webSocketServer : map.values()) webSocketServer.sendMessage(msg); } } private void log() { log.info("<<<<<<<<<<<完成操作後,列印訂閱資訊開始>>>>>>>>>>"); Iterator iterator1 = webSocketMap.keySet().iterator(); while (iterator1.hasNext()) { String topic = (String) iterator1.next(); log.info("topic:" + topic); Iterator iterator2 = webSocketMap.get(topic).keySet().iterator(); while (iterator2.hasNext()) { String session = (String) iterator2.next(); log.info("訂閱" + topic + "的sessionId:" + session); } } log.info("<<<<<<<<<<<完成操作後,列印訂閱資訊結束>>>>>>>>>>"); } }
上面介紹了核心程式碼,下面是完整程式碼地址
https://github.com/Curtain-Wang/websocket-redis-subscribe.git
參考評論區老哥的建議,將redis訂閱監聽類裡面的subscribe和psubscribe方法調整如下:
//訂閱 @Override public void subscribe(String... channels) { boolean done = true; while (done){ Jedis jedis = jedisPool.getResource(); try { jedis.subscribe(this, channels); done = false; } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到異常後關閉連線重新訂閱 log.info("監聽遇到異常,四秒後重新訂閱頻道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } } //模糊訂閱 @Override public void psubscribe(String... channels) { boolean done = true; while (done){ Jedis jedis = jedisPool.getResource(); try { jedis.psubscribe(this, channels); done = false; } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到異常後關閉連線重新訂閱 log.info("監聽遇到異常,四秒後重新訂閱頻道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } }
到此這篇關於websocket+redis動態訂閱和動態取消訂閱的實現範例的文章就介紹到這了,更多相關websocket redis動態訂閱 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45