style: 将连接抽象为一个单独的模块,符合单一职责原则,增加了系统的可扩展性

- 好的设计模式也将减少代码
This commit is contained in:
zerlei 2024-09-05 17:48:08 +08:00
parent 62366f0d97
commit 8d1e08a370
8 changed files with 202 additions and 196 deletions

View file

@ -0,0 +1,103 @@
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
namespace Common;
public abstract class AbsPipeLine
{
public abstract IAsyncEnumerable<int> Work(Func<byte[], bool> receiveCb, string addr = "");
protected Func<byte[], bool> ReceiveMsg = (byte[] a) =>
{
return true;
};
protected abstract Task Listen(Func<byte[], bool> receiveCb);
/// <summary>
/// 关闭连接
/// </summary>
/// <param name="CloseReason"></param>
/// <returns></returns>
public abstract Task Close(string? CloseReason);
/// <summary>
/// 发送消息
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
public abstract Task SendMsg(SyncMsg msg);
}
public class WebSocPipeLine<TSocket>(TSocket socket) : AbsPipeLine
where TSocket : WebSocket
{
public readonly TSocket Socket = socket;
public override async IAsyncEnumerable<int> Work(
Func<byte[], bool> receiveCb,
string addr = ""
)
{
if (Socket is ClientWebSocket CSocket)
{
//连接失败会抛出异常
await CSocket.ConnectAsync(new Uri(addr), CancellationToken.None);
yield return 0;
}
// 从controller 来,这个已经连接上了
else if (Socket is WebSocket)
{
yield return 0;
}
await Listen(receiveCb);
yield return 1;
}
protected override async Task Listen(Func<byte[], bool> receiveCb)
{
//最大1MB!=
var buffer = new byte[1024 * 1024];
while (Socket.State == WebSocketState.Open)
{
var receiveResult = await Socket.ReceiveAsync(
new ArraySegment<byte>(buffer),
CancellationToken.None
);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
throw new Exception(receiveResult.CloseStatusDescription);
}
else
{
var nbuffer = new byte[receiveResult.Count];
System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count);
receiveCb(nbuffer);
}
}
}
public override async Task Close(string? CloseReason)
{
if (Socket.State == WebSocketState.Open)
{
await Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
CloseReason,
CancellationToken.None
);
}
}
public override async Task SendMsg(SyncMsg msg)
{
string msgStr = JsonSerializer.Serialize(msg);
await Socket.SendAsync(
new ArraySegment<byte>(Encoding.UTF8.GetBytes(msgStr)),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
}

View file

@ -1,7 +1,8 @@
using System.Net.NetworkInformation; using System.Net.NetworkInformation;
using System.Text; using System.Text;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Common;
using System.Net.WebSockets;
namespace LocalServer.Controllers namespace LocalServer.Controllers
{ {
public class LocalServerController(LocalSyncServerFactory factory) : ControllerBase public class LocalServerController(LocalSyncServerFactory factory) : ControllerBase
@ -16,7 +17,8 @@ namespace LocalServer.Controllers
try try
{ {
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
Factory.CreateLocalSyncServer(webSocket, Name); var pipeLine = new WebSocPipeLine<WebSocket>(webSocket);
Factory.CreateLocalSyncServer(pipeLine, Name);
} }
catch (Exception e) catch (Exception e)
{ {

View file

@ -14,34 +14,26 @@ public class LocalSyncServer
public Config? SyncConfig; public Config? SyncConfig;
public Config NotNullSyncConfig {get { public Config NotNullSyncConfig
if (SyncConfig == null) {
get
{ {
throw new ArgumentNullException("SyncConfig"); if (SyncConfig == null)
{
throw new ArgumentNullException("SyncConfig");
}
return SyncConfig;
} }
return SyncConfig; }
}}
/// <summary> /// <summary>
/// 发布源连接 /// 发布源连接
/// </summary> /// </summary>
public readonly WebSocket LocalSocket; public readonly AbsPipeLine LocalPipe;
/// <summary>
/// 发布源-缓冲区,存储数据 最大1MB
/// </summary>
// public byte[] Buffer = new byte[1024 * 1024];
/// <summary>
/// 发布目标-连接
/// </summary>
public readonly ClientWebSocket RemoteSocket = new();
/// <summary>
/// 发布开始时间
/// </summary>
private readonly DateTime StartTime = DateTime.Now;
public readonly AbsPipeLine RemotePipe = new WebSocPipeLine<ClientWebSocket>(
new ClientWebSocket()
);
/// <summary> /// <summary>
/// 发布名称 /// 发布名称
/// </summary> /// </summary>
@ -52,148 +44,39 @@ public class LocalSyncServer
/// </summary> /// </summary>
public readonly LocalSyncServerFactory Factory; public readonly LocalSyncServerFactory Factory;
public LocalSyncServer(WebSocket socket, string name, LocalSyncServerFactory factory) public LocalSyncServer(AbsPipeLine pipe, string name, LocalSyncServerFactory factory)
{ {
LocalSocket = socket; LocalPipe = pipe;
Name = name; Name = name;
Factory = factory; Factory = factory;
StateHelper = new ConnectAuthorityHelper(this); StateHelper = new ConnectAuthorityHelper(this);
}
public async Task RemoteSocketConnect() Task.Run(async () =>
{
if (SyncConfig != null)
{ {
await RemoteSocket.ConnectAsync( var rs = LocalPipe.Work(
new Uri(SyncConfig.RemoteUrl + "/websoc"), (byte[] b) =>
CancellationToken.None {
return StateHelper.ReceiveLocalMsg(b);
}
); );
} try
else
{
throw new ArgumentException("SyncConfig is null!");
}
}
public async Task RemoteSocketLiten()
{
string CloseMsg = "任务结束关闭";
try
{
while (RemoteSocket.State == WebSocketState.Open)
{ {
var buffer = new byte[1024 * 1024]; await foreach (var r in rs) { }
var receiveResult = await RemoteSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
CancellationToken.None
);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
Close(receiveResult.CloseStatusDescription);
}
else
{
var nbuffer = new byte[receiveResult.Count];
System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count);
StateHelper.ReceiveRemoteMsg(nbuffer);
}
} }
} catch (Exception e)
catch (Exception e)
{
CloseMsg = e.Message;
}
finally
{
Close(CloseMsg);
}
}
public async Task LocalSocketListen()
{
string CloseMsg = "任务结束关闭";
try
{
//最大1MB!=
var buffer = new byte[1024 * 1024];
while (LocalSocket.State == WebSocketState.Open)
{ {
var receiveResult = await LocalSocket.ReceiveAsync( Close(e.Message);
new ArraySegment<byte>(buffer),
CancellationToken.None
);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
Close(receiveResult.CloseStatusDescription);
}
else
{
StateHelper.ReceiveLocalMsg(
Encoding.UTF8.GetString(buffer, 0, receiveResult.Count)
);
}
} }
} });
catch (Exception e)
{
CloseMsg = e.Message;
}
finally
{
Close(CloseMsg);
}
} }
public async Task LocalSocketSendMsg(object msgOb)
{
string msg = JsonSerializer.Serialize(msgOb);
await LocalSocket.SendAsync(
new ArraySegment<byte>(Encoding.UTF8.GetBytes(msg)),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
public async Task RemoteSocketSendMsg(object msgOb)
{
string msg = JsonSerializer.Serialize(msgOb);
var buffer = AESHelper.EncryptStringToBytes_Aes(msg);
await RemoteSocket.SendAsync(
buffer,
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
public void Close(string? CloseReason) public void Close(string? CloseReason)
{ {
try try
{ {
if (LocalSocket.State == WebSocketState.Open) LocalPipe.Close(CloseReason);
{ RemotePipe.Close(CloseReason);
LocalSocket
.CloseAsync(
WebSocketCloseStatus.NormalClosure,
CloseReason,
CancellationToken.None
)
.Wait(60 * 1000);
}
if (RemoteSocket.State == WebSocketState.Open)
{
RemoteSocket
.CloseAsync(
WebSocketCloseStatus.NormalClosure,
CloseReason,
CancellationToken.None
)
.Wait(60 * 1000);
}
} }
catch (Exception e) catch (Exception e)
{ {

View file

@ -1,26 +1,22 @@
using System.Net.WebSockets; using System.Net.WebSockets;
using Common;
namespace LocalServer; namespace LocalServer;
public class LocalSyncServerFactory public class LocalSyncServerFactory
{ {
private readonly object Lock = new(); private readonly object Lock = new();
public void CreateLocalSyncServer(WebSocket socket, string name) public void CreateLocalSyncServer(AbsPipeLine pipeLine, string name)
{ {
if (Servers.Select(x => x.Name == name).Any()) if (Servers.Select(x => x.Name == name).Any())
{ {
throw new Exception("LocalServer:存在同名发布源!"); throw new Exception("LocalServer:存在同名发布源!");
} }
var server = new LocalSyncServer(socket, name, this); var server = new LocalSyncServer(pipeLine, name, this);
lock (Lock) lock (Lock)
{ {
Servers.Add(server); Servers.Add(server);
} }
//脱离当前函数栈
Task.Run(async ()=>{
await server.LocalSocketListen();
});
} }
private readonly List<LocalSyncServer> Servers = []; private readonly List<LocalSyncServer> Servers = [];

View file

@ -32,7 +32,7 @@ public abstract class StateHelpBase(
return new SyncMsg(type, Step, body); return new SyncMsg(type, Step, body);
} }
public void ReceiveLocalMsg(string msg) public bool ReceiveLocalMsg(byte[] msg)
{ {
var syncMsg = var syncMsg =
JsonSerializer.Deserialize<SyncMsg>(msg) JsonSerializer.Deserialize<SyncMsg>(msg)
@ -42,9 +42,10 @@ public abstract class StateHelpBase(
throw new Exception("Sync step error!"); throw new Exception("Sync step error!");
} }
HandleLocalMsg(syncMsg); HandleLocalMsg(syncMsg);
return true;
} }
public void ReceiveRemoteMsg(byte[] bytes) public bool ReceiveRemoteMsg(byte[] bytes)
{ {
var msg = AESHelper.DecryptStringFromBytes_Aes(bytes); var msg = AESHelper.DecryptStringFromBytes_Aes(bytes);
@ -56,6 +57,7 @@ public abstract class StateHelpBase(
throw new Exception("Sync step error!"); throw new Exception("Sync step error!");
} }
HandleLocalMsg(syncMsg); HandleLocalMsg(syncMsg);
return true;
} }
protected abstract void HandleRemoteMsg(SyncMsg msg); protected abstract void HandleRemoteMsg(SyncMsg msg);
@ -74,7 +76,7 @@ public class ConnectAuthorityHelper(LocalSyncServer context)
protected override void HandleRemoteMsg(SyncMsg msg) protected override void HandleRemoteMsg(SyncMsg msg)
{ {
//将remote的消息传递到前端界面 //将remote的消息传递到前端界面
Context.LocalSocketSendMsg(msg).Wait(); Context.LocalPipe.SendMsg(msg);
//下一步 //下一步
var deployHelper = new DeployHelper(Context); var deployHelper = new DeployHelper(Context);
Context.StateHelper = deployHelper; Context.StateHelper = deployHelper;
@ -86,18 +88,24 @@ public class ConnectAuthorityHelper(LocalSyncServer context)
//收到配置文件 //收到配置文件
var config = JsonSerializer.Deserialize<Config>(msg.Body); var config = JsonSerializer.Deserialize<Config>(msg.Body);
Context.SyncConfig = config; Context.SyncConfig = config;
Context.RemoteSocketConnect().Wait(60 * 1000);
Task.Run(async () => Task.Run(async () =>
{ {
if (Context.SyncConfig != null) var rs = Context.RemotePipe.Work(
(byte[] b) =>
{
return Context.StateHelper.ReceiveRemoteMsg(b);
},
Context.NotNullSyncConfig.RemoteUrl + "/websoc"
);
await foreach (var r in rs)
{ {
await Context.RemoteSocketSendMsg(CreateMsg(Context.SyncConfig.RemotePwd)); if (r == 0)
{
await Context.RemotePipe.SendMsg(
CreateMsg(Context.NotNullSyncConfig.RemotePwd)
);
}
} }
else
{
throw new NullReferenceException("Config is null!");
}
await Context.RemoteSocketLiten();
}); });
} }
} }
@ -113,7 +121,7 @@ public class DeployHelper(LocalSyncServer context)
{ {
if (Context.NotNullSyncConfig.IsDeployProject == false) if (Context.NotNullSyncConfig.IsDeployProject == false)
{ {
Context.LocalSocketSendMsg(CreateMsg("配置为不发布跳过此步骤")).Wait(); Context.LocalPipe.SendMsg(CreateMsg("配置为不发布跳过此步骤")).Wait();
var h = new DiffFileAndPackHelper(Context); var h = new DiffFileAndPackHelper(Context);
Context.StateHelper = h; Context.StateHelper = h;
h.DiffProcess(); h.DiffProcess();
@ -145,14 +153,14 @@ public class DeployHelper(LocalSyncServer context)
if (process.ExitCode == 0) if (process.ExitCode == 0)
{ {
Context.LocalSocketSendMsg(CreateMsg("发布成功!")).Wait(); Context.LocalPipe.SendMsg(CreateMsg("发布成功!")).Wait();
var h = new DiffFileAndPackHelper(Context); var h = new DiffFileAndPackHelper(Context);
Context.StateHelper = h; Context.StateHelper = h;
h.DiffProcess(); h.DiffProcess();
} }
else else
{ {
Context.LocalSocketSendMsg(CreateErrMsg(output)).Wait(); Context.LocalPipe.SendMsg(CreateErrMsg(output)).Wait();
throw new Exception("执行发布错误,错误信息参考上一条消息!"); throw new Exception("执行发布错误,错误信息参考上一条消息!");
} }
} }
@ -181,17 +189,13 @@ public class DiffFileAndPackHelper(LocalSyncServer context)
}); });
//将配置信息发送到remoteServer //将配置信息发送到remoteServer
Context Context
.RemoteSocketSendMsg(CreateMsg(JsonSerializer.Serialize(Context.NotNullSyncConfig))) .RemotePipe.SendMsg(CreateMsg(JsonSerializer.Serialize(Context.NotNullSyncConfig)))
.Wait(); .Wait();
} }
protected override void HandleLocalMsg(SyncMsg msg) { } protected override void HandleLocalMsg(SyncMsg msg) { }
protected override void HandleRemoteMsg(SyncMsg msg) { protected override void HandleRemoteMsg(SyncMsg msg) { }
}
} }
// /// <summary> // /// <summary>

View file

@ -1,10 +1,34 @@
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using RemoteServer.Models; using RemoteServer.Models;
using System.Text;
namespace RemoteServer.Controllers; namespace RemoteServer.Controllers;
public class SyncFilesController(SqliteDbContext db) : ControllerBase public class SyncFilesController(RemoteSyncServerFactory factory, SqliteDbContext db) : ControllerBase
{ {
private readonly SqliteDbContext _db = db; private readonly SqliteDbContext _db = db;
private readonly RemoteSyncServerFactory Factory = factory;
[Route("/websoc")]
public async Task WebsocketConnection(string Name)
{
if (HttpContext.WebSockets.IsWebSocketRequest)
{
try
{
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
Factory.CreateLocalSyncServer(webSocket, Name);
}
catch (Exception e)
{
HttpContext.Response.Body = new MemoryStream(Encoding.UTF8.GetBytes(e.Message));
HttpContext.Response.StatusCode = StatusCodes.Status406NotAcceptable;
}
}
else
{
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
}
}
[HttpGet("/GetSyncFilesLogs")] [HttpGet("/GetSyncFilesLogs")]
public IActionResult GetSyncFilesLogs( public IActionResult GetSyncFilesLogs(

View file

@ -1,12 +1,25 @@
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using RemoteServer;
using RemoteServer.Models; using RemoteServer.Models;
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
ConfigurationBuilder configurationBuilder = new();
// Add services to the container. // Add services to the container.
//添加配置文件路径
configurationBuilder
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json");
//加载文件
IConfiguration _configuration = configurationBuilder.Build();
RemoteSyncServer.TempRootFile = _configuration["TempDir"] ?? "C:/TempPack";
;
builder.Services.AddControllers(); builder.Services.AddControllers();
builder.Services.AddDbContext<SqliteDbContext>(opions=>{ builder.Services.AddDbContext<SqliteDbContext>(opions =>
{
opions.UseSqlite(builder.Configuration.GetConnectionString("DbPath")); opions.UseSqlite(builder.Configuration.GetConnectionString("DbPath"));
}); });
@ -23,7 +36,7 @@ if (app.Environment.IsDevelopment())
} }
app.UseWebSockets(); app.UseWebSockets();
app.Urls.Clear(); app.Urls.Clear();
app.Urls.Add("http://0.0.0.0:6828"); app.Urls.Add("http://0.0.0.0:6818");
app.MapControllers(); app.MapControllers();
app.Run(); app.Run();

View file

@ -1,6 +1,4 @@
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Text;
using System.Text.Json; using System.Text.Json;
using Common; using Common;
@ -28,12 +26,6 @@ public class RemoteSyncServer
/// </summary> /// </summary>
public readonly WebSocket RemoteSocket; public readonly WebSocket RemoteSocket;
/// <summary>
/// 发布源-缓冲区,存储数据 最大1MB
/// </summary>
public byte[] Buffer = new byte[1024 * 1024];
/// <summary> /// <summary>
/// 发布开始时间 /// 发布开始时间
/// </summary> /// </summary>
@ -95,17 +87,6 @@ public class RemoteSyncServer
} }
} }
// public async Task LocalSocketSendMsg(object msgOb)
// {
// string msg = JsonSerializer.Serialize(msgOb);
// await RemoteSocket.SendAsync(
// new ArraySegment<byte>(Encoding.UTF8.GetBytes(msg)),
// WebSocketMessageType.Text,
// true,
// CancellationToken.None
// );
// }
public async Task RemoteSocketSendMsg(object msgOb) public async Task RemoteSocketSendMsg(object msgOb)
{ {
string msg = JsonSerializer.Serialize(msgOb); string msg = JsonSerializer.Serialize(msgOb);