using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
namespace Common;
public abstract class AbsPipeLine(bool isAES)
{
///
/// pipeLine工作函数,生效期间永久阻塞
///
/// 回调函数,从pipeline接收到的消息
/// 连接的远程地址
/// 第一次返回连接信息,第二次当pipeline 被断开时返回,此时可能是正常完成断开,或异常发生断开
public abstract IAsyncEnumerable Work(Func receiveCb, string addr = "");
protected Func ReceiveMsg = (byte[] a) =>
{
return true;
};
///
/// 监听pipeline 消息,由Work 函数调用
///
/// 消息回调
///
protected abstract Task Listen(Func receiveCb);
///
/// 关闭连接
///
/// 关闭原因
///
public abstract Task Close(string? CloseReason);
///
/// 向管道中发送消息
///
///
///
public abstract Task SendMsg(SyncMsg msg);
///
/// 打包文件上传
///
/// 上传地址
/// 上传的文件路径
/// 上传进度回调(现在没有回调)
/// 上传完成时返回/
public abstract Task UploadFile(string url, string filePath, Func progressCb);
///
/// 管道消息是否使用AES加密
///
protected readonly bool IsAES = isAES;
}
public class WebSocPipeLine(TSocket socket, bool isAES) : AbsPipeLine(isAES)
where TSocket : WebSocket
{
public readonly TSocket Socket = socket;
public override async Task UploadFile(
string url,
string filePath,
Func progressCb
)
{
using var client = new HttpClient();
using var content = new MultipartFormDataContent();
using var fileStream = new FileStream(filePath, FileMode.Open);
// TODO 上传进度回调
var progress = new Progress(
(current) =>
{
progressCb(current);
}
);
var fileContent = new ProgressStreamContent(fileStream, progress);
content.Add(fileContent, "file", Path.GetFileName(filePath));
var it = await client.PostAsync("http://" + url + "/UploadFile", content);
if (it.StatusCode != System.Net.HttpStatusCode.OK)
{
throw new Exception(it.Content.ReadAsStringAsync().Result);
}
}
public override async IAsyncEnumerable Work(Func receiveCb, string addr = "")
{
if (Socket is ClientWebSocket CSocket)
{
//连接失败会抛出异常
await CSocket.ConnectAsync(new Uri("ws://" + addr), CancellationToken.None);
yield return 0;
}
// 从controller 来,这个已经连接上了
else if (Socket is WebSocket)
{
yield return 0;
}
await Listen(receiveCb);
yield return 1;
}
protected override async Task Listen(Func receiveCb)
{
//warning 最大支持10MB,这由需要同步的文件数量大小决定 UTF-8 每个字符,汉字视为4个字节,数字1个 ,英文字母2个。1MB=256KB*4,25万个字符能描述就行
var buffer = new byte[10 * 1024 * 1024];
while (Socket.State == WebSocketState.Open)
{
var receiveResult = await Socket.ReceiveAsync(
new ArraySegment(buffer),
CancellationToken.None
);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
throw new Exception(receiveResult.CloseStatusDescription);
}
else
{
var nbuffer = new byte[receiveResult.Count];
System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count);
if (IsAES)
{
var nnbuffer = AESHelper.DecryptStringFromBytes_Aes(nbuffer);
receiveCb(Encoding.UTF8.GetBytes(nnbuffer));
}
else
{
receiveCb(nbuffer);
}
}
}
}
public override async Task Close(string? CloseReason)
{
if (Socket.State == WebSocketState.Open)
{
//CloseReason 最大不能超过123bytes.https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close
//若传递超过这个限制,此处表现WebSocket将会卡住,无法关闭。
if (Encoding.UTF8.GetBytes(CloseReason ?? "").Length > 120)
{
await SendMsg(
new SyncMsg
{
Type = SyncMsgType.Error,
Step = SyncProcessStep.Close,
Body = CloseReason ?? ""
}
);
await Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"查看上一条错误信息!",
CancellationToken.None
);
}
else
{
await Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
CloseReason,
CancellationToken.None
);
}
}
}
public override async Task SendMsg(SyncMsg msg)
{
string msgStr = JsonSerializer.Serialize(msg);
var it = AESHelper.EncryptStringToBytes_Aes(msgStr);
if (IsAES)
{
// 加密后的字符,使用Binary 发送,加密通常不会发送到最前端,通常是 js 写的websocket
await Socket.SendAsync(
new ArraySegment(it),
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
else
{
await Socket.SendAsync(
new ArraySegment(Encoding.UTF8.GetBytes(msgStr)),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
}
}