fix: c# websocket 不能脱离http上下文,也就是请求接口的函数作用域,否则就会异常退出(这是什么?)。 因此改了一些websocket的结构。

This commit is contained in:
zerlei 2024-10-10 13:15:14 +08:00
parent f7e911943c
commit 5517ea9df8
11 changed files with 115 additions and 59 deletions

View file

@ -55,7 +55,7 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
); );
var fileContent = new ProgressStreamContent(fileStream, progress); var fileContent = new ProgressStreamContent(fileStream, progress);
content.Add(fileContent, "file", Path.GetFileName(filePath)); content.Add(fileContent, "file", Path.GetFileName(filePath));
var it = await client.PostAsync(url + "/UploadPacked", content); var it = await client.PostAsync("http://" + url + "/UploadPacked", content);
if (it.StatusCode != System.Net.HttpStatusCode.OK) if (it.StatusCode != System.Net.HttpStatusCode.OK)
{ {
throw new Exception(it.Content.ReadAsStringAsync().Result); throw new Exception(it.Content.ReadAsStringAsync().Result);
@ -72,7 +72,7 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
if (Socket is ClientWebSocket CSocket) if (Socket is ClientWebSocket CSocket)
{ {
//连接失败会抛出异常 //连接失败会抛出异常
await CSocket.ConnectAsync(new Uri(addr), CancellationToken.None); await CSocket.ConnectAsync(new Uri("ws://" + addr), CancellationToken.None);
yield return 0; yield return 0;
} }
// 从controller 来,这个已经连接上了 // 从controller 来,这个已经连接上了
@ -121,6 +121,14 @@ public class WebSocPipeLine<TSocket>(TSocket socket, bool isAES) : AbsPipeLine(i
{ {
if (Socket.State == WebSocketState.Open) if (Socket.State == WebSocketState.Open)
{ {
//await SendMsg(
// new SyncMsg
// {
// Type = SyncMsgType.Error,
// Step = SyncProcessStep.Finally,
// Body = CloseReason ?? ""
// }
//);
await Socket.CloseAsync( await Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure, WebSocketCloseStatus.NormalClosure,
CloseReason, CloseReason,

View file

@ -10,6 +10,36 @@ namespace LocalServer.Controllers
{ {
private readonly LocalSyncServerFactory Factory = factory; private readonly LocalSyncServerFactory Factory = factory;
private static async Task Echo(WebSocket webSocket)
{
var buffer = new byte[1024 * 4];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
CancellationToken.None
);
while (!receiveResult.CloseStatus.HasValue)
{
await webSocket.SendAsync(
new ArraySegment<byte>(buffer, 0, receiveResult.Count),
receiveResult.MessageType,
receiveResult.EndOfMessage,
CancellationToken.None
);
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
CancellationToken.None
);
}
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None
);
}
[Route("/websoc")] [Route("/websoc")]
public async Task WebsocketConnection(string Name) public async Task WebsocketConnection(string Name)
{ {
@ -20,8 +50,9 @@ namespace LocalServer.Controllers
if (Factory.GetServerByName(Name) == null) if (Factory.GetServerByName(Name) == null)
{ {
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
//await Echo(webSocket);
var pipeLine = new WebSocPipeLine<WebSocket>(webSocket, false); var pipeLine = new WebSocPipeLine<WebSocket>(webSocket, false);
Factory.CreateLocalSyncServer( await Factory.CreateLocalSyncServer(
pipeLine, pipeLine,
Name, Name,
new WebSocPipeLine<ClientWebSocket>(new ClientWebSocket(), false) new WebSocPipeLine<ClientWebSocket>(new ClientWebSocket(), false)

View file

@ -105,24 +105,24 @@ public class LocalSyncServer
StateHelper = new ConnectAuthorityHelper(this); StateHelper = new ConnectAuthorityHelper(this);
Name = name; Name = name;
RemotePipe = remotePipe; RemotePipe = remotePipe;
}
Task.Run(async () => public async Task Connect()
{
try
{ {
try var rs = LocalPipe.Work(
{ (byte[] b) =>
var rs = LocalPipe.Work( {
(byte[] b) => return StateHelper.ReceiveLocalMsg(b);
{ }
return StateHelper.ReceiveLocalMsg(b); );
} await foreach (var r in rs) { }
); }
await foreach (var r in rs) { } catch (Exception e)
} {
catch (Exception e) Close(e.Message);
{ }
Close(e.Message);
}
});
} }
public void Close(string? CloseReason) public void Close(string? CloseReason)

View file

@ -1,5 +1,5 @@
using Common;
using System.Net.WebSockets; using System.Net.WebSockets;
using Common;
namespace LocalServer; namespace LocalServer;
@ -7,18 +7,18 @@ public class LocalSyncServerFactory
{ {
private readonly object Lock = new(); private readonly object Lock = new();
public void CreateLocalSyncServer(AbsPipeLine pipeLine, string name,AbsPipeLine absPipeLine) public async Task CreateLocalSyncServer(
AbsPipeLine pipeLine,
string name,
AbsPipeLine absPipeLine
)
{ {
var server = new LocalSyncServer( var server = new LocalSyncServer(pipeLine, this, name, absPipeLine);
pipeLine,
this,
name,
absPipeLine
);
lock (Lock) lock (Lock)
{ {
Servers.Add(server); Servers.Add(server);
} }
await server.Connect();
} }
private readonly List<LocalSyncServer> Servers = []; private readonly List<LocalSyncServer> Servers = [];

View file

@ -23,7 +23,7 @@ public class SyncFilesController(RemoteSyncServerFactory factory, SqliteDbContex
{ {
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
var pipeLine = new WebSocPipeLine<WebSocket>(webSocket, true); var pipeLine = new WebSocPipeLine<WebSocket>(webSocket, true);
Factory.CreateRemoteSyncServer(pipeLine, Name); await Factory.CreateRemoteSyncServer(pipeLine, Name);
} }
else else
{ {

View file

@ -38,6 +38,6 @@ if (app.Environment.IsDevelopment())
} }
app.UseWebSockets(); app.UseWebSockets();
app.Urls.Clear(); app.Urls.Clear();
app.Urls.Add("http://0.0.0.0:6818"); app.Urls.Add("http://0.0.0.0:6819");
app.MapControllers(); app.MapControllers();
app.Run(); app.Run();

View file

@ -61,24 +61,24 @@ public class RemoteSyncServer
Name = name; Name = name;
Pwd = pwd; Pwd = pwd;
StateHelper = new ConnectAuthorityHelper(this); StateHelper = new ConnectAuthorityHelper(this);
}
Task.Run(async () => public async Task Connect()
{
try
{ {
try var rs = Pipe.Work(
{ (byte[] b) =>
var rs = Pipe.Work( {
(byte[] b) => return StateHelper.ReceiveMsg(b);
{ }
return StateHelper.ReceiveMsg(b); );
} await foreach (var r in rs) { }
); }
await foreach (var r in rs) { } catch (Exception e)
} {
catch (Exception e) Close(e.Message);
{ }
Close(e.Message);
}
});
} }
public void Close(string? CloseReason) public void Close(string? CloseReason)

View file

@ -10,7 +10,7 @@ public class RemoteSyncServerFactory
public static List<Tuple<string, string>> NamePwd = []; public static List<Tuple<string, string>> NamePwd = [];
#pragma warning restore CA2211 // Non-constant fields should not be visible #pragma warning restore CA2211 // Non-constant fields should not be visible
public void CreateRemoteSyncServer(AbsPipeLine pipeLine, string Name) public async Task CreateRemoteSyncServer(AbsPipeLine pipeLine, string Name)
{ {
var pwd = var pwd =
NamePwd.Where(x => x.Item1 == Name).FirstOrDefault() NamePwd.Where(x => x.Item1 == Name).FirstOrDefault()
@ -20,6 +20,7 @@ public class RemoteSyncServerFactory
{ {
Servers.Add(server); Servers.Add(server);
} }
await server.Connect();
} }
private readonly List<RemoteSyncServer> Servers = []; private readonly List<RemoteSyncServer> Servers = [];

View file

@ -52,9 +52,17 @@ public class PipeTest
RemoteSyncServer.TempRootFile = "D:/FileSyncTest/dtemp"; RemoteSyncServer.TempRootFile = "D:/FileSyncTest/dtemp";
RemoteSyncServerFactory.NamePwd = [new Tuple<string, string>("Test", "t123")]; RemoteSyncServerFactory.NamePwd = [new Tuple<string, string>("Test", "t123")];
var lf = new LocalSyncServerFactory(); var lf = new LocalSyncServerFactory();
lf.CreateLocalSyncServer(p2, "Test", p3); var task1 = Task.Run(() =>
{
lf.CreateLocalSyncServer(p2, "Test", p3).RunSynchronously();
});
var rf = new RemoteSyncServerFactory(); var rf = new RemoteSyncServerFactory();
rf.CreateRemoteSyncServer(p4, "Test");
var task2 = Task.Run(async () =>
{
await rf.CreateRemoteSyncServer(p4, "Test");
});
TestPipe.syncServerFactory = rf; TestPipe.syncServerFactory = rf;
var starter = new SyncMsg var starter = new SyncMsg
{ {

View file

@ -3,6 +3,7 @@ import MonacoEditor from 'monaco-editor-vue3';
import HistoryBtn from "./HistoryBtn.vue" import HistoryBtn from "./HistoryBtn.vue"
import { ref, onMounted, computed } from 'vue'; import { ref, onMounted, computed } from 'vue';
import stringifyObject from 'stringify-object'; import stringifyObject from 'stringify-object';
import ConnectPipe from './connect.js'
const cacheConfig = ref({}) const cacheConfig = ref({})
const options = ref({ const options = ref({
@ -13,7 +14,7 @@ const options = ref({
const code = ref(` const code = ref(`
config = { config = {
Name: "Test", Name: "Test",
RemoteUrl: "D:/FileSyncTest/dtemp", RemoteUrl: "127.0.0.1:6819",
RemotePwd: "t123", RemotePwd: "t123",
IsDeployDb: false, IsDeployDb: false,
IsDeployProject: false, IsDeployProject: false,
@ -59,6 +60,9 @@ function submit() {
} }
cacheConfig.value[config.Name] = config cacheConfig.value[config.Name] = config
updateStorage() updateStorage()
var p = new ConnectPipe()
p.OpenPipe(config,()=>{})
} }
catch (e) { catch (e) {
window.alert(e) window.alert(e)
@ -76,9 +80,6 @@ const history = computed(() => {
return Object.keys(cacheConfig.value) return Object.keys(cacheConfig.value)
}) })
function publishCB(MsgIt) {
}
function onDel(name) { function onDel(name) {
delete cacheConfig.value[name] delete cacheConfig.value[name]
updateStorage() updateStorage()
@ -86,6 +87,11 @@ function onDel(name) {
function updateStorage() { function updateStorage() {
localStorage.setItem('config', JSON.stringify(cacheConfig.value)) localStorage.setItem('config', JSON.stringify(cacheConfig.value))
} }
function publishCB(MsgIt) {
}
onMounted(() => { onMounted(() => {
var cacheConfigStr = localStorage.getItem('config') var cacheConfigStr = localStorage.getItem('config')
if (cacheConfigStr) { if (cacheConfigStr) {

View file

@ -5,22 +5,25 @@ class ConnectPipe {
//Id,Msgtype,callback //Id,Msgtype,callback
// this.#websocket = new WebSocket(`ws://${window.location.host}`) // this.#websocket = new WebSocket(`ws://${window.location.host}`)
} }
#OpenPipe(config, MsgCb) { OpenPipe(config, MsgCb) {
this.config = config; this.config = config;
var webSocUrl = `ws://${window.location.host}:${window.location.port}/websoc?Name=${config.Name}` // var webSocUrl = `ws://${window.location.host}:${window.location.port}/websoc?Name=${config.Name}`
var webSocUrl = "ws://127.0.0.1:6818/websoc?Name=Test"
this.#websocket = new WebSocket(webSocUrl); this.#websocket = new WebSocket(webSocUrl);
this.#websocket.onopen = (event) => { this.#websocket.onopen = (event) => {
// console.warn("websocket connected!"); // console.warn("websocket connected!");
this.#websocket.send(JSON.stringify(this.config)); this.#websocket.send(JSON.stringify(this.config));
}; };
this.#websocket.onmessage = (event) => { this.#websocket.onmessage = (event) => {
console.log(event.data)
}; };
this.#websocket.onclose = (event) => {}; this.#websocket.onclose = (event) => {
console.warn(event.reason)
};
this.#websocket.onerror = (e) => { this.#websocket.onerror = (e) => {
console.error(e)
if (this.#websocket.readyState) { if (this.#websocket.readyState) {
//bla bla //bla bla
} }
@ -28,6 +31,5 @@ class ConnectPipe {
} }
} }
let cPipe = new ConnectPipe();
export default cPipe; export default ConnectPipe;