diff --git a/Server/Common/ConnectPipeline.cs b/Server/Common/ConnectPipeline.cs new file mode 100644 index 0000000..26f3b63 --- /dev/null +++ b/Server/Common/ConnectPipeline.cs @@ -0,0 +1,103 @@ +using System.Net.WebSockets; +using System.Text; +using System.Text.Json; + +namespace Common; + +public abstract class AbsPipeLine +{ + public abstract IAsyncEnumerable Work(Func receiveCb, string addr = ""); + protected Func ReceiveMsg = (byte[] a) => + { + return true; + }; + protected abstract Task Listen(Func receiveCb); + + /// + /// 关闭连接 + /// + /// + /// + public abstract Task Close(string? CloseReason); + + /// + /// 发送消息 + /// + /// + /// + public abstract Task SendMsg(SyncMsg msg); +} + +public class WebSocPipeLine(TSocket socket) : AbsPipeLine + where TSocket : WebSocket +{ + public readonly TSocket Socket = socket; + + public override async IAsyncEnumerable Work( + Func receiveCb, + string addr = "" + ) + { + if (Socket is ClientWebSocket CSocket) + { + //连接失败会抛出异常 + await CSocket.ConnectAsync(new Uri(addr), CancellationToken.None); + yield return 0; + } + // 从controller 来,这个已经连接上了 + else if (Socket is WebSocket) + { + yield return 0; + } + await Listen(receiveCb); + yield return 1; + } + + protected override async Task Listen(Func receiveCb) + { + //最大1MB!= + var buffer = new byte[1024 * 1024]; + + while (Socket.State == WebSocketState.Open) + { + var receiveResult = await Socket.ReceiveAsync( + new ArraySegment(buffer), + CancellationToken.None + ); + + if (receiveResult.MessageType == WebSocketMessageType.Close) + { + throw new Exception(receiveResult.CloseStatusDescription); + } + else + { + var nbuffer = new byte[receiveResult.Count]; + System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count); + receiveCb(nbuffer); + } + } + } + + public override async Task Close(string? CloseReason) + { + if (Socket.State == WebSocketState.Open) + { + await Socket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + CloseReason, + CancellationToken.None + ); + } + } + + public override async Task SendMsg(SyncMsg msg) + { + string msgStr = JsonSerializer.Serialize(msg); + await Socket.SendAsync( + new ArraySegment(Encoding.UTF8.GetBytes(msgStr)), + WebSocketMessageType.Text, + true, + CancellationToken.None + ); + } +} diff --git a/Server/LocalServer/Controllers/LocalServerController.cs b/Server/LocalServer/Controllers/LocalServerController.cs index 297dff3..99d15b7 100644 --- a/Server/LocalServer/Controllers/LocalServerController.cs +++ b/Server/LocalServer/Controllers/LocalServerController.cs @@ -1,7 +1,8 @@ using System.Net.NetworkInformation; using System.Text; using Microsoft.AspNetCore.Mvc; - +using Common; +using System.Net.WebSockets; namespace LocalServer.Controllers { public class LocalServerController(LocalSyncServerFactory factory) : ControllerBase @@ -16,7 +17,8 @@ namespace LocalServer.Controllers try { var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); - Factory.CreateLocalSyncServer(webSocket, Name); + var pipeLine = new WebSocPipeLine(webSocket); + Factory.CreateLocalSyncServer(pipeLine, Name); } catch (Exception e) { diff --git a/Server/LocalServer/LocalSyncServer.cs b/Server/LocalServer/LocalSyncServer.cs index 428a674..1bcf3ad 100644 --- a/Server/LocalServer/LocalSyncServer.cs +++ b/Server/LocalServer/LocalSyncServer.cs @@ -14,34 +14,26 @@ public class LocalSyncServer public Config? SyncConfig; - public Config NotNullSyncConfig {get { - if (SyncConfig == null) + public Config NotNullSyncConfig + { + get { - throw new ArgumentNullException("SyncConfig"); + if (SyncConfig == null) + { + throw new ArgumentNullException("SyncConfig"); + } + return SyncConfig; } - return SyncConfig; - }} + } /// /// 发布源连接 /// - public readonly WebSocket LocalSocket; - - /// - /// 发布源-缓冲区,存储数据 最大1MB - /// - // public byte[] Buffer = new byte[1024 * 1024]; - - /// - /// 发布目标-连接 - /// - public readonly ClientWebSocket RemoteSocket = new(); - - /// - /// 发布开始时间 - /// - private readonly DateTime StartTime = DateTime.Now; + public readonly AbsPipeLine LocalPipe; + public readonly AbsPipeLine RemotePipe = new WebSocPipeLine( + new ClientWebSocket() + ); /// /// 发布名称 /// @@ -52,148 +44,39 @@ public class LocalSyncServer /// public readonly LocalSyncServerFactory Factory; - public LocalSyncServer(WebSocket socket, string name, LocalSyncServerFactory factory) + public LocalSyncServer(AbsPipeLine pipe, string name, LocalSyncServerFactory factory) { - LocalSocket = socket; + LocalPipe = pipe; Name = name; Factory = factory; StateHelper = new ConnectAuthorityHelper(this); - } - public async Task RemoteSocketConnect() - { - if (SyncConfig != null) + Task.Run(async () => { - await RemoteSocket.ConnectAsync( - new Uri(SyncConfig.RemoteUrl + "/websoc"), - CancellationToken.None + var rs = LocalPipe.Work( + (byte[] b) => + { + return StateHelper.ReceiveLocalMsg(b); + } ); - } - else - { - throw new ArgumentException("SyncConfig is null!"); - } - } - - public async Task RemoteSocketLiten() - { - string CloseMsg = "任务结束关闭"; - try - { - while (RemoteSocket.State == WebSocketState.Open) + try { - var buffer = new byte[1024 * 1024]; - var receiveResult = await RemoteSocket.ReceiveAsync( - new ArraySegment(buffer), - CancellationToken.None - ); - if (receiveResult.MessageType == WebSocketMessageType.Close) - { - Close(receiveResult.CloseStatusDescription); - } - else - { - var nbuffer = new byte[receiveResult.Count]; - System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count); - StateHelper.ReceiveRemoteMsg(nbuffer); - } + await foreach (var r in rs) { } } - } - catch (Exception e) - { - CloseMsg = e.Message; - } - finally - { - Close(CloseMsg); - } - } - - public async Task LocalSocketListen() - { - string CloseMsg = "任务结束关闭"; - try - { - //最大1MB!= - var buffer = new byte[1024 * 1024]; - - while (LocalSocket.State == WebSocketState.Open) + catch (Exception e) { - var receiveResult = await LocalSocket.ReceiveAsync( - new ArraySegment(buffer), - CancellationToken.None - ); - - if (receiveResult.MessageType == WebSocketMessageType.Close) - { - Close(receiveResult.CloseStatusDescription); - } - else - { - StateHelper.ReceiveLocalMsg( - Encoding.UTF8.GetString(buffer, 0, receiveResult.Count) - ); - } + Close(e.Message); } - } - catch (Exception e) - { - CloseMsg = e.Message; - } - finally - { - Close(CloseMsg); - } + }); } - public async Task LocalSocketSendMsg(object msgOb) - { - string msg = JsonSerializer.Serialize(msgOb); - await LocalSocket.SendAsync( - new ArraySegment(Encoding.UTF8.GetBytes(msg)), - WebSocketMessageType.Text, - true, - CancellationToken.None - ); - } - - public async Task RemoteSocketSendMsg(object msgOb) - { - string msg = JsonSerializer.Serialize(msgOb); - var buffer = AESHelper.EncryptStringToBytes_Aes(msg); - await RemoteSocket.SendAsync( - buffer, - WebSocketMessageType.Text, - true, - CancellationToken.None - ); - } public void Close(string? CloseReason) { try { - if (LocalSocket.State == WebSocketState.Open) - { - LocalSocket - .CloseAsync( - WebSocketCloseStatus.NormalClosure, - CloseReason, - CancellationToken.None - ) - .Wait(60 * 1000); - } - - if (RemoteSocket.State == WebSocketState.Open) - { - RemoteSocket - .CloseAsync( - WebSocketCloseStatus.NormalClosure, - CloseReason, - CancellationToken.None - ) - .Wait(60 * 1000); - } + LocalPipe.Close(CloseReason); + RemotePipe.Close(CloseReason); } catch (Exception e) { diff --git a/Server/LocalServer/LocalSyncServerFactory.cs b/Server/LocalServer/LocalSyncServerFactory.cs index 19aada5..1d5435d 100644 --- a/Server/LocalServer/LocalSyncServerFactory.cs +++ b/Server/LocalServer/LocalSyncServerFactory.cs @@ -1,26 +1,22 @@ using System.Net.WebSockets; - +using Common; namespace LocalServer; public class LocalSyncServerFactory { private readonly object Lock = new(); - public void CreateLocalSyncServer(WebSocket socket, string name) + public void CreateLocalSyncServer(AbsPipeLine pipeLine, string name) { if (Servers.Select(x => x.Name == name).Any()) { throw new Exception("LocalServer:存在同名发布源!"); } - var server = new LocalSyncServer(socket, name, this); + var server = new LocalSyncServer(pipeLine, name, this); lock (Lock) { Servers.Add(server); } - //脱离当前函数栈 - Task.Run(async ()=>{ - await server.LocalSocketListen(); - }); } private readonly List Servers = []; diff --git a/Server/LocalServer/StateHelper.cs b/Server/LocalServer/StateHelper.cs index 4dfc78f..758e84b 100644 --- a/Server/LocalServer/StateHelper.cs +++ b/Server/LocalServer/StateHelper.cs @@ -32,7 +32,7 @@ public abstract class StateHelpBase( return new SyncMsg(type, Step, body); } - public void ReceiveLocalMsg(string msg) + public bool ReceiveLocalMsg(byte[] msg) { var syncMsg = JsonSerializer.Deserialize(msg) @@ -42,9 +42,10 @@ public abstract class StateHelpBase( throw new Exception("Sync step error!"); } HandleLocalMsg(syncMsg); + return true; } - public void ReceiveRemoteMsg(byte[] bytes) + public bool ReceiveRemoteMsg(byte[] bytes) { var msg = AESHelper.DecryptStringFromBytes_Aes(bytes); @@ -56,6 +57,7 @@ public abstract class StateHelpBase( throw new Exception("Sync step error!"); } HandleLocalMsg(syncMsg); + return true; } protected abstract void HandleRemoteMsg(SyncMsg msg); @@ -74,7 +76,7 @@ public class ConnectAuthorityHelper(LocalSyncServer context) protected override void HandleRemoteMsg(SyncMsg msg) { //将remote的消息传递到前端界面 - Context.LocalSocketSendMsg(msg).Wait(); + Context.LocalPipe.SendMsg(msg); //下一步 var deployHelper = new DeployHelper(Context); Context.StateHelper = deployHelper; @@ -86,18 +88,24 @@ public class ConnectAuthorityHelper(LocalSyncServer context) //收到配置文件 var config = JsonSerializer.Deserialize(msg.Body); Context.SyncConfig = config; - Context.RemoteSocketConnect().Wait(60 * 1000); Task.Run(async () => { - if (Context.SyncConfig != null) + var rs = Context.RemotePipe.Work( + (byte[] b) => + { + return Context.StateHelper.ReceiveRemoteMsg(b); + }, + Context.NotNullSyncConfig.RemoteUrl + "/websoc" + ); + await foreach (var r in rs) { - await Context.RemoteSocketSendMsg(CreateMsg(Context.SyncConfig.RemotePwd)); + if (r == 0) + { + await Context.RemotePipe.SendMsg( + CreateMsg(Context.NotNullSyncConfig.RemotePwd) + ); + } } - else - { - throw new NullReferenceException("Config is null!"); - } - await Context.RemoteSocketLiten(); }); } } @@ -113,7 +121,7 @@ public class DeployHelper(LocalSyncServer context) { if (Context.NotNullSyncConfig.IsDeployProject == false) { - Context.LocalSocketSendMsg(CreateMsg("配置为不发布跳过此步骤")).Wait(); + Context.LocalPipe.SendMsg(CreateMsg("配置为不发布跳过此步骤")).Wait(); var h = new DiffFileAndPackHelper(Context); Context.StateHelper = h; h.DiffProcess(); @@ -145,14 +153,14 @@ public class DeployHelper(LocalSyncServer context) if (process.ExitCode == 0) { - Context.LocalSocketSendMsg(CreateMsg("发布成功!")).Wait(); + Context.LocalPipe.SendMsg(CreateMsg("发布成功!")).Wait(); var h = new DiffFileAndPackHelper(Context); Context.StateHelper = h; h.DiffProcess(); } else { - Context.LocalSocketSendMsg(CreateErrMsg(output)).Wait(); + Context.LocalPipe.SendMsg(CreateErrMsg(output)).Wait(); throw new Exception("执行发布错误,错误信息参考上一条消息!"); } } @@ -181,17 +189,13 @@ public class DiffFileAndPackHelper(LocalSyncServer context) }); //将配置信息发送到remoteServer Context - .RemoteSocketSendMsg(CreateMsg(JsonSerializer.Serialize(Context.NotNullSyncConfig))) + .RemotePipe.SendMsg(CreateMsg(JsonSerializer.Serialize(Context.NotNullSyncConfig))) .Wait(); } protected override void HandleLocalMsg(SyncMsg msg) { } - protected override void HandleRemoteMsg(SyncMsg msg) { - - - - } + protected override void HandleRemoteMsg(SyncMsg msg) { } } // /// diff --git a/Server/RemoteServer/Controllers/SyncFilesController.cs b/Server/RemoteServer/Controllers/RemoteServerController.cs similarity index 79% rename from Server/RemoteServer/Controllers/SyncFilesController.cs rename to Server/RemoteServer/Controllers/RemoteServerController.cs index 0a68cef..ec4bf61 100644 --- a/Server/RemoteServer/Controllers/SyncFilesController.cs +++ b/Server/RemoteServer/Controllers/RemoteServerController.cs @@ -1,10 +1,34 @@ using Microsoft.AspNetCore.Mvc; using RemoteServer.Models; +using System.Text; namespace RemoteServer.Controllers; -public class SyncFilesController(SqliteDbContext db) : ControllerBase +public class SyncFilesController(RemoteSyncServerFactory factory, SqliteDbContext db) : ControllerBase { private readonly SqliteDbContext _db = db; + private readonly RemoteSyncServerFactory Factory = factory; + + [Route("/websoc")] + public async Task WebsocketConnection(string Name) + { + if (HttpContext.WebSockets.IsWebSocketRequest) + { + try + { + var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); + Factory.CreateLocalSyncServer(webSocket, Name); + } + catch (Exception e) + { + HttpContext.Response.Body = new MemoryStream(Encoding.UTF8.GetBytes(e.Message)); + HttpContext.Response.StatusCode = StatusCodes.Status406NotAcceptable; + } + } + else + { + HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest; + } + } [HttpGet("/GetSyncFilesLogs")] public IActionResult GetSyncFilesLogs( diff --git a/Server/RemoteServer/Program.cs b/Server/RemoteServer/Program.cs index 6e0b33b..4848d0f 100644 --- a/Server/RemoteServer/Program.cs +++ b/Server/RemoteServer/Program.cs @@ -1,12 +1,25 @@ using Microsoft.EntityFrameworkCore; +using RemoteServer; using RemoteServer.Models; var builder = WebApplication.CreateBuilder(args); +ConfigurationBuilder configurationBuilder = new(); + // Add services to the container. +//添加配置文件路径 +configurationBuilder + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json"); + +//加载文件 +IConfiguration _configuration = configurationBuilder.Build(); +RemoteSyncServer.TempRootFile = _configuration["TempDir"] ?? "C:/TempPack"; +; builder.Services.AddControllers(); -builder.Services.AddDbContext(opions=>{ +builder.Services.AddDbContext(opions => +{ opions.UseSqlite(builder.Configuration.GetConnectionString("DbPath")); }); @@ -23,7 +36,7 @@ if (app.Environment.IsDevelopment()) } app.UseWebSockets(); app.Urls.Clear(); -app.Urls.Add("http://0.0.0.0:6828"); +app.Urls.Add("http://0.0.0.0:6818"); app.MapControllers(); app.Run(); diff --git a/Server/RemoteServer/RemoteSyncServer.cs b/Server/RemoteServer/RemoteSyncServer.cs index 40d33a3..efb8e1e 100644 --- a/Server/RemoteServer/RemoteSyncServer.cs +++ b/Server/RemoteServer/RemoteSyncServer.cs @@ -1,6 +1,4 @@ - using System.Net.WebSockets; -using System.Text; using System.Text.Json; using Common; @@ -28,12 +26,6 @@ public class RemoteSyncServer /// public readonly WebSocket RemoteSocket; - /// - /// 发布源-缓冲区,存储数据 最大1MB - /// - public byte[] Buffer = new byte[1024 * 1024]; - - /// /// 发布开始时间 /// @@ -95,17 +87,6 @@ public class RemoteSyncServer } } - // public async Task LocalSocketSendMsg(object msgOb) - // { - // string msg = JsonSerializer.Serialize(msgOb); - // await RemoteSocket.SendAsync( - // new ArraySegment(Encoding.UTF8.GetBytes(msg)), - // WebSocketMessageType.Text, - // true, - // CancellationToken.None - // ); - // } - public async Task RemoteSocketSendMsg(object msgOb) { string msg = JsonSerializer.Serialize(msgOb);