using System.Collections.Concurrent; using System.Text; using System.Text.Json; using Common; using Microsoft.AspNetCore.Hosting.Server; using RemoteServer; namespace ServerTest { public class TestPipe(bool isAES, string id) : AbsPipeLine(isAES) { private readonly BlockingCollection> EventQueue = new BlockingCollection>(); private readonly CancellationTokenSource Cts = new CancellationTokenSource(); public TestPipe? Other; public string? ErrResult; public string Id = id; #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. public static RemoteSyncServerFactory syncServerFactory; #pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable. public override async IAsyncEnumerable Work( Func receiveCb, string addr = "" ) { yield return 0; await Listen(receiveCb); yield return 1; } public override async Task UploadFile( string dst, string filePath, Func progressCb ) { dst = Path.Combine(RemoteSyncServer.TempRootFile, Path.GetFileName(filePath)); await Task.Run(() => { System.IO.File.Copy(filePath, dst, true); progressCb(100); //if (!Directory.Exists(dst)) //{ // Directory.CreateDirectory(dst); //} Task.Run(() => { var it = syncServerFactory.GetServerByName("Test")?? throw new NullReferenceException("找不到服务名称!"); var h = new UnPackAndReleaseHelper(it); it.SetStateHelpBase(h); h.UnPack(); }); }); } public override async Task Close(string? CloseReason) { ErrResult = CloseReason; var Id = this.Id; Cts.Cancel(); if (Other != null) { if (Other.ErrResult == null) { await Other.Close(CloseReason); } } } public new async Task ReceiveMsg(SyncMsg msg) { await Task.Run(() => { EventQueue.Add(() => { return JsonSerializer.SerializeToUtf8Bytes(msg); }); }); } public override async Task SendMsg(SyncMsg msg) { if (Other == null) { throw new Exception("can't be null"); } var r = JsonSerializer.SerializeToUtf8Bytes(msg); if (IsAES) { var str = Encoding.UTF8.GetString(r); var t = AESHelper.EncryptStringToBytes_Aes(str); var f = AESHelper.DecryptStringFromBytes_Aes(t); #pragma warning disable CS8604 // 引用类型参数可能为 null。 await Other.ReceiveMsg(JsonSerializer.Deserialize(f)); #pragma warning restore CS8604 // 引用类型参数可能为 null。 } else { #pragma warning disable CS8604 // 引用类型参数可能为 null。 await Other.ReceiveMsg(JsonSerializer.Deserialize(r)); #pragma warning restore CS8604 // 引用类型参数可能为 null。 } } protected override async Task Listen(Func receiveCb) { try { foreach (var eventTask in EventQueue.GetConsumingEnumerable(Cts.Token)) { await Task.Run(() => { var r = eventTask(); receiveCb(r); }); } } catch (OperationCanceledException) { //var x = 1; var id = Id; //抛出异常 从 p3 传递到 p2 if (ErrResult == "正常退出!") { return; } else { throw new Exception(ErrResult); } } } } }