fix: 单元测试修了一大堆bug。当前问题,Dir序列化错误,重构dir

This commit is contained in:
zerlei 2024-09-23 17:46:46 +08:00
parent 352e13fd29
commit f93cc03bc0
11 changed files with 243 additions and 111 deletions

View file

@ -33,6 +33,14 @@ public class AESHelper
0x44, 0x44,
0xB5, 0xB5,
0xF6, 0xF6,
0x23,
0x44,
0xB5,
0xF6,
0x44,
0xB5,
0xF6,
0x23,
]; ];
static readonly byte[] IV = static readonly byte[] IV =
[ [
@ -54,11 +62,15 @@ public class AESHelper
0xB6, 0xB6,
]; ];
public static byte[] EncryptStringToBytes_Aes(byte[] plainText) public static byte[] EncryptStringToBytes_Aes(string plainText)
{ {
// Check arguments. // Check arguments.
if (plainText == null || plainText.Length <= 0) if (plainText == null || plainText.Length <= 0)
throw new ArgumentNullException(nameof(plainText), "can't be null"); throw new ArgumentNullException("plainText");
if (Key == null || Key.Length <= 0)
throw new ArgumentNullException("Key");
if (IV == null || IV.Length <= 0)
throw new ArgumentNullException("IV");
byte[] encrypted; byte[] encrypted;
// Create an Aes object // Create an Aes object
@ -72,14 +84,24 @@ public class AESHelper
ICryptoTransform encryptor = aesAlg.CreateEncryptor(aesAlg.Key, aesAlg.IV); ICryptoTransform encryptor = aesAlg.CreateEncryptor(aesAlg.Key, aesAlg.IV);
// Create the streams used for encryption. // Create the streams used for encryption.
using MemoryStream msEncrypt = new(); using (MemoryStream msEncrypt = new MemoryStream())
using CryptoStream csEncrypt = new(msEncrypt, encryptor, CryptoStreamMode.Write);
using (StreamWriter swEncrypt = new(csEncrypt))
{ {
//Write all data to the stream. using (
swEncrypt.Write(plainText); CryptoStream csEncrypt = new CryptoStream(
msEncrypt,
encryptor,
CryptoStreamMode.Write
)
)
{
using (StreamWriter swEncrypt = new StreamWriter(csEncrypt))
{
//Write all data to the stream.
swEncrypt.Write(plainText);
}
encrypted = msEncrypt.ToArray();
}
} }
encrypted = msEncrypt.ToArray();
} }
// Return the encrypted bytes from the memory stream. // Return the encrypted bytes from the memory stream.
@ -90,11 +112,15 @@ public class AESHelper
{ {
// Check arguments. // Check arguments.
if (cipherText == null || cipherText.Length <= 0) if (cipherText == null || cipherText.Length <= 0)
throw new ArgumentNullException(nameof(cipherText), "can't be null"); throw new ArgumentNullException("cipherText");
if (Key == null || Key.Length <= 0)
throw new ArgumentNullException("Key");
if (IV == null || IV.Length <= 0)
throw new ArgumentNullException("IV");
// Declare the string used to hold // Declare the string used to hold
// the decrypted text. // the decrypted text.
string plaintext = string.Empty; string plaintext = null;
// Create an Aes object // Create an Aes object
// with the specified key and IV. // with the specified key and IV.
@ -107,12 +133,24 @@ public class AESHelper
ICryptoTransform decryptor = aesAlg.CreateDecryptor(aesAlg.Key, aesAlg.IV); ICryptoTransform decryptor = aesAlg.CreateDecryptor(aesAlg.Key, aesAlg.IV);
// Create the streams used for decryption. // Create the streams used for decryption.
using MemoryStream msDecrypt = new(cipherText); using (MemoryStream msDecrypt = new MemoryStream(cipherText))
using CryptoStream csDecrypt = new(msDecrypt, decryptor, CryptoStreamMode.Read); {
using StreamReader srDecrypt = new(csDecrypt); using (
// Read the decrypted bytes from the decrypting stream CryptoStream csDecrypt = new CryptoStream(
// and place them in a string. msDecrypt,
plaintext = srDecrypt.ReadToEnd(); decryptor,
CryptoStreamMode.Read
)
)
{
using (StreamReader srDecrypt = new StreamReader(csDecrypt))
{
// Read the decrypted bytes from the decrypting stream
// and place them in a string.
plaintext = srDecrypt.ReadToEnd();
}
}
}
} }
return plaintext; return plaintext;

View file

@ -19,8 +19,8 @@ public class DirFileConfig
/// ///
public Dir? LocalDirInfo { get; set; } public Dir? LocalDirInfo { get; set; }
public Dir? DiffDirInfo{ get; set; } public Dir? DiffDirInfo { get; set; }
public Dir? RemoteDirInfo{ get; set; } public Dir? RemoteDirInfo { get; set; }
} }
public class MSSqlConfig public class MSSqlConfig
@ -29,18 +29,22 @@ public class MSSqlConfig
/// 数据库地址 /// 数据库地址
/// </summary> /// </summary>
public required string ServerName { get; set; } public required string ServerName { get; set; }
/// <summary> /// <summary>
/// db名称 /// db名称
/// </summary> /// </summary>
public required string DatebaseName { get; set; } public required string DatebaseName { get; set; }
/// <summary> /// <summary>
/// 用户 /// 用户
/// </summary> /// </summary>
public required string User { get; set; } public required string User { get; set; }
/// <summary> /// <summary>
/// 密码 /// 密码
/// </summary> /// </summary>
public required string Password { get; set; } public required string Password { get; set; }
/// <summary> /// <summary>
/// 通常是True /// 通常是True
/// </summary> /// </summary>
@ -49,7 +53,7 @@ public class MSSqlConfig
/// <summary> /// <summary>
/// 同步数据的表格 通常是 dbo.TableName !!! 注意dbo. /// 同步数据的表格 通常是 dbo.TableName !!! 注意dbo.
/// </summary> /// </summary>
public List<string>? SyncTablesData{get;set;} public List<string>? SyncTablesData { get; set; }
} }
public class Config public class Config

View file

@ -104,7 +104,15 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
{ {
var nbuffer = new byte[receiveResult.Count]; var nbuffer = new byte[receiveResult.Count];
System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count); System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count);
receiveCb(nbuffer); if (IsAES)
{
var nnbuffer = AESHelper.DecryptStringFromBytes_Aes(buffer);
receiveCb(Encoding.UTF8.GetBytes(nnbuffer));
}
else
{
receiveCb(nbuffer);
}
} }
} }
} }
@ -126,7 +134,7 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
string msgStr = JsonSerializer.Serialize(msg); string msgStr = JsonSerializer.Serialize(msg);
await Socket.SendAsync( await Socket.SendAsync(
IsAES IsAES
? AESHelper.EncryptStringToBytes_Aes(Encoding.UTF8.GetBytes(msgStr)) ? AESHelper.EncryptStringToBytes_Aes(msgStr)
: new ArraySegment<byte>(Encoding.UTF8.GetBytes(msgStr)), : new ArraySegment<byte>(Encoding.UTF8.GetBytes(msgStr)),
WebSocketMessageType.Text, WebSocketMessageType.Text,
true, true,

View file

@ -7,6 +7,7 @@ public enum SyncMsgType
Process = 2, Process = 2,
// DirFilePack = 3 // DirFilePack = 3
} }
public enum SyncProcessStep public enum SyncProcessStep
{ {
Connect = 1, Connect = 1,
@ -16,15 +17,12 @@ public enum SyncProcessStep
UploadAndUnpack = 5, UploadAndUnpack = 5,
Publish = 6 Publish = 6
} }
public class SyncMsg(SyncMsgType msgType, SyncProcessStep step, string body)
public class SyncMsg
{ {
public SyncMsgType? Type { get; set; } = msgType; public SyncMsgType Type { get; set; }
public SyncProcessStep Step {get;set;} = step; public SyncProcessStep Step { get; set; }
public bool IsSuccess public required string Body { get; set; }
{
get { return Type != SyncMsgType.Error; }
}
public string Body { get; set; } = body;
} }

View file

@ -31,12 +31,18 @@ public class LocalSyncServer
public readonly AbsPipeLine LocalPipe; public readonly AbsPipeLine LocalPipe;
public readonly AbsPipeLine RemotePipe; public readonly AbsPipeLine RemotePipe;
/// <summary> /// <summary>
/// 父工程,用于释放资源 /// 父工程,用于释放资源
/// </summary> /// </summary>
public readonly LocalSyncServerFactory Factory; public readonly LocalSyncServerFactory Factory;
public LocalSyncServer(AbsPipeLine pipe, LocalSyncServerFactory factory,string name,AbsPipeLine remotePipe ) public LocalSyncServer(
AbsPipeLine pipe,
LocalSyncServerFactory factory,
string name,
AbsPipeLine remotePipe
)
{ {
LocalPipe = pipe; LocalPipe = pipe;
Factory = factory; Factory = factory;
@ -46,14 +52,14 @@ public class LocalSyncServer
Task.Run(async () => Task.Run(async () =>
{ {
var rs = LocalPipe.Work(
(byte[] b) =>
{
return StateHelper.ReceiveLocalMsg(b);
}
);
try try
{ {
var rs = LocalPipe.Work(
(byte[] b) =>
{
return StateHelper.ReceiveLocalMsg(b);
}
);
await foreach (var r in rs) { } await foreach (var r in rs) { }
} }
catch (Exception e) catch (Exception e)
@ -63,7 +69,6 @@ public class LocalSyncServer
}); });
} }
public void Close(string? CloseReason) public void Close(string? CloseReason)
{ {
try try

View file

@ -1,5 +1,6 @@
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json; using System.Text.Json;
using Common; using Common;
@ -24,12 +25,22 @@ public abstract class StateHelpBase(
public SyncMsg CreateErrMsg(string Body) public SyncMsg CreateErrMsg(string Body)
{ {
return new SyncMsg(SyncMsgType.Error, Step, Body); return new SyncMsg
{
Body = Body,
Step = Step,
Type = SyncMsgType.Error
};
} }
public SyncMsg CreateMsg(string body, SyncMsgType type = SyncMsgType.General) public SyncMsg CreateMsg(string body, SyncMsgType type = SyncMsgType.General)
{ {
return new SyncMsg(type, Step, body); return new SyncMsg
{
Body = body,
Step = Step,
Type = type
};
} }
public bool ReceiveLocalMsg(byte[] msg) public bool ReceiveLocalMsg(byte[] msg)
@ -47,7 +58,7 @@ public abstract class StateHelpBase(
public bool ReceiveRemoteMsg(byte[] bytes) public bool ReceiveRemoteMsg(byte[] bytes)
{ {
var msg = AESHelper.DecryptStringFromBytes_Aes(bytes); var msg = Encoding.UTF8.GetString(bytes);
var syncMsg = var syncMsg =
JsonSerializer.Deserialize<SyncMsg>(msg) JsonSerializer.Deserialize<SyncMsg>(msg)
@ -90,22 +101,29 @@ public class ConnectAuthorityHelper(LocalSyncServer context)
Context.SyncConfig = config; Context.SyncConfig = config;
Task.Run(async () => Task.Run(async () =>
{ {
var rs = Context.RemotePipe.Work( try
(byte[] b) =>
{
return Context.StateHelper.ReceiveRemoteMsg(b);
},
Context.NotNullSyncConfig.RemoteUrl + "/websoc?Name=" + Context.Name
);
await foreach (var r in rs)
{ {
if (r == 0) var rs = Context.RemotePipe.Work(
(byte[] b) =>
{
return Context.StateHelper.ReceiveRemoteMsg(b);
},
Context.NotNullSyncConfig.RemoteUrl + "/websoc?Name=" + Context.Name
);
await foreach (var r in rs)
{ {
await Context.RemotePipe.SendMsg( if (r == 0)
CreateMsg(Context.NotNullSyncConfig.RemotePwd) {
); await Context.RemotePipe.SendMsg(
CreateMsg(Context.NotNullSyncConfig.RemotePwd)
);
}
} }
} }
catch (Exception e)
{
await Context.LocalPipe.Close(e.Message);
}
}); });
} }
} }
@ -133,9 +151,9 @@ public class DeployHelper(LocalSyncServer context)
ProcessStartInfo startInfo = ProcessStartInfo startInfo =
new() new()
{ {
FileName = "cmd.exe", // The command to execute (can be any command line tool) FileName = "msdeploy", // The command to execute (can be any command line tool)
Arguments = Arguments =
$" msdeploy.exe -verb:sync -source:contentPath={Context.NotNullSyncConfig.LocalProjectAbsolutePath} -dest:contentPath={Context.NotNullSyncConfig.LocalRootPath} -disablerule:BackupRule", $" -verb:sync -source:contentPath={Context.NotNullSyncConfig.LocalProjectAbsolutePath} -dest:contentPath={Context.NotNullSyncConfig.LocalRootPath} -disablerule:BackupRule",
// The arguments to pass to the command (e.g., list directory contents) // The arguments to pass to the command (e.g., list directory contents)
RedirectStandardOutput = true, // Redirect the standard output to a string RedirectStandardOutput = true, // Redirect the standard output to a string
UseShellExecute = false, // Do not use the shell to execute the command UseShellExecute = false, // Do not use the shell to execute the command
@ -212,7 +230,7 @@ public class DiffFileAndPackHelper(LocalSyncServer context)
); );
Context.NotNullSyncConfig.DirFileConfigs.ForEach(e => Context.NotNullSyncConfig.DirFileConfigs.ForEach(e =>
{ {
if (e.DiffDirInfo!= null) if (e.DiffDirInfo != null)
{ {
e.DiffDirInfo.ResetRootPath( e.DiffDirInfo.ResetRootPath(
Context.NotNullSyncConfig.RemoteRootPath, Context.NotNullSyncConfig.RemoteRootPath,
@ -253,7 +271,7 @@ public class DeployMSSqlHelper(LocalSyncServer context)
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{ {
var arguments = var arguments =
$"SqlPackage /Action:Extract /TargetFile:{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id.ToString()}/{Context.NotNullSyncConfig.Id.ToString()}.dacpac " $" /Action:Extract /TargetFile:{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id.ToString()}/{Context.NotNullSyncConfig.Id.ToString()}.dacpac "
+ $"/DiagnosticsFile:{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id.ToString()}/{Context.NotNullSyncConfig.Id.ToString()}.log " + $"/DiagnosticsFile:{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id.ToString()}/{Context.NotNullSyncConfig.Id.ToString()}.log "
+ $"/p:ExtractAllTableData=false /p:VerifyExtraction=true /SourceServerName:{Context.NotNullSyncConfig.SrcDb.ServerName}" + $"/p:ExtractAllTableData=false /p:VerifyExtraction=true /SourceServerName:{Context.NotNullSyncConfig.SrcDb.ServerName}"
+ $"/SourceDatabaseName:{Context.NotNullSyncConfig.SrcDb.DatebaseName} /SourceUser:{Context.NotNullSyncConfig.SrcDb.User} " + $"/SourceDatabaseName:{Context.NotNullSyncConfig.SrcDb.DatebaseName} /SourceUser:{Context.NotNullSyncConfig.SrcDb.User} "
@ -270,7 +288,7 @@ public class DeployMSSqlHelper(LocalSyncServer context)
ProcessStartInfo startInfo = ProcessStartInfo startInfo =
new() new()
{ {
FileName = "cmd.exe", // The command to execute (can be any command line tool) FileName = "SqlPackage", // The command to execute (can be any command line tool)
Arguments = arguments, Arguments = arguments,
// The arguments to pass to the command (e.g., list directory contents) // The arguments to pass to the command (e.g., list directory contents)
RedirectStandardOutput = true, // Redirect the standard output to a string RedirectStandardOutput = true, // Redirect the standard output to a string
@ -320,7 +338,7 @@ public class UploadPackedHelper(LocalSyncServer context)
{ {
Context Context
.LocalPipe.UploadFile( .LocalPipe.UploadFile(
Context.NotNullSyncConfig.RemoteUrl , Context.NotNullSyncConfig.RemoteUrl,
$"{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id}/{Context.NotNullSyncConfig.Id}.zip", $"{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id}/{Context.NotNullSyncConfig.Id}.zip",
(double current) => (double current) =>
{ {

View file

@ -1,4 +1,3 @@
using System.Net.WebSockets; using System.Net.WebSockets;
using Common; using Common;
@ -29,16 +28,22 @@ public class RemoteSyncServer
/// 发布源连接 /// 发布源连接
/// </summary> /// </summary>
public readonly AbsPipeLine Pipe; public readonly AbsPipeLine Pipe;
/// <summary> /// <summary>
/// 父工程,用于释放资源 /// 父工程,用于释放资源
/// </summary> /// </summary>
public readonly RemoteSyncServerFactory Factory; public readonly RemoteSyncServerFactory Factory;
public string Name; public string Name;
public string Pwd; public string Pwd;
public RemoteSyncServer(AbsPipeLine pipe, RemoteSyncServerFactory factory,string name,string pwd) public RemoteSyncServer(
AbsPipeLine pipe,
RemoteSyncServerFactory factory,
string name,
string pwd
)
{ {
Pipe = pipe; Pipe = pipe;
Factory = factory; Factory = factory;
@ -48,14 +53,14 @@ public class RemoteSyncServer
Task.Run(async () => Task.Run(async () =>
{ {
var rs = Pipe.Work(
(byte[] b) =>
{
return StateHelper.ReceiveMsg(b);
}
);
try try
{ {
var rs = Pipe.Work(
(byte[] b) =>
{
return StateHelper.ReceiveMsg(b);
}
);
await foreach (var r in rs) { } await foreach (var r in rs) { }
} }
catch (Exception e) catch (Exception e)
@ -65,7 +70,6 @@ public class RemoteSyncServer
}); });
} }
public void Close(string? CloseReason) public void Close(string? CloseReason)
{ {
try try

View file

@ -1,5 +1,6 @@
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json; using System.Text.Json;
using Common; using Common;
@ -24,17 +25,27 @@ public abstract class StateHelpBase(
public SyncMsg CreateErrMsg(string Body) public SyncMsg CreateErrMsg(string Body)
{ {
return new SyncMsg(SyncMsgType.Error, Step, Body); return new SyncMsg
{
Body = Body,
Type = SyncMsgType.Error,
Step = Step
};
} }
public SyncMsg CreateMsg(string body, SyncMsgType type = SyncMsgType.General) public SyncMsg CreateMsg(string body, SyncMsgType type = SyncMsgType.General)
{ {
return new SyncMsg(type, Step, body); return new SyncMsg
{
Body = body,
Type = type,
Step = Step
};
} }
public bool ReceiveMsg(byte[] bytes) public bool ReceiveMsg(byte[] bytes)
{ {
var msg = AESHelper.DecryptStringFromBytes_Aes(bytes); var msg = Encoding.UTF8.GetString(bytes);
var syncMsg = var syncMsg =
JsonSerializer.Deserialize<SyncMsg>(msg) JsonSerializer.Deserialize<SyncMsg>(msg)
@ -61,6 +72,8 @@ public class ConnectAuthorityHelper(RemoteSyncServer context)
{ {
if (msg.Body == Context.Pwd) if (msg.Body == Context.Pwd)
{ {
var h = new DiffFileHelper(Context);
Context.StateHelper = h;
Context.Pipe.SendMsg(CreateMsg("RemoteServer: 密码验证成功!")); Context.Pipe.SendMsg(CreateMsg("RemoteServer: 密码验证成功!"));
} }
else else
@ -100,6 +113,8 @@ public class DiffFileHelper(RemoteSyncServer context)
); );
} }
}); });
var h = new UnPackAndReleaseHelper(Context);
Context.StateHelper = h;
//将对比结果发送到Local //将对比结果发送到Local
Context.Pipe.SendMsg(CreateMsg(JsonSerializer.Serialize(diffConfigs))); Context.Pipe.SendMsg(CreateMsg(JsonSerializer.Serialize(diffConfigs)));
} }
@ -134,14 +149,14 @@ public class FinallyPublishHelper(RemoteSyncServer context)
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{ {
var arguments = var arguments =
$"SqlPackage /Action:Publish /SourceFile: {RemoteSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id}/{Context.NotNullSyncConfig.Id}.dacpac " $"/Action:Publish /SourceFile: {RemoteSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id}/{Context.NotNullSyncConfig.Id}.dacpac "
+ $"/TargetServerName:{Context.NotNullSyncConfig.DstDb.ServerName} /TargetDatabaseName:{Context.NotNullSyncConfig.DstDb.DatebaseName}" + $"/TargetServerName:{Context.NotNullSyncConfig.DstDb.ServerName} /TargetDatabaseName:{Context.NotNullSyncConfig.DstDb.DatebaseName}"
+ $" /TargetUser:{Context.NotNullSyncConfig.DstDb.User} /TargetPassword:{Context.NotNullSyncConfig.DstDb.Password} /TargetTrustServerCertificate:True "; + $" /TargetUser:{Context.NotNullSyncConfig.DstDb.User} /TargetPassword:{Context.NotNullSyncConfig.DstDb.Password} /TargetTrustServerCertificate:True ";
ProcessStartInfo startInfo = ProcessStartInfo startInfo =
new() new()
{ {
FileName = "cmd.exe", // The command to execute (can be any command line tool) FileName = "SqlPackage", // The command to execute (can be any command line tool)
Arguments = arguments, Arguments = arguments,
// The arguments to pass to the command (e.g., list directory contents) // The arguments to pass to the command (e.g., list directory contents)
RedirectStandardOutput = true, // Redirect the standard output to a string RedirectStandardOutput = true, // Redirect the standard output to a string

View file

@ -12,7 +12,7 @@ public class PipeSeed : IDisposable
{ {
Name = "Test", Name = "Test",
RemoteUrl = "D:/FileSyncTest", RemoteUrl = "D:/FileSyncTest",
RemotePwd = "", RemotePwd = "t123",
IsDeployDb = true, IsDeployDb = true,
IsDeployProject = true, IsDeployProject = true,
LocalProjectAbsolutePath = "D:/git/HMES-H7-HNFY/HMES-H7-HNFYMF/HMES-H7-HNFYMF.WEB", LocalProjectAbsolutePath = "D:/git/HMES-H7-HNFY/HMES-H7-HNFYMF/HMES-H7-HNFYMF.WEB",
@ -42,12 +42,9 @@ public class PipeSeed : IDisposable
Password = "0", Password = "0",
TrustServerCertificate = "True" TrustServerCertificate = "True"
}, },
DirFileConfigs = new List<DirFileConfig>{ DirFileConfigs = new List<DirFileConfig>
new DirFileConfig{ {
DirPath = "/bin", new DirFileConfig { DirPath = "/bin", Excludes = ["/roslyn", "/Views"] }
Excludes = ["/roslyn","/Views"]
}
} }
}; };
} }

View file

@ -3,6 +3,7 @@ using Common;
using LocalServer; using LocalServer;
using RemoteServer; using RemoteServer;
using XUnit.Project.Attributes; using XUnit.Project.Attributes;
/*using Newtonsoft.Json;*/ /*using Newtonsoft.Json;*/
namespace ServerTest; namespace ServerTest;
@ -12,25 +13,46 @@ public class PipeTest
[Fact] [Fact]
public async void TestCase() public async void TestCase()
{ {
var p1 = new TestPipe(false); var p1 = new TestPipe(false, "1");
var p2 = new TestPipe(false); var x = Task.Run(async () =>
{
var rs = p1.Work(
(byte[] b) =>
{
Console.WriteLine(b);
return true;
}
);
await foreach (var r in rs)
{
Console.WriteLine(r);
}
});
//await p1.Close("sdf");
//await x;
var p2 = new TestPipe(false, "2");
p1.Other = p2; p1.Other = p2;
p2.Other = p1; p2.Other = p1;
var p3 = new TestPipe(true); var p3 = new TestPipe(true, "3");
var p4 = new TestPipe(true); var p4 = new TestPipe(true, "4");
p3.Other = p4; p3.Other = p4;
p4.Other = p3; p4.Other = p3;
RemoteSyncServerFactory.NamePwd = [new Tuple<string, string>("Test", "t123")];
var lf = new LocalSyncServerFactory(); var lf = new LocalSyncServerFactory();
lf.CreateLocalSyncServer(p2,"Test",p3); lf.CreateLocalSyncServer(p2, "Test", p3);
var rf = new RemoteSyncServerFactory(); var rf = new RemoteSyncServerFactory();
rf.CreateRemoteSyncServer(p4,"Test"); rf.CreateRemoteSyncServer(p4, "Test");
var starter = new SyncMsg( var starter = new SyncMsg
SyncMsgType.General, {
SyncProcessStep.Connect, Body = JsonSerializer.Serialize(new PipeSeed().TestConfig),
JsonSerializer.Serialize(new PipeSeed().TestConfig) Type = SyncMsgType.General,
); Step = SyncProcessStep.Connect
};
await p1.SendMsg(starter); await p1.SendMsg(starter);
await x;
if (p1.ErrResult != null)
{
Assert.Fail(p1.ErrResult);
}
} }
} }

View file

@ -1,15 +1,18 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Text;
using System.Text.Json; using System.Text.Json;
using Common; using Common;
namespace ServerTest namespace ServerTest
{ {
public class TestPipe(bool isAES) : AbsPipeLine(isAES) public class TestPipe(bool isAES, string id) : AbsPipeLine(isAES)
{ {
private readonly BlockingCollection<Func<byte[]>> EventQueue = private readonly BlockingCollection<Func<byte[]>> EventQueue =
new BlockingCollection<Func<byte[]>>(); new BlockingCollection<Func<byte[]>>();
private readonly CancellationTokenSource Cts = new CancellationTokenSource(); private readonly CancellationTokenSource Cts = new CancellationTokenSource();
public TestPipe? Other; public TestPipe? Other;
public string? ErrResult;
public string Id = id;
public override async IAsyncEnumerable<int> Work( public override async IAsyncEnumerable<int> Work(
Func<byte[], bool> receiveCb, Func<byte[], bool> receiveCb,
@ -36,10 +39,16 @@ namespace ServerTest
public override async Task Close(string? CloseReason) public override async Task Close(string? CloseReason)
{ {
await Task.Run(() => ErrResult = CloseReason;
var Id = this.Id;
Cts.Cancel();
if (Other != null)
{ {
Cts.Cancel(); if (Other.ErrResult == null)
}); {
await Other.Close(CloseReason);
}
}
} }
public new async Task ReceiveMsg(SyncMsg msg) public new async Task ReceiveMsg(SyncMsg msg)
@ -48,15 +57,7 @@ namespace ServerTest
{ {
EventQueue.Add(() => EventQueue.Add(() =>
{ {
var r = JsonSerializer.SerializeToUtf8Bytes(msg); return JsonSerializer.SerializeToUtf8Bytes(msg);
if (IsAES)
{
return AESHelper.EncryptStringToBytes_Aes(r);
}
else
{
return r;
}
}); });
}); });
} }
@ -67,15 +68,30 @@ namespace ServerTest
{ {
throw new Exception("can't be null"); throw new Exception("can't be null");
} }
await Other.ReceiveMsg(msg); var r = JsonSerializer.SerializeToUtf8Bytes(msg);
if (IsAES)
{
var str = Encoding.UTF8.GetString(r);
var t = AESHelper.EncryptStringToBytes_Aes(str);
var f = AESHelper.DecryptStringFromBytes_Aes(t);
#pragma warning disable CS8604 // 引用类型参数可能为 null。
await Other.ReceiveMsg(JsonSerializer.Deserialize<SyncMsg>(f));
#pragma warning restore CS8604 // 引用类型参数可能为 null。
}
else
{
#pragma warning disable CS8604 // 引用类型参数可能为 null。
await Other.ReceiveMsg(JsonSerializer.Deserialize<SyncMsg>(r));
#pragma warning restore CS8604 // 引用类型参数可能为 null。
}
} }
protected override async Task Listen(Func<byte[], bool> receiveCb) protected override async Task Listen(Func<byte[], bool> receiveCb)
{ {
while (!Cts.Token.IsCancellationRequested) try
{ {
Func<byte[]> eventTask = EventQueue.Take(Cts.Token); foreach (var eventTask in EventQueue.GetConsumingEnumerable(Cts.Token))
if (eventTask != null)
{ {
await Task.Run(() => await Task.Run(() =>
{ {
@ -84,6 +100,13 @@ namespace ServerTest
}); });
} }
} }
catch (OperationCanceledException)
{
//var x = 1;
var id = Id;
//抛出异常 从 p3 传递到 p2
throw new Exception(ErrResult);
}
} }
} }
} }