From 031d4f3416353890847fa758fa13413e093b4ce3 Mon Sep 17 00:00:00 2001 From: wilson Date: Fri, 9 Jan 2026 13:02:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=96=87=E6=9C=AC=E6=8C=87?= =?UTF-8?q?=E4=BB=A4=E5=8F=AA=E5=8F=91=E4=B8=80=E4=B8=AA=E9=80=9A=E9=81=93?= =?UTF-8?q?=EF=BC=8C=E5=A4=9A=E9=80=9A=E9=81=93=E6=97=B6=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E6=AF=8F=E4=B8=AA=E9=80=9A=E9=81=93=E7=8B=AC=E7=AB=8B=E5=8F=91?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Core/CmdClients/CommandClientWorker.cs | 147 ++++++++++-------- .../NetSenders/DeviceStateMonitorWorker.cs | 85 +++++----- 2 files changed, 120 insertions(+), 112 deletions(-) diff --git a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs index cd13ef0..dabbf55 100644 --- a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs +++ b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs @@ -12,14 +12,16 @@ public class CommandClientWorker : BackgroundService { private readonly ServiceConfig _config; private readonly CommandDispatcher _dispatcher; - - // ★ 1. 注入拦截器管道管理器 private readonly InterceptorPipeline _pipeline; + // 管理多个 Socket + private readonly List _sockets = new(); + private NetMQPoller? _poller; + public CommandClientWorker( ServiceConfig config, CommandDispatcher dispatcher, - InterceptorPipeline pipeline) // <--- 注入 + InterceptorPipeline pipeline) { _config = config; _dispatcher = dispatcher; @@ -30,24 +32,35 @@ public class CommandClientWorker : BackgroundService { await Task.Yield(); - if (!_config.ShouldConnect) return; - if (_config.CommandEndpoints.Count == 0) return; + if (!_config.ShouldConnect || _config.CommandEndpoints.Count == 0) return; - using var dealer = new DealerSocket(); - string myIdentity = _config.AppId; - dealer.Options.Identity = Encoding.UTF8.GetBytes(myIdentity); + // 1. 建立连接 (但不立即启动 Poller) + _poller = new NetMQPoller(); foreach (var ep in _config.CommandEndpoints) { - try { dealer.Connect(ep.Uri); } - catch (Exception ex) { Console.WriteLine($"[指令] 连接失败 {ep.Uri}: {ex.Message}"); } + try + { + var socket = new DealerSocket(); + // 建议加上 Socket 索引或 UUID 以防服务端认为 Identity 冲突 + // 或者保持原样,取决于服务端逻辑。通常同一个 AppId 连不同 Server 是没问题的。 + socket.Options.Identity = Encoding.UTF8.GetBytes(_config.AppId); + socket.Connect(ep.Uri); + + socket.ReceiveReady += OnSocketReceiveReady; + + _sockets.Add(socket); + _poller.Add(socket); + + Console.WriteLine($"[指令] 建立通道: {ep.Uri}"); + } + catch (Exception ex) { Console.WriteLine($"[指令] 连接异常: {ex.Message}"); } } - string localIp = "127.0.0.1"; - // ... (获取 IP 代码省略,保持不变) ... + if (_sockets.Count == 0) return; // ================================================================= - // 构建注册包 + // 2. 发送注册包 (在 Poller 启动前发送,绝对线程安全) // ================================================================= var registerPayload = new RegisterPayload { @@ -55,7 +68,7 @@ public class CommandClientWorker : BackgroundService InstanceId = _config.AppId, ProcessId = Environment.ProcessId, Version = "1.0.0", - ServerIp = localIp, + ServerIp = "127.0.0.1", WebApiPort = _config.BasePort, StartTime = DateTime.Now }; @@ -63,92 +76,94 @@ public class CommandClientWorker : BackgroundService try { byte[] regData = MessagePackSerializer.Serialize(registerPayload); - - // ============================================================= - // ★ 2. 拦截点 A: 发送注册包 (Outbound) - // ============================================================= var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.ServerRegister, regData); - if (ctx != null) // 如果未被拦截 + if (ctx != null) { - // 注意:这里使用 ctx.Protocol 和 ctx.Data,允许拦截器修改内容 - dealer.SendMoreFrame(ctx.Protocol) - .SendFrame(ctx.Data); - - Console.WriteLine($"[指令] 注册包已发送 ({ctx.Data.Length} bytes)"); + foreach (var socket in _sockets) + { + // 此时 Poller 还没跑,主线程发送是安全的 + socket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); + } + Console.WriteLine($"[指令] 注册包已广播至 {_sockets.Count} 个目标"); } } catch (Exception ex) { - Console.WriteLine($"[致命错误] 注册流程异常: {ex.Message}"); - return; + Console.WriteLine($"[指令] 注册失败: {ex.Message}"); } // ================================================================= - // 定义 ACK 发送逻辑 (包含拦截器) + // 3. 绑定 ACK 逻辑 // ================================================================= - // 注意:这里需要 async,因为拦截器是异步的 - Action sendAckHandler = async (result) => + // 关键修正:直接使用 async void,不要包裹在 Task.Run 中! + // 因为 OnResponseReady 是由 Dispatcher 触发的,而 Dispatcher 是由 Poller 线程触发的。 + // 所以这里就在 Poller 线程内,可以直接操作 Socket。 + _dispatcher.OnResponseReady += async (result) => { try { byte[] resultBytes = MessagePackSerializer.Serialize(result); - - // ========================================================= - // ★ 3. 拦截点 B: 发送 ACK 回执 (Outbound) - // ========================================================= - // 协议头是 COMMAND_RESULT var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.CommandResult, resultBytes); if (ctx != null) { - dealer.SendMoreFrame(ctx.Protocol) - .SendFrame(ctx.Data); - - Console.WriteLine($"[指令] 已回复 ACK -> Req: {result.RequestId}"); + foreach (var socket in _sockets) + { + socket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); + } + Console.WriteLine($"[指令] ACK 已广播 (ID: {result.RequestId})"); } } catch (Exception ex) { - Console.WriteLine($"[ACK Error] 回执发送失败: {ex.Message}"); + Console.WriteLine($"[ACK] 发送失败: {ex.Message}"); } }; - // 订阅事件 (需要适配 async void,注意异常捕获) - _dispatcher.OnResponseReady += async (res) => await Task.Run(() => sendAckHandler(res)); + // ================================================================= + // 4. 启动 Poller (开始监听接收) + // ================================================================= + _poller.RunAsync(); - // ================================================================= - // 接收循环 - // ================================================================= - try + // 阻塞直到取消 + while (!stoppingToken.IsCancellationRequested) { - while (!stoppingToken.IsCancellationRequested) + await Task.Delay(1000, stoppingToken); + } + + // 清理 + _poller.Stop(); + _poller.Dispose(); + foreach (var s in _sockets) s.Dispose(); + } + + private async void OnSocketReceiveReady(object? sender, NetMQSocketEventArgs e) + { + // 这里的代码运行在 Poller 线程 + NetMQMessage incomingMsg = new NetMQMessage(); + if (e.Socket.TryReceiveMultipartMessage(ref incomingMsg)) + { + if (incomingMsg.FrameCount >= 2) { - NetMQMessage incomingMsg = new NetMQMessage(); - if (dealer.TryReceiveMultipartMessage(TimeSpan.FromMilliseconds(500), ref incomingMsg)) + try { - if (incomingMsg.FrameCount >= 2) + string rawProtocol = incomingMsg[0].ConvertToString(); + byte[] rawData = incomingMsg[1].ToByteArray(); + + var ctx = await _pipeline.ExecuteReceiveAsync(rawProtocol, rawData); + if (ctx != null) { - string rawProtocol = incomingMsg[0].ConvertToString(); - byte[] rawData = incomingMsg[1].ToByteArray(); - - // ================================================= - // ★ 4. 拦截点 C: 接收指令 (Inbound) - // ================================================= - var ctx = await _pipeline.ExecuteReceiveAsync(rawProtocol, rawData); - - if (ctx != null) // 如果未被拦截 - { - // 将处理后的数据交给 Dispatcher - await _dispatcher.DispatchAsync(ctx.Protocol, ctx.Data); - } + // DispatchAsync 会同步触发 OnResponseReady, + // 从而在同一个线程内完成 ACK 发送,线程安全且高效。 + await _dispatcher.DispatchAsync(ctx.Protocol, ctx.Data); } } + catch (Exception ex) + { + Console.WriteLine($"[指令] 处理异常: {ex.Message}"); + } } } - catch (Exception ex) - { - Console.WriteLine($"[指令] 接收循环异常: {ex.Message}"); - } } } \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs b/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs index 8b108c3..76350f1 100644 --- a/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs +++ b/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs @@ -5,32 +5,27 @@ using NetMQ.Sockets; using MessagePack; using SHH.CameraSdk; using SHH.Contracts; +using System.Text; namespace SHH.CameraService { - /// - /// [二合一] 设备状态聚合与上报服务 - /// public class DeviceStateMonitorWorker : BackgroundService { private readonly CameraManager _manager; private readonly ServiceConfig _config; - - // ★ 2. 注入拦截器管道 private readonly InterceptorPipeline _pipeline; - // 本地状态全集缓存 + // 修改点1: 改为 Socket 列表 + private readonly List _sockets = new(); private readonly ConcurrentDictionary _stateStore = new(); - // 标记是否有新变更 private volatile bool _isDirty = false; private long _lastSendTick = 0; - // ★ 3. 构造函数增加 InterceptorPipeline 参数 public DeviceStateMonitorWorker( CameraManager manager, ServiceConfig config, - InterceptorPipeline pipeline) // <--- 注入点 + InterceptorPipeline pipeline) { _manager = manager; _config = config; @@ -39,45 +34,50 @@ namespace SHH.CameraService protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - // 1. 初始化缓存 (默认离线) + // 1. 初始化 foreach (var dev in _manager.GetAllDevices()) { UpdateLocalState(dev.Id, false, "Init"); } - // 2. 挂载 SDK 事件 _manager.OnDeviceStatusChanged += OnSdkStatusChanged; - // 3. 建立连接 - var cmdEndpoint = _config.CommandEndpoints.FirstOrDefault()?.Uri; - if (string.IsNullOrEmpty(cmdEndpoint)) + // 修改点2: 遍历所有端点建立连接 + if (_config.CommandEndpoints.Count == 0) return; + + Console.WriteLine($"[StatusWorker] 启动状态上报,目标节点数: {_config.CommandEndpoints.Count}"); + + foreach (var ep in _config.CommandEndpoints) { - Console.WriteLine("[StatusWorker] 警告: 未配置 Command 端点,状态上报无法启动。"); - return; + try + { + var socket = new DealerSocket(); + // 状态通道也建议设置 Identity,方便服务端追踪 + socket.Options.Identity = Encoding.UTF8.GetBytes(_config.AppId + "_status"); + socket.Options.SendHighWatermark = 1000; + socket.Connect(ep.Uri); + _sockets.Add(socket); + } + catch (Exception ex) + { + Console.WriteLine($"[StatusWorker] 连接失败 {ep.Uri}: {ex.Message}"); + } } - Console.WriteLine($"[StatusWorker] 启动状态上报,直连服务端: {cmdEndpoint}"); - - using var socket = new DealerSocket(); - socket.Options.SendHighWatermark = 1000; - // 设置 Identity 是个好习惯,虽然这里只发不收 - // socket.Options.Identity = ... - socket.Connect(cmdEndpoint); - - // 4. 定时循环 (1秒1次) + // 定时循环 (1秒1次) var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); try { while (await timer.WaitForNextTickAsync(stoppingToken)) { - // ★ 4. 关键修正:必须使用 await 调用新的异步方法 - await CheckAndDirectSendAsync(socket); + await CheckAndBroadcastAsync(); } } finally { _manager.OnDeviceStatusChanged -= OnSdkStatusChanged; - socket.Dispose(); + // 清理所有 socket + foreach (var s in _sockets) s.Dispose(); } } @@ -99,21 +99,17 @@ namespace SHH.CameraService _stateStore[deviceId.ToString()] = evt; } - /// - /// 检查并在当前线程直接发送 (已改为异步 Task) - /// - // ★ 5. 关键修正:void -> async Task - private async Task CheckAndDirectSendAsync(NetMQSocket socket) + // 修改点3: 广播发送逻辑 + private async Task CheckAndBroadcastAsync() { long now = Environment.TickCount64; // 策略: 有变更 或 超过5秒(心跳) bool shouldSend = _isDirty || (now - _lastSendTick > 5000); - if (shouldSend) + if (shouldSend && _sockets.Count > 0) { try { - // A. 组包 (全量) var snapshot = _stateStore.Values.ToList(); var batch = new StatusBatchPayload { @@ -121,22 +117,19 @@ namespace SHH.CameraService Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; - // B. 序列化 byte[] data = MessagePackSerializer.Serialize(batch); - // ========================================================= - // ★ 6. 拦截器调用 - // ========================================================= - // 这里的 "STATUS_BATCH" 是协议头,你可以替换为 ProtocolHeaders.StatusBatch (如果定义了的话) + // 拦截器处理 var ctx = await _pipeline.ExecuteSendAsync("STATUS_BATCH", data); - - if (ctx != null) // 如果没被拦截 + if (ctx != null) { - // C. 直接发送 - socket.SendMoreFrame(ctx.Protocol) - .SendFrame(ctx.Data); + // ★★★ 核心修复:循环广播给所有 Socket ★★★ + foreach (var socket in _sockets) + { + // TrySend 避免阻塞,如果某个服务端卡死不影响其他端 + socket.SendMoreFrame(ctx.Protocol).TrySendFrame(ctx.Data); + } - // D. 重置标记 _isDirty = false; _lastSendTick = now; }