fix: websocket pipeline 测试修复了大量的问题,一大部分是对websocket和c# 不熟悉所致。

- 上传文件bug修复
- 继续测试修改bug
- 代码审查处理,添加必要注释,风格规范性等
This commit is contained in:
zerlei 2024-10-11 09:04:58 +08:00
parent 5517ea9df8
commit 35a0710ab5
11 changed files with 148 additions and 69 deletions

View file

@ -120,7 +120,7 @@ public class AESHelper
// Declare the string used to hold
// the decrypted text.
string plaintext = null;
string plaintext = string.Empty;
// Create an Aes object
// with the specified key and IV.

View file

@ -27,7 +27,7 @@ public abstract class AbsPipeLine(bool isAES)
/// <returns></returns>
public abstract Task SendMsg(SyncMsg msg);
public abstract Task UploadFile(string filePath, string url, Func<double, bool> progressCb);
public abstract Task UploadFile(string url, string filePath, Func<double, bool> progressCb);
protected readonly bool IsAES = isAES;
}
@ -37,13 +37,12 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
public readonly TSocket Socket = socket;
public override async Task UploadFile(
string filePath,
string url,
string filePath,
Func<double, bool> progressCb
)
{
if (Socket is HttpClient)
{
//throw new Exception("sdfsdf");
using var client = new HttpClient();
using var content = new MultipartFormDataContent();
using var fileStream = new FileStream(filePath, FileMode.Open);
@ -53,19 +52,14 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
progressCb(current);
}
);
var fileContent = new ProgressStreamContent(fileStream, progress);
content.Add(fileContent, "file", Path.GetFileName(filePath));
var it = await client.PostAsync("http://" + url + "/UploadPacked", content);
//var fileContent = new ProgressStreamContent(fileStream, progress);
content.Add(new StreamContent(fileStream), "file", Path.GetFileName(filePath));
var it = await client.PostAsync("http://" + url + "/UploadFile", content);
if (it.StatusCode != System.Net.HttpStatusCode.OK)
{
throw new Exception(it.Content.ReadAsStringAsync().Result);
}
}
else
{
throw new NotSupportedException("只支持HttpClient!");
}
}
public override async IAsyncEnumerable<int> Work(Func<byte[], bool> receiveCb, string addr = "")
{
@ -106,7 +100,9 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
System.Buffer.BlockCopy(buffer, 0, nbuffer, 0, receiveResult.Count);
if (IsAES)
{
var nnbuffer = AESHelper.DecryptStringFromBytes_Aes(buffer);
//var msg1 = Encoding.UTF8.GetString(nbuffer);
//var n1Buffler = Encoding.UTF8.GetBytes(msg1);
var nnbuffer = AESHelper.DecryptStringFromBytes_Aes(nbuffer);
receiveCb(Encoding.UTF8.GetBytes(nnbuffer));
}
else
@ -129,6 +125,24 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
// Body = CloseReason ?? ""
// }
//);
if (Encoding.UTF8.GetBytes(CloseReason ?? "").Length > 120)
{
await SendMsg(
new SyncMsg
{
Type = SyncMsgType.Error,
Step = SyncProcessStep.CloseError,
Body = CloseReason ?? ""
}
);
await Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"查看上一条错误信息!",
CancellationToken.None
);
}
else
{
await Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
CloseReason,
@ -136,17 +150,30 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
);
}
}
}
public override async Task SendMsg(SyncMsg msg)
{
string msgStr = JsonSerializer.Serialize(msg);
var it = AESHelper.EncryptStringToBytes_Aes(msgStr);
//var xx = new ArraySegment<byte>(it);
if (IsAES)
{
await Socket.SendAsync(
IsAES
? AESHelper.EncryptStringToBytes_Aes(msgStr)
: new ArraySegment<byte>(Encoding.UTF8.GetBytes(msgStr)),
new ArraySegment<byte>(it),
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
else
{
await Socket.SendAsync(
new ArraySegment<byte>(Encoding.UTF8.GetBytes(msgStr)),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
}
}

View file

@ -15,7 +15,8 @@ public enum SyncProcessStep
DiffFileAndPack = 3,
PackSqlServer = 4,
UploadAndUnpack = 5,
Publish = 6
Publish = 6,
CloseError = 7
}
public class SyncMsg

View file

@ -55,7 +55,7 @@ namespace LocalServer.Controllers
await Factory.CreateLocalSyncServer(
pipeLine,
Name,
new WebSocPipeLine<ClientWebSocket>(new ClientWebSocket(), false)
new WebSocPipeLine<ClientWebSocket>(new ClientWebSocket(), true)
);
}
else

View file

@ -124,7 +124,7 @@ public class ConnectAuthorityHelper(LocalSyncServer context)
}
catch (Exception e)
{
await Context.LocalPipe.Close(e.Message);
Context.Close(e.Message);
}
});
}
@ -301,6 +301,7 @@ public class DiffFileAndPackHelper(LocalSyncServer context)
e.DiffDirInfo.WriteByThisInfo(PackOp);
}
});
Context.LocalPipe.SendMsg(CreateMsg("文件差异比较成功!")).Wait();
var n = new DeployMSSqlHelper(Context);
Context.SetStateHelper(n);
n.PackSqlServerProcess();
@ -414,8 +415,6 @@ public class UploadPackedHelper(LocalSyncServer context)
)
.Wait();
Context.LocalPipe.SendMsg(CreateMsg("上传完成!")).Wait();
var x = Context.GetStateHelper().Step;
}
protected override void HandleLocalMsg(SyncMsg msg)

View file

@ -6,7 +6,7 @@
}
},
"AllowedHosts": "*",
"TempDir": "D:/TempPack",
"TempDir": "D:\\FileSyncTest\\stemp",
"SqlPackageAbPath": "C:\\Users\\ZHAOLEI\\.dotnet\\tools\\sqlpackage.exe",
//"MsdeployAbPath": "C:\\Program Files\\IIS\\Microsoft Web Deploy V3\\msdeploy.exe",
"MSBuildAbPath": "C:\\Program Files\\Microsoft Visual Studio\\2022\\Community\\MSBuild\\Current\\Bin\\amd64\\MSBuild.exe"

View file

@ -24,6 +24,8 @@ public class SyncFilesController(RemoteSyncServerFactory factory, SqliteDbContex
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
var pipeLine = new WebSocPipeLine<WebSocket>(webSocket, true);
await Factory.CreateRemoteSyncServer(pipeLine, Name);
var x = 11;
}
else
{
@ -195,10 +197,7 @@ public class SyncFilesController(RemoteSyncServerFactory factory, SqliteDbContex
}
catch (Exception ex)
{
return StatusCode(
500,
new { IsSuccess = false, Message = $"Internal server error: {ex.Message}" }
);
return StatusCode(500, new { IsSuccess = false, Message = $"上传文件失败: {ex.Message}" });
}
}
}

View file

@ -14,6 +14,13 @@ RemoteSyncServerFactory.NamePwd =
[
.. (builder.Configuration.GetSection("NamePwds").Get<Tuple<string, string>[]>() ?? [])
];
foreach (var x in builder.Configuration.GetSection("NamePwds").GetChildren())
{
var it = x.GetChildren();
RemoteSyncServerFactory.NamePwd.Add(
new Tuple<string, string>(it.ElementAt(0).Value ?? "", it.ElementAt(1).Value ?? "")
);
}
RemoteSyncServer.SqlPackageAbPath =
builder.Configuration["SqlPackageAbPath"]
?? "C:\\Users\\ZHAOLEI\\.dotnet\\tools\\sqlpackage.exe";

View file

@ -9,9 +9,9 @@
}
},
"AllowedHosts": "*",
"TempDir":"D:/TempPack2",
"NamePwds":[
["test","testpwd"]
"TempDir": "D:\\FileSyncTest\\dtemp",
"NamePwds": [
[ "Test", "t123" ]
],
"SqlPackageAbPath": "C:\\Users\\ZHAOLEI\\.dotnet\\tools\\sqlpackage.exe",
"SqlPackageAbPath": "C:\\Users\\ZHAOLEI\\.dotnet\\tools\\sqlpackage.exe"
}

View file

@ -11,6 +11,9 @@ const options = ref({
lineHeight: 24,
tabSize: 2,
})
let Pipe = null
const Msgs = ref([])
const code = ref(`
config = {
Name: "Test",
@ -51,7 +54,31 @@ config = {
],
};
`)
var CStatus = ref('None')
function publishCB(MsgIt) {
if (MsgIt.Type == 2) {
Msgs.value[Msgs.value.length - 1] = MsgIt
} else {
Msgs.value.push(MsgIt)
}
if (MsgIt.Step == 6) {
if (MsgIt.Body == "发布完成!") {
CStatus.value = 'Success'
Pipe.ClosePipe()
window.alert("正确:发布完成!")
}
}
if (MsgIt.Step == 8) {
if (CStatus.value != "Success") {
window.alert("失败:请查看错误信息!")
}
CStatus.value = "None"
}
}
function submit() {
Msgs.value = []
var config = {}
try {
eval(code.value)
@ -60,8 +87,9 @@ function submit() {
}
cacheConfig.value[config.Name] = config
updateStorage()
var p = new ConnectPipe()
p.OpenPipe(config,()=>{})
Pipe = new ConnectPipe()
Pipe.OpenPipe(config, publishCB)
CStatus.value = "Process"
}
catch (e) {
@ -89,9 +117,6 @@ function updateStorage() {
}
function publishCB(MsgIt) {
}
onMounted(() => {
var cacheConfigStr = localStorage.getItem('config')
if (cacheConfigStr) {
@ -111,11 +136,17 @@ onMounted(() => {
v-model:value="code"></MonacoEditor>
<div style="width: 800px;height: 700px;background-color: #1e1e1e;">
发布日志
<p style="text-align: left;border: 1px solid gray;margin: 5px;" v-for="msg in Msgs">
<span :style="{ width: '100px', color: msg.Step > 6 ? 'red' : 'green' }">[{{ msg.Step
> 6 ? msg.Step > 7 ? "关闭" : "错误" : `${msg.Step}/${6}`}}]</span>
<span style="margin-left: 5px ;">{{ msg.Body }}</span>
</p>
</div>
</div>
<button style="margin-top: 20px;" @click="submit">发布</button>
<button :disabled="CStatus != 'None'" style="margin-top: 20px;" @click="submit">发布</button>
</template>
<style scoped></style>

View file

@ -6,30 +6,45 @@ class ConnectPipe {
// this.#websocket = new WebSocket(`ws://${window.location.host}`)
}
OpenPipe(config, MsgCb) {
this.config = config;
// var webSocUrl = `ws://${window.location.host}:${window.location.port}/websoc?Name=${config.Name}`
var webSocUrl = "ws://127.0.0.1:6818/websoc?Name=Test"
var webSocUrl = "ws://127.0.0.1:6818/websoc?Name=Test";
this.#websocket = new WebSocket(webSocUrl);
this.#websocket.onopen = (event) => {
var starter = {
Body: JSON.stringify(config),
Type: 1,
Step: 1,
};
// console.warn("websocket connected!");
this.#websocket.send(JSON.stringify(this.config));
this.#websocket.send(JSON.stringify(starter));
};
this.#websocket.onmessage = (event) => {
console.log(event.data)
// console.log(event.data);
MsgCb(JSON.parse(event.data))
};
this.#websocket.onclose = (event) => {
console.warn(event.reason)
console.warn(event)
MsgCb({
Type: 0,
Step: 8,
Body:event.reason
})
};
this.#websocket.onerror = (e) => {
console.error(e)
if (this.#websocket.readyState) {
//bla bla
}
console.error(e);
MsgCb({
Type: 0,
Body: "异常错误,查看 Console",
Step: 7,
});
};
}
ClosePipe() {
this.#websocket.close();
}
}
export default ConnectPipe;