首頁 > 軟體

ASP.NET Core WebSocket叢集實現思路詳解

2022-11-11 14:01:04

前言

提到WebSocket相信大家都聽說過,它的初衷是為了解決使用者端瀏覽器與伺服器端進行雙向通訊,是在單個TCP連線上進行全雙工通訊的協定。在沒有WebSocket之前只能通過瀏覽器到伺服器端的請求應答模式比如輪詢,來實現伺服器端的變更響應到使用者端,現在伺服器端也可以主動傳送資料到使用者端瀏覽器。WebSocket協定和Http協定平行,都屬於TCP/IP四層模型中的第四層應用層。由於WebSocket握手階段採用HTTP協定,所以也需要進行跨域處理。它的協定標識是wswss對應了常規標識和安全通訊協定標識。本文重點並不是介紹WebSocket協定相關,而是提供一種基於ASP.NET Core原生WebSocket的方式實現叢集的實現思路。關於這套思路其實很早之前我就構思過了,只是之前一直沒有系統的整理出來,本篇文章就來和大家分享一下,由於主要是提供一種思路,所以涉及到具體細節或者業務相關的可能沒有體現出來,還望大家理解。

實現

咱們的重點關鍵字就是兩個WebSocket叢集,實現的框架便是基於ASP.NET Core,我也基於golang實現了一套,本文涉及到的相關原始碼和golang版本的實現都已上傳至我的github,具體倉庫地址可以轉到文末自行跳轉到#範例原始碼中檢視。既然涉及到叢集,這裡咱們就用nginx作為反向代理,來搭建一個叢集範例。大致的範例結構如下圖所示

redis在這裡扮演的角色呢,是用來處理Server端的訊息相互傳遞用的,主要是使用的redis的pub/sub功能來實現的,這裡便涉及到幾個核心問題

  • 首先,叢集狀態每個使用者被分發到具體的哪臺伺服器上是不得而知的
  • 其次,處在不同Server端的不同使用者間的相互通訊是需要一個傳遞媒介
  • 最後,針對不同的場景比如單發訊息、分組訊息、全部通知等要有不同的處理策略

這裡需要考慮的是,如果需要搭建實時通訊伺服器的話,需要注意叢集的隔離性,主要是和核心業務進行隔離,畢竟WebSocket需要保持長連結、且訊息的大小需要評估。

上面提到了redis的主要功能就是用來傳遞訊息用的,畢竟每個server伺服器是無狀態的。這當然不是必須的,任何可以進行訊息分發的中介軟體都可以,比如訊息佇列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要處理的訊息儲存起來都可以比如快取甚至是關係型資料庫等等。這壓力使用redis主要是因為操作起來簡單、輕量級、靈活,讓大家關注點在思路上,而不是使用中案件的程式碼上。

nginx設定

通過上面的圖我們可以看到,我們這裡構建叢集範例使用的nginx,如果讓nginx支援WebSocket的話,需要額外的設定,這個在網上有很多相關的文章介紹,這裡就來列一下咱們範例的nginx設定,在組態檔nginx.conf

//上游伺服器地址也就是websocket服務的真實地址
upstream wsbackend {
    server 127.0.0.1:5001;
    server 127.0.0.1:5678;
}
server {
    listen       5000;
    server_name  localhost;
    location ~/chat/{
        //upstream地址
        proxy_pass http://wsbackend;
        proxy_connect_timeout 60s; 
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        //記得轉發避免踩坑
        proxy_set_header Host $host;
        proxy_http_version 1.1; 
        //http升級成websocket協定的頭標識
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
    }
}

這套設定呢,在搜尋引擎上能收到很多,不過不妨礙我把使用的貼上出來。這一套親測有效,也是我使用的設定,請放心使用。個人認為如果是線上環境採用的負載均衡策略可以選擇ip_hash的方式,保證同一個ip的使用者端使用者可以分發到一臺WebSocket範例中去,這樣的話能儘量避免使用redis的使用者頻道做訊息傳遞。好了,接下來準備開始展示具體實現的程式碼了。

一對一傳送

首先介紹的就是一對一傳送的情況,也就是我把訊息發給你,聊天的時候私聊的情況。這裡呢涉及到兩種情況

  • 如果你需要通訊的使用者端和你連線在一個Server端裡,這樣的話可以直接在連結裡找到這個端的通訊範例直接傳送。
  • 如果你需要通訊的使用者端和你不在一個Server端裡,這個時候咱們就需要藉助redis的pub/sub的功能,把訊息傳遞給另一個Server端。

咱們通過一張圖大致的展示一下它的工作方式

解釋一下,每個使用者端註冊到WebSocket服務裡的時候會在redis裡訂閱一個user:使用者唯一標識的頻道,這個頻道用於接收和當前WebSocket連線不在一個伺服器端的其他WebSocket傳送過來的訊息。

每次傳送訊息的時候你會知道你要傳送給誰,不在當前伺服器的話則傳送到redis的user:使用者唯一標識頻道,這樣的話目標WebSocket就能收到訊息了。

首先是注入相關的依賴項,這裡我使用的redis使用者端是freeredis,主要是因為操作起來簡單,具體實現程式碼如下

var builder = WebApplication.CreateBuilder(args);
//註冊freeredis
builder.Services.AddSingleton(provider => {
    var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();
    RedisClient cli = new RedisClient("127.0.0.1:6379");
    cli.Notice += (s, e) => logger?.LogInformation(e.Log);
    return cli;
});
//註冊WebSocket具體操作的類
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();
var app = builder.Build();
var webSocketOptions = new WebSocketOptions
{
    KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//註冊WebSocket中介軟體
app.UseWebSockets(webSocketOptions);
app.MapGet("/", () => "Hello World!");
app.MapControllers();
app.Run();

接下來我們定義一個Controller用來處理WebSocket請求

public class WebSocketController : ControllerBase
{
    private readonly ILogger<WebSocketController> _logger;
    private readonly WebSocketHandler _socketHandler;
    public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler)
    {
        _logger = logger;
        _socketHandler = socketHandler;
    }
    //這裡的id代表當前連線的使用者端唯一標識比如使用者唯一標識
    [HttpGet("/chat/user/{id}")]
    public async Task ChatUser(string id)
    {
        //判斷是否是WebSocket請求
        if (HttpContext.WebSockets.IsWebSocketRequest)
        {
            _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
            var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
            //處理請求相關
            await _socketHandler.Handle(id, webSocket);
        }
        else
        {
            HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
        }
    }
}

這裡的WebSocketHandler是用來處理具體邏輯用的,咱們看一下相關程式碼

public class WebSocketHandler:IDisposable
{
    //儲存當前服務使用者的集合
    private readonly UserConnection UserConnection = new();
    //redis頻道字首
    private readonly string userPrefix = "user:";
    //使用者對應的redis頻道
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
    private readonly ILogger<WebSocketHandler> _logger;
    //redis使用者端
    private readonly RedisClient _redisClient;
    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }
    public async Task Handle(string id, WebSocket webSocket)
    {
        //把當前使用者連線儲存起來
        _ = UserConnection.GetOrAdd(id, webSocket);
        //訂閱一個當前使用者的頻道
        await SubMsg($"{userPrefix}{id}");
        var buffer = new byte[1024 * 4];
        //接收傳送過來的訊息,這個方法是阻塞的,如果沒收到訊息則一直阻塞
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        //迴圈接收訊息
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                //因為緩衝區長度是固定的所以要獲取實際長度
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('');
                //接收的到訊息轉換成實體
                MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);
                //傳送到其他使用者端的資料
                byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");
                _logger.LogInformation($"user {id} send:{msgBody.Msg}");
                //判斷目標使用者端是否在當前當前服務,如果在當前服務直接扎到目標連線直接傳送
                if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))
                {
                    if (targetSocket.State == WebSocketState.Open)
                    {
                        await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);
                    }
                }
                else
                {
                    //如果要傳送的目標端不在當前服務,則傳送給目標redis端的頻道
                    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };
                    //目標的redis頻道
                    _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));
                }
                //繼續阻塞回圈接收訊息
                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        //迴圈結束意味著當前端已經退出
        //從當前使用者的集合移除當前使用者
        _ = UserConnection.TryRemove(id, out _);
        //關閉當前WebSocket連線
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
        //在當前訂閱集合移除當前使用者
        _disposables.TryRemove($"{userPrefix}{id}", out var disposable);
        //關閉當前使用者的通道
        disposable.Dispose();
    }
    private async Task SubMsg(string channel)
    {
        //訂閱當前使用者頻道
        var sub = _redisClient.Subscribe(channel,  async (channel, data) => {
            //接收過來當前頻道資料,說明傳送端不在當前服務
            ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
            byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");
            //在當前服務找到目標的WebSocket連線並行送訊息
            if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))
            {
                if (targetSocket.State == WebSocketState.Open)
                {
                    await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        });
        //把redis訂閱頻道新增到集合中
        _disposables.TryAdd(channel, sub);
    }
    //程式退出的時候取消當前服務訂閱的redis頻道
    public void Dispose()
    {
        foreach (var disposable in _disposables)
        {
            disposable.Value.Dispose();
        }
        _disposables.Clear();
    }
}

這裡涉及到幾個輔助相關的類,其中UserConnection類是儲存註冊到當前服務的連線,MsgBody類用來接受使用者端傳送過來的訊息,ChannelMsgBody是用來傳送redis頻道的相關訊息,因為要把相關訊息通過redis釋出出去,咱們列一下這幾個類的相關程式碼

//註冊到當前服務的連線
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{
    //儲存使用者唯一標識和WebSocket的對應關係
    private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();
    //當前服務的使用者數量
    public int Count => _users.Count;
    public WebSocket GetOrAdd(string userId, WebSocket webSocket)
    {
        return _users.GetOrAdd(userId, webSocket);
    }
    public bool TryGetValue(string userId, out WebSocket webSocket)
    {
        return _users.TryGetValue(userId, out webSocket);
    }
    public bool TryRemove(string userId, out WebSocket webSocket)
    {
        return _users.TryRemove(userId, out webSocket);
    }
    public void Clear()
    {
        _users.Clear();
    }
    public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator()
    {
        return _users.GetEnumerator();
    }
    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}
//使用者端訊息
public class MsgBody
{
    //目標使用者標識
    public string Id { get; set; }
    //要傳送的訊息
    public string Msg { get; set; }
}
//頻道訂閱訊息
public class ChannelMsgBody
{
    //使用者標識
    public string FromId { get; set; }
    //目標使用者標識,也就是要傳送給誰
    public string ToId { get; set; }
    //要傳送的訊息
    public string Msg { get; set; }
}

這樣的話關於一對一傳送訊息的相關邏輯就實現完成了,啟動兩個Server端,由於nginx預設的負載均衡策略是輪詢,所以註冊兩個使用者的話會被分發到不同的服務裡去

Postman連線三個連線唯一標識分別是1、2、3,模擬一下訊息傳送,效果如下,傳送效果

接收效果

群組傳送

上面我們展示了一對一傳送的情況,接下來我們來看一下,群組傳送的情況。群組傳送的話就是隻要大家都加入一個群組,只要使用者端在群組裡傳送一條訊息,則註冊到當前群組內的所有使用者端都可以收到訊息。相對於一對一的情況就是如果當前WebSocket伺服器端如果存在使用者加入某個群組,則當前當前WebSocket伺服器端則可以訂閱一個group:群組唯一標識的redis頻道,叢集中的其他WebSocket伺服器通過這個redis頻道接收群組訊息,通過一張圖描述一下

群組的實現方式相對於一對一要簡單一點

  • 傳送端可以不用考慮當前服務中的使用者端連線,一股腦的交給redis把訊息釋出出去
  • 如果有WebSocket服務中的使用者訂閱了當前分組則可以接受訊息,獲取組內的使用者迴圈傳送訊息

展示一下程式碼實現的方式,首先是定義一個action用於表示群組的相關場景

//包含兩個標識一個是組別標識一個是註冊到組別的使用者
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{
    if (HttpContext.WebSockets.IsWebSocketRequest)
    {
        _logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
        var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
        //呼叫HandleGroup處理群組相關的訊息
        await _socketHandler.HandleGroup(groupId, userId, webSocket);
    }
    else
    {
        HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
    }
}

接下來看一下HandleGroup的相關邏輯,還是在WebSocketHandler類中,看一下程式碼實現

public class WebSocketHandler:IDisposable
{
    private readonly UserConnection UserConnection = new();
    private readonly GroupUser GroupUser = new();
    private readonly SemaphoreSlim _lock = new(1, 1);
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
    private readonly string groupPrefix = "group:";
    private readonly ILogger<WebSocketHandler> _logger;
    private readonly RedisClient _redisClient;
    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }
    public async Task HandleGroup(string groupId, string userId, WebSocket webSocket)
    {
        //因為群組的集合可能會存在很多使用者一起存取所以限制存取數量
        await _lock.WaitAsync();
        //初始化群組容器 群唯一標識為key 群員容器為value
        var currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });
        //當前使用者加入當前群組
        _ = currentGroup.GetOrAdd(userId, webSocket);
        //只有有當前WebSocket服務的第一個加入當前組的時候才去訂閱群組頻道
        //如果不限制的話則會出現如果當前WebSocket服務有多個使用者在一個組內則會重複收到redis訊息
        if (currentGroup.Count == 1)
        {
            //訂閱redis頻道
            await SubGroupMsg($"{groupPrefix}{groupId}");
        }
        _lock.Release();
        var buffer = new byte[1024 * 4];
        //阻塞接收WebSocket訊息
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        //服務不退出的話則一直等待接收
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('');
                _logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");
                //組裝redis頻道釋出的訊息,目標為群組標識
                ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };
                //通過redis釋出訊息
                _redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        //如果使用者端退出則在當前群組集合刪除當前使用者
        _ = currentGroup.TryRemove(userId, out _);
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
    }
    private async Task SubGroupMsg(string channel)
    {
        var sub = _redisClient.Subscribe(channel, async (channel, data) => {
            ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
            byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");
            //在當前WebSocket伺服器找到當前群組裡的使用者
            GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);
            //迴圈當前WebSocket伺服器裡的使用者傳送訊息
            foreach (var user in currentGroup)
            {
                //不用給自己傳送了
                if (user.Key == msgBody.FromId)
                {
                    continue;
                }
                if (user.Value.State == WebSocketState.Open)
                {
                    await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        });
        //把當前頻道加入訂閱集合
        _disposables.TryAdd(channel, sub);
    }
}

這裡涉及到了GroupUser類,是來儲存群組和群組使用者的對應關係的,定義如下

public class GroupUser
{
    //key為群組的唯一標識
    public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();
}

演示一下把兩個使用者新增到一個群組內,然後傳送接收訊息的場景,使用者u1傳送

使用者u2接收

傳送所有人

傳送給所有使用者的邏輯比較簡單,不用考慮到使用者限制,只要使用者連線到了WebSocket叢集則都可以接收到這個訊息,大致工作方式如下圖所示

這個比較簡單,咱們直接看實現程式碼,首先是定義一個地址,用於釋出訊息

//把使用者註冊進去
[HttpGet("/chat/all/{id}")]
public async Task ChatAll(string id)
{
    if (HttpContext.WebSockets.IsWebSocketRequest)
    {
        _logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
        var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
        await _socketHandler.HandleAll(id, webSocket);
    }
    else
    {
        HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
    }
}

具體的實現邏輯還是在HandleGroup類裡,是HandleAll方法,看一下具體實現

public class WebSocketHandler:IDisposable
{
    private readonly UserConnection AllConnection = new();
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
    private readonly string all = "all";
    private readonly ILogger<WebSocketHandler> _logger;
    private readonly RedisClient _redisClient;
    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }
    public async Task HandleAll(string id, WebSocket webSocket)
    {
        await _lock.WaitAsync();
        //把使用者加入使用者集合
        _ = AllConnection.GetOrAdd(id, webSocket);
        //WebSocket叢集中的每個服務只定義一次
        if (AllConnection.Count == 1)
        {
            await SubAllMsg(all);
        }
        _lock.Release();
        var buffer = new byte[1024 * 4];
        //阻塞接收資訊
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('');
                _logger.LogInformation($"user {id} send:{msg}");
                //獲取接收資訊
                ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };
                //把訊息通過redis釋出到叢集中的其他服務
                _redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        //使用者退出則刪除集合中的當前使用者資訊
        _ = AllConnection.TryRemove(id, out _);
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
    }
    private async Task SubAllMsg(string channel)
    {
        var sub = _redisClient.Subscribe(channel, async (channel, data) => {
            ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
            byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");
            //接收到訊息後遍歷使用者集合把訊息傳送給所有使用者
            foreach (var user in AllConnection)
            {   
                //如果包含當前使用者跳過
                if (user.Key == msgBody.FromId)
                {
                    continue;
                }
                if (user.Value.State == WebSocketState.Open)
                {
                    await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        });
        _disposables.TryAdd(channel, sub);
    }
}

效果在這裡就不展示了,和群組的效果是類似的,只是一個是部分使用者,一個是全部的使用者。

整合到一起

上面我們分別展示了一對一、群組、所有人的場景,但是實際使用的時候,每個使用者只需要註冊到WebSocket叢集一次也就是保持一個連線即可,而不是一對一一個連線、註冊群組一個連線、所有訊息的時候一個連線。所以我們需要把上面的演示整合一下,一個使用者只需要連線到WebSocket叢集一次即可,至於傳送給誰,加入什麼群組,接收全部訊息等都是連線後通過一些標識區分的,而不必每個型別的操作都註冊一次,就和微信和QQ一樣我只要登入了即可,至於其他操作都是靠資料標識區分的。接下來咱們就整合一下程式碼達到這個效果,大致的思路是

  • 使用者連線到WebSocket叢集,把使用者和連線儲存到當前WebSocket伺服器的使用者集合中去。
  • 一對一傳送的時候,只需要在具體的伺服器中找到具體的使用者端傳送訊息
  • 群組的時候,先把當前使用者標識加入群組集合即可,接收訊息的時候根據群組集合裡的使用者標識去使用者集合裡去拿具體的WebSocket連線傳送訊息
  • 全員訊息的時候,直接遍歷叢集中的每個WebSocket服務裡的使用者集合裡的WebSocket連線訓話傳送訊息

這樣的話就保證了每個使用者端使用者在叢集中只會繫結一個連線,首先還是單獨定義一個action,用於讓使用者端使用者連線上來,具體實現程式碼如下所示

public class WebSocketChannelController : ControllerBase
{
    private readonly ILogger<WebSocketController> _logger;
    private readonly WebSocketChannelHandler _webSocketChannelHandler;
    public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler)
    {
        _logger = logger;
        _webSocketChannelHandler = webSocketChannelHandler;
    }
    //只需要把當前使用者連線到服務即可
    [HttpGet("/chat/channel/{id}")]
    public async Task Channel(string id)
    {
        if (HttpContext.WebSockets.IsWebSocketRequest)
        {
            _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");
            var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
            await _webSocketChannelHandler.HandleChannel(id, webSocket);
        }
        else
        {
            HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
        }
    }
}

接下來看一下WebSocketChannelHandler類的HandleChannel方法實現,用於處理不同的訊息,比如一對一、群組、全員訊息等不同型別的訊息

public class WebSocketChannelHandler : IDisposable
{
    //用於儲存當前WebSocket伺服器連結上來的所有使用者對應關係
    private readonly UserConnection UserConnection = new();
    //用於儲存群組和使用者關係,使用者集合採用HashSet保證每個使用者只加入一個群組一次
    private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();
    private readonly SemaphoreSlim _lock = new(1, 1);
    //存放redis訂閱範例
    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();
    //一對一redis頻道字首
    private readonly string userPrefix = "user:";
    //群組redis頻道字首
    private readonly string groupPrefix = "group:";
    //全員redis頻道
    private readonly string all = "all";
    private readonly ILogger<WebSocketHandler> _logger;
    private readonly RedisClient _redisClient;
    public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)
    {
        _logger = logger;
        _redisClient = redisClient;
    }
    public async Task HandleChannel(string id, WebSocket webSocket)
    {
        await _lock.WaitAsync();
        //每次連線進來就新增到使用者集合
        _ = UserConnection.GetOrAdd(id, webSocket);
        //每個WebSocket服務範例只需要訂閱一次全員訊息頻道
        await SubMsg($"{userPrefix}{id}");
        if (UserConnection.Count == 1)
        {
            await SubAllMsg(all);
        }
        _lock.Release();
        var buffer = new byte[1024 * 4];
        //接收使用者端訊息
        var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        while (webSocket.State == WebSocketState.Open)
        {
            try
            {
                string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('');
                //讀取使用者端訊息
                ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);
                //判斷訊息型別
                switch (channelData.Method)
                {
                    //一對一
                    case "One":
                        await HandleOne(id, channelData.MsgBody, receiveResult);
                        break;
                    //把使用者加入群組
                    case "UserGroup":
                        await AddUserGroup(id, channelData.Group, webSocket);
                        break;
                    //處理群組訊息
                    case "Group":
                        await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);
                        break;
                    //處理全員訊息
                    default:
                        await HandleAll(id, channelData.MsgBody);
                        break;
                }
                receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                break;
            }
        }
        await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);
        //在群組中移除當前使用者
        foreach (var users in GroupUser.Values)
        {
            lock (users)
            {
                users.Remove(id);
            }
        }
        //當前使用者端使用者退出則移除連線
        _ = UserConnection.TryRemove(id, out _);
        //取消使用者頻道訂閱
        _disposables.Remove($"{userPrefix}{id}", out var sub);
        sub?.Dispose();
    }
    public void Dispose()
    {
        foreach (var disposable in _disposables)
        {
            disposable.Value.Dispose();
        }
        _disposables.Clear();
    }
}

這裡涉及到了ChannelData類是用於接收使用者端訊息的類別範本,具體定義如下

public class ChannelData
{
    //訊息型別 比如一對一 群組 全員
    public string Method { get; set; }
    //群組標識
    public string Group { get; set; }
    //訊息體
    public object MsgBody { get; set; }
}

類中並不會包含當前使用者資訊,因為連線到當前服務的時候已經提供了使用者端唯一標識。結合上面的處理程式碼我們可以看出,使用者端使用者連線到WebSocket範例之後,先註冊當前使用者的redis訂閱頻道並且當前範例僅註冊一次全員訊息的redis頻道,用於處理非當前範例註冊使用者端的一對一訊息處理和全員訊息處理,然後等待接收使用者端訊息,根據使用者端訊息的訊息型別來判斷是進行一對一、群組、或者全員的訊息型別處理,它的工作流程入下圖所示

由程式碼和上面的流程圖可知,它根據不同的標識去處理不同型別的訊息,接下來我們可以看下每種訊息型別的處理方式。

一對一處理

首先是一對一的訊息處理情況,看一下具體的處理邏輯,首先是一對一發布訊息

 private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult)
 {
    MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));
    byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");
    _logger.LogInformation($"user {id} send:{msgBody.Msg}");
    //判斷目標使用者是否在當前WebSocket伺服器
    if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))
    {
        if (targetSocket.State == WebSocketState.Open)
        {
            await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);
        }
    }
    else
    {
        //如果不在當前伺服器,則直接把訊息釋出到具體的使用者頻道去,由具體使用者去訂閱
        ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };
        _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));
    }
}

接下來是用於處理訂閱其他使用者傳送過來訊息的邏輯,這個和整合之前的邏輯是一致的,在當前伺服器中找到使用者對應的連線,傳送訊息

private async Task SubMsg(string channel)
{
    var sub = _redisClient.Subscribe(channel, async (channel, data) =>
    {
        ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
        byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");
        if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))
        {
            if (targetSocket.State == WebSocketState.Open)
            {
                await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                _ = UserConnection.TryRemove(msgBody.FromId, out _);
            }
        }
    });
    //把訂閱範例加入集合
    _disposables.TryAdd(channel, sub);
}

如果給某個使用者傳送訊息則可以使用如下的訊息格式

{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}

Method為One代表著是私聊一對一的情況,訊息體內Id為要傳送給的具體使用者標識和訊息體。

群組處理

接下來看群組處理方式,這個和之前的邏輯是有出入的,首先是使用者要先加入到某個群組然後才能接收群組訊息或者在群組中傳送訊息,之前是一個使用者對應多個連線,整合了之後叢集中每個使用者只關聯唯一的一個WebSocket連線,首先看使用者加入群組的邏輯

private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{
    //獲取群組資訊
    var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());
    lock (currentGroup)
    {
       //把使用者標識加入當前組
        _ = currentGroup.Add(user);
    }
    //每個組的redis頻道,在每臺WebSocket伺服器範例只註冊一次訂閱
    if (currentGroup.Count == 1)
    {
        //訂閱當前組訊息
        await SubGroupMsg($"{groupPrefix}{group}");
    }
    string addMsg = $"user 【{user}】 add  to group 【{group}】";
    byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);
    await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
    //如果有使用者加入群組,則通知其他群成員
    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };
    _redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));
}

使用者想要在群組內發訊息,則必須先加入到一個具體的群組內,具體的加入群組的格式如下

{"Method":"UserGroup", "Group":"g1"}

Method為UserGroup代表著使用者加入群組的業務型別,Group代表著你要加入的群組唯一標識。接下來就看下,使用者傳送群組訊息的邏輯了

private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{
    //判斷群組是否存在
    var hasValue = GroupUser.TryGetValue(groupId, out var users);
    if (!hasValue)
    {
        byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");
        await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
        return;
    }
    //只有加入到當前群組,才能在群組內傳送訊息
    if (!users.Contains(userId))
    {
        byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");
        await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
        return;
    }
    _logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");
    //傳送群組訊息
    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };
    _redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
}

加入群組之後則可以傳送和接收群組內的訊息了,給群組傳送訊息的格式如下

{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}

Method為Group代表著使用者加入群組的業務型別,Group則代表你要傳送到具體的群組的唯一標識,MsgBody則是傳送到群組內的訊息。最後再來看下訂閱群組內訊息的情況,也就是處理群組訊息的邏輯

private async Task SubGroupMsg(string channel)
{
    var sub = _redisClient.Subscribe(channel, async (channel, data) =>
    {
        //接收群組訂閱訊息
        ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
        byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");
        //獲取當前伺服器範例中當前群組的所有使用者連線
        GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);
        foreach (var user in currentGroup)
        {
            if (user == msgBody.FromId)
            {
                continue;
            }
            //通過群組內的使用者標識去使用者集合獲取使用者集合裡的使用者唯一連線傳送訊息
            if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open)
            {
                await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                currentGroup.Remove(user);
            }
        }
    });
    _disposables.TryAdd(channel, sub);
}

全員訊息處理

全員訊息處理相對來說思路比較簡單,因為當服務啟動的時候就會監聽redis的全員訊息頻道,這樣的話具體的實現也就只包含傳送和接收全員訊息了,首先看一下全員訊息傳送的邏輯

private async Task HandleAll(string id, object msgBody)
{
    _logger.LogInformation($"user {id} send:{msgBody}");
    //直接給redis的全員頻道傳送訊息
    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };
    _redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}

全員訊息的傳送資料格式如下所示

{"Method":"All", "MsgBody":"Hello All"}

Method為All代表著全員訊息型別,MsgBody則代表著具體訊息。接收訊息出裡同樣很簡單,訂閱redis全員訊息頻道,然後遍歷當前WebSocket伺服器範例內的所有使用者獲取連線傳送訊息,具體邏輯如下

private async Task SubAllMsg(string channel)
{
    var sub = _redisClient.Subscribe(channel, async (channel, data) =>
    {
        ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());
        byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");
        //獲取當前伺服器範例內所有使用者的連線
        foreach (var user in UserConnection)
        {
            //不給自己傳送訊息,因為傳送的時候可以通過具體的業務程式碼處理
            if (user.Key == msgBody.FromId)
            {
                continue;
            }
            //給每個使用者傳送訊息
            if (user.Value.State == WebSocketState.Open)
            {
                await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                _ = UserConnection.TryRemove(user.Key, out _);
            }
        }
    });
    _disposables.TryAdd(channel, sub);
}

範例原始碼

由於篇幅有限,沒辦法設計到全部的相關原始碼,因此在這裡貼出來github相關的地址,方便大家檢視和執行原始碼。相關的原始碼我這裡實現了兩個版本,一個是基於asp.net core的版本,一個是基於golang的版本。兩份原始碼的實現思路是一致的,所以這兩份程式碼可以執行在一套叢集範例裡,設定在一套nginx裡,並且連線到同一個redis範例裡即可

倉庫裡還涉及到本人閒暇之餘開源的其他倉庫,由於本人能力有限難登大雅之堂,就不做廣告了,有興趣的同學可以自行瀏覽一下。

總結

本文基於ASP.NET Core框架提供了一個基於WebSocket做叢集的範例,由於思想是通用的,所以基於這個思路樓主也實現了golang版本。其實在之前就想自己動手搞一搞關於WebSocket叢集方面的設計,本篇文章算是對之前想法的一個落地操作。其核心思路文章已經做了相關介紹,由於這些只是博主關於構思的實現,可能有很多細節尚未體現到,還希望大家多多理解。其核心思路總結一下

  • 首先是,利用可以構建WebSocket服務的框架,在當前服務範例中儲存當前使用者端使用者和WebSocket的連線關係
  • 如果訊息的目標使用者端不在當前伺服器,可以利用redis頻道、訊息佇列相關、甚至是資料庫類的共用回話傳送的訊息,由目標伺服器獲取目標是否屬於自己的ws對談
  • 本文設計的思路使用的是無狀態的方式,即WebSocket服務範例之間不存在直接的訊息通訊和相互的服務地址儲存,當然也可以利用redis等儲存線上使用者資訊等,這個可以參考具體業務自行設計

讀萬卷書,行萬里路。在這個時刻都在變化點的環境裡,唯有不斷的進化自己,多接觸多嘗試不用的事物,多擴充套件自己的認知思維,方能構建自己的底層邏輯。畢竟越底層越抽象,越通用越抽象。面對未知的挑戰,自身作為自己堅強的後盾,可能才會讓自己更踏實。

以上就是ASP.NET Core WebSocket叢集實現思路詳解的詳細內容,更多關於ASP.NET Core WebSocket的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com