using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading.Tasks; using NetMQ; using NetMQ.Sockets; using Newtonsoft.Json; using SHH.Contracts; namespace SHH.CameraDashboard.Services { /// /// 客户端指令总线 (企业增强版) /// 核心职责:作为指挥中心监听 7000 端口,管理所有网关连接。 /// 通讯模式:Router (Bind) <--- Dealer (Connect) /// 高级特性: /// 1. 智能路由:根据 InstanceId 自动查找 NetMQ Identity。 /// 2. QoS 分级:支持 "强一致性等待" 和 "射后不理" 两种模式。 /// 3. 自动重试:网络超时自动重发,失败多次自动熔断。 /// 4. 性能监控:精确统计全链路耗时 (RTT)。 /// public class CommandBusClient : IDisposable { #region --- 1. 字段与配置 --- private RouterSocket? _routerSocket; private NetMQPoller? _poller; private volatile bool _isRunning; private readonly object _disposeLock = new object(); // 默认超时设置 private const int DEFAULT_TIMEOUT_MS = 2000; private const int DEFAULT_MAX_RETRIES = 2; // ★★★ 核心:线程安全的任务字典 ★★★ // Key: 请求ID (身份证号) // Value: 异步任务凭证 (用于 await 唤醒) private readonly ConcurrentDictionary> _pendingRequests = new ConcurrentDictionary>(); // ★★★ 核心:路由表 ★★★ // Key: 实例ID (例如 "Gateway_01") // Value: NetMQ 路由 Identity (二进制地址,这是 Router 发消息必须的“信封地址”) private readonly ConcurrentDictionary _sessions = new ConcurrentDictionary(); /// /// 当有服务端连上来并完成注册时触发 /// public event Action? OnServerRegistered; #endregion #region --- 2. 启动与停止 --- /// /// 启动指令中心监听 /// /// 监听端口 (建议 7000) public void Start(int port) { if (_isRunning) return; try { lock (_disposeLock) { _routerSocket = new RouterSocket(); // 绑定端口,等待服务端(Active Mode)主动来连接 // 使用 tcp://*:{port} 绑定本机所有网卡 _routerSocket.Bind($"tcp://*:{port}"); // 注册接收事件 (基于 NetMQPoller 的异步非阻塞模式) _routerSocket.ReceiveReady += OnReceiveReady; _poller = new NetMQPoller { _routerSocket }; _poller.RunAsync(); // 在后台线程启动轮询 _isRunning = true; Debug.WriteLine($"[ClientBus] 指令中心已启动,监听端口: {port}"); } } catch (Exception ex) { // 启动失败属于致命错误,记录日志 Debug.WriteLine($"[ClientBus-Error] 启动失败: {ex.Message}"); throw; // 向上抛出,让 UI 层感知并报错 } } public void Stop() { if (!_isRunning) return; lock (_disposeLock) { _isRunning = false; try { _poller?.Stop(); _poller?.Dispose(); _routerSocket?.Dispose(); } catch (Exception ex) { Debug.WriteLine($"[ClientBus-Error] 停止时异常: {ex.Message}"); } finally { // 彻底清理状态 CleanupPendingTasks(); _sessions.Clear(); } } } public void Dispose() { Stop(); } private void CleanupPendingTasks() { // 取消所有挂起的请求,避免 SendAsync 里的 await 永久卡死 foreach (var kvp in _pendingRequests) { kvp.Value.TrySetCanceled(); } _pendingRequests.Clear(); } #endregion #region --- 3. 核心发送逻辑 (策略层) --- /// /// 发送指令(包含 QoS判断 + 重试循环 + 熔断 + RTT统计) /// /// 目标网关ID (如 "Gateway_01") /// 指令包 /// 单次超时时间 (毫秒) /// 最大重试次数 (0表示不重试) /// 执行结果 public async Task SendAsync(string instanceId, CommandPayload payload, int timeoutMs = DEFAULT_TIMEOUT_MS, int maxRetries = DEFAULT_MAX_RETRIES) { if (!_isRunning) return CommandResult.Fail("服务未启动"); // 1. 检查目标是否在线 (快速失败) if (!_sessions.ContainsKey(instanceId)) { return CommandResult.Fail($"服务端 {instanceId} 离线或未连接"); } // 2. 确保有 RequestId if (string.IsNullOrEmpty(payload.RequestId)) payload.RequestId = Guid.NewGuid().ToString("N"); // ========================================================= // 策略 A: 射后不理 (Fire-and-Forget) - QoS 0 // ========================================================= // 适用于:心跳包、非关键日志、高频状态查询 // 优势:不占用 await 线程资源,不产生网络拥堵 if (!payload.RequireAck) { try { SendInternal(instanceId, payload); return CommandResult.Ok("已投递 (NoAck Mode)"); } catch (Exception ex) { return CommandResult.Fail($"投递失败: {ex.Message}"); } } // ========================================================= // 策略 B: 强一致性重试 (Reliable Retry) - QoS 1 // ========================================================= // 适用于:PTZ控制、录像启停、参数设置 int currentRetry = 0; // 启动高精度计时器 (统计包含重试在内的总耗时) Stopwatch totalStopwatch = Stopwatch.StartNew(); // 重试循环 (Retry Loop) while (currentRetry <= maxRetries) { // 更新重试计数,服务端可据此判断是否需要打印 "Retry Warning" payload.RetryCount = currentRetry; try { // ★ 核心原子操作:发送并等待单次结果 ★ var result = await SendRequestCore(instanceId, payload, timeoutMs); // --- 成功路径 --- totalStopwatch.Stop(); result.ElapsedMilliseconds = totalStopwatch.Elapsed.TotalMilliseconds; // 如果重试过,打印一条恢复日志 if (currentRetry > 0) Debug.WriteLine($"[ClientBus] {payload.CmdCode} 在第 {currentRetry} 次重试后成功恢复。"); return result; } catch (TimeoutException) { // --- 超时路径 --- Debug.WriteLine($"[ClientBus-Warn] Req {payload.RequestId} 超时 ({currentRetry + 1}/{maxRetries + 1})..."); currentRetry++; // 可选:在重试前稍微等待一下 (指数退避),避免瞬间拥塞 // await Task.Delay(50 * currentRetry); } catch (Exception ex) { // --- 致命错误路径 (如序列化失败、Socket已释放) --- // 这种错误重试也没用,直接报错 return CommandResult.Fail($"发送过程发生不可恢复错误: {ex.Message}"); } } // ========================================================= // 熔断 (Meltdown) // ========================================================= totalStopwatch.Stop(); var failRes = CommandResult.Fail($"请求熔断: 目标无响应 (已重试 {maxRetries} 次)"); failRes.ElapsedMilliseconds = totalStopwatch.Elapsed.TotalMilliseconds; return failRes; } #endregion #region --- 4. 底层发送实现 (原子操作层) --- /// /// 执行单次 "请求-响应" 周期 /// private async Task SendRequestCore(string instanceId, CommandPayload payload, int timeoutMs) { // 1. 创建异步凭证 (TCS) // RunContinuationsAsynchronously 是必须的,防止 NetMQ 接收线程直接执行 await 后的 UI 代码导致死锁 var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // 2. 注册到字典,等待回信 // 如果 ID 冲突 (极低概率),说明上一个还没处理完,强行覆盖或报错 _pendingRequests[payload.RequestId] = tcs; try { // 3. 发送网络包 SendInternal(instanceId, payload); // 4. 异步等待 (Wait for TCS or Timeout) // Task.WhenAny 是实现超时的经典模式 var completedTask = await Task.WhenAny(tcs.Task, Task.Delay(timeoutMs)); if (completedTask == tcs.Task) { // 任务完成 (OnReceiveReady 设置了结果) return await tcs.Task; } else { // 时间到,任务还没完成 -> 抛出超时异常,触发外层重试 throw new TimeoutException(); } } finally { // 5. 清理现场 (无论成功失败,必须移除字典,防止内存泄漏) _pendingRequests.TryRemove(payload.RequestId, out _); } } /// /// 纯粹的 NetMQ 数据发送 (不处理逻辑) /// private void SendInternal(string instanceId, CommandPayload payload) { // 查路由表获取 Identity if (_sessions.TryGetValue(instanceId, out byte[]? identity)) { var msg = new NetMQMessage(); // Frame 1: 目标地址 (Identity) msg.Append(identity); // Frame 2: 数据 (JSON) msg.Append(JsonConvert.SerializeObject(payload)); // 线程安全检查 if (_routerSocket != null) { _routerSocket.SendMultipartMessage(msg); } } else { throw new InvalidOperationException($"无法找到目标 {instanceId} 的路由信息"); } } #endregion #region --- 5. 核心接收逻辑 (Router) --- /// /// 处理所有入站消息 /// private void OnReceiveReady(object? sender, NetMQSocketEventArgs e) { // 防止处理过程中崩溃导致监听停止 try { NetMQMessage msg = new NetMQMessage(); // Router 模式:至少包含 [Identity, Data] 两帧,有时中间会有空帧 if (!e.Socket.TryReceiveMultipartMessage(ref msg) || msg.FrameCount < 2) return; // 第一帧永远是发送方的 Identity byte[] identity = msg[0].Buffer; // 最后一帧通常是 JSON 数据 string json = msg.Last.ConvertToString(); // 简单的协议识别 // 优化建议:正式项目中可以用更严谨的 Header 区分,这里用 JSON 嗅探即可 if (json.Contains("\"CmdCode\"")) { // ---> 收到注册包 (CmdCode 字段存在) HandleRegistration(identity, json); } else if (json.Contains("\"Success\"")) { // ---> 收到回执包 (Success 字段存在) HandleResponse(json); } } catch (Exception ex) { Debug.WriteLine($"[ClientBus-RecvError] 接收处理异常: {ex.Message}"); } } private void HandleRegistration(byte[] identity, string json) { try { var payload = JsonConvert.DeserializeObject(json); if (payload?.CmdCode == "SERVER_REGISTER") { var regInfo = JsonConvert.DeserializeObject(payload.JsonParams); if (regInfo != null) { // 更新路由表:[实例名] -> [二进制地址] _sessions[regInfo.InstanceId] = identity; Debug.WriteLine($"[ClientBus] 网关上线: {regInfo.InstanceId} IP: {regInfo.ServerIp}"); // 通知 UI 刷新列表 OnServerRegistered?.Invoke(regInfo); } } } catch (Exception ex) { Debug.WriteLine($"[ClientBus-Warn] 注册包解析失败: {ex.Message}"); } } private void HandleResponse(string json) { try { var result = JsonConvert.DeserializeObject(json); // 闭环匹配:根据 RequestId 找到挂起的 TCS if (!string.IsNullOrEmpty(result?.RequestId) && _pendingRequests.TryGetValue(result.RequestId, out var tcs)) { // 设置结果 -> 唤醒 SendRequestCore -> 唤醒 SendAsync tcs.TrySetResult(result); } } catch (Exception ex) { Debug.WriteLine($"[ClientBus-Warn] 回执包解析失败: {ex.Message}"); } } #endregion } }