首頁 > 軟體

websocket+redis動態訂閱和動態取消訂閱的實現範例

2022-05-17 16:00:07

原理

websocket的訂閱就是在前後端建立ws連線之後,前端通過傳送一定格式的訊息,後端解析出來去訂閱或者取消訂閱redis頻道。

訂閱頻道訊息格式:

{
    "cmd":"subscribe",
    "topic":[
        "topic_name"
    ]
}

模糊訂閱格式

{
    "cmd":"psubscribe",
    "topic":[
        "topic_name"
    ]
}

取消訂閱格式

{
    "cmd":"unsubscribe",
    "topic":[
        "topic_name"
    ]
}

兩個核心類,一個是redis的訂閱監聽類,一個是websocket的釋出訂閱類。

redis訂閱監聽類

package com.curtain.core;

import com.curtain.config.GetBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

import java.util.Arrays;

/**
 * @Author Curtain
 * @Date 2021/6/7 14:27
 * @Description
 */
@Component
@Slf4j
public class RedisPubSub extends JedisPubSub {
    private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
    private Jedis jedis;

    //訂閱
    public void subscribe(String... channels) {
        jedis = jedisPool.getResource();
        try {
            jedis.subscribe(this, channels);
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到異常後關閉連線重新訂閱
            log.info("監聽遇到異常,四秒後重新訂閱頻道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            subscribe(channels);
        }
    }

    //模糊訂閱
    public void psubscribe(String... channels) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.psubscribe(this, channels);
        } catch (ArithmeticException e) {//取消訂閱故意造成的異常
            if (jedis != null)
                jedis.close();
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到異常後關閉連線重新訂閱
            log.info("監聽遇到異常,四秒後重新訂閱頻道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            psubscribe(channels);
        }
    }

    public void unsubscribeAndClose(String... channels){
        unsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }

    public void punsubscribeAndClose(String... channels){
        punsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        log.info("subscribe redis channel:" + channel + ", 執行緒id:" + Thread.currentThread().getId());
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        log.info("psubscribe redis channel:" + pattern + ", 執行緒id:" + Thread.currentThread().getId());
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 執行緒id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, pattern);
        WebSocketServer.publish(message, channel);

    }

    @Override
    public void onMessage(String channel, String message) {
        log.info("receive from redis channal: " + channel + ",message:" + message + ", 執行緒id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, channel);
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        log.info("unsubscribe redis channel:" + channel);
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        log.info("punsubscribe redis channel:" + pattern);
    }
}

1.jedis監聽redis頻道的時候如果遇見異常會關閉連線導致後續沒有監聽該頻道,所以這裡在subscribe捕獲到異常的時候會重新建立一個jedis連線訂閱該redis頻道。

webSocket訂閱推播類

這個類會有兩個ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>型別類變數,分別儲存訂閱和模糊訂閱的資訊。

外面一層的String對應的值是topic_name,裡面一層的String對應的值是sessionId。前端傳送過來的訊息裡面對應的這三類操作其實就是對這兩個map裡面的。

還有個ConcurrentHashMap<String, RedisPubSub>型別的變數,儲存的是事件-RedisPubSub,便於取消訂閱的時候找到監聽該頻道(事件)的RedisPubSub物件。

資訊進行增加或者刪除;後端往前端推播資料也會根據不同的topic_name推播到不同的訂閱者這邊。

package com.curtain.core;

import com.alibaba.fastjson.JSON;
import com.curtain.config.WebsocketProperties;
import com.curtain.service.Cancelable;
import com.curtain.service.impl.TaskExecuteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @Author Curtain
 * @Date 2021/5/14 16:49
 * @Description
 */
@ServerEndpoint("/ws")
@Component
@Slf4j
public class WebSocketServer {
    /**
     * concurrent包的執行緒安全Set,用來存放每個使用者端對應的MyWebSocket物件。
     */
    private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 存放psub的事件
     **/
    private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>();
    /**
     * 存放topic(pattern)-對應的RedisPubsub
     */
    private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>();
    /**
     * 與某個使用者端的連線對談,需要通過它來給使用者端傳送資料
     */
    private Session session;
    private String sessionId = "";
    //要注入的物件
    private static TaskExecuteService executeService;
    private static WebsocketProperties properties;

    private Cancelable cancelable;

    @Autowired
    public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
        WebSocketServer.executeService = taskExecuteService;
    }

    @Autowired
    public void setWebsocketProperties(WebsocketProperties properties) {
        WebSocketServer.properties = properties;
    }

    /**
     * 連線建立成功呼叫的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.sessionId = session.getId();
        //構造推播資料
        Map pubHeader = new HashMap();
        pubHeader.put("name", "connect_status");
        pubHeader.put("type", "create");
        pubHeader.put("from", "pubsub");
        pubHeader.put("time", new Date().getTime() / 1000);
        Map pubPayload = new HashMap();
        pubPayload.put("status", "success");
        Map pubMap = new HashMap();
        pubMap.put("header", pubHeader);
        pubMap.put("payload", pubPayload);
        sendMessage(JSON.toJSONString(pubMap));
        cancelable = executeService.runPeriodly(() -> {
            try {
                if (cancelable != null && !session.isOpen()) {
                    log.info("斷開連線,停止傳送ping");
                    cancelable.cancel();
                } else {
                    String data = "ping";
                    ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
                    session.getBasicRemote().sendPing(payload);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, properties.getPeriod());

    }

    @OnMessage
    public void onMessage(String message) {
        synchronized (session) {
            Map msgMap = (Map) JSON.parse(message);
            String cmd = (String) msgMap.get("cmd");
            //訂閱訊息
            if ("subscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //本地記錄訂閱資訊
                for (int i = 0; i < topics.size(); i++) {
                    String topic = topics.get(i);
                    log.info("============================subscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",開始訂閱:" + topic);
                    if (webSocketMap.containsKey(topic)) {//有人訂閱過了
                        webSocketMap.get(topic).put(this.sessionId, this);
                    } else {//之前還沒人訂閱過,所以需要訂閱redis頻道
                        ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                        map.put(this.sessionId, this);
                        webSocketMap.put(topic, map);
                        new Thread(() -> {
                            RedisPubSub redisPubSub = new RedisPubSub();
                            //存入map
                            redisPubSubMap.put(topic, redisPubSub);
                            redisPubSub.subscribe(topic);
                        }).start();
                    }
                    log.info("sessionId:" + this.sessionId + ",完成訂閱:" + topic);
                    log();
                    log.info("============================subscribe-end============================");
                }
            }
            //psubscribe
            if ("psubscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //本地記錄訂閱資訊
                for (int i = 0; i < topics.size(); i++) {
                    String topic = topics.get(i);
                    log.info("============================psubscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",開始模糊訂閱:" + topic);
                    if (pWebSocketMap.containsKey(topic)) {//有人訂閱過了
                        pWebSocketMap.get(topic).put(this.sessionId, this);
                    } else {//之前還沒人訂閱過,所以需要訂閱redis頻道
                        ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                        map.put(this.sessionId, this);
                        pWebSocketMap.put(topic, map);
                        new Thread(() -> {
                            RedisPubSub redisPubSub = new RedisPubSub();
                            //存入map
                            redisPubSubMap.put(topic, redisPubSub);
                            redisPubSub.psubscribe(topic);
                        }).start();
                    }
                    log.info("sessionId:" + this.sessionId + ",完成模糊訂閱:" + topic);
                    log();
                    log.info("============================psubscribe-end============================");
                }
            }
            //取消訂閱
            if ("unsubscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //刪除本地對應的訂閱資訊
                for (String topic : topics) {
                    log.info("============================unsubscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",開始刪除訂閱:" + topic);
                    if (webSocketMap.containsKey(topic)) {
                        ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                        map.remove(this.sessionId);
                        if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道
                            webSocketMap.remove(topic);
                            redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                            redisPubSubMap.remove(topic);
                        }
                    }
                    if (pWebSocketMap.containsKey(topic)) {
                        ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                        map.remove(this.sessionId);
                        if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道
                            pWebSocketMap.remove(topic);
                            redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                            redisPubSubMap.remove(topic);
                        }
                    }
                    log.info("sessionId:" + this.sessionId + ",完成刪除訂閱:" + topic);
                    log();
                    log.info("============================unsubscribe-end============================");
                }
            }
        }
    }

    @OnMessage
    public void onPong(PongMessage pongMessage) {
        try {
            log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連線關閉呼叫的方法
     */
    @OnClose
    public void onClose() {
        synchronized (session) {
            log.info("============================onclose-start============================");
            //刪除訂閱
            Iterator iterator = webSocketMap.keySet().iterator();
            while (iterator.hasNext()) {
                String topic = (String) iterator.next();
                ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道
                    webSocketMap.remove(topic);
                    redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            //刪除模糊訂閱
            Iterator iteratorP = pWebSocketMap.keySet().iterator();
            while (iteratorP.hasNext()) {
                String topic = (String) iteratorP.next();
                ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道
                    pWebSocketMap.remove(topic);
                    redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            log.info("sessionId:" + this.sessionId + ",斷開連線:");
            //debug
            log();
            log.info("============================onclose-end============================");
        }
    }


    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        synchronized (session) {
            log.info("============================onError-start============================");
            log.error("使用者錯誤,sessionId:" + session.getId() + ",原因:" + error.getMessage());
            error.printStackTrace();
            log.info("關閉錯誤使用者對應的連線");
            //刪除訂閱
            Iterator iterator = webSocketMap.keySet().iterator();
            while (iterator.hasNext()) {
                String topic = (String) iterator.next();
                ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道
                    webSocketMap.remove(topic);
                    redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            //刪除模糊訂閱
            Iterator iteratorP = pWebSocketMap.keySet().iterator();
            while (iteratorP.hasNext()) {
                String topic = (String) iteratorP.next();
                ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果這個頻道沒有使用者訂閱了,則取消訂閱該redis頻道
                    pWebSocketMap.remove(topic);
                    redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            log.info("完成錯誤使用者對應的連線關閉");
            //debug
            log();
            log.info("============================onError-end============================");
        }
    }

    /**
     * 實現伺服器主動推播
     */
    public void sendMessage(String message) {
        synchronized (session) {
            try {
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void publish(String msg, String topic) {
        ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
        if (map != null && map.values() != null) {
            for (WebSocketServer webSocketServer : map.values())
                webSocketServer.sendMessage(msg);
        }
        map = pWebSocketMap.get(topic);
        if (map != null && map.values() != null) {
            for (WebSocketServer webSocketServer : map.values())
                webSocketServer.sendMessage(msg);
        }
    }

    private void log() {
        log.info("<<<<<<<<<<<完成操作後,列印訂閱資訊開始>>>>>>>>>>");
        Iterator iterator1 = webSocketMap.keySet().iterator();
        while (iterator1.hasNext()) {
            String topic = (String) iterator1.next();
            log.info("topic:" + topic);
            Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();
            while (iterator2.hasNext()) {
                String session = (String) iterator2.next();
                log.info("訂閱" + topic + "的sessionId:" + session);
            }
        }
        log.info("<<<<<<<<<<<完成操作後,列印訂閱資訊結束>>>>>>>>>>");
    }
}

專案地址

上面介紹了核心程式碼,下面是完整程式碼地址

https://github.com/Curtain-Wang/websocket-redis-subscribe.git

Update20220415

參考評論區老哥的建議,將redis訂閱監聽類裡面的subscribe和psubscribe方法調整如下:

    //訂閱
    @Override
    public void subscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.subscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到異常後關閉連線重新訂閱
                log.info("監聽遇到異常,四秒後重新訂閱頻道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }
    //模糊訂閱
    @Override
    public void psubscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.psubscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到異常後關閉連線重新訂閱
                log.info("監聽遇到異常,四秒後重新訂閱頻道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }

到此這篇關於websocket+redis動態訂閱和動態取消訂閱的實現範例的文章就介紹到這了,更多相關websocket redis動態訂閱 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com