using System.Net.WebSockets; using System.Text; using System.Text.Json; namespace Common; public abstract class AbsPipeLine(bool isAES) { public abstract IAsyncEnumerable Work(Func receiveCb, string addr = ""); protected Func ReceiveMsg = (byte[] a) => { return true; }; protected abstract Task Listen(Func receiveCb); /// /// 关闭连接 /// /// /// public abstract Task Close(string? CloseReason); /// /// 发送消息 /// /// /// public abstract Task SendMsg(SyncMsg msg); protected readonly bool IsAES = isAES; } public class WebSocPipeLine(TSocket socket, bool isAES) : AbsPipeLine(isAES) where TSocket : WebSocket { public readonly TSocket Socket = socket; public override async IAsyncEnumerable Work(Func receiveCb, string addr = "") { if (Socket is ClientWebSocket CSocket) { //连接失败会抛出异常 await CSocket.ConnectAsync(new Uri(addr), CancellationToken.None); yield return 0; } // 从controller 来,这个已经连接上了 else if (Socket is WebSocket) { yield return 0; } await Listen(receiveCb); yield return 1; } protected override async Task Listen(Func receiveCb) { //最大1MB!= var buffer = new byte[1024 * 1024]; while (Socket.State == WebSocketState.Open) { var receiveResult = await Socket.ReceiveAsync( new ArraySegment(buffer), CancellationToken.None ); if (receiveResult.MessageType == WebSocketMessageType.Close) { throw new Exception(receiveResult.CloseStatusDescription); } else { var nbuffer = new byte[receiveResult.Count]; System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count); receiveCb(nbuffer); } } } public override async Task Close(string? CloseReason) { if (Socket.State == WebSocketState.Open) { await Socket.CloseAsync( WebSocketCloseStatus.NormalClosure, CloseReason, CancellationToken.None ); } } public override async Task SendMsg(SyncMsg msg) { string msgStr = JsonSerializer.Serialize(msg); await Socket.SendAsync( IsAES ? AESHelper.EncryptStringToBytes_Aes(msgStr) : new ArraySegment(Encoding.UTF8.GetBytes(msgStr)), WebSocketMessageType.Text, true, CancellationToken.None ); } }