From f93cc03bc00aefe2913d34fa86fd17e541a57648 Mon Sep 17 00:00:00 2001
From: zerlei <1445089819@qq.com>
Date: Mon, 23 Sep 2024 17:46:46 +0800
Subject: [PATCH] =?UTF-8?q?fix:=20=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?=
=?UTF-8?q?=E4=BF=AE=E4=BA=86=E4=B8=80=E5=A4=A7=E5=A0=86bug=E3=80=82?=
=?UTF-8?q?=E5=BD=93=E5=89=8D=E9=97=AE=E9=A2=98=EF=BC=8CDir=E5=BA=8F?=
=?UTF-8?q?=E5=88=97=E5=8C=96=E9=94=99=E8=AF=AF=EF=BC=8C=E9=87=8D=E6=9E=84?=
=?UTF-8?q?dir?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
Server/Common/AES.cs | 70 +++++++++++++++++++------
Server/Common/Config.cs | 10 ++--
Server/Common/ConnectPipeline.cs | 12 ++++-
Server/Common/Message.cs | 14 +++--
Server/LocalServer/LocalSyncServer.cs | 21 +++++---
Server/LocalServer/StateHelper.cs | 60 +++++++++++++--------
Server/RemoteServer/RemoteSyncServer.cs | 24 +++++----
Server/RemoteServer/StateHelper.cs | 25 +++++++--
Server/ServerTest/PipeSeed.cs | 11 ++--
Server/ServerTest/PipeTest.cs | 50 +++++++++++++-----
Server/ServerTest/TestPipe.cs | 57 ++++++++++++++------
11 files changed, 243 insertions(+), 111 deletions(-)
diff --git a/Server/Common/AES.cs b/Server/Common/AES.cs
index f6bd9cc..ac3cc13 100644
--- a/Server/Common/AES.cs
+++ b/Server/Common/AES.cs
@@ -33,6 +33,14 @@ public class AESHelper
0x44,
0xB5,
0xF6,
+ 0x23,
+ 0x44,
+ 0xB5,
+ 0xF6,
+ 0x44,
+ 0xB5,
+ 0xF6,
+ 0x23,
];
static readonly byte[] IV =
[
@@ -54,11 +62,15 @@ public class AESHelper
0xB6,
];
- public static byte[] EncryptStringToBytes_Aes(byte[] plainText)
+ public static byte[] EncryptStringToBytes_Aes(string plainText)
{
// Check arguments.
if (plainText == null || plainText.Length <= 0)
- throw new ArgumentNullException(nameof(plainText), "can't be null");
+ throw new ArgumentNullException("plainText");
+ if (Key == null || Key.Length <= 0)
+ throw new ArgumentNullException("Key");
+ if (IV == null || IV.Length <= 0)
+ throw new ArgumentNullException("IV");
byte[] encrypted;
// Create an Aes object
@@ -72,14 +84,24 @@ public class AESHelper
ICryptoTransform encryptor = aesAlg.CreateEncryptor(aesAlg.Key, aesAlg.IV);
// Create the streams used for encryption.
- using MemoryStream msEncrypt = new();
- using CryptoStream csEncrypt = new(msEncrypt, encryptor, CryptoStreamMode.Write);
- using (StreamWriter swEncrypt = new(csEncrypt))
+ using (MemoryStream msEncrypt = new MemoryStream())
{
- //Write all data to the stream.
- swEncrypt.Write(plainText);
+ using (
+ CryptoStream csEncrypt = new CryptoStream(
+ msEncrypt,
+ encryptor,
+ CryptoStreamMode.Write
+ )
+ )
+ {
+ using (StreamWriter swEncrypt = new StreamWriter(csEncrypt))
+ {
+ //Write all data to the stream.
+ swEncrypt.Write(plainText);
+ }
+ encrypted = msEncrypt.ToArray();
+ }
}
- encrypted = msEncrypt.ToArray();
}
// Return the encrypted bytes from the memory stream.
@@ -90,11 +112,15 @@ public class AESHelper
{
// Check arguments.
if (cipherText == null || cipherText.Length <= 0)
- throw new ArgumentNullException(nameof(cipherText), "can't be null");
+ throw new ArgumentNullException("cipherText");
+ if (Key == null || Key.Length <= 0)
+ throw new ArgumentNullException("Key");
+ if (IV == null || IV.Length <= 0)
+ throw new ArgumentNullException("IV");
// Declare the string used to hold
// the decrypted text.
- string plaintext = string.Empty;
+ string plaintext = null;
// Create an Aes object
// with the specified key and IV.
@@ -107,12 +133,24 @@ public class AESHelper
ICryptoTransform decryptor = aesAlg.CreateDecryptor(aesAlg.Key, aesAlg.IV);
// Create the streams used for decryption.
- using MemoryStream msDecrypt = new(cipherText);
- using CryptoStream csDecrypt = new(msDecrypt, decryptor, CryptoStreamMode.Read);
- using StreamReader srDecrypt = new(csDecrypt);
- // Read the decrypted bytes from the decrypting stream
- // and place them in a string.
- plaintext = srDecrypt.ReadToEnd();
+ using (MemoryStream msDecrypt = new MemoryStream(cipherText))
+ {
+ using (
+ CryptoStream csDecrypt = new CryptoStream(
+ msDecrypt,
+ decryptor,
+ CryptoStreamMode.Read
+ )
+ )
+ {
+ using (StreamReader srDecrypt = new StreamReader(csDecrypt))
+ {
+ // Read the decrypted bytes from the decrypting stream
+ // and place them in a string.
+ plaintext = srDecrypt.ReadToEnd();
+ }
+ }
+ }
}
return plaintext;
diff --git a/Server/Common/Config.cs b/Server/Common/Config.cs
index 294c67a..a206c47 100644
--- a/Server/Common/Config.cs
+++ b/Server/Common/Config.cs
@@ -19,8 +19,8 @@ public class DirFileConfig
///
public Dir? LocalDirInfo { get; set; }
- public Dir? DiffDirInfo{ get; set; }
- public Dir? RemoteDirInfo{ get; set; }
+ public Dir? DiffDirInfo { get; set; }
+ public Dir? RemoteDirInfo { get; set; }
}
public class MSSqlConfig
@@ -29,18 +29,22 @@ public class MSSqlConfig
/// 数据库地址
///
public required string ServerName { get; set; }
+
///
/// db名称
///
public required string DatebaseName { get; set; }
+
///
/// 用户
///
public required string User { get; set; }
+
///
/// 密码
///
public required string Password { get; set; }
+
///
/// 通常是:True
///
@@ -49,7 +53,7 @@ public class MSSqlConfig
///
/// 同步数据的表格 !!! 通常是 dbo.TableName !!! 注意dbo.
///
- public List? SyncTablesData{get;set;}
+ public List? SyncTablesData { get; set; }
}
public class Config
diff --git a/Server/Common/ConnectPipeline.cs b/Server/Common/ConnectPipeline.cs
index 09bd98c..5aba076 100644
--- a/Server/Common/ConnectPipeline.cs
+++ b/Server/Common/ConnectPipeline.cs
@@ -104,7 +104,15 @@ public class WebSocPipeLine(TSocket socket, bool isAES) : AbsPipeLine(i
{
var nbuffer = new byte[receiveResult.Count];
System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count);
- receiveCb(nbuffer);
+ if (IsAES)
+ {
+ var nnbuffer = AESHelper.DecryptStringFromBytes_Aes(buffer);
+ receiveCb(Encoding.UTF8.GetBytes(nnbuffer));
+ }
+ else
+ {
+ receiveCb(nbuffer);
+ }
}
}
}
@@ -126,7 +134,7 @@ public class WebSocPipeLine(TSocket socket, bool isAES) : AbsPipeLine(i
string msgStr = JsonSerializer.Serialize(msg);
await Socket.SendAsync(
IsAES
- ? AESHelper.EncryptStringToBytes_Aes(Encoding.UTF8.GetBytes(msgStr))
+ ? AESHelper.EncryptStringToBytes_Aes(msgStr)
: new ArraySegment(Encoding.UTF8.GetBytes(msgStr)),
WebSocketMessageType.Text,
true,
diff --git a/Server/Common/Message.cs b/Server/Common/Message.cs
index 8827efe..23a67cf 100644
--- a/Server/Common/Message.cs
+++ b/Server/Common/Message.cs
@@ -7,6 +7,7 @@ public enum SyncMsgType
Process = 2,
// DirFilePack = 3
}
+
public enum SyncProcessStep
{
Connect = 1,
@@ -16,15 +17,12 @@ public enum SyncProcessStep
UploadAndUnpack = 5,
Publish = 6
}
-public class SyncMsg(SyncMsgType msgType, SyncProcessStep step, string body)
+
+public class SyncMsg
{
- public SyncMsgType? Type { get; set; } = msgType;
+ public SyncMsgType Type { get; set; }
- public SyncProcessStep Step {get;set;} = step;
+ public SyncProcessStep Step { get; set; }
- public bool IsSuccess
- {
- get { return Type != SyncMsgType.Error; }
- }
- public string Body { get; set; } = body;
+ public required string Body { get; set; }
}
diff --git a/Server/LocalServer/LocalSyncServer.cs b/Server/LocalServer/LocalSyncServer.cs
index b0a4892..3ddbb4b 100644
--- a/Server/LocalServer/LocalSyncServer.cs
+++ b/Server/LocalServer/LocalSyncServer.cs
@@ -31,12 +31,18 @@ public class LocalSyncServer
public readonly AbsPipeLine LocalPipe;
public readonly AbsPipeLine RemotePipe;
+
///
/// 父工程,用于释放资源
///
public readonly LocalSyncServerFactory Factory;
- public LocalSyncServer(AbsPipeLine pipe, LocalSyncServerFactory factory,string name,AbsPipeLine remotePipe )
+ public LocalSyncServer(
+ AbsPipeLine pipe,
+ LocalSyncServerFactory factory,
+ string name,
+ AbsPipeLine remotePipe
+ )
{
LocalPipe = pipe;
Factory = factory;
@@ -46,14 +52,14 @@ public class LocalSyncServer
Task.Run(async () =>
{
- var rs = LocalPipe.Work(
- (byte[] b) =>
- {
- return StateHelper.ReceiveLocalMsg(b);
- }
- );
try
{
+ var rs = LocalPipe.Work(
+ (byte[] b) =>
+ {
+ return StateHelper.ReceiveLocalMsg(b);
+ }
+ );
await foreach (var r in rs) { }
}
catch (Exception e)
@@ -63,7 +69,6 @@ public class LocalSyncServer
});
}
-
public void Close(string? CloseReason)
{
try
diff --git a/Server/LocalServer/StateHelper.cs b/Server/LocalServer/StateHelper.cs
index ccc6d14..5c105c8 100644
--- a/Server/LocalServer/StateHelper.cs
+++ b/Server/LocalServer/StateHelper.cs
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.Runtime.InteropServices;
+using System.Text;
using System.Text.Json;
using Common;
@@ -24,12 +25,22 @@ public abstract class StateHelpBase(
public SyncMsg CreateErrMsg(string Body)
{
- return new SyncMsg(SyncMsgType.Error, Step, Body);
+ return new SyncMsg
+ {
+ Body = Body,
+ Step = Step,
+ Type = SyncMsgType.Error
+ };
}
public SyncMsg CreateMsg(string body, SyncMsgType type = SyncMsgType.General)
{
- return new SyncMsg(type, Step, body);
+ return new SyncMsg
+ {
+ Body = body,
+ Step = Step,
+ Type = type
+ };
}
public bool ReceiveLocalMsg(byte[] msg)
@@ -47,7 +58,7 @@ public abstract class StateHelpBase(
public bool ReceiveRemoteMsg(byte[] bytes)
{
- var msg = AESHelper.DecryptStringFromBytes_Aes(bytes);
+ var msg = Encoding.UTF8.GetString(bytes);
var syncMsg =
JsonSerializer.Deserialize(msg)
@@ -90,22 +101,29 @@ public class ConnectAuthorityHelper(LocalSyncServer context)
Context.SyncConfig = config;
Task.Run(async () =>
{
- var rs = Context.RemotePipe.Work(
- (byte[] b) =>
- {
- return Context.StateHelper.ReceiveRemoteMsg(b);
- },
- Context.NotNullSyncConfig.RemoteUrl + "/websoc?Name=" + Context.Name
- );
- await foreach (var r in rs)
+ try
{
- if (r == 0)
+ var rs = Context.RemotePipe.Work(
+ (byte[] b) =>
+ {
+ return Context.StateHelper.ReceiveRemoteMsg(b);
+ },
+ Context.NotNullSyncConfig.RemoteUrl + "/websoc?Name=" + Context.Name
+ );
+ await foreach (var r in rs)
{
- await Context.RemotePipe.SendMsg(
- CreateMsg(Context.NotNullSyncConfig.RemotePwd)
- );
+ if (r == 0)
+ {
+ await Context.RemotePipe.SendMsg(
+ CreateMsg(Context.NotNullSyncConfig.RemotePwd)
+ );
+ }
}
}
+ catch (Exception e)
+ {
+ await Context.LocalPipe.Close(e.Message);
+ }
});
}
}
@@ -133,9 +151,9 @@ public class DeployHelper(LocalSyncServer context)
ProcessStartInfo startInfo =
new()
{
- FileName = "cmd.exe", // The command to execute (can be any command line tool)
+ FileName = "msdeploy", // The command to execute (can be any command line tool)
Arguments =
- $" msdeploy.exe -verb:sync -source:contentPath={Context.NotNullSyncConfig.LocalProjectAbsolutePath} -dest:contentPath={Context.NotNullSyncConfig.LocalRootPath} -disablerule:BackupRule",
+ $" -verb:sync -source:contentPath={Context.NotNullSyncConfig.LocalProjectAbsolutePath} -dest:contentPath={Context.NotNullSyncConfig.LocalRootPath} -disablerule:BackupRule",
// The arguments to pass to the command (e.g., list directory contents)
RedirectStandardOutput = true, // Redirect the standard output to a string
UseShellExecute = false, // Do not use the shell to execute the command
@@ -212,7 +230,7 @@ public class DiffFileAndPackHelper(LocalSyncServer context)
);
Context.NotNullSyncConfig.DirFileConfigs.ForEach(e =>
{
- if (e.DiffDirInfo!= null)
+ if (e.DiffDirInfo != null)
{
e.DiffDirInfo.ResetRootPath(
Context.NotNullSyncConfig.RemoteRootPath,
@@ -253,7 +271,7 @@ public class DeployMSSqlHelper(LocalSyncServer context)
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
var arguments =
- $"SqlPackage /Action:Extract /TargetFile:{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id.ToString()}/{Context.NotNullSyncConfig.Id.ToString()}.dacpac "
+ $" /Action:Extract /TargetFile:{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id.ToString()}/{Context.NotNullSyncConfig.Id.ToString()}.dacpac "
+ $"/DiagnosticsFile:{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id.ToString()}/{Context.NotNullSyncConfig.Id.ToString()}.log "
+ $"/p:ExtractAllTableData=false /p:VerifyExtraction=true /SourceServerName:{Context.NotNullSyncConfig.SrcDb.ServerName}"
+ $"/SourceDatabaseName:{Context.NotNullSyncConfig.SrcDb.DatebaseName} /SourceUser:{Context.NotNullSyncConfig.SrcDb.User} "
@@ -270,7 +288,7 @@ public class DeployMSSqlHelper(LocalSyncServer context)
ProcessStartInfo startInfo =
new()
{
- FileName = "cmd.exe", // The command to execute (can be any command line tool)
+ FileName = "SqlPackage", // The command to execute (can be any command line tool)
Arguments = arguments,
// The arguments to pass to the command (e.g., list directory contents)
RedirectStandardOutput = true, // Redirect the standard output to a string
@@ -320,7 +338,7 @@ public class UploadPackedHelper(LocalSyncServer context)
{
Context
.LocalPipe.UploadFile(
- Context.NotNullSyncConfig.RemoteUrl ,
+ Context.NotNullSyncConfig.RemoteUrl,
$"{LocalSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id}/{Context.NotNullSyncConfig.Id}.zip",
(double current) =>
{
diff --git a/Server/RemoteServer/RemoteSyncServer.cs b/Server/RemoteServer/RemoteSyncServer.cs
index 4beb2f6..79cce13 100644
--- a/Server/RemoteServer/RemoteSyncServer.cs
+++ b/Server/RemoteServer/RemoteSyncServer.cs
@@ -1,4 +1,3 @@
-
using System.Net.WebSockets;
using Common;
@@ -29,16 +28,22 @@ public class RemoteSyncServer
/// 发布源连接
///
public readonly AbsPipeLine Pipe;
+
///
/// 父工程,用于释放资源
///
public readonly RemoteSyncServerFactory Factory;
-
+
public string Name;
public string Pwd;
- public RemoteSyncServer(AbsPipeLine pipe, RemoteSyncServerFactory factory,string name,string pwd)
+ public RemoteSyncServer(
+ AbsPipeLine pipe,
+ RemoteSyncServerFactory factory,
+ string name,
+ string pwd
+ )
{
Pipe = pipe;
Factory = factory;
@@ -48,14 +53,14 @@ public class RemoteSyncServer
Task.Run(async () =>
{
- var rs = Pipe.Work(
- (byte[] b) =>
- {
- return StateHelper.ReceiveMsg(b);
- }
- );
try
{
+ var rs = Pipe.Work(
+ (byte[] b) =>
+ {
+ return StateHelper.ReceiveMsg(b);
+ }
+ );
await foreach (var r in rs) { }
}
catch (Exception e)
@@ -65,7 +70,6 @@ public class RemoteSyncServer
});
}
-
public void Close(string? CloseReason)
{
try
diff --git a/Server/RemoteServer/StateHelper.cs b/Server/RemoteServer/StateHelper.cs
index 5d3fe2b..43f33ce 100644
--- a/Server/RemoteServer/StateHelper.cs
+++ b/Server/RemoteServer/StateHelper.cs
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.Runtime.InteropServices;
+using System.Text;
using System.Text.Json;
using Common;
@@ -24,17 +25,27 @@ public abstract class StateHelpBase(
public SyncMsg CreateErrMsg(string Body)
{
- return new SyncMsg(SyncMsgType.Error, Step, Body);
+ return new SyncMsg
+ {
+ Body = Body,
+ Type = SyncMsgType.Error,
+ Step = Step
+ };
}
public SyncMsg CreateMsg(string body, SyncMsgType type = SyncMsgType.General)
{
- return new SyncMsg(type, Step, body);
+ return new SyncMsg
+ {
+ Body = body,
+ Type = type,
+ Step = Step
+ };
}
public bool ReceiveMsg(byte[] bytes)
{
- var msg = AESHelper.DecryptStringFromBytes_Aes(bytes);
+ var msg = Encoding.UTF8.GetString(bytes);
var syncMsg =
JsonSerializer.Deserialize(msg)
@@ -61,6 +72,8 @@ public class ConnectAuthorityHelper(RemoteSyncServer context)
{
if (msg.Body == Context.Pwd)
{
+ var h = new DiffFileHelper(Context);
+ Context.StateHelper = h;
Context.Pipe.SendMsg(CreateMsg("RemoteServer: 密码验证成功!"));
}
else
@@ -100,6 +113,8 @@ public class DiffFileHelper(RemoteSyncServer context)
);
}
});
+ var h = new UnPackAndReleaseHelper(Context);
+ Context.StateHelper = h;
//将对比结果发送到Local
Context.Pipe.SendMsg(CreateMsg(JsonSerializer.Serialize(diffConfigs)));
}
@@ -134,14 +149,14 @@ public class FinallyPublishHelper(RemoteSyncServer context)
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
var arguments =
- $"SqlPackage /Action:Publish /SourceFile: {RemoteSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id}/{Context.NotNullSyncConfig.Id}.dacpac "
+ $"/Action:Publish /SourceFile: {RemoteSyncServer.TempRootFile}/{Context.NotNullSyncConfig.Id}/{Context.NotNullSyncConfig.Id}.dacpac "
+ $"/TargetServerName:{Context.NotNullSyncConfig.DstDb.ServerName} /TargetDatabaseName:{Context.NotNullSyncConfig.DstDb.DatebaseName}"
+ $" /TargetUser:{Context.NotNullSyncConfig.DstDb.User} /TargetPassword:{Context.NotNullSyncConfig.DstDb.Password} /TargetTrustServerCertificate:True ";
ProcessStartInfo startInfo =
new()
{
- FileName = "cmd.exe", // The command to execute (can be any command line tool)
+ FileName = "SqlPackage", // The command to execute (can be any command line tool)
Arguments = arguments,
// The arguments to pass to the command (e.g., list directory contents)
RedirectStandardOutput = true, // Redirect the standard output to a string
diff --git a/Server/ServerTest/PipeSeed.cs b/Server/ServerTest/PipeSeed.cs
index 8752f17..bdabf7a 100644
--- a/Server/ServerTest/PipeSeed.cs
+++ b/Server/ServerTest/PipeSeed.cs
@@ -12,7 +12,7 @@ public class PipeSeed : IDisposable
{
Name = "Test",
RemoteUrl = "D:/FileSyncTest",
- RemotePwd = "",
+ RemotePwd = "t123",
IsDeployDb = true,
IsDeployProject = true,
LocalProjectAbsolutePath = "D:/git/HMES-H7-HNFY/HMES-H7-HNFYMF/HMES-H7-HNFYMF.WEB",
@@ -42,12 +42,9 @@ public class PipeSeed : IDisposable
Password = "0",
TrustServerCertificate = "True"
},
- DirFileConfigs = new List{
- new DirFileConfig{
- DirPath = "/bin",
- Excludes = ["/roslyn","/Views"]
-
- }
+ DirFileConfigs = new List
+ {
+ new DirFileConfig { DirPath = "/bin", Excludes = ["/roslyn", "/Views"] }
}
};
}
diff --git a/Server/ServerTest/PipeTest.cs b/Server/ServerTest/PipeTest.cs
index ab3e62b..5fbbc82 100644
--- a/Server/ServerTest/PipeTest.cs
+++ b/Server/ServerTest/PipeTest.cs
@@ -3,6 +3,7 @@ using Common;
using LocalServer;
using RemoteServer;
using XUnit.Project.Attributes;
+
/*using Newtonsoft.Json;*/
namespace ServerTest;
@@ -12,25 +13,46 @@ public class PipeTest
[Fact]
public async void TestCase()
{
- var p1 = new TestPipe(false);
- var p2 = new TestPipe(false);
+ var p1 = new TestPipe(false, "1");
+ var x = Task.Run(async () =>
+ {
+ var rs = p1.Work(
+ (byte[] b) =>
+ {
+ Console.WriteLine(b);
+ return true;
+ }
+ );
+ await foreach (var r in rs)
+ {
+ Console.WriteLine(r);
+ }
+ });
+ //await p1.Close("sdf");
+ //await x;
+ var p2 = new TestPipe(false, "2");
p1.Other = p2;
p2.Other = p1;
- var p3 = new TestPipe(true);
- var p4 = new TestPipe(true);
+ var p3 = new TestPipe(true, "3");
+ var p4 = new TestPipe(true, "4");
p3.Other = p4;
p4.Other = p3;
-
+ RemoteSyncServerFactory.NamePwd = [new Tuple("Test", "t123")];
var lf = new LocalSyncServerFactory();
- lf.CreateLocalSyncServer(p2,"Test",p3);
+ lf.CreateLocalSyncServer(p2, "Test", p3);
var rf = new RemoteSyncServerFactory();
- rf.CreateRemoteSyncServer(p4,"Test");
- var starter = new SyncMsg(
- SyncMsgType.General,
- SyncProcessStep.Connect,
- JsonSerializer.Serialize(new PipeSeed().TestConfig)
- );
+ rf.CreateRemoteSyncServer(p4, "Test");
+ var starter = new SyncMsg
+ {
+ Body = JsonSerializer.Serialize(new PipeSeed().TestConfig),
+ Type = SyncMsgType.General,
+ Step = SyncProcessStep.Connect
+ };
await p1.SendMsg(starter);
-
+ await x;
+ if (p1.ErrResult != null)
+ {
+ Assert.Fail(p1.ErrResult);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/Server/ServerTest/TestPipe.cs b/Server/ServerTest/TestPipe.cs
index 9e1521c..b2a1bb8 100644
--- a/Server/ServerTest/TestPipe.cs
+++ b/Server/ServerTest/TestPipe.cs
@@ -1,15 +1,18 @@
using System.Collections.Concurrent;
+using System.Text;
using System.Text.Json;
using Common;
namespace ServerTest
{
- public class TestPipe(bool isAES) : AbsPipeLine(isAES)
+ public class TestPipe(bool isAES, string id) : AbsPipeLine(isAES)
{
private readonly BlockingCollection> EventQueue =
new BlockingCollection>();
private readonly CancellationTokenSource Cts = new CancellationTokenSource();
public TestPipe? Other;
+ public string? ErrResult;
+ public string Id = id;
public override async IAsyncEnumerable Work(
Func receiveCb,
@@ -36,10 +39,16 @@ namespace ServerTest
public override async Task Close(string? CloseReason)
{
- await Task.Run(() =>
+ ErrResult = CloseReason;
+ var Id = this.Id;
+ Cts.Cancel();
+ if (Other != null)
{
- Cts.Cancel();
- });
+ if (Other.ErrResult == null)
+ {
+ await Other.Close(CloseReason);
+ }
+ }
}
public new async Task ReceiveMsg(SyncMsg msg)
@@ -48,15 +57,7 @@ namespace ServerTest
{
EventQueue.Add(() =>
{
- var r = JsonSerializer.SerializeToUtf8Bytes(msg);
- if (IsAES)
- {
- return AESHelper.EncryptStringToBytes_Aes(r);
- }
- else
- {
- return r;
- }
+ return JsonSerializer.SerializeToUtf8Bytes(msg);
});
});
}
@@ -67,15 +68,30 @@ namespace ServerTest
{
throw new Exception("can't be null");
}
- await Other.ReceiveMsg(msg);
+ var r = JsonSerializer.SerializeToUtf8Bytes(msg);
+
+ if (IsAES)
+ {
+ var str = Encoding.UTF8.GetString(r);
+ var t = AESHelper.EncryptStringToBytes_Aes(str);
+ var f = AESHelper.DecryptStringFromBytes_Aes(t);
+#pragma warning disable CS8604 // 引用类型参数可能为 null。
+ await Other.ReceiveMsg(JsonSerializer.Deserialize(f));
+#pragma warning restore CS8604 // 引用类型参数可能为 null。
+ }
+ else
+ {
+#pragma warning disable CS8604 // 引用类型参数可能为 null。
+ await Other.ReceiveMsg(JsonSerializer.Deserialize(r));
+#pragma warning restore CS8604 // 引用类型参数可能为 null。
+ }
}
protected override async Task Listen(Func receiveCb)
{
- while (!Cts.Token.IsCancellationRequested)
+ try
{
- Func eventTask = EventQueue.Take(Cts.Token);
- if (eventTask != null)
+ foreach (var eventTask in EventQueue.GetConsumingEnumerable(Cts.Token))
{
await Task.Run(() =>
{
@@ -84,6 +100,13 @@ namespace ServerTest
});
}
}
+ catch (OperationCanceledException)
+ {
+ //var x = 1;
+ var id = Id;
+ //抛出异常 从 p3 传递到 p2
+ throw new Exception(ErrResult);
+ }
}
}
}