diff --git a/SHH.CameraDashboard/App.xaml.cs b/SHH.CameraDashboard/App.xaml.cs index 22b9f30..8b97667 100644 --- a/SHH.CameraDashboard/App.xaml.cs +++ b/SHH.CameraDashboard/App.xaml.cs @@ -1,4 +1,5 @@ using SHH.CameraDashboard.Services; +using SHH.Contracts; using SHH.ProcessLaunchers; using System.Collections.ObjectModel; using System.Windows; @@ -31,6 +32,7 @@ namespace SHH.CameraDashboard // 启动指令服务 (Port 6001) CommandServer.Instance.Start(6001); + CommandServer.Instance.OnClientRegistered += SetupAutomaticConfiguration; // 现在我们来配置启动 @@ -57,7 +59,9 @@ namespace SHH.CameraDashboard string serviceArgs = $"" + $"--pid {myPid} " + $"--appid \"CameraApp_01\" " + - $"--uris \"127.0.0.1,6002&6001;\" " + + $"--uris \"127.0.0.1,6002,video,调试PC;\" " + + $"--uris \"127.0.0.1,6001,command,调试PC;\" " + + $"--uris \"192.168.1.100,6002,video,大屏展示;\" " + $"--mode 1 " + $"--ports \"5000,100\""; @@ -86,6 +90,78 @@ namespace SHH.CameraDashboard mainWin.Show(); } + /// + /// 在程序启动时订阅事件 + /// + /// + private void SetupAutomaticConfiguration(ConnectedClient obj) + { + // 监听注册事件:每当有 Service (CommandClientWorker) 连上来注册成功 + CommandServer.Instance.OnClientRegistered += (client) => + { + Console.WriteLine($"[自动化] 检测到新服务上线: {client.ServiceId} ({client.Ip})"); + + // 放到线程池去执行,避免阻塞 UI 或网络接收线程 + Task.Run(async () => + { + // 1. 稍微延时一点点 (500ms),给 Service 一点喘息时间准备接收指令 + await Task.Delay(500); + + // 2. 构造您指定的“206摄像头”配置 + var cameraConfig = new CameraConfigDto + { + Id = 17798, + Name = "206摄像头", + Location = "404办公室", + IpAddress = "172.16.41.88", + Username = "admin", + Password = "abcd1234", + Port = 8000, + ChannelIndex = 1, + StreamType = 0, + Brand = DeviceBrand.HikVision.GetHashCode(), // 对应 DeviceBrand 枚举 + RenderHandle = 0, // 初始化为0 + MainboardIp = "", // 留空 + MainboardPort = 0, + RtspPath = "" + }; + + // ★ 新增:一并带上订阅要求 ★ + cameraConfig.AutoSubscriptions = new List + { + // 第一条:显示帧,要求 8 帧 + new CameraConfigSubscribeDto { + AppId = "UI_Display", + Type = 0, + TargetFps = 8, + Memo = "显示帧" + }, + // 第二条:分析帧,要求 1 帧 + new CameraConfigSubscribeDto { + AppId = "AI_Analysis", + Type = 0, + Memo = "分析帧", + TargetFps = 1 + } + }; + + // 3. 封装协议包 + var commandPacket = new + { + Action = "SyncCamera", // 告诉 Service 执行什么动作 + Payload = cameraConfig, // 数据载荷 + Time = DateTime.Now + }; + + // 4. 定向发送 + // client.ServiceId 就是那个 "CameraApp_01" + CommandServer.Instance.SendCommand(client.ServiceId, commandPacket); + + Console.WriteLine($"[自动化] 已向 {client.ServiceId} 下发配置: 206摄像头"); + }); + }; + } + /// /// 全局统一退出入口 /// diff --git a/SHH.CameraDashboard/Invokes/CommandServer.cs b/SHH.CameraDashboard/Invokes/CommandServer.cs index 52fa13a..96c85b5 100644 --- a/SHH.CameraDashboard/Invokes/CommandServer.cs +++ b/SHH.CameraDashboard/Invokes/CommandServer.cs @@ -1,40 +1,48 @@ using NetMQ; using NetMQ.Sockets; using Newtonsoft.Json; -using System; +using Newtonsoft.Json.Linq; using System.Collections.Concurrent; using System.Diagnostics; using System.Text; -using System.Threading.Tasks; -namespace SHH.CameraDashboard.Services; +namespace SHH.CameraDashboard; /// /// [Dashboard端] 指令控制服务 -/// 职责:双向通信通道。接收 Service 心跳/响应,向 Service 发送控制指令。 -/// 核心模式:ROUTER (Dashboard) <--> DEALER (Service) +/// 职责:监听 6001 端口,接收 CameraService 的注册/心跳,并下发控制指令。 /// public class CommandServer : IDisposable { // 单例模式 public static CommandServer Instance { get; } = new CommandServer(); - // 事件:收到消息时触发 (ServiceId, MessageContent) + // ================================================================= + // 事件定义 + // ================================================================= + + // 当有新设备注册成功时触发 (UI 可以订阅这个来刷新列表) + public event Action? OnClientRegistered; + + // 当收到通用业务消息时触发 public event Action? OnMessageReceived; + // ================================================================= + // 内部成员 + // ================================================================= private RouterSocket? _routerSocket; private NetMQPoller? _poller; - - // 【关键新增】发送队列:用于解决跨线程发送的安全问题 - // UI线程 -> Enqueue -> Poller线程 -> Socket.Send private NetMQQueue? _sendQueue; + // 在线设备表 (Key: Identity/AppId) + // 线程安全字典,存储客户端的详细信息(包括视频地址) + private readonly ConcurrentDictionary _clients = new(); + public int ListenPort { get; private set; } public bool IsRunning => _poller != null && _poller.IsRunning; - // 在线设备表 (可选,用于记录谁在线) - // Key: ServiceId (Identity字符串) - private readonly ConcurrentDictionary _onlineClients = new(); + // 获取当前所有在线客户端的副本 + public List GetClients() => _clients.Values.ToList(); private CommandServer() { } @@ -47,17 +55,15 @@ public class CommandServer : IDisposable { // 1. 初始化 Router Socket _routerSocket = new RouterSocket(); - _routerSocket.Bind($"tcp://*:{ListenPort}"); + _routerSocket.Bind($"tcp://*:{ListenPort}"); // 监听所有网卡 _routerSocket.ReceiveReady += OnSocketReady; - // 2. 初始化发送队列 + // 2. 初始化发送队列 (确保 UI 线程可以安全发送) _sendQueue = new NetMQQueue(); _sendQueue.ReceiveReady += OnQueueReady; - // 3. 启动 Poller (同时监听 Socket 接收 和 队列发送) + // 3. 启动 Poller _poller = new NetMQPoller { _routerSocket, _sendQueue }; - - // RunAsync 会自动开启后台线程 _poller.RunAsync(); Console.WriteLine($"[Dashboard] 指令服务启动,监听: tcp://*:{ListenPort}"); @@ -65,33 +71,37 @@ public class CommandServer : IDisposable catch (Exception ex) { Console.WriteLine($"[Dashboard] 指令端口绑定失败: {ex.Message}"); - throw; // 必须抛出,让 App 感知 + throw; } } /// - /// 处理来自 Service 的网络消息 (运行在 Poller 线程) + /// [Poller线程] 处理网络接收 /// private void OnSocketReady(object? sender, NetMQSocketEventArgs e) { try { - // 1. 读取身份帧 (Identity) - // 只要 Service 端 DealerSocket 设置了 Identity,这里收到就是那个 ID + // Router 接收逻辑: + // Frame 1: 发送者的 Identity (NetMQ 自动处理) + // Frame 2: 真实数据 + + // 1. 读取身份 (Identity) var identityBytes = e.Socket.ReceiveFrameBytes(); - string serviceId = Encoding.UTF8.GetString(identityBytes); + string serviceId = Encoding.UTF8.GetString(identityBytes); // e.g., "CameraApp_01" - // 2. 读取内容帧 (假设 Dealer 直接发内容,中间无空帧) - // 如果你使用了 REQ/REP 模式,中间可能会有空帧,需注意兼容 + // 2. 读取消息内容 + // 兼容性处理:有些 Dealer 实现可能会发空帧,这里做个简单尝试 + // 如果发现在 Identity 后紧跟的是空帧,则再读一帧 + // 但在我们目前的 Dealer 实现中,是直接发的 JSON string message = e.Socket.ReceiveFrameString(); + if (string.IsNullOrWhiteSpace(message)) + { + if (e.Socket.HasIn) message = e.Socket.ReceiveFrameString(); + } - // 3. 简单的心跳保活逻辑 - _onlineClients[serviceId] = DateTime.Now; - - // 4. 触发业务事件 - // 注意:这依然在 Poller 线程,UI 处理时需 Invoke - Console.WriteLine($"[指令] From {serviceId}: {message}"); - OnMessageReceived?.Invoke(serviceId, message); + // 3. 协议解析与业务分发 + ProcessMessage(serviceId, message, identityBytes); } catch (Exception ex) { @@ -100,37 +110,99 @@ public class CommandServer : IDisposable } /// - /// 处理发送队列 (运行在 Poller 线程) + /// 核心业务逻辑处理 /// - private void OnQueueReady(object? sender, NetMQQueueEventArgs e) + private void ProcessMessage(string serviceId, string json, byte[] identityBytes) { try { - if (_routerSocket == null) return; + // 尝试解析基础结构 + var jObj = JObject.Parse(json); + string action = jObj["Action"]?.ToString() ?? "Unknown"; - // 从队列取出一个包 - if (e.Queue.TryDequeue(out var packet, TimeSpan.Zero)) + // 更新最后心跳时间 (如果已存在) + if (_clients.TryGetValue(serviceId, out var existingClient)) { - // Router 发送标准三步走: - // 1. 发送目标 Identity (More = true) - // 2. 发送空帧 (可选,取决于协议约定,Router-Dealer 直连通常不需要空帧) - // 3. 发送数据 (More = false) + existingClient.LastHeartbeat = DateTime.Now; + } - // 这里我们采用最简协议:[Identity][Data] - _routerSocket.SendMoreFrame(packet.TargetId) - .SendFrame(packet.JsonData); - - Console.WriteLine($"[指令] To {packet.TargetId}: {packet.JsonData}"); + // ★★★ 处理注册握手 ★★★ + if (action == "Register") + { + HandleRegistration(serviceId, jObj, identityBytes); + } + else + { + // 其他业务消息,透传给上层 + Console.WriteLine($"[指令] From {serviceId}: {json}"); + OnMessageReceived?.Invoke(serviceId, json); } } - catch (Exception ex) + catch (JsonException) { - Debug.WriteLine($"[Command Send Error] {ex.Message}"); + Console.WriteLine($"[指令] 收到非 JSON 消息 From {serviceId}: {json}"); } } /// - /// 发送指令 (线程安全,可由 UI 线程调用) + /// 处理注册逻辑 + /// + private void HandleRegistration(string serviceId, JObject jObj, byte[] identityBytes) + { + var payload = jObj["Payload"]; + if (payload == null) return; + + // 1. 提取客户端信息 + var client = new ConnectedClient + { + ServiceId = serviceId, + Ip = payload["Ip"]?.ToString() ?? "Unknown", + + // ★★★ 解析新字段 ★★★ + WebPort = payload["WebPort"]?.Value() ?? 5000, + Version = payload["Version"]?.ToString() ?? "Unknown", + Pid = payload["Pid"]?.Value() ?? 0, + + TargetVideoNodes = payload["TargetVideoNodes"]?.ToObject>() ?? new List(), + LastHeartbeat = DateTime.Now + }; + + // 2. 存入内存表 (Add or Update) + _clients.AddOrUpdate(serviceId, client, (key, old) => client); + + Console.WriteLine($"[注册成功] {serviceId}"); + + // 3. 回复 ACK (握手确认) + // 告诉客户端:我收到你的注册了,连接建立成功 + var ackPacket = new { Action = "ACK", Message = $"Registered {serviceId}", Time = DateTime.Now }; + string ackJson = JsonConvert.SerializeObject(ackPacket); + + // 直接在 Poller 线程发回,不需要走 Queue (因为拥有 Socket 所有权) + _routerSocket?.SendMoreFrame(identityBytes).SendFrame(ackJson); + + // 4. 通知 UI 更新列表 + OnClientRegistered?.Invoke(client); + } + + /// + /// [Poller线程] 处理发送队列 + /// + private void OnQueueReady(object? sender, NetMQQueueEventArgs e) + { + if (_routerSocket == null) return; + + if (e.Queue.TryDequeue(out var packet, TimeSpan.Zero)) + { + // Router 发送:[Identity] [Data] + _routerSocket.SendMoreFrame(packet.TargetId) + .SendFrame(packet.JsonData); + + Console.WriteLine($"[发送] To {packet.TargetId}: {packet.JsonData}"); + } + } + + /// + /// [公共API] 向指定 Service 发送指令 /// public void SendCommand(string targetServiceId, object commandData) { @@ -138,7 +210,6 @@ public class CommandServer : IDisposable var json = JsonConvert.SerializeObject(commandData); - // ★★★ 核心修复:不直接操作 Socket,而是入队 ★★★ _sendQueue.Enqueue(new CommandPacket { TargetId = targetServiceId, @@ -152,13 +223,12 @@ public class CommandServer : IDisposable _poller?.Dispose(); _routerSocket?.Dispose(); _sendQueue?.Dispose(); - - _poller = null; - _routerSocket = null; - _sendQueue = null; } - // 内部数据包结构 + // ============================================================= + // 数据模型 + // ============================================================= + private class CommandPacket { public string TargetId { get; set; } = ""; diff --git a/SHH.CameraDashboard/Invokes/ConnectedClient.cs b/SHH.CameraDashboard/Invokes/ConnectedClient.cs new file mode 100644 index 0000000..8bed2e8 --- /dev/null +++ b/SHH.CameraDashboard/Invokes/ConnectedClient.cs @@ -0,0 +1,30 @@ +namespace SHH.CameraDashboard; + +/// +/// 在线客户端信息模型 (已更新) +/// +public class ConnectedClient +{ + /// 唯一标识 (AppId) + public string ServiceId { get; set; } = string.Empty; + + /// 版本号 + public string Version { get; set; } = "1.0.0"; + + /// 远程进程 ID + public int Pid { get; set; } + + /// 客户端 IP + public string Ip { get; set; } = string.Empty; + + /// WebAPI 端口 (Dashboard 调用 REST 接口用) + public int WebPort { get; set; } + + /// 该客户端正在推流的目标地址 + public List TargetVideoNodes { get; set; } = new List(); + + public DateTime LastHeartbeat { get; set; } + + // 辅助属性:拼接出完整的 API BaseUrl + public string WebApiUrl => $"http://{Ip}:{WebPort}"; +} \ No newline at end of file diff --git a/SHH.CameraDashboard/Invokes/StreamReceiverService.cs b/SHH.CameraDashboard/Invokes/StreamReceiverService.cs index ebbecd4..110b494 100644 --- a/SHH.CameraDashboard/Invokes/StreamReceiverService.cs +++ b/SHH.CameraDashboard/Invokes/StreamReceiverService.cs @@ -1,67 +1,54 @@ using NetMQ; using NetMQ.Sockets; -using System.Diagnostics; // 用于 Debug 输出 +using Newtonsoft.Json; +using SHH.Contracts; // ★★★ 必须引用契约库 ★★★ +using System.Diagnostics; -namespace SHH.CameraDashboard.Services; +namespace SHH.CameraDashboard; public class StreamReceiverService : IDisposable { // 单例模式 public static StreamReceiverService Instance { get; } = new StreamReceiverService(); - public event Action? OnFrameReceived; + // ★★★ 核心变更:使用强类型契约载体 ★★★ + public event Action? OnPayloadReceived; private SubscriberSocket? _subSocket; private Task? _receiveTask; - - // 【修复1】不要在这里初始化,改为在 Start 中初始化 private CancellationTokenSource? _cts; public int ListenPort { get; private set; } - // 增加运行状态标记 + // 运行状态检查 public bool IsRunning => _receiveTask != null && !_receiveTask.IsCompleted; private StreamReceiverService() { } public void Start(int port = 6000) { - // 1. 防止重复启动 if (IsRunning) return; - ListenPort = port; - - // 【修复1】每次启动时创建新的 TokenSource _cts = new CancellationTokenSource(); try { - // 2. 初始化 Socket _subSocket = new SubscriberSocket(); - - // 【优化】设置高水位限制 (HWM) - // 如果 UI 处理不过来,积压超过 1000 帧直接丢弃,防止内存爆炸 + // 设置高水位,防止 UI 卡顿时内存溢出 _subSocket.Options.ReceiveHighWatermark = 1000; + _subSocket.Bind($"tcp://*:{ListenPort}"); + _subSocket.Subscribe(""); // 订阅所有内容(这是 Dealer-Router/Pub-Sub 的基础) - string bindAddr = $"tcp://*:{ListenPort}"; - _subSocket.Bind(bindAddr); - _subSocket.Subscribe(""); - - Console.WriteLine($"[Dashboard] 视频流接收服务启动: {bindAddr}"); + Console.WriteLine($"[Dashboard] 视频流接收服务启动: tcp://*:{ListenPort}"); } catch (Exception ex) { - Console.WriteLine($"[Dashboard] 致命错误 - 端口绑定失败: {ex.Message}"); - - // 清理资源 + // 明确抛出异常,让 App.xaml.cs 知道启动失败了 _subSocket?.Dispose(); _subSocket = null; - - // 【修复4】抛出异常让上层知道启动失败了 throw new Exception($"端口 {port} 绑定失败,可能被占用。", ex); } - // 3. 启动任务 _receiveTask = Task.Run(ReceiveLoop, _cts.Token); } @@ -73,61 +60,55 @@ public class StreamReceiverService : IDisposable { try { - // 【修复2】线程安全检查 if (_subSocket == null) break; - // 接收 Topic - if (!_subSocket.TryReceiveFrameString(TimeSpan.FromMilliseconds(500), out string cameraId)) + // ========================================================= + // 核心解析逻辑:适配 Service 端的 4 帧复合协议 + // ========================================================= + NetMQMessage msg = new NetMQMessage(); + + // 1. 非阻塞接收多帧消息 + if (!_subSocket.TryReceiveMultipartMessage(TimeSpan.FromMilliseconds(500), ref msg)) continue; - // 接收 Payload - if (!_subSocket.TryReceiveFrameBytes(TimeSpan.FromMilliseconds(100), out byte[] jpgBytes)) - continue; + // 2. 协议完整性检查 + if (msg.FrameCount < 4) continue; - // 触发事件 - OnFrameReceived?.Invoke(cameraId, jpgBytes); - } - catch (ObjectDisposedException) - { - // 【修复2】这是正常的退出流程(Socket被Dispose了),优雅退出循环 - break; + // 3. 协议头校验 (Frame 0) + if (msg[0].ConvertToString() != "SHH_V1") continue; + + // 4. 反序列化元数据 (Frame 1) + string json = msg[1].ConvertToString(); + var payload = JsonConvert.DeserializeObject(json); + + if (payload == null) continue; + + // 5. 填充二进制图像数据 (Frame 2 & 3) + // 注意:NetMQ 的 msg 数据是非托管内存,转为 byte[] 实现了拷贝,安全供 UI 使用 + if (payload.HasOriginalImage) + payload.OriginalImageBytes = msg[2].ToByteArray(); + + if (payload.HasTargetImage) + payload.TargetImageBytes = msg[3].ToByteArray(); + + // 6. 触发事件 + OnPayloadReceived?.Invoke(payload); } catch (Exception ex) { - // 记录日志,但不崩溃 - Debug.WriteLine($"[ReceiverLoop Error] {ex.Message}"); + Debug.WriteLine($"[Receiver Error] {ex.Message}"); } } Console.WriteLine("[Dashboard] 接收循环已停止"); } - /// - /// 停止服务(支持停止后重新 Start) - /// public void Stop() { - // 1. 发出取消信号 - if (_cts != null && !_cts.IsCancellationRequested) - { - _cts.Cancel(); - } - - // 2. 销毁 Socket (这会触发 ReceiveLoop 中的 ObjectDisposedException 从而退出循环) - if (_subSocket != null) - { - try { _subSocket.Dispose(); } catch { } - _subSocket = null; - } - - // 3. 清理 Token - _cts?.Dispose(); - _cts = null; - + _cts?.Cancel(); + try { _subSocket?.Dispose(); } catch { } + _subSocket = null; _receiveTask = null; } - public void Dispose() - { - Stop(); - } + public void Dispose() => Stop(); } \ No newline at end of file diff --git a/SHH.CameraDashboard/Pages/CameraWall/VideoTileViewModel.cs b/SHH.CameraDashboard/Pages/CameraWall/VideoTileViewModel.cs index ae81e3c..6381b67 100644 --- a/SHH.CameraDashboard/Pages/CameraWall/VideoTileViewModel.cs +++ b/SHH.CameraDashboard/Pages/CameraWall/VideoTileViewModel.cs @@ -1,6 +1,6 @@ using System.Windows; using System.Windows.Media.Imaging; -using SHH.CameraDashboard.Services; // 引用服务命名空间 +using SHH.Contracts; // ★★★ 引用契约库 (VideoPayload) ★★★ namespace SHH.CameraDashboard; @@ -8,7 +8,7 @@ public class VideoTileViewModel : ViewModelBase { private readonly string _boundCameraId; - // --- 属性定义 --- + // --- 属性定义 (保持不变) --- private string _cameraName; public string CameraName { @@ -37,38 +37,59 @@ public class VideoTileViewModel : ViewModelBase CameraName = name; StatusInfo = "等待信号..."; - // 【修正 1】直接订阅单例服务 - // 不需要判断 null,因为 Instance 是静态初始化的,永远存在 - StreamReceiverService.Instance.OnFrameReceived += OnGlobalFrameReceived; + // ★★★ 变更 1: 订阅新的 OnPayloadReceived 事件 ★★★ + // 旧的 OnFrameReceived(string, byte[]) 已经无法满足需求 + StreamReceiverService.Instance.OnPayloadReceived += OnPayloadReceived; } // --- 事件回调 (后台线程) --- - private void OnGlobalFrameReceived(string cameraId, byte[] jpgData) + // ★★★ 变更 2: 参数变为 VideoPayload 实体对象 ★★★ + private void OnPayloadReceived(VideoPayload payload) { - // 1. 过滤:不是我的画面,直接忽略 - if (cameraId != _boundCameraId) return; + // 1. 过滤:校验 Payload 中的 CameraId + if (payload.CameraId != _boundCameraId) return; - // 2. 解码:耗时操作在后台完成 - var bitmap = BitmapHelper.ToBitmapImage(jpgData); + // 2. ★★★ 智能选图策略 ★★★ + // 优先显示 AI 处理后的图 (TargetImageBytes) + // 如果没有处理图,则降级显示原始图 (OriginalImageBytes) + byte[] dataToShow = null; + + if (payload.HasTargetImage && payload.TargetImageBytes != null) + { + dataToShow = payload.TargetImageBytes; + } + else if (payload.HasOriginalImage && payload.OriginalImageBytes != null) + { + dataToShow = payload.OriginalImageBytes; + } + + // 如果两张图都没有,直接返回 + if (dataToShow == null || dataToShow.Length == 0) return; + + // 3. 解码图片 (耗时操作在后台完成) + var bitmap = BitmapHelper.ToBitmapImage(dataToShow); if (bitmap == null) return; - // 3. 【修正 2】恢复 UI 更新逻辑 - // 必须使用 Dispatcher,因为 VideoSource 绑定在界面上,只能在主线程修改 + // 4. ★★★ 计算端到端延迟 ★★★ + // 当前时间(接收端) - 采集时间(发送端) = 真实的网络+处理延迟 + long latency = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - payload.CaptureTimestamp; + + // 5. UI 更新 Application.Current.Dispatcher.InvokeAsync(() => { VideoSource = bitmap; - // 更新状态信息 (例如显示当前时间和数据大小) - StatusInfo = $"{DateTime.Now:HH:mm:ss} | {jpgData.Length / 1024} KB"; + // 显示更丰富的信息:延迟毫秒数、数据量、当前时间 + // 工业监控中,"延迟(ms)" 是比 "当前时间" 更重要的指标 + StatusInfo = $"延迟: {latency}ms | {dataToShow.Length / 1024} KB | {DateTime.Now:HH:mm:ss}"; }); } // --- 资源清理 --- public void Unload() { - // 【修正 3】从单例服务取消订阅 - // 这一步至关重要,否则切换页面时会内存泄漏 - StreamReceiverService.Instance.OnFrameReceived -= OnGlobalFrameReceived; + // ★★★ 变更 3: 取消订阅新的事件 ★★★ + StreamReceiverService.Instance.OnPayloadReceived -= OnPayloadReceived; // 清空图片引用,帮助 GC 回收内存 VideoSource = null; diff --git a/SHH.CameraDashboard/Pages/CameraWall/VideoWallViewModel.cs b/SHH.CameraDashboard/Pages/CameraWall/VideoWallViewModel.cs index e9ff5b6..3cacc0e 100644 --- a/SHH.CameraDashboard/Pages/CameraWall/VideoWallViewModel.cs +++ b/SHH.CameraDashboard/Pages/CameraWall/VideoWallViewModel.cs @@ -1,14 +1,10 @@ -using SHH.Contracts; -using System.Collections.ObjectModel; +using System.Collections.ObjectModel; using System.Windows.Input; namespace SHH.CameraDashboard { public class VideoWallViewModel : ViewModelBase { - // 引用推流接收服务 - private readonly VideoPushServer _pushServer; - // 视频列表 public ObservableCollection VideoTiles { get; } = new ObservableCollection(); @@ -27,35 +23,11 @@ namespace SHH.CameraDashboard { SetLayoutCommand = new RelayCommand(ExecuteSetLayout); - // 1. 初始化并启动接收服务 - _pushServer = new VideoPushServer(); - _pushServer.OnFrameReceived += OnGlobalFrameReceived; - - // 2. 启动监听端口 (比如 6000) - // 之后你的采集端 ForwarderClient 需要 Connect("tcp://你的IP:6000") - _pushServer.Start(6000); - // 3. 初始化格子 (不再需要传入 IP/Port 去主动连接了) // 我们用 CameraId 或 Name 来作为匹配标识 InitVideoTiles(); } - /// - /// 全局接收回调:收到任何一路视频都会进这里 - /// - private void OnGlobalFrameReceived(VideoPayload payload) - { - // 1. 在 VideoTiles 集合中找到对应的格子 - // 假设 payload.CameraId 与我们 VideoTileViewModel 中的 ID 对应 - //var targetTile = VideoTiles.FirstOrDefault(t => t.id == payload.CameraId); - - //if (targetTile != null) - //{ - // // 2. 将数据交给格子去渲染 - // targetTile.UpdateFrame(payload); - //} - } - private void InitVideoTiles() { // 假设我们预设 4 个格子,分别对应不同的摄像头 ID diff --git a/SHH.CameraSdk/Abstractions/Enums/SubscriptionType.cs b/SHH.CameraSdk/Abstractions/Enums/SubscriptionType.cs index 2a30adc..0ca7c6f 100644 --- a/SHH.CameraSdk/Abstractions/Enums/SubscriptionType.cs +++ b/SHH.CameraSdk/Abstractions/Enums/SubscriptionType.cs @@ -8,38 +8,44 @@ namespace SHH.CameraSdk; /// public enum SubscriptionType { + /// + /// 仅提供流 + /// + [Description("仅提供流")] + Stream = 0, + /// /// 本地窗口渲染 /// 直接在服务器端显示器绘制(如 OpenCV Window、WinForm 控件) /// [Description("本地窗口显示")] - LocalWindow = 0, + LocalWindow = 1, /// /// 本地录像存储 /// 写入磁盘文件(如 MP4/AVI 格式,支持定时切割、循环覆盖) /// [Description("本地录像存储")] - LocalRecord = 1, + LocalRecord = 2, /// /// 句柄绑定显示 /// 渲染到指定 HWND 窗口句柄(如 SDK 硬件解码渲染到客户端控件) /// [Description("句柄绑定显示")] - HandleDisplay = 2, + HandleDisplay = 3, /// /// 自定义网络传输 /// 通过私有协议转发给第三方系统(如工控机、告警服务器) /// [Description("自定义网络传输")] - NetworkTrans = 3, + NetworkTrans = 4, /// /// 网页端推流 /// 转码为 Web 标准协议(如 WebRTC、HLS、RTMP)供浏览器播放 /// [Description("网页端推流")] - WebPush = 4 + WebPush = 5 } \ No newline at end of file diff --git a/SHH.CameraSdk/Configs/ServiceConfig.cs b/SHH.CameraSdk/Configs/ServiceConfig.cs index 62d3bc1..c012f1f 100644 --- a/SHH.CameraSdk/Configs/ServiceConfig.cs +++ b/SHH.CameraSdk/Configs/ServiceConfig.cs @@ -1,227 +1,141 @@ namespace SHH.CameraSdk; -/// -/// 全局服务配置模型 (V3 最终版) -/// 负责解析命令行参数,构建网络拓扑和身份标识 -/// public class ServiceConfig { // ========================================== - // 1. 身份与进程属性 + // 1. 基础属性 // ========================================== - - /// - /// 父进程 PID (用于哨兵守护,--pid) - /// public int ParentPid { get; private set; } - - /// - /// 应用完整标识 (例如 "CameraApp_01", --appid) - /// public string AppId { get; private set; } = "Unknown_01"; - - /// - /// 【核心】从 AppId 自动提取的数字编号 - /// 规则:取最后一个下划线后的数字 - /// 示例:"CameraApp_05" -> 5 - /// public int NumericId { get; private set; } = 1; - - // ========================================== - // 2. 网络连接属性 (分流) - // ========================================== - - /// - /// 视频流目标地址列表 (对应 & 符号左侧) - /// ZeroMQBridgeWorker 使用此列表 - /// - public List VideoEndpoints { get; private set; } = new List(); - - /// - /// 指令控制目标地址列表 (对应 & 符号右侧) - /// CommandClientWorker 使用此列表 - /// - public List CommandEndpoints { get; private set; } = new List(); - - /// - /// WebAPI 基础端口 (--ports 的第一个值) - /// public int BasePort { get; private set; } = 5000; - - /// - /// 端口扫描范围 (--ports 的第二个值) - /// public int MaxPortRange { get; private set; } = 100; - - /// - /// 网络模式 (--mode) - /// public NetworkMode Mode { get; private set; } = NetworkMode.Passive; - - // ========================================== - // 3. 辅助属性 - // ========================================== - - /// - /// 是否需要执行 Connect 操作 - /// public bool ShouldConnect => Mode == NetworkMode.Active || Mode == NetworkMode.Hybrid; // ========================================== - // 4. 解析入口 (Factory Method) + // 2. 目标地址列表 (类型变了!) // ========================================== + // ★★★ 修改点:从 List 变为 List ★★★ + public List VideoEndpoints { get; private set; } = new List(); + public List CommandEndpoints { get; private set; } = new List(); + + // ========================================== + // 3. 工厂方法 (保持不变) + // ========================================== public static ServiceConfig BuildFromArgs(string[] args) { var config = new ServiceConfig(); - for (int i = 0; i < args.Length; i++) { - // 1. 预处理 Key var key = args[i].ToLower().Trim(); + var value = (i + 1 < args.Length && !args[i + 1].StartsWith("--")) ? args[i + 1] : string.Empty; + bool consumed = !string.IsNullOrEmpty(value); - // 2. 预取 Value (如果存在且不是下一个 flag) - var value = (i + 1 < args.Length) ? args[i + 1] : string.Empty; - - // 简单判断:如果 value 以 -- 开头,说明当前 key 是开关,或者参数值缺失 - if (value.StartsWith("--")) value = string.Empty; - - bool consumed = false; // 标记是否消耗了下一个参数 - - // 3. 匹配参数 switch (key) { - case "--pid": - if (int.TryParse(value, out int pid)) config.ParentPid = pid; - consumed = true; - break; - + case "--pid": if (int.TryParse(value, out int pid)) config.ParentPid = pid; break; case "--appid": if (!string.IsNullOrWhiteSpace(value)) { config.AppId = value; - // ★★★ 立即解析数字编号 ★★★ config.NumericId = ParseIdFromAppId(value); } - consumed = true; break; - case "--uris": - if (!string.IsNullOrWhiteSpace(value)) - { - // ★★★ 解析复杂 URI 字符串 ★★★ - ParseUris(config, value); - } - consumed = true; + if (!string.IsNullOrWhiteSpace(value)) ParseSingleUriConfig(config, value); break; - - case "--mode": - if (int.TryParse(value, out int m) && Enum.IsDefined(typeof(NetworkMode), m)) - { - config.Mode = (NetworkMode)m; - } - consumed = true; - break; - + case "--mode": if (int.TryParse(value, out int m)) config.Mode = (NetworkMode)m; break; case "--ports": - // 格式: "BasePort,Range" -> "6003,100" if (!string.IsNullOrWhiteSpace(value) && value.Contains(",")) { var parts = value.Split(','); - if (parts.Length >= 1) - { - if (int.TryParse(parts[0], out int baseP)) config.BasePort = baseP; - } - if (parts.Length >= 2) - { - if (int.TryParse(parts[1], out int range)) config.MaxPortRange = range; - } + if (parts.Length >= 1 && int.TryParse(parts[0], out int p)) config.BasePort = p; + if (parts.Length >= 2 && int.TryParse(parts[1], out int r)) config.MaxPortRange = r; } - consumed = true; break; } - - // 4. 如果消耗了 Value,跳过下一个索引 if (consumed) i++; } - return config; } - // ========================================== - // 5. 核心解析算法实现 - // ========================================== - - /// - /// 算法:提取下划线后的数字 - /// private static int ParseIdFromAppId(string appId) { if (string.IsNullOrWhiteSpace(appId)) return 1; - - // 查找最后一个下划线 int lastIdx = appId.LastIndexOf('_'); - - // 确保下划线存在,且后面还有字符 if (lastIdx >= 0 && lastIdx < appId.Length - 1) { - string numPart = appId.Substring(lastIdx + 1); - if (int.TryParse(numPart, out int id)) - { - return id; - } + if (int.TryParse(appId.Substring(lastIdx + 1), out int id)) return id; } - - // 解析失败默认返回 1 return 1; } - /// - /// 算法:解析 URI 列表并分流 - /// 格式: IP,VideoPort&CommandPort - /// 空缺处理: "&6001" (仅指令), "6002&" (仅视频) - /// - private static void ParseUris(ServiceConfig config, string rawValue) + // ========================================== + // 4. 解析算法实现 (核心修改) + // ========================================== + private static void ParseSingleUriConfig(ServiceConfig config, string rawValue) { - // 1. 按分号拆分不同主机配置 - // "127.0.0.1,6002&6001; 192.168.1.5,&6001" - var groups = rawValue.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries); + var segments = rawValue.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries); - foreach (var group in groups) + foreach (var segment in segments) { - // 2. 按逗号拆分 IP 和 端口段 - var hostParts = group.Split(','); - if (hostParts.Length < 2) continue; // 格式非法 + var parts = segment.Split(','); + if (parts.Length < 3) continue; - string ip = hostParts[0].Trim(); - string portSection = hostParts[1].Trim(); // "6002&6001" + string ip = parts[0].Trim(); + string portStr = parts[1].Trim(); + string type = parts[2].Trim().ToLower(); - // 3. 按 & 拆分端口 (注意:不要 RemoveEmptyEntries,位置很重要) - var ports = portSection.Split('&'); + // ★★★ 提取第四个字段作为备注 ★★★ + string desc = parts.Length >= 4 ? parts[3].Trim() : "未命名终端"; - // --- 索引 0: 视频端口 --- - if (ports.Length > 0) + if (int.TryParse(portStr, out int port)) { - string p = ports[0].Trim(); - if (!string.IsNullOrWhiteSpace(p) && int.TryParse(p, out int port)) + string zmqUri = $"tcp://{ip}:{port}"; + + // 构建对象 + var endpoint = new ServiceEndpoint { - string uri = $"tcp://{ip}:{port}"; - if (!config.VideoEndpoints.Contains(uri)) - config.VideoEndpoints.Add(uri); + Uri = zmqUri, + Description = desc + }; + + // 添加前检查 Uri 是否重复 (备注不参与排重) + if (type == "video") + { + if (!config.VideoEndpoints.Any(e => e.Uri == zmqUri)) + config.VideoEndpoints.Add(endpoint); } - } - - // --- 索引 1: 指令端口 --- - if (ports.Length > 1) - { - string p = ports[1].Trim(); - if (!string.IsNullOrWhiteSpace(p) && int.TryParse(p, out int port)) + else if (type == "command" || type == "text") { - string uri = $"tcp://{ip}:{port}"; - if (!config.CommandEndpoints.Contains(uri)) - config.CommandEndpoints.Add(uri); + if (!config.CommandEndpoints.Any(e => e.Uri == zmqUri)) + config.CommandEndpoints.Add(endpoint); } } } } +} + +/// +/// [新增] 端点配置对象,包含地址和备注 +/// +public class ServiceEndpoint +{ + /// + /// ZeroMQ 连接地址 (e.g. "tcp://127.0.0.1:6001") + /// + public string Uri { get; set; } = string.Empty; + + /// + /// 备注信息 (e.g. "调试机", "大屏") + /// + public string Description { get; set; } = string.Empty; + + /// + /// ToString + /// + /// + public override string ToString() => $"{Uri} ({Description})"; } \ No newline at end of file diff --git a/SHH.CameraSdk/Controllers/CamerasController.cs b/SHH.CameraSdk/Controllers/CamerasController.cs index 62bfbd4..23644e8 100644 --- a/SHH.CameraSdk/Controllers/CamerasController.cs +++ b/SHH.CameraSdk/Controllers/CamerasController.cs @@ -1,4 +1,5 @@ using Microsoft.AspNetCore.Mvc; +using SHH.Contracts; namespace SHH.CameraSdk; diff --git a/SHH.CameraSdk/Controllers/Dto/CameraConfigDto.cs b/SHH.CameraSdk/Controllers/Dto/CameraConfigDto.cs deleted file mode 100644 index a7d3c85..0000000 --- a/SHH.CameraSdk/Controllers/Dto/CameraConfigDto.cs +++ /dev/null @@ -1,125 +0,0 @@ -using System.ComponentModel.DataAnnotations; - -namespace SHH.CameraSdk; - -// ============================================================================== -// 1. 物理与运行配置 DTO (对应 CRUD 操作) -// 用于设备新增/全量配置查询,包含基础身份、连接信息、运行参数等全量字段 -// ============================================================================== -public class CameraConfigDto -{ - // --- 基础身份 (Identity) --- - /// - /// 设备唯一标识 - /// - [Required(ErrorMessage = "设备ID不能为空")] - [Range(1, long.MaxValue, ErrorMessage = "设备ID必须为正整数")] - public long Id { get; set; } - - /// - /// 设备友好名称 - /// - [MaxLength(64, ErrorMessage = "设备名称长度不能超过64个字符")] - public string Name { get; set; } = string.Empty; - - /// - /// 摄像头品牌类型 (0:HikVision, 1:Dahua, 2:RTSP...) - /// - [Range(0, 10, ErrorMessage = "品牌类型值必须在0-10范围内")] - public int Brand { get; set; } - - /// - /// 设备安装位置描述 - /// - [MaxLength(128, ErrorMessage = "安装位置长度不能超过128个字符")] - public string Location { get; set; } = string.Empty; - - // --- 核心连接 (Connectivity) - 修改此类参数触发冷重启 --- - /// - /// 摄像头IP地址 - /// - [Required(ErrorMessage = "IP地址不能为空")] - [RegularExpression(@"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$", - ErrorMessage = "请输入合法的IPv4地址")] - public string IpAddress { get; set; } = string.Empty; - - /// - /// SDK端口 (如海康默认8000) - /// - [Range(1, 65535, ErrorMessage = "端口号必须在1-65535范围内")] - public ushort Port { get; set; } = 8000; - - /// - /// 登录用户名 - /// - [MaxLength(32, ErrorMessage = "用户名长度不能超过32个字符")] - public string Username { get; set; } = string.Empty; - - /// - /// 登录密码 - /// - [MaxLength(64, ErrorMessage = "密码长度不能超过64个字符")] - public string Password { get; set; } = string.Empty; - - public long RenderHandle { get; set; } - - /// - /// 通道号 (通常为1) - /// - [Range(1, 32, ErrorMessage = "通道号必须在1-32范围内")] - public int ChannelIndex { get; set; } = 1; - - /// - /// RTSP流路径 (备用或非SDK模式使用) - /// - [MaxLength(256, ErrorMessage = "RTSP地址长度不能超过256个字符")] - public string RtspPath { get; set; } = string.Empty; - - // --- 主板关联信息 (Metadata) --- - /// - /// 关联主板IP地址 - /// - [RegularExpression(@"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)?$", - ErrorMessage = "请输入合法的IPv4地址")] - public string MainboardIp { get; set; } = string.Empty; - - /// - /// 关联主板端口 - /// - [Range(1, 65535, ErrorMessage = "主板端口号必须在1-65535范围内")] - public int MainboardPort { get; set; } = 80; - - // --- 运行时参数 (Runtime Options) - 支持热更新 --- - /// - /// 码流类型 (0:主码流, 1:子码流) - /// - [Range(0, 1, ErrorMessage = "码流类型只能是0(主码流)或1(子码流)")] - public int StreamType { get; set; } = 0; - - /// - /// 是否使用灰度图 (用于AI分析场景加速) - /// - public bool UseGrayscale { get; set; } = false; - - /// - /// 是否启用图像增强 (去噪/锐化等) - /// - public bool EnhanceImage { get; set; } = true; - - // --- 画面变换 (Transform) - 支持热更新 --- - /// - /// 是否允许图像压缩 (降低带宽占用) - /// - public bool AllowCompress { get; set; } = true; - - /// - /// 是否允许图像放大 (提升渲染质量) - /// - public bool AllowExpand { get; set; } = false; - - /// - /// 目标分辨率 (格式如 1920x1080,空则保持原图) - /// - [RegularExpression(@"^\d+x\d+$", ErrorMessage = "分辨率格式必须为 宽度x高度 (如 1920x1080)")] - public string TargetResolution { get; set; } = string.Empty; -} \ No newline at end of file diff --git a/SHH.CameraSdk/Controllers/Dto/SubscriptionDto.cs b/SHH.CameraSdk/Controllers/Dto/SubscriptionDto.cs index e091a5a..0ddd5c6 100644 --- a/SHH.CameraSdk/Controllers/Dto/SubscriptionDto.cs +++ b/SHH.CameraSdk/Controllers/Dto/SubscriptionDto.cs @@ -9,7 +9,7 @@ namespace SHH.CameraSdk; public class SubscriptionDto { /// - /// 进程唯一标识 (如 "AI_Process_01"、"Main_Display_02") + /// 订阅标识 (如 "AI_Process_01"、"Main_Display_02") /// [Required(ErrorMessage = "进程标识 AppId 不能为空")] [MaxLength(50, ErrorMessage = "AppId 长度不能超过 50 个字符")] diff --git a/SHH.CameraSdk/Core/Manager/CameraManager.cs b/SHH.CameraSdk/Core/Manager/CameraManager.cs index 1047998..02dbfe7 100644 --- a/SHH.CameraSdk/Core/Manager/CameraManager.cs +++ b/SHH.CameraSdk/Core/Manager/CameraManager.cs @@ -309,7 +309,7 @@ public class CameraManager : IDisposable, IAsyncDisposable { var options = new DynamicStreamOptions { - StreamType = dto.StreamType, + StreamType = dto.StreamType ?? newConfig.StreamType, RenderHandle = (IntPtr)dto.RenderHandle }; device.ApplyOptions(options); @@ -428,4 +428,13 @@ public class CameraManager : IDisposable, IAsyncDisposable } #endregion + + /// + /// [新增] 获取当前管理的所有相机设备(兼容网络引擎接口) + /// + public IEnumerable GetAllCameras() + { + // 复用现有的 GetAllDevices 逻辑 + return GetAllDevices(); + } } \ No newline at end of file diff --git a/SHH.CameraSdk/Core/Memory/SmartFrame.cs b/SHH.CameraSdk/Core/Memory/SmartFrame.cs index b0c52ca..40c790f 100644 --- a/SHH.CameraSdk/Core/Memory/SmartFrame.cs +++ b/SHH.CameraSdk/Core/Memory/SmartFrame.cs @@ -28,6 +28,12 @@ public class SmartFrame : IDisposable /// 内存由帧池预分配,全程复用,不触发 GC public Mat InternalMat { get; private set; } + /// [快捷属性] 原始图像宽度 (若 TargetMat 为空则返回 0) + public int InternalWidth => InternalMat?.Width ?? 0; + + /// [快捷属性] 原始图像高度 (若 TargetMat 为空则返回 0) + public int InnernalHeight => InternalMat?.Height ?? 0; + /// 帧激活时间戳(记录帧被取出池的时刻) public DateTime Timestamp { get; private set; } diff --git a/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs b/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs index 521ae79..1a70ea5 100644 --- a/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs +++ b/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs @@ -36,11 +36,6 @@ public static class GlobalStreamDispatcher // ================================================================= public static event Action OnGlobalFrame; - // ================================================================= - // 2. 原有:定向分发逻辑 (保留不动,给图像处理集群用) - // ================================================================= - // private static ConcurrentDictionary _subscribers ... - /// /// 统一入口:驱动层调用此方法分发图像 /// @@ -71,6 +66,10 @@ public static class GlobalStreamDispatcher /// private static readonly ConcurrentDictionary> _routingTable = new(); + // [新增] 旁路订阅支持 + // 用于 NetworkService 这种需要针对单个设备进行订阅/取消订阅的场景 + private static readonly ConcurrentDictionary>> _deviceSpecificTable = new(); + #endregion #region --- 3. 订阅管理接口 (Subscription Management API) --- @@ -98,27 +97,63 @@ public static class GlobalStreamDispatcher ); } + ///// + ///// [新增] 精准订阅:仅监听指定设备的特定 AppId 帧 + ///// 优势:内部自动过滤 DeviceId,回调函数无需再写 if 判断 + ///// + ///// 需求标识 + ///// 只接收此设备的帧 + ///// 处理回调(注意:此处签名不含 deviceId,因为已隐式确定) + //public static void Subscribe(string appId, long specificDeviceId, Action handler) + //{ + // // 创建一个“过滤器”闭包 + // Action wrapper = (id, frame) => + // { + // // 只有当来源 ID 与订阅 ID 一致时,才触发用户的业务回调 + // if (id == specificDeviceId) + // { + // handler(frame); + // } + // }; + + // // 将过滤器注册到基础路由表中 + // Subscribe(appId, wrapper); + //} + /// - /// [新增] 精准订阅:仅监听指定设备的特定 AppId 帧 - /// 优势:内部自动过滤 DeviceId,回调函数无需再写 if 判断 + /// [重写] 精准订阅:仅监听指定设备的特定 AppId 帧 + /// 修改说明:不再使用闭包 + 多播委托,而是存入二级字典,以便能精准取消 /// - /// 需求标识 - /// 只接收此设备的帧 - /// 处理回调(注意:此处签名不含 deviceId,因为已隐式确定) public static void Subscribe(string appId, long specificDeviceId, Action handler) { - // 创建一个“过滤器”闭包 - Action wrapper = (id, frame) => - { - // 只有当来源 ID 与订阅 ID 一致时,才触发用户的业务回调 - if (id == specificDeviceId) - { - handler(frame); - } - }; + if (string.IsNullOrWhiteSpace(appId) || handler == null) return; - // 将过滤器注册到基础路由表中 - Subscribe(appId, wrapper); + // 1. 获取或创建该 AppId 的设备映射表 + var deviceMap = _deviceSpecificTable.GetOrAdd(appId, _ => new ConcurrentDictionary>()); + + // 2. 添加或更新该设备的订阅 + // 注意:这里使用多播委托 (+),支持同一个 App 同一个 Device 有多个处理逻辑(虽然很少见) + deviceMap.AddOrUpdate(specificDeviceId, handler, (_, existing) => existing + handler); + } + + /// + /// [新增] 精准取消订阅:移除指定 AppId 下指定设备的订阅 + /// NetworkService 必须调用此方法来防止内存泄漏 + /// + public static void Unsubscribe(string appId, long specificDeviceId) + { + if (string.IsNullOrWhiteSpace(appId)) return; + + // 1. 查找该 AppId 是否有记录 + if (_deviceSpecificTable.TryGetValue(appId, out var deviceMap)) + { + // 2. 移除该设备的订阅委托 + if (deviceMap.TryRemove(specificDeviceId, out _)) + { + // 可选:如果该 AppId 下没设备了,是否清理外层字典?(为了性能通常不清理,或者定期清理) + // Console.WriteLine($"[Dispatcher] {appId} 已停止订阅设备 {specificDeviceId}"); + } + } } /// @@ -192,6 +227,26 @@ public static class GlobalStreamDispatcher } } + // ========================================================= + // B. [新增逻辑] 匹配设备级 AppId 订阅 (如 NetworkService) + // ========================================================= + if (_deviceSpecificTable.TryGetValue(appId, out var deviceMap)) + { + // 查找当前设备是否有订阅者 + if (deviceMap.TryGetValue(deviceId, out var deviceHandler)) + { + try + { + deviceHandler.Invoke(frame); + task.Context.AddLog($"帧任务 设备级 [Seq:{sequence}] 投递到 AppId:{appId}"); + } + catch (Exception ex) + { + Console.WriteLine($"[DispatchError] DeviceSpecific AppId={appId}, Dev={deviceId}: {ex.Message}"); + } + } + } + // 2. 匹配预设的全局通道(兼容旧版订阅逻辑) switch (appId.ToUpperInvariant()) { @@ -204,6 +259,43 @@ public static class GlobalStreamDispatcher } } + // ========================================================================= + // 2. [旁路通道] 扫描设备级订阅表 (NetworkService, 录像服务 等) + // 这是外部服务“被动”监听的目标,不在 targetAppIds 白名单里也要发 + // ========================================================================= + if (!_deviceSpecificTable.IsEmpty) + { + // 遍历所有注册了旁路监听的 AppId (例如 "NetService") + foreach (var kvp in _deviceSpecificTable) + { + string sidecarAppId = kvp.Key; + var deviceMap = kvp.Value; + + // 优化:如果这个 AppId 已经在上面的 targetAppIds 里处理过了,就跳过,防止重复发送 + // (例如:如果设备未来真的把 NetService 加入了白名单,这里就不重复发了) + if (targetAppIds.Contains(sidecarAppId)) continue; + + // 检查这个 AppId 下,是否有人订阅了当前这台设备 + if (deviceMap.TryGetValue(deviceId, out var handler)) + { + try + { + handler.Invoke(frame); + // task.Context.AddLog($"帧任务 [Seq:{sequence}] 旁路投递到: {sidecarAppId}"); + } + catch (Exception ex) + { + Console.WriteLine($"[SidecarDispatchError] App={sidecarAppId}, Dev={deviceId}: {ex.Message}"); + } + } + } + } + + // ========================================================================= + // 3. [上帝通道] 全局广播 + // ========================================================================= + OnGlobalFrame?.Invoke(deviceId, frame); + // 分发完成后记录遥测数据 GlobalTelemetry.RecordLog(sequence, task.Context); } diff --git a/SHH.CameraSdk/Core/Services/DisplayWindowManager.cs b/SHH.CameraSdk/Core/Services/DisplayWindowManager.cs index 504d9dc..2aa4b95 100644 --- a/SHH.CameraSdk/Core/Services/DisplayWindowManager.cs +++ b/SHH.CameraSdk/Core/Services/DisplayWindowManager.cs @@ -1,9 +1,4 @@ using OpenCvSharp; -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; namespace SHH.CameraSdk { @@ -166,6 +161,13 @@ namespace SHH.CameraSdk return; } + // 1. 先检查队列容量 (虽然 BlockingCollection 没有完美的无锁 IsFull,但可以通过 Count 判断) + // 这是一个不需要 100% 精确的优化,只要能拦截掉大部分无用功即可 + if (_uiActionQueue.Count >= 30) + { + return; // 直接丢弃,不进行克隆,节省 CPU + } + Mat frameClone = null; try { diff --git a/SHH.CameraSdk/Core/Services/FileStorageService.cs b/SHH.CameraSdk/Core/Services/FileStorageService.cs index 41dd9af..a103448 100644 --- a/SHH.CameraSdk/Core/Services/FileStorageService.cs +++ b/SHH.CameraSdk/Core/Services/FileStorageService.cs @@ -77,6 +77,7 @@ public class FileStorageService : IStorageService var list = JsonSerializer.Deserialize>(json, _jsonOptions); return list ?? new List(); + //return new List(); } catch (Exception ex) { diff --git a/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs b/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs index 1e527b5..eba3e99 100644 --- a/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs +++ b/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs @@ -1,5 +1,7 @@ using OpenCvSharp; using SHH.CameraSdk.HikFeatures; +using System.Runtime.ExceptionServices; +using System.Security; namespace SHH.CameraSdk; @@ -325,6 +327,9 @@ public class HikVideoSource : BaseVideoSource, #region --- 解码与帧分发 (Decoding) --- + // 必须同时加上 SecurityCritical + [HandleProcessCorruptedStateExceptions] + [SecurityCritical] private void SafeOnDecodingCallBack(int nPort, IntPtr pBuf, int nSize, ref HikPlayMethods.FRAME_INFO pFrameInfo, int nReserved1, int nReserved2) { // [优化] 维持心跳,防止被哨兵误杀 diff --git a/SHH.CameraSdk/SHH.CameraSdk.csproj b/SHH.CameraSdk/SHH.CameraSdk.csproj index 650444d..698573b 100644 --- a/SHH.CameraSdk/SHH.CameraSdk.csproj +++ b/SHH.CameraSdk/SHH.CameraSdk.csproj @@ -21,6 +21,10 @@ + + + + diff --git a/SHH.CameraService/CameraEngineWorker.cs b/SHH.CameraService/CameraEngineWorker.cs deleted file mode 100644 index 4c3099f..0000000 --- a/SHH.CameraService/CameraEngineWorker.cs +++ /dev/null @@ -1,113 +0,0 @@ -using Microsoft.Extensions.Hosting; -using SHH.CameraSdk; - -namespace SHH.CameraService; - -/// -/// 相机服务核心引擎工作者(后台长驻服务) -/// 核心职责: -/// 1. 管理 CameraManager 全生命周期(启动、配置、释放) -/// 2. 初始化网络哨兵(ConnectivitySentinel),监控设备网络连通性 -/// 3. 无配置时自动添加默认测试设备,降低调试门槛 -/// 设计说明: -/// - 基于 BackgroundService,运行在独立后台线程,不阻塞 Web 主线程 -/// - 与 CameraManager 强绑定,是所有相机设备的统一入口 -/// - 包含容错机制,添加设备失败不影响整体服务启动 -public class CameraEngineWorker : BackgroundService -{ - #region --- 依赖注入字段 --- - - /// - /// 相机管理器实例(核心业务对象) - /// 功能:管理所有相机设备的生命周期、状态监控、配置更新 - /// - private readonly CameraManager _manager; - - /// - /// 网络连通性哨兵实例 - /// 功能:周期性 Ping 设备、检测网络状态、触发断线重连 - /// - private readonly ConnectivitySentinel _sentinel; - - #endregion - - #region --- 构造函数 --- - /// - /// 初始化相机引擎工作者实例 - /// - /// 相机管理器(通过 DI 注入,已关联存储服务) - /// 网络哨兵(通过 DI 注入,已预设监控周期) - public CameraEngineWorker(CameraManager manager, ConnectivitySentinel sentinel) - { - _manager = manager ?? throw new ArgumentNullException( - nameof(manager), "相机管理器实例不能为空,核心引擎启动失败"); - _sentinel = sentinel ?? throw new ArgumentNullException( - nameof(sentinel), "网络哨兵实例不能为空,设备监控功能失效"); - } - - #endregion - - #region --- BackgroundService 核心实现 --- - - /// - /// 启动引擎:初始化相机管理器并加载业务配置 - /// 执行流程: - /// 1. 启动 CameraManager(加载本地配置文件中的设备信息) - /// 2. 加载默认业务逻辑(无设备时添加测试设备) - /// 注意:网络哨兵的启动逻辑已内置在其构造函数中,此处无需额外调用 - /// - /// 服务停止令牌(响应应用关闭/重启信号) - /// 异步任务(引擎启动完成后结束) - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - Console.WriteLine("[Engine] 核心引擎启动中..."); - Console.WriteLine("[Engine] 启动相机管理器(加载设备配置)"); - - try - { - // 启动相机管理器:加载 App_Data/Process_X 目录下的设备配置文件 - await _manager.StartAsync(); - Console.WriteLine("[Engine] 相机管理器启动成功,已加载配置文件中的设备"); - } - catch (Exception ex) - { - Console.WriteLine($"[Engine] 相机管理器启动失败:{ex.Message}"); - Console.WriteLine("[Engine] 警告:核心引擎将继续运行,但无法管理任何相机设备"); - return; - } - - Console.WriteLine("[Engine] 核心引擎启动完成,进入运行状态"); - Console.WriteLine("[Engine] 提示:可通过 API 接口添加/编辑/删除设备,实时生效"); - } - - /// - /// 停止引擎:优雅释放资源 - /// 执行流程: - /// 1. 调用 CameraManager.DisposeAsync(),释放所有设备连接、句柄、线程资源 - /// 2. 调用基类 StopAsync(),标记服务停止状态 - /// 注意:必须先释放 CameraManager,避免设备连接泄露 - /// - /// 取消令牌(用于强制终止释放流程) - /// 异步任务(资源释放完成后结束) - public override async Task StopAsync(CancellationToken cancellationToken) - { - Console.WriteLine("[Engine] 核心引擎正在停止..."); - - try - { - // 释放相机管理器:停止所有设备取流、注销登录、释放非托管资源 - await _manager.DisposeAsync(); - Console.WriteLine("[Engine] 相机管理器资源已释放"); - } - catch (Exception ex) - { - Console.WriteLine($"[Engine] 资源释放异常:{ex.Message}"); - } - - // 调用基类方法,完成服务停止 - await base.StopAsync(cancellationToken); - Console.WriteLine("[Engine] 核心引擎已停止"); - } - - #endregion -} \ No newline at end of file diff --git a/SHH.CameraService/CommandBusProcessor.cs b/SHH.CameraService/CommandBusProcessor.cs deleted file mode 100644 index 63633b1..0000000 --- a/SHH.CameraService/CommandBusProcessor.cs +++ /dev/null @@ -1,123 +0,0 @@ -using Newtonsoft.Json; -using SHH.CameraSdk; -using SHH.Contracts; - -namespace SHH.CameraService -{ - /// - /// 指令业务逻辑分发器 (纯逻辑层) - /// 职责:解析业务参数 -> 调用 CameraManager -> 返回执行结果 - /// 注意:本类不处理网络协议,也不负责 RequestId 的回填,只关注业务本身 - /// - public static class CommandBusProcessor - { - /// - /// 核心业务入口 - /// - /// 相机管理器实例 - /// 已解析的指令包 - /// 执行结果 (不含 RequestId,由调用方补充) - public static CommandResult ProcessBusinessLogic(CameraManager manager, CommandPayload payload) - { - string cmd = payload.CmdCode.ToUpper(); - - // 忽略客户端发回的 ACK (如果是双向确认模式) - if (cmd == "REGISTER_ACK") return CommandResult.Ok(); - - // 解析 TargetId (CameraId) - long deviceId = 0; - // 只有非 SYSTEM 指令才需要解析设备ID - if (payload.TargetId != "SYSTEM" && !long.TryParse(payload.TargetId, out deviceId)) - { - return CommandResult.Fail($"Invalid Device ID: {payload.TargetId}"); - } - - try - { - switch (cmd) - { - // ========================================== - // 1. PTZ 云台控制 - // ========================================== - case "PTZ": - { - var device = manager.GetDevice(deviceId); - if (device == null) return CommandResult.Fail("Device Not Found"); - if (!device.IsOnline) return CommandResult.Fail("Device Offline"); - - // 检查设备是否支持 PTZ 能力 (接口模式匹配) - if (device is IPtzFeature ptzFeature) - { - var ptzDto = JsonConvert.DeserializeObject(payload.JsonParams); - if (ptzDto == null) return CommandResult.Fail("Invalid PTZ Params"); - - // 异步转同步执行 (Task.Wait 在后台线程是安全的) - if (ptzDto.Duration > 0) - { - // 点动模式 (例如:向左转 500ms) - ptzFeature.PtzStepAsync(ptzDto.Action, ptzDto.Duration, ptzDto.Speed).Wait(); - } - else - { - // 持续模式 (开始转/停止转) - ptzFeature.PtzControlAsync(ptzDto.Action, ptzDto.Stop, ptzDto.Speed).Wait(); - } - return CommandResult.Ok("PTZ Executed"); - } - return CommandResult.Fail("Device does not support PTZ"); - } - - // ========================================== - // 2. 远程重启 - // ========================================== - case "REBOOT": - { - var device = manager.GetDevice(deviceId); - if (device == null) return CommandResult.Fail("Device Not Found"); - - if (device is IRebootFeature rebootFeature) - { - rebootFeature.RebootAsync().Wait(); - return CommandResult.Ok("Reboot command sent"); - } - return CommandResult.Fail("Device does not support Reboot"); - } - - // ========================================== - // 3. 时间同步 - // ========================================== - case "SYNC_TIME": - { - var device = manager.GetDevice(deviceId); - if (device == null) return CommandResult.Fail("Device Not Found"); - - if (device is ITimeSyncFeature timeFeature) - { - timeFeature.SetTimeAsync(DateTime.Now).Wait(); - return CommandResult.Ok("Time synced"); - } - return CommandResult.Fail("Device does not support TimeSync"); - } - - // ========================================== - // 4. 系统级指令 (心跳/诊断) - // ========================================== - case "PING": - return CommandResult.Ok("PONG"); - - default: - return CommandResult.Fail($"Unknown Command: {cmd}"); - } - } - catch (AggregateException ae) - { - // 捕获异步任务内部的异常 - return CommandResult.Fail($"Execution Error: {ae.InnerException?.Message}"); - } - catch (Exception ex) - { - return CommandResult.Fail($"Execution Error: {ex.Message}"); - } - } - } -} \ No newline at end of file diff --git a/SHH.CameraService/CommandBusService.cs b/SHH.CameraService/CommandBusService.cs deleted file mode 100644 index ca338c4..0000000 --- a/SHH.CameraService/CommandBusService.cs +++ /dev/null @@ -1,346 +0,0 @@ -using Microsoft.Extensions.Caching.Memory; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Configuration; -using NetMQ; -using NetMQ.Sockets; -using Newtonsoft.Json; -using SHH.CameraSdk; -using SHH.Contracts; - -namespace SHH.CameraService -{ - /// - /// 双模指令总线服务 (Enterprise V2) - /// 核心职责:建立 TCP 指令通道,接收客户端指令并分发给 CameraManager - /// 增强特性: - /// 1. 支持双模:被动监听 (Bind) 与 主动投递 (Connect) - /// 2. 幂等性控制:利用 MemoryCache 防止客户端重试导致的重复执行 - /// 3. 顺序一致性:利用时间戳防止指令乱序 - /// - public class CommandBusService : BackgroundService - { - #region --- 1. 字段与依赖 --- - - private readonly CameraManager _cameraManager; - private readonly IConfiguration _config; - private readonly IMemoryCache _cache; // 核心:用于请求去重 - private readonly int _processId; - - // 运行状态标志 - private volatile bool _isRunning = false; - - // 两种模式的 Socket (互斥存在) - private ResponseSocket? _repSocket; // 模式A: 被动监听 (Server-Listening) - private DealerSocket? _dealerSocket; // 模式B: 主动投递 (Server-Dialing) - - // 顺序一致性锁:记录每个设备最后处理的指令时间戳 - // Key: TargetId (设备ID), Value: Timestamp (最后执行时间) - private readonly Dictionary _deviceLastCmdTime = new(); - - #endregion - - #region --- 2. 构造函数 --- - - /// - /// 构造函数 (注意:必须在 Program.cs 注册 AddMemoryCache) - /// - public CommandBusService(CameraManager manager, IConfiguration config, IMemoryCache cache) - { - _cameraManager = manager; - _config = config; - _cache = cache; - // 获取当前进程 ID (默认为 1) - _processId = _config.GetValue("ProcessId", 1); - } - - #endregion - - #region --- 3. 核心生命周期 --- - - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - // 在后台线程启动,避免阻塞 Web 主线程 - return Task.Run(() => - { - _isRunning = true; - - // 1. 读取网络策略 - // 优先读取配置中的主动目标,如果没有则回退到被动监听 - string? activeTargetIp = _config["Network:ActiveTargets:0:Ip"]; - bool isActiveMode = !string.IsNullOrEmpty(activeTargetIp); - - try - { - if (isActiveMode) - { - // === 模式 B: 主动投递 (Server Connects Client) === - // 场景:服务器在内网,主动连接公网/固定IP的客户端 - int cmdPort = _config.GetValue("Network:ActiveTargets:0:CmdPort", 7000); - string addr = $"tcp://{activeTargetIp}:{cmdPort}"; - RunActiveMode(addr, stoppingToken); - } - else - { - // === 模式 A: 被动监听 (Server Binds Port) === - // 场景:服务器有固定IP,等待客户端连接 - int basePort = _config.GetValue("Network:Passive:CmdPortBase", 7000); - int listenPort = basePort + (_processId - 1); - string addr = $"tcp://*:{listenPort}"; - RunPassiveMode(addr, stoppingToken); - } - } - catch (Exception ex) - { - Console.WriteLine($"[CmdBus] 致命错误停止: {ex.Message}"); - } - finally - { - _isRunning = false; - CleanupSockets(); - } - - }, stoppingToken); - } - - private void CleanupSockets() - { - try - { - _repSocket?.Dispose(); - _dealerSocket?.Dispose(); - } - catch { /* 忽略销毁时的异常 */ } - } - - #endregion - - #region --- 4. 模式实现:被动监听 (Passive) --- - - private void RunPassiveMode(string address, CancellationToken token) - { - using (_repSocket = new ResponseSocket()) - { - _repSocket.Bind(address); - Console.WriteLine($"[CmdBus] [被动模式] 指令监听已启动: {address}"); - - while (!token.IsCancellationRequested) - { - try - { - // 1. 阻塞等待请求 (超时1秒以便响应 Cancel 信号) - if (!_repSocket.TryReceiveFrameString(TimeSpan.FromSeconds(1), out string reqJson)) - continue; - - // 2. 处理业务 (带 去重 + ID回填 逻辑) - CommandResult result = this.ProcessRequest(reqJson); - - // 3. 发送回执 - // 注意:REP 模式必须发送应答,即使 result 为 null (Fire-and-Forget) 也建议发一个空 ACK 防止 Socket 状态错乱 - // 但为了协议统一,建议 Passive 模式下总是返回结果 - string respJson = result != null ? JsonConvert.SerializeObject(result) : "{}"; - _repSocket.SendFrame(respJson); - } - catch (Exception ex) - { - Console.WriteLine($"[CmdBus-Passive] 异常: {ex.Message}"); - } - } - } - } - - #endregion - - #region --- 5. 模式实现:主动投递 (Active) --- - - private void RunActiveMode(string address, CancellationToken token) - { - // 外层循环:断线重连机制 - while (!token.IsCancellationRequested) - { - try - { - using (_dealerSocket = new DealerSocket()) - { - Console.WriteLine($"[CmdBus] [主动模式] 正在连接指令中心: {address}"); - _dealerSocket.Connect(address); - - // ★★★ 关键步骤:连接成功后,立即发送【身份注册包】 ★★★ - // 客户端收到这个包后,才能在界面上显示"设备在线" - SendRegistration(_dealerSocket); - - // 内层循环:消息收发 - while (!token.IsCancellationRequested) - { - // 1. 接收指令 - // DealerSocket 是异步全双工的,这里即使没收到消息也不会阻塞发送 - if (!_dealerSocket.TryReceiveFrameString(TimeSpan.FromSeconds(1), out string reqJson)) - { - // 空闲周期,可在此处添加心跳发送逻辑 (Ping) - continue; - } - - // 2. 处理业务 (带 去重 + ID回填 逻辑) - CommandResult result = this.ProcessRequest(reqJson); - - // 3. 发送结果 (QoS控制) - // 如果结果为 null,说明指令是 Fire-and-Forget (无需回执),则不发送网络包节省带宽 - if (result != null) - { - _dealerSocket.SendFrame(JsonConvert.SerializeObject(result)); - } - } - } - } - catch (Exception ex) - { - Console.WriteLine($"[CmdBus-Active] 连接中断或异常: {ex.Message}"); - // 避免死循环狂刷 CPU,等待 3 秒再重连 - Thread.Sleep(3000); - } - } - } - - /// - /// 发送身份注册包 (Active 模式专用) - /// - private void SendRegistration(DealerSocket socket) - { - try - { - // 计算实际端口信息 - int portOffset = _processId - 1; - var regInfo = new ServerRegistrationDto - { - ProcessId = _processId, - InstanceId = $"Gateway_{_processId}", - ServerIp = GetLocalIpAddress(), - WebApiPort = 5000 + portOffset, - VideoPort = 5555 + portOffset, - CmdPort = 7000 + portOffset, - StartTime = DateTime.Now, - Description = "Active Mode Connection (V2)" - }; - - // 封装信封 (系统级指令) - var payload = new CommandPayload - { - CmdCode = "SERVER_REGISTER", - TargetId = "SYSTEM", - JsonParams = JsonConvert.SerializeObject(regInfo), - RequestId = Guid.NewGuid().ToString("N"), - RequireAck = false // 注册包通常不需要回执,只要连上就行 - }; - - socket.SendFrame(JsonConvert.SerializeObject(payload)); - Console.WriteLine($"[CmdBus] 身份注册包已发送 -> {regInfo.ServerIp}:{regInfo.WebApiPort}"); - } - catch (Exception ex) - { - Console.WriteLine($"[CmdBus] 注册包发送失败: {ex.Message}"); - } - } - - private string GetLocalIpAddress() - { - try - { - var host = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName()); - foreach (var ip in host.AddressList) - { - if (ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork) - return ip.ToString(); - } - } - catch { } - return "127.0.0.1"; - } - - #endregion - - #region --- 6. 协议处理核心 (★★ V2 核心增强 ★★) --- - - /// - /// 统一处理请求协议:去重 -> 排序 -> 执行 -> 回填 ID - /// - private CommandResult ProcessRequest(string json) - { - if (string.IsNullOrWhiteSpace(json)) return CommandResult.Fail("Empty Request"); - - CommandPayload? payload; - try { payload = JsonConvert.DeserializeObject(json); } - catch { return CommandResult.Fail("Invalid JSON Protocol"); } - - if (payload == null) return CommandResult.Fail("Null Payload"); - - // ========================================================= - // A. 【幂等性检查】(Idempotency Check) - // ========================================================= - // 查缓存:如果这个 RequestId 10秒内处理过,直接返回上次的结果 - // 这样即使客户端重试发了 10 次,业务逻辑也只跑 1 次 - if (_cache.TryGetValue(payload.RequestId, out CommandResult cachedResult)) - { - Console.WriteLine($"[Dedup] 拦截重复请求: {payload.RequestId} (Retry: {payload.RetryCount})"); - return cachedResult; - } - - // ========================================================= - // B. 【顺序一致性检查】(Order Guarantee) - // ========================================================= - // 防止乱序:比如先发的“停止”因为网络卡顿,比后发的“开始”晚到 - if (payload.TargetId != "SYSTEM") - { - lock (_deviceLastCmdTime) - { - if (_deviceLastCmdTime.TryGetValue(payload.TargetId, out DateTime lastTime)) - { - if (payload.Timestamp < lastTime) - { - Console.WriteLine($"[Order] 丢弃乱序指令: {payload.CmdCode}"); - return CommandResult.Fail("Order Violation: Stale Command Dropped"); - } - } - _deviceLastCmdTime[payload.TargetId] = payload.Timestamp; - } - } - - // ========================================================= - // C. 【业务执行】 - // ========================================================= - CommandResult result; - try - { - // 调用纯逻辑层 - result = CommandBusProcessor.ProcessBusinessLogic(_cameraManager, payload); - } - catch (Exception ex) - { - result = CommandResult.Fail($"Internal Logic Error: {ex.Message}"); - } - - // ========================================================= - // D. 【闭环回填】 - // ========================================================= - // 必须把身份证号贴回去,不然客户端不知道这是谁的回执 - result.RequestId = payload.RequestId; - - // ========================================================= - // E. 【存入缓存】 - // ========================================================= - // 缓存 10 秒,覆盖客户端的重试窗口 - _cache.Set(payload.RequestId, result, TimeSpan.FromSeconds(10)); - - // ========================================================= - // F. 【QoS 过滤】 - // ========================================================= - // 如果客户端说不需要回信,返回 null - if (!payload.RequireAck) - { - return null; - } - - return result; - } - - #endregion - } -} \ No newline at end of file diff --git a/SHH.CameraService/CommandClientWorker.cs b/SHH.CameraService/CommandClientWorker.cs deleted file mode 100644 index b4fa75b..0000000 --- a/SHH.CameraService/CommandClientWorker.cs +++ /dev/null @@ -1,97 +0,0 @@ -using Microsoft.Extensions.Hosting; -using NetMQ; -using NetMQ.Sockets; -using Newtonsoft.Json; -using SHH.CameraSdk; -using System.Text; - -namespace SHH.CameraService; - -public class CommandClientWorker : BackgroundService -{ - private readonly ServiceConfig _config; - - public CommandClientWorker(ServiceConfig config) - { - _config = config; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - // 1. 如果不是主动/混合模式,不需要连接 - if (!_config.ShouldConnect) return; - - // ★★★ 核心修正:直接读取解析好的指令地址列表 ★★★ - // 这些地址来自参数 --uris "IP,VideoPort&CommandPort" 中的 CommandPort 部分 - var cmdUris = _config.CommandEndpoints; - - if (cmdUris.Count == 0) - { - Console.WriteLine("[指令] 未在参数中找到指令通道地址(位于&符号右侧),跳过连接。"); - return; - } - - // 2. 初始化 Dealer Socket - using var dealer = new DealerSocket(); - - // 设置身份 (Identity),让 Dashboard 知道我是 "CameraApp_01" - string myIdentity = _config.AppId; - dealer.Options.Identity = Encoding.UTF8.GetBytes(myIdentity); - - // 3. 连接所有目标 (支持多点控制) - foreach (var uri in cmdUris) - { - Console.WriteLine($"[指令] 连接控制端: {uri}"); - dealer.Connect(uri); - } - - // 4. 发送登录包 (握手) - // 构造心跳包 - var heartbeat = new - { - Type = "Login", - Id = myIdentity, - Time = DateTime.Now - }; - string loginJson = JsonConvert.SerializeObject(heartbeat); - - // 注意:Dealer Socket 发送是负载均衡的 (Round-Robin)。 - // 如果连接了多个 Dashboard,SendFrame 一次只会发给其中一个。 - // 为了确保所有 Dashboard 都能收到上线通知,我们根据连接数循环发送几次。 - // (注:这只是初始化时的权宜之计,心跳包后续可以定时发送) - for (int i = 0; i < cmdUris.Count; i++) - { - dealer.SendFrame(loginJson); - await Task.Delay(10); // 稍微间隔,给 ZMQ 内部调度一点时间 - } - - Console.WriteLine($"[指令] 已发送登录包 (ID: {myIdentity}),进入监听循环..."); - - // 5. 监听循环 - while (!stoppingToken.IsCancellationRequested) - { - try - { - // 非阻塞接收 (500ms 超时),避免卡死线程 - if (dealer.TryReceiveFrameString(TimeSpan.FromMilliseconds(500), out string msg)) - { - Console.WriteLine($"[指令] 收到: {msg}"); - - // TODO: 在这里解析 JSON 并调用 CameraSDK 执行业务 - // var cmd = JsonConvert.DeserializeObject(msg); - // if (cmd.Action == "Reboot") ... - - // 回复 ACK (确认收到) - // Dealer 会自动根据 Router 发来的 RoutingID 路由回去 - dealer.SendFrame($"ACK: {msg} (From {myIdentity})"); - } - } - catch (Exception ex) - { - Console.WriteLine($"[指令] 通信异常: {ex.Message}"); - // 防止异常死循环刷屏 - await Task.Delay(1000, stoppingToken); - } - } - } -} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs new file mode 100644 index 0000000..c1d36ba --- /dev/null +++ b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs @@ -0,0 +1,139 @@ +using Microsoft.Extensions.Hosting; +using NetMQ; +using NetMQ.Sockets; +using Newtonsoft.Json; +using SHH.CameraSdk; +using System.Text; + +namespace SHH.CameraService; + +public class CommandClientWorker : BackgroundService +{ + private readonly ServiceConfig _config; + private readonly CommandDispatcher _dispatcher; // 注入分发器 + + public CommandClientWorker(ServiceConfig config, CommandDispatcher dispatcher) + { + _config = config; + _dispatcher = dispatcher; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // ================================================================= + // ★★★ 核心修复:强制让出主线程 ★★★ + // 这行代码会让当前的 ExecuteAsync 立即返回一个未完成的 Task 给 Host, + // Host 就会认为 "这个服务启动好了",然后继续去启动 WebAPI。 + // 而剩下的代码会被调度到线程池里异步执行,互不干扰。 + // ================================================================= + await Task.Yield(); + + // 1. 如果不是主动/混合模式,不需要连接 + if (!_config.ShouldConnect) return; + + var cmdEndpoints = _config.CommandEndpoints; + if (cmdEndpoints.Count == 0) + { + Console.WriteLine("[指令] 未配置指令通道,跳过注册。"); + return; + } + + // 2. 初始化 Dealer Socket + using var dealer = new DealerSocket(); + + // ★★★ 关键:设置身份标识 (Identity) ★★★ + // 服务端 (Router) 收到消息时,第一帧就是这个 ID + // 如果不设,ZMQ 会随机生成一个二进制 ID,服务端就不知道你是谁了 + string myIdentity = _config.AppId; + dealer.Options.Identity = Encoding.UTF8.GetBytes(myIdentity); + + // 3. 连接所有目标 (遍历 ServiceEndpoint 对象) + foreach (var ep in cmdEndpoints) + { + Console.WriteLine($"[指令] 连接控制端: {ep.Uri} [{ep.Description}]"); + try + { + dealer.Connect(ep.Uri); + } + catch (Exception ex) + { + Console.WriteLine($"[指令] 连接失败 {ep.Uri}: {ex.Message}"); + } + } + + // 1. 获取本机 IP (简单的获取方式,用于上报给 Dashboard) + string localIp = "127.0.0.1"; + try + { + // 简单获取首个非回环 IP,生产环境建议用更严谨的帮助类 + var host = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName()); + localIp = host.AddressList.FirstOrDefault(ip => + ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork)?.ToString() ?? "127.0.0.1"; + } + catch { } + + // 4. 构建注册/登录包 + var registerPayload = new + { + Action = "Register", + Payload = new + { + // 1. AppId (身份) + Id = _config.AppId, + + // 2. Version (程序集版本) + Version = System.Reflection.Assembly.GetEntryAssembly()?.GetName().Version?.ToString() ?? "1.0.0", + + // 3. 进程 ID (用于远程监控) + Pid = Environment.ProcessId, + + // 4. 关键端口信息 + // 告诉 Dashboard:如果你想调我的 REST API,请访问这个端口 + WebPort = _config.BasePort, + + // 如果您有本地绑定的 ZMQ 端口也可以在这里上报 + // VideoPort = _config.BasePort + 1, + + // 基础网络信息 + Ip = localIp, + + // 附带信息:我是要把视频推给谁 (供 Dashboard 调试用) + TargetVideoNodes = _config.VideoEndpoints.Select(e => e.Uri).ToList() + }, + Time = DateTime.Now + }; + + string json = JsonConvert.SerializeObject(registerPayload); + + // 5. 发送注册包 + // Dealer 连接建立是异步的,所以这里直接发,ZMQ 会在底层连接成功后自动把消息推出去 + // 为了保险,对于多个 Endpoint,Dealer 默认是负载均衡发送的(轮询)。 + // 如果想让每个 Endpoint 都收到注册包,这在 Dealer 模式下稍微有点特殊。 + // 但通常我们只需要发一次,只要有一个 Dashboard 收到并建立会话即可。 + // 或者简单粗暴:循环发送几次,确保覆盖。 + + Console.WriteLine($"[指令] 发送注册包: {json}"); + dealer.SendFrame(json); + + // 6. 进入监听循环 (等待 ACK 或 指令) + // 进入监听循环 + while (!stoppingToken.IsCancellationRequested) + { + try + { + if (dealer.TryReceiveFrameString(TimeSpan.FromMilliseconds(500), out string msg)) + { + Console.WriteLine($"[指令] 收到消息: {msg}"); + + // ★★★ 核心变化:直接扔给分发器 ★★★ + // 无论未来加多少指令,这里都不用改代码 + await _dispatcher.DispatchAsync(msg); + } + } + catch (Exception ex) + { + Console.WriteLine($"[指令] 异常: {ex.Message}"); + } + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/CommandDispatcher.cs b/SHH.CameraService/Core/CmdClients/CommandDispatcher.cs new file mode 100644 index 0000000..bf895ba --- /dev/null +++ b/SHH.CameraService/Core/CmdClients/CommandDispatcher.cs @@ -0,0 +1,46 @@ +using Newtonsoft.Json.Linq; + +namespace SHH.CameraService; + +public class CommandDispatcher +{ + // 路由表:Key = ActionName, Value = Handler + private readonly Dictionary _handlers; + + // 通过依赖注入拿到所有实现了 ICommandHandler 的类 + public CommandDispatcher(IEnumerable handlers) + { + _handlers = handlers.ToDictionary(h => h.ActionName, h => h); + } + + public async Task DispatchAsync(string jsonMessage) + { + try + { + var jObj = JObject.Parse(jsonMessage); + string action = jObj["Action"]?.ToString(); + var payload = jObj["Payload"]; + + if (string.IsNullOrEmpty(action)) return; + + // 1. 查找是否有对应的处理器 + if (_handlers.TryGetValue(action, out var handler)) + { + await handler.ExecuteAsync(payload); + } + else if (action == "ACK") + { + // ACK 是特殊的,可以直接在这里处理或者忽略 + Console.WriteLine($"[指令] 握手成功: {jObj["Message"]}"); + } + else + { + Console.WriteLine($"[警告] 未知的指令: {action}"); + } + } + catch (Exception ex) + { + Console.WriteLine($"[分发错误] {ex.Message}"); + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/ConnectedClient.cs b/SHH.CameraService/Core/CmdClients/ConnectedClient.cs new file mode 100644 index 0000000..54e5c62 --- /dev/null +++ b/SHH.CameraService/Core/CmdClients/ConnectedClient.cs @@ -0,0 +1,30 @@ +namespace SHH.CameraService; + +/// +/// 在线客户端信息模型 (已更新) +/// +public class ConnectedClient +{ + /// 唯一标识 (AppId) + public string ServiceId { get; set; } = string.Empty; + + /// 版本号 + public string Version { get; set; } = "1.0.0"; + + /// 远程进程 ID + public int Pid { get; set; } + + /// 客户端 IP + public string Ip { get; set; } = string.Empty; + + /// WebAPI 端口 (Dashboard 调用 REST 接口用) + public int WebPort { get; set; } + + /// 该客户端正在推流的目标地址 + public List TargetVideoNodes { get; set; } = new List(); + + public DateTime LastHeartbeat { get; set; } + + // 辅助属性:拼接出完整的 API BaseUrl + public string WebApiUrl => $"http://{Ip}:{WebPort}"; +} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/ICommandHandler.cs b/SHH.CameraService/Core/CmdClients/ICommandHandler.cs new file mode 100644 index 0000000..c9bfddd --- /dev/null +++ b/SHH.CameraService/Core/CmdClients/ICommandHandler.cs @@ -0,0 +1,18 @@ +namespace SHH.CameraService; + +/// +/// 抽象指令处理器接口 +/// +public interface ICommandHandler +{ + /// + /// 该处理器支持的 Action 名称 (如 "AddCamera", "Reboot") + /// + string ActionName { get; } + + /// + /// 执行指令逻辑 + /// + /// 指令携带的数据 (JSON JToken) + Task ExecuteAsync(Newtonsoft.Json.Linq.JToken payload); +} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs b/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs new file mode 100644 index 0000000..c8be4b1 --- /dev/null +++ b/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs @@ -0,0 +1,103 @@ +using Newtonsoft.Json.Linq; +using SHH.CameraSdk; // 引用包含 FrameController 和 FrameRequirement 的命名空间 +using SHH.Contracts; + +namespace SHH.CameraService; + +public class SyncCameraHandler : ICommandHandler +{ + private readonly CameraManager _cameraManager; + + public string ActionName => "SyncCamera"; + + public SyncCameraHandler(CameraManager cameraManager) + { + _cameraManager = cameraManager; + } + + public async Task ExecuteAsync(JToken payload) + { + // 1. 解析配置 + var dto = payload.ToObject(); + if (dto == null) return; + + // 2. 添加设备到管理器 (这一步是必须的,不然没有 Device 就没有 Controller) + var videoConfig = new VideoSourceConfig + { + Id = dto.Id, + Name = dto.Name, + IpAddress = dto.IpAddress, + Port = dto.Port, + Username = dto.Username, + Password = dto.Password, + ChannelIndex = dto.ChannelIndex, + StreamType = dto.StreamType, + Brand = (DeviceBrand)dto.Brand, + RenderHandle = (IntPtr)dto.RenderHandle, + MainboardIp = dto.MainboardIp, + MainboardPort = dto.MainboardPort, + // 必须给个默认值,防止空引用 + VendorArguments = new Dictionary(), + }; + + // 如果设备不存在才添加,如果已存在,后续逻辑会直接获取 + if (_cameraManager.GetDevice(videoConfig.Id) == null) + { + _cameraManager.AddDevice(videoConfig); + } + + // 3. 核心:直接获取设备实例 + var device = _cameraManager.GetDevice(dto.Id); + if (device == null) + { + Console.WriteLine($"[SyncError] 设备 {dto.Id} 创建失败,无法执行自动订阅。"); + return; + } + + // 4. 拿到你的“宝贝”控制器 (FrameController) + var controller = device.Controller; + if (controller == null) + { + Console.WriteLine($"[SyncError] 设备 {dto.Id} 不支持流控调度 (Controller is null)。"); + return; + } + + // 5. 暴力注册订阅需求 (Loop AutoSubscriptions) + if (dto.AutoSubscriptions != null && dto.AutoSubscriptions.Count > 0) + { + foreach (var subItem in dto.AutoSubscriptions) + { + // 生成 AppId (照抄你给的逻辑) + string finalAppId = string.IsNullOrWhiteSpace(subItem.AppId) + ? $"SUB_{Guid.NewGuid().ToString("N").Substring(0, 8).ToUpper()}" + : subItem.AppId; + + Console.WriteLine($"[自动化] 正在注册流控: {finalAppId}, 目标: {subItem.TargetFps} FPS"); + + // 构造 FrameRequirement 对象 (完全匹配你 FrameController 的入参) + // 这里的属性赋值对应你代码里 req.Type, req.SavePath 等逻辑 + var requirement = new FrameRequirement + { + AppId = finalAppId, + TargetFps = subItem.TargetFps, // 8帧 或 1帧 + Type = (SubscriptionType)subItem.Type, // 业务类型 (LocalWindow, NetworkTrans...) + Memo = subItem.Memo ?? "Auto Sync", + + // 其它字段给默认空值,防止 Controller 内部逻辑报错 + Handle = "", + SavePath = "" + }; + + // ★★★ 见证奇迹的时刻:直接调用 Register ★★★ + controller.Register(requirement); + } + } + + //// 6. 启动设备 + //// 你的积分算法会在 device 内部的推流循环中被 MakeDecision 调用 + if (dto.ImmediateExecution) + await device.StartAsync(); + + Console.WriteLine($"[SyncSuccess] 设备 {dto.Id} 同步完成,策略已下发。"); + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/Configs/PushTargetConfig.cs b/SHH.CameraService/Core/Configs/PushTargetConfig.cs new file mode 100644 index 0000000..abc7052 --- /dev/null +++ b/SHH.CameraService/Core/Configs/PushTargetConfig.cs @@ -0,0 +1,24 @@ +namespace SHH.CameraService; + +/// +/// 定义发送的目标 +/// +public class PushTargetConfig +{ + /// + /// 目标名称 + /// + public string Name { get; set; } + = string.Empty; + + /// + /// NetMQ 地址 (如 "tcp://1.2.3.4:5555") + /// + public string Endpoint { get; set; } + = string.Empty; + + /// + /// 独立队列容量 (隔离的关键) + /// + public int QueueCapacity { get; set; } = 10; +} \ No newline at end of file diff --git a/SHH.CameraService/Core/Configs/StreamTarget.cs b/SHH.CameraService/Core/Configs/StreamTarget.cs new file mode 100644 index 0000000..72da7d9 --- /dev/null +++ b/SHH.CameraService/Core/Configs/StreamTarget.cs @@ -0,0 +1,30 @@ +namespace SHH.CameraService; + +/// +/// 代表一个独立的推送目标 +/// 包含:配置信息 + 专属于它的数据管道 +/// +public class StreamTarget +{ + /// + /// 配置 + /// + public PushTargetConfig Config { get; } + + /// + /// 管道 + /// + public VideoDataChannel Channel { get; } + + /// + /// 构造函数 + /// + /// + public StreamTarget(PushTargetConfig config) + { + Config = config; + + // 为这个目标创建独立的管道,容量由配置决定 + Channel = new VideoDataChannel(capacity: config.QueueCapacity); + } +} diff --git a/SHH.CameraService/Core/NetSenders/CameraEngineWorker.cs b/SHH.CameraService/Core/NetSenders/CameraEngineWorker.cs new file mode 100644 index 0000000..6d78bc8 --- /dev/null +++ b/SHH.CameraService/Core/NetSenders/CameraEngineWorker.cs @@ -0,0 +1,52 @@ +using Microsoft.Extensions.Hosting; +using SHH.CameraSdk; + +public class CameraEngineWorker : BackgroundService +{ + private readonly CameraManager _manager; + + public CameraEngineWorker(CameraManager manager) + { + // 理由:严谨性检查,防止因配置错误导致的空指针崩溃 + _manager = manager ?? throw new ArgumentNullException(nameof(manager)); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + Console.WriteLine("[Engine] 正在启动核心引擎..."); + + try + { + // 1. 理由:启动 SDK 内部加载流程(从本地存储恢复设备) + await _manager.StartAsync(); + Console.WriteLine("[Engine] 设备管理服务已启动。"); + } + catch (Exception ex) + { + Console.WriteLine($"[Engine] 严重启动异常: {ex.Message}"); + return; // 理由:核心组件失败,终止后续逻辑 + } + + // 2. 理由:Worker 必须保持活跃状态,以便作为宿主生命周期的一部分 + while (!stoppingToken.IsCancellationRequested) + { + // 你可以在这里定期输出一些状态统计 + // Console.WriteLine($"[Engine] 活跃设备数: {_manager.GetActiveCount()}"); + await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); + } + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + Console.WriteLine("[Engine] 正在执行优雅停机..."); + try + { + // 理由:这是重构的核心。必须在 SDK 退出前释放所有非托管句柄 + await _manager.DisposeAsync(); + } + finally + { + await base.StopAsync(cancellationToken); + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs b/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs new file mode 100644 index 0000000..e378638 --- /dev/null +++ b/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs @@ -0,0 +1,73 @@ +using NetMQ; +using SHH.Contracts; + +namespace SHH.CameraService +{ + /// + /// 负责将业务契约转换为 ZeroMQ 传输协议 + /// + public static class NetMQProtocolExtensions + { + private const string PROTOCOL_HEADER = "SHH_V1"; + + /// + /// 扩展方法:将 Payload 转为 NetMQMessage + /// 使用方法:var msg = payload.ToNetMqMessage(); + /// + public static NetMQMessage ToNetMqMessage(this VideoPayload payload) + { + var msg = new NetMQMessage(); + + // Frame 0: 协议魔数 + msg.Append(PROTOCOL_HEADER); + + // Frame 1: 元数据 JSON + msg.Append(payload.GetMetadataJson()); + + // Frame 2: 原始图 (保持帧位对齐,无数据则发空帧) + if (payload.HasOriginalImage && payload.OriginalImageBytes != null) + msg.Append(payload.OriginalImageBytes); + else + msg.Append(Array.Empty()); + + // Frame 3: 处理图 + if (payload.HasTargetImage && payload.TargetImageBytes != null) + msg.Append(payload.TargetImageBytes); + else + msg.Append(Array.Empty()); + + return msg; + } + + /// + /// 扩展方法:从 NetMQMessage 还原 Payload + /// + public static VideoPayload ToVideoPayload(this NetMQMessage msg) + { + if (msg == null || msg.FrameCount < 4) return null; + + // Frame 0 Check + if (msg[0].ConvertToString() != PROTOCOL_HEADER) return null; + + // Frame 1: Metadata + string json = msg[1].ConvertToString(); + var payload = VideoPayload.FromMetadataJson(json); + if (payload == null) return null; + + // Frame 2: Raw Image + // 利用 BufferSize 避免不必要的内存拷贝,如果长度为0则跳过 + if (payload.HasOriginalImage && msg[2].BufferSize > 0) + { + payload.OriginalImageBytes = msg[2].ToByteArray(); + } + + // Frame 3: Processed Image + if (payload.HasTargetImage && msg[3].BufferSize > 0) + { + payload.TargetImageBytes = msg[3].ToByteArray(); + } + + return payload; + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs b/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs new file mode 100644 index 0000000..da5ea0f --- /dev/null +++ b/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs @@ -0,0 +1,62 @@ +using Microsoft.Extensions.Hosting; +using NetMQ; +using NetMQ.Sockets; + +namespace SHH.CameraService; + +/// +/// NetMQ 发送工作者 +/// 职责:从指定目标的 VideoDataChannel 读取 Payload,通过 ZeroMQ 发送出去 +/// +public class NetMqSenderWorker : BackgroundService +{ + private readonly StreamTarget _target; + + // 构造函数注入特定的目标对象 (由 Program.cs 的工厂方法提供) + public NetMqSenderWorker(StreamTarget target) + { + _target = target; + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + Console.WriteLine($"[NetMqSender] 正在连接至服务端: {_target.Config.Endpoint} ..."); + + // ★★★ 修正点:必须使用 PublisherSocket 来配合接收端的 SubscriberSocket ★★★ + // 虽然是 Connect 模式,Publisher 依然可以 Connect + using var clientSocket = new PublisherSocket(); + + // 设置高水位 (HWM) + // 对于 Publisher,如果队列满了,默认行为就是丢弃旧数据,这非常符合视频流需求 + clientSocket.Options.SendHighWatermark = 1000; + + // 主动连接 + clientSocket.Connect(_target.Config.Endpoint); + + Console.WriteLine("[NetMqSender] 连接成功,开始从通道搬运数据..."); + + await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken)) + { + try + { + var msg = payload.ToNetMqMessage(); + + // 发送消息 + // PublisherSocket 的 TrySend 如果没人订阅或者队列满了,通常不会阻塞,而是直接丢弃或返回 + // 注意:PUB 模式下,第一帧 ("SHH_V1") 会被当作订阅的主题 (Topic)。 + // 你的接收端订阅了 "" (空字符串),所以能收到以任何字符串开头的数据。 + bool sent = clientSocket.TrySendMultipartMessage(msg); + + if (!sent) + { + // 这种情况通常意味着网络断了且 HWM 队列也满了 + Console.WriteLine($"[NetMqSender] 警告: 发送队列已满,正在丢帧..."); + msg.Clear(); // 手动清理(可选) + } + } + catch (Exception ex) + { + Console.WriteLine($"[NetMqSender] 异常: {ex.Message}"); + } + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs b/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs new file mode 100644 index 0000000..a998dac --- /dev/null +++ b/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs @@ -0,0 +1,126 @@ +using Microsoft.Extensions.Hosting; +using OpenCvSharp; +using SHH.CameraSdk; // 引用 SDK 核心 +using SHH.Contracts; +using System.Diagnostics; + +namespace SHH.CameraService; + +public class NetworkStreamingWorker : BackgroundService +{ + // 注入所有注册的目标(云端、大屏等),实现动态分发 + private readonly IEnumerable _targets; + + // 编码参数:JPG 质量 75 (平衡画质与带宽) + // 工业经验:75 是甜点,体积只有 100 的 1/3,肉眼几无区别。 + // 如果您确实需要 100,请注意带宽压力。此处我保留您要求的 100,但建议未来调优。 + private readonly int[] _encodeParams = { (int)ImwriteFlags.JpegQuality, 100 }; + + public NetworkStreamingWorker(IEnumerable targets) + { + _targets = targets; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + Console.WriteLine("[StreamWorker] 启动流媒体采集引擎..."); + + // ========================================================= + // 订阅逻辑:接入 "上帝模式" (God Mode) + // ========================================================= + // 理由:NetMQ 网关需要无差别地获取所有设备的图像。 + GlobalStreamDispatcher.OnGlobalFrame += ProcessFrame; + + //Console.WriteLine($"[StreamWorker] 已挂载至全局广播总线,正在监听 {GlobalStreamDispatcher.OnGlobalFrame?.GetInvocationList().Length ?? 0} 个订阅者..."); + + var tcs = new TaskCompletionSource(); + stoppingToken.Register(() => + { + // 停止时反注册,防止静态事件内存泄漏 + GlobalStreamDispatcher.OnGlobalFrame -= ProcessFrame; + Console.WriteLine("[StreamWorker] 已断开全局广播连接"); + tcs.SetResult(); + }); + + return tcs.Task; + } + + /// + /// [回调函数] 处理每一帧图像 + /// 注意:此方法运行在 SDK 的采集线程池中,必须极速处理,严禁阻塞! + /// + private void ProcessFrame(long deviceId, SmartFrame frame) + { + try + { + // 1. 基础校验 (合法性检查) + if (frame == null || frame.InternalMat.Empty()) return; + + long startTick = Stopwatch.GetTimestamp(); + + // ========================================================= + // 2. 一次编码 (One Encode) - CPU 消耗点 + // ========================================================= + // 理由:在这里同步编码是最安全的,因为出了这个函数 frame 内存就会失效。 + // 且只编一次,后续分发给 10 个目标也只用这一份数据。 + + byte[] jpgBytes = null; + // 如果有更小的图片, 原始图片不压缩, 除非有特殊需求 + if (frame.TargetMat == null) + { + jpgBytes = EncodeImage(frame.InternalMat); + } + + // 双流支持:如果存在处理后的 AI 图,也一并编码 + byte[] targetBytes = null; + if (frame.TargetMat != null && !frame.TargetMat.Empty()) + { + targetBytes = EncodeImage(frame.TargetMat); + } + + // ========================================================= + // 3. 构建 Payload (数据载荷) + // ========================================================= + var payload = new VideoPayload + { + CameraId = deviceId.ToString(), + CaptureTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + OriginalImageBytes = jpgBytes, // 引用赋值 + TargetImageBytes = targetBytes, // 引用赋值 + OriginalWidth = frame.TargetWidth, + OriginalHeight = frame.TargetHeight, + DispatchTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + + // 计算转码耗时(ms) + double processMs = (Stopwatch.GetTimestamp() - startTick) * 1000.0 / Stopwatch.Frequency; + payload.Diagnostics["encode_ms"] = Math.Round(processMs, 2); + + // ========================================================= + // 4. 动态扇出 (Dynamic Fan-Out) - 内存消耗极低 + // ========================================================= + // 遍历所有目标,往各自独立的管道里写数据。 + // 实现了"物理隔离":一个管道满了(云端卡顿),不影响另一个管道(大屏流畅)。 + foreach (var target in _targets) + { + // WriteLog 是非阻塞的。满了就丢弃,返回 false。 + target.Channel.WriteLog(payload); + } + } + catch (Exception ex) + { + // 极少发生的内存错误,打印日志但不抛出,避免崩溃 SDK 线程 + Console.WriteLine($"[StreamWorker] 采集处理异常: {ex.Message}"); + } + } + + /// + /// 辅助:OpenCV 内存编码 + /// + private byte[] EncodeImage(Mat mat) + { + // ImEncode 将 Mat 编码为一维字节数组 (托管内存) + Cv2.ImEncode(".jpg", mat, out byte[] buf, _encodeParams); + return buf; + } +} \ No newline at end of file diff --git a/SHH.CameraService/PipelineConfigurator.cs b/SHH.CameraService/Core/NetSenders/PipelineConfigurator.cs similarity index 100% rename from SHH.CameraService/PipelineConfigurator.cs rename to SHH.CameraService/Core/NetSenders/PipelineConfigurator.cs diff --git a/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs b/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs new file mode 100644 index 0000000..ab1a910 --- /dev/null +++ b/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs @@ -0,0 +1,40 @@ +using System.Threading.Channels; +using SHH.Contracts; + +namespace SHH.CameraService +{ + /// + /// 视频数据内部总线 (线程安全的生产者-消费者通道) + /// 作用:解耦 [采集编码线程] 与 [网络发送线程] + /// + public class VideoDataChannel + { + // 限制容量为 100 帧。如果积压超过 100 帧,说明发送端彻底堵死了,必须丢帧。 + private readonly Channel _channel; + + public VideoDataChannel(int capacity = 10) + { + var options = new BoundedChannelOptions(capacity) + { + FullMode = BoundedChannelFullMode.DropOldest, // 核心策略:满了就丢弃最旧的帧 + SingleReader = false, // 允许多个发送 Worker (如 CloudWorker, ScreenWorker) 同时读取 + SingleWriter = true // 只有一个采集线程在写 + }; + _channel = Channel.CreateBounded(options); + } + + /// + /// [生产者] 写入一个封装好的数据包 (非阻塞) + /// + public void WriteLog(VideoPayload payload) + { + // TryWrite 永远不会等待,满了就丢旧的写入新的,返回 true + _channel.Writer.TryWrite(payload); + } + + /// + /// [消费者] 读取器 + /// + public ChannelReader Reader => _channel.Reader; + } +} \ No newline at end of file diff --git a/SHH.CameraService/ParentProcessSentinel.cs b/SHH.CameraService/Core/ParentProcessSentinel.cs similarity index 100% rename from SHH.CameraService/ParentProcessSentinel.cs rename to SHH.CameraService/Core/ParentProcessSentinel.cs diff --git a/SHH.CameraService/NetworkStreamManager.cs b/SHH.CameraService/NetworkStreamManager.cs deleted file mode 100644 index be4b262..0000000 --- a/SHH.CameraService/NetworkStreamManager.cs +++ /dev/null @@ -1,91 +0,0 @@ -using SHH.CameraSdk; -using System.Collections.Concurrent; - -namespace SHH.CameraService; - -/// -/// 网络推流管理器 -/// 职责:管理 ZeroMQ 推流任务的生命周期 -/// 类似于 DisplayWindowManager,它负责订阅数据并将其桥接到传输层 -/// -public class NetworkStreamManager -{ - private readonly VideoDataChannel _channel; - // 记录当前活跃的推流任务,防止重复订阅 - private readonly ConcurrentDictionary _activeStreams = new(); - - public NetworkStreamManager(VideoDataChannel channel) - { - _channel = channel; - } - - /// - /// 启动推流任务 - /// - public void StartStream(string appId, long deviceId) - { - // 1. 防止重复启动 - if (_activeStreams.ContainsKey(appId)) return; - - // 2. 向全局分发器订阅精准数据 - // 这里实现了业务逻辑的闭环:只有被 Manager 管理的任务才会消耗 CPU 去转码 - GlobalStreamDispatcher.Subscribe(appId, deviceId, (frame) => - { - // --- 这里的代码运行在分发线程中 --- - - // A. 转码 (耗时操作封装在这里,不污染 Controller) - byte[] jpgBytes = EncodeFrameToJpg(frame); - - if (jpgBytes != null && jpgBytes.Length > 0) - { - var payload = new VideoPayload - { - CameraId = appId, // 使用 AppId 作为 Topic (给 Dashboard 订阅用) - OriginalImageBytes = jpgBytes, - CaptureTime = DateTime.Now, - OriginalWidth = frame.TargetWidth, - OriginalHeight = frame.TargetHeight - }; - - // B. 写入传输通道 - _ = _channel.WriteAsync(payload); - } - }); - - _activeStreams.TryAdd(appId, true); - Console.WriteLine($"[Network] 推流任务已启动: {appId} -> Device {deviceId}"); - } - - /// - /// 停止推流任务 - /// - public void StopStream(string appId) - { - if (_activeStreams.TryRemove(appId, out _)) - { - // 1. 从全局分发器注销 - GlobalStreamDispatcher.Unsubscribe(appId); - Console.WriteLine($"[Network] 推流任务已停止: {appId}"); - } - } - - // --- 辅助方法 --- - private byte[] EncodeFrameToJpg(SmartFrame frame) - { - try - { - // 优先使用处理后的 TargetMat,如果没有则用原始的 InternalMat - var mat = frame.TargetMat ?? frame.InternalMat; - if (mat != null && !mat.Empty()) - { - // 80 质量平衡体积与画质 - return mat.ImEncode(".jpg", new int[] { 1, 80 }); - } - } - catch (Exception ex) - { - Console.WriteLine($"[Network] 转码失败: {ex.Message}"); - } - return Array.Empty(); - } -} \ No newline at end of file diff --git a/SHH.CameraService/Program.cs b/SHH.CameraService/Program.cs index 5c69482..f03545a 100644 --- a/SHH.CameraService/Program.cs +++ b/SHH.CameraService/Program.cs @@ -1,7 +1,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Microsoft.OpenApi.Models; -using SHH.CameraSdk; // 引用你的业务核心 +using SHH.CameraSdk; namespace SHH.CameraService; @@ -9,163 +9,152 @@ public class Program { public static async Task Main(string[] args) { - // 缓冲时间 (您之前写了20000ms即20秒,可能是为了附加调试器。如果觉得太慢可以改回 2000) - for(var i=1; i<10; i++) + // 1. 理由:缓冲时间 10 秒, 供附加调试工具使用 + for (var i = 1; i < 10; i++) Thread.Sleep(1000); - // 1. 解析配置 + // ============================================================= + // 2. 基础环境与配置 (理由:明确身份 ID 和 监听端口) + // ============================================================= var config = ServiceConfig.BuildFromArgs(args); - // ---【补全变量定义】--- - - // A. 补全 webPort (统一使用 config.BasePort) - int webPort = config.BasePort; - - // B. 补全 processIdInt (用于 FileStorage 和 CameraSdk) - // 逻辑:尝试将 AppId 解析为数字;如果 AppId 是字符串(如"CameraApp_01"),则默认给 1,或者根据 BasePort 推算 - int processIdInt = config.NumericId; - - Console.Title = $"SHH Gateway - {config.AppId} (Web: {webPort})"; - - #region --- 2. 硬件环境预热 --- - - InitHardwareEnv(); - - #endregion - - #region --- 3. 构建 WebHost --- + // 硬件预热 (理由:确保底层驱动库在 Web 容器启动前完全就绪) + HikNativeMethods.NET_DVR_Init(); + HikSdkManager.ForceWarmUp(); var builder = WebApplication.CreateBuilder(args); - // ★★★ 核心:注入全局配置 ★★★ + // ============================================================= + // 3. 依赖注入注册 (DI) + // ============================================================= builder.Services.AddSingleton(config); - // ------------------------------------------------------------- - // A. 注册新架构组件 - // ------------------------------------------------------------- - builder.Services.AddSingleton(); - - // 推流服务 (连接 config.TargetClients 里的 :6002) - builder.Services.AddHostedService(); - - // 指令客户端 (连接 config.TargetClients 里的 :6001) - builder.Services.AddHostedService(); - - // 进程守护 - builder.Services.AddHostedService(); - - // ------------------------------------------------------------- - // B. 注册 SDK 业务服务 - // ------------------------------------------------------------- - // 使用刚刚补全的 processIdInt - builder.Services.AddSingleton(new FileStorageService(processIdInt)); - - builder.Services.AddSingleton(); + // 注册缩放与增亮业务(不注册则不实现) builder.Services.AddSingleton(); - builder.Services.AddSingleton(); - builder.Services.AddSingleton(); - - builder.Services.AddSingleton(sp => new ImageScaleCluster(4, sp.GetRequiredService())); - builder.Services.AddSingleton(sp => new ImageEnhanceCluster(4, sp.GetRequiredService())); + builder.Services.AddSingleton(sp => new ImageScaleCluster(4, sp.GetRequiredService())); + builder.Services.AddSingleton(sp => new ImageEnhanceCluster(4, sp.GetRequiredService())); builder.Services.AddHostedService(); - // 使用补全的 processIdInt - builder.Services.AddCameraSdk(processIdInt); + // 接入 SDK 核心逻辑 + builder.Services.AddCameraSdk(config.NumericId); + // 注册后台引擎 (理由:托管长周期的硬件状态监控) builder.Services.AddHostedService(); - builder.Services.AddSingleton(); - builder.Services.AddControllers().AddApplicationPart(typeof(CamerasController).Assembly); - builder.Services.AddControllers().AddApplicationPart(typeof(MonitorController).Assembly); + // 配置 Web 相关的服务 + ConfigureWebServices(builder, config); - // ------------------------------------------------------------- - // C. Web API 基础 - // ------------------------------------------------------------- - builder.Services.AddControllers().AddControllersAsServices(); - builder.Services.AddEndpointsApiExplorer(); - builder.Services.AddSwaggerGen(c => + // 配置进程守护 + builder.Services.AddHostedService(); + + // ============================================================= + // 4. 接受启动传参, 并支持将视频进行网络广播 + // ============================================================= + + // 1. 读取配置创建 targets (可以是 1 个,也可以是 10 个) + var netTargets = new List(); + if (config.VideoEndpoints != null) { - // 【修正】使用 config.AppId - c.SwaggerDoc("v1", new OpenApiInfo { Title = $"Gateway {config.AppId}", Version = "v1" }); - }); + foreach(var cfgVideo in config.VideoEndpoints) + { + netTargets.Add(new StreamTarget(new PushTargetConfig + { + Name = cfgVideo.Description, Endpoint = cfgVideo.Uri, QueueCapacity = 10, + })); + } + } + // 2. 注册 Targets (供采集者用) + builder.Services.AddSingleton>(netTargets); - builder.Services.AddCors(o => o.AddPolicy("AllowAll", p => p.AllowAnyOrigin().AllowAnyHeader().AllowAnyMethod())); + // 3. 注册采集者 (它会注入上面的 targets,进行编码和分发) + builder.Services.AddHostedService(); + // 4. 为每个 Target 注册一个独立的发送者 + foreach (var target in netTargets) + { + builder.Services.AddHostedService(sp => new NetMqSenderWorker(target)); + } + + // ============================================================= + // 5. 命令管道配置 + // ============================================================= + + // 负责连接 Dashboard,注册身份,接收重启/控制指令 + builder.Services.AddHostedService(); + + // 1. 注册分发器 + builder.Services.AddSingleton(); + + // 2. 注册具体的指令处理器 (每写一个新的 Handler,就在这里注册一下,或者用反射批量注册) + builder.Services.AddSingleton(); + + // ============================================================= + // 6. 构建与管道配置 + // ============================================================= var app = builder.Build(); - //// ======================================================================= - //// ★★★ 核心接入点:连接 [现有分发器] 与 [新推流通道] ★★★ - //// ======================================================================= - - //// 1. 获取刚刚注册的数据通道 - //var videoChannel = app.Services.GetRequiredService(); - ////var config = app.Services.GetRequiredService(); - - //// 2. 订阅你现有的全局事件 (这里就是“取货”的地方) - //// 每当 HikVideoSource 采集到一帧并调用 Dispatch 时,这里就会触发 - //GlobalStreamDispatcher.OnGlobalFrame += (deviceId, smartFrame) => - //{ - // // 3. 数据处理:将 OpenCvSharp Mat 转为 JPG 字节流 (网络传输必须压缩) - // byte[] jpgData = EncodeToJpg(smartFrame); - - // if (jpgData != null && jpgData.Length > 0) - // { - // // 4. 封装载荷 - // var payload = new VideoPayload - // { - // // 使用 AppId 或 DeviceId 作为标识 - // CameraId = config.AppId, - // OriginalImageBytes = jpgData, - // CaptureTime = DateTime.Now, - // OriginalWidth = smartFrame.TargetWidth, - // OriginalHeight = smartFrame.TargetHeight - // }; - - // // 5. 扔进通道 (Fire-and-Forget,不阻塞你原来的显示逻辑) - // // WriteAsync 是 ValueTask,这里忽略等待,追求最高吞吐 - // _ = videoChannel.WriteAsync(payload); - // } - //}; - - //Console.WriteLine("[System] 全局流已桥接到 ZeroMQ 推流通道"); + // 核心修复:同步点火逻辑 (理由:在 Web 开启前完成设备池的初步构建) + await StartBusinessLogic(app); app.UseSwagger(); - app.UseSwaggerUI(); + app.UseSwaggerUI(c => + { + c.SwaggerEndpoint("/swagger/v1/swagger.json", $"SHH Gateway #{config.AppId}"); + }); + + app.MapGet("/", () => $"SHH Gateway {config.AppId} is running."); + app.UseCors("AllowAll"); + + // 理由:正式映射控制器路由 app.MapControllers(); - // 【修正】使用 webPort - Console.WriteLine($"[System] Web API 已启动: http://0.0.0.0:{webPort}"); - - await app.RunAsync($"http://0.0.0.0:{webPort}"); - - #endregion - } - - static void InitHardwareEnv() - { - Console.WriteLine("=== 工业级视频接入服务启动 ==="); + // ============================================================= + // 5. 正式启动 + // ============================================================= + await app.RunAsync($"http://0.0.0.0:{config.BasePort}"); } /// - /// 内存转码:Mat -> Jpg Bytes + /// 对齐业务启动:激活单例并启动相机管理器 /// - static byte[] EncodeToJpg(SmartFrame frame) + static async Task StartBusinessLogic(WebApplication app) { - try + var manager = app.Services.GetRequiredService(); + + // 激活哨兵逻辑 (理由:显式 Get 触发单例构造,否则不工作) + _ = app.Services.GetRequiredService(); + + // 启动相机任务加载 + await manager.StartAsync(); + + Console.WriteLine("[System] 核心业务逻辑已激活。"); + } + + /// + /// 注册 Web API 支持 + /// + static void ConfigureWebServices(WebApplicationBuilder builder, ServiceConfig cfg) + { + builder.Services.AddCors(options => { - // 假设 SmartFrame 内部持有 OpenCvSharp.Mat 类型的 InternalMat - if (frame != null && frame.InternalMat != null && !frame.InternalMat.Empty()) - { - // 80 是 JPG 质量参数,平衡画质与带宽 - return frame.InternalMat.ImEncode(".jpg", new int[] { 1, 80 }); - } - } - catch + options.AddPolicy("AllowAll", p => p.AllowAnyOrigin().AllowAnyHeader().AllowAnyMethod()); + }); + + // ★★★★★ 补全点:跨项目控制器加载 ★★★★★ + // 理由:Controller 定义在 SDK 项目中,必须通过 AddApplicationPart 显式挂载 + builder.Services.AddControllers(options => { - // 容错处理,防止一帧损坏导致程序崩溃 - } - return Array.Empty(); + options.Filters.Add(); + }) + .AddApplicationPart(typeof(CamerasController).Assembly) // 必备:加载相机控制接口 + .AddApplicationPart(typeof(MonitorController).Assembly); // 必备:加载监控接口 + + builder.Services.AddEndpointsApiExplorer(); + builder.Services.AddSwaggerGen(c => + { + c.SwaggerDoc("v1", new OpenApiInfo { Title = $"SHH Gateway #{cfg.AppId}", Version = "v1" }); + }); } } \ No newline at end of file diff --git a/SHH.CameraService/SHH.CameraService.csproj b/SHH.CameraService/SHH.CameraService.csproj index ef527a6..1eda48a 100644 --- a/SHH.CameraService/SHH.CameraService.csproj +++ b/SHH.CameraService/SHH.CameraService.csproj @@ -7,6 +7,12 @@ enable + + + + + + diff --git a/SHH.CameraService/VideoDataChannel.cs b/SHH.CameraService/VideoDataChannel.cs deleted file mode 100644 index 422859c..0000000 --- a/SHH.CameraService/VideoDataChannel.cs +++ /dev/null @@ -1,63 +0,0 @@ -using System.Threading.Channels; - -namespace SHH.CameraService; - -/// -/// 视频数据高速通道 -/// 作用:解耦 采集线程(Producer) 和 发送线程(Consumer) -/// 特性:使用 BoundedChannel,当网络发送慢时,自动丢弃旧帧(DropOldest),防止内存溢出。 -/// -public class VideoDataChannel -{ - // 创建一个有限容量的通道 (容量 5) - // 如果发送端太慢,这就满了,DropOldest 会丢弃最旧的帧,保证实时性 - private readonly Channel _channel = Channel.CreateBounded( - new BoundedChannelOptions(5) - { - FullMode = BoundedChannelFullMode.DropOldest, // 核心策略:丢弃旧帧 - SingleReader = true, // 只有一个 ZeroMQWorker 在读 - SingleWriter = false //可能有多个相机线程在写 - }); - - // ★★★ 新增:公开 Reader 属性,让外部可以直接调用 ReadAsync ★★★ - public ChannelReader Reader => _channel.Reader; - - /// - /// 写入数据 (生产者调用) - /// - public ValueTask WriteAsync(VideoPayload payload) - { - return _channel.Writer.WriteAsync(payload); - } - - /// - /// 读取数据流 (消费者调用) - /// - public IAsyncEnumerable ReadAllAsync(CancellationToken ct) - { - return _channel.Reader.ReadAllAsync(ct); - } -} - -// 附带:如果您的项目中还没有定义 VideoPayload,这里是一个最小实现 -// 如果 SHH.Contracts 中已有,请忽略此类 -public class VideoPayload -{ - /// 相机唯一标识 - public string CameraId { get; set; } = string.Empty; - - /// 采集时间 - public DateTime CaptureTime { get; set; } - - /// 发送时间 - public DateTime DispatchTime { get; set; } - - /// 原始宽 - public int OriginalWidth { get; set; } - - /// 原始高 - public int OriginalHeight { get; set; } - - /// 已编码的图片数据 (JPG) - public byte[] OriginalImageBytes { get; set; } = Array.Empty(); -} \ No newline at end of file diff --git a/SHH.CameraService/ZeroMQBridgeWorker.cs b/SHH.CameraService/ZeroMQBridgeWorker.cs deleted file mode 100644 index 6a3e764..0000000 --- a/SHH.CameraService/ZeroMQBridgeWorker.cs +++ /dev/null @@ -1,87 +0,0 @@ -using Microsoft.Extensions.Hosting; -using NetMQ; -using NetMQ.Sockets; -using SHH.CameraSdk; - -namespace SHH.CameraService; - -public class ZeroMQBridgeWorker : BackgroundService -{ - private readonly ServiceConfig _config; - private readonly VideoDataChannel _channel; // 数据源 - - public ZeroMQBridgeWorker(ServiceConfig config, VideoDataChannel channel) - { - _config = config; - _channel = channel; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - // 1. 如果不是主动/混合模式,不需要连接 - if (!_config.ShouldConnect) return; - - // ★★★ 核心修正:直接读取解析好的视频地址列表 ★★★ - // 这些地址来自参数 --uris "IP,VideoPort&CommandPort" 中的 VideoPort 部分 (符号左边) - var streamUris = _config.VideoEndpoints; - - if (streamUris.Count == 0) - { - Console.WriteLine("[推流] 未在参数中找到视频通道地址(位于&符号左侧),跳过连接。"); - return; - } - - // 2. 初始化 Publisher Socket - // 特点:只需 Send 一次,底层会自动分发给所有 Connect 的 Dashboard - using var pubSocket = new PublisherSocket(); - - // 设置发送高水位 (HWM) - // 防止网络拥塞或接收端处理慢时,内存无限增长。超过50帧积压就开始丢弃旧帧。 - pubSocket.Options.SendHighWatermark = 50; - - // 3. 连接所有视频目标 - foreach (var uri in streamUris) - { - Console.WriteLine($"[推流] 连接视频接收端: {uri}"); - pubSocket.Connect(uri); - } - - Console.WriteLine($"[推流] 服务就绪 (AppId: {_config.AppId}),等待视频帧..."); - - // 4. 推流循环 - while (!stoppingToken.IsCancellationRequested) - { - try - { - // 从通道读取最新帧 (支持异步等待) - // 注意:这里使用了之前 VideoDataChannel 暴露出来的 Reader 属性 - var payload = await _channel.Reader.ReadAsync(stoppingToken); - - // 简单校验 - if (payload == null || payload.OriginalImageBytes == null || payload.OriginalImageBytes.Length == 0) - continue; - - // 构造 Topic (通常用 AppId 作为 Topic,这样 Dashboard 可以按需订阅) - string topic = _config.AppId; - - // 发送两帧:[Topic] [ImageBytes] - // 这样 Dashboard 的 Subscriber 可以通过 Subscribe(topic) 来过滤 - pubSocket.SendMoreFrame(topic) - .SendFrame(payload.OriginalImageBytes); - - // 调试日志 (生产环境建议注释掉,否则刷屏) - // Console.WriteLine($"[推流] Sent {payload.OriginalImageBytes.Length} bytes"); - } - catch (OperationCanceledException) - { - break; // 正常退出 - } - catch (Exception ex) - { - Console.WriteLine($"[推流] 发送异常: {ex.Message}"); - // 发生错误稍微停顿,防止死循环占用 CPU - await Task.Delay(1000, stoppingToken); - } - } - } -} \ No newline at end of file diff --git a/SHH.CameraService/ZeroMqBridgeService.cs b/SHH.CameraService/ZeroMqBridgeService.cs deleted file mode 100644 index 7aa4127..0000000 --- a/SHH.CameraService/ZeroMqBridgeService.cs +++ /dev/null @@ -1,140 +0,0 @@ -using Microsoft.Extensions.Hosting; -using OpenCvSharp; -using SHH.Contracts; -using SHH.NetMQ; - -namespace SHH.CameraSdk -{ - /// - /// ZeroMQ 消息桥接服务(后台服务)。 - /// 核心职责:订阅系统全局视频帧广播,将帧数据编码为标准协议格式后,通过 ZeroMQ 分发至外部系统。 - /// 设计特性: - /// 1. 无侵入集成:通过订阅 GlobalStreamDispatcher 事件,无需修改原有帧处理流程。 - /// 2. 自动适配:支持动态增删设备,无需手动注册设备监听。 - /// 3. 安全隔离:帧数据深拷贝,避免跨线程内存访问冲突。 - /// - public class ZeroMqBridgeService : BackgroundService - { - #region --- 依赖注入字段 --- - - /// - /// ZeroMQ 分发服务器(用于广播帧数据至多个订阅端) - /// - private readonly DistributorServer _distributor; - - /// - /// ZeroMQ 转发客户端(用于定向推送帧数据至指定目标) - /// - private readonly ForwarderClient _forwarder; - - #endregion - - #region --- 构造函数 --- - - /// - /// 初始化 实例。 - /// - /// ZeroMQ 分发服务器实例(通过 DI 注入) - /// ZeroMQ 转发客户端实例(通过 DI 注入) - public ZeroMqBridgeService(DistributorServer distributor, ForwarderClient forwarder) - { - _distributor = distributor ?? throw new ArgumentNullException(nameof(distributor)); - _forwarder = forwarder ?? throw new ArgumentNullException(nameof(forwarder)); - } - - #endregion - - #region --- 后台服务核心逻辑 --- - - /// - /// 启动后台服务,订阅全局视频帧广播。 - /// - /// 服务停止令牌(用于优雅关闭) - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - Console.WriteLine("[ZeroMQ Bridge] 正在连接全局视频帧广播总线..."); - - // 订阅全局帧广播事件:所有设备的帧数据都会触发该事件 - // 无需手动绑定设备,动态增删的设备自动适配 - GlobalStreamDispatcher.OnGlobalFrame += OnGlobalFrameReceived; - - Console.WriteLine("[ZeroMQ Bridge] 全局总线连接成功!已开始监听所有设备帧数据。"); - Console.WriteLine("[ZeroMQ Bridge] 说明:动态增删的设备会自动转发,无需重启服务。"); - - // 返回空任务:服务通过事件驱动,无需阻塞主线程 - return Task.CompletedTask; - } - - /// - /// 停止后台服务,取消事件订阅以避免内存泄漏。 - /// - /// 取消令牌 - public override Task StopAsync(CancellationToken cancellationToken) - { - Console.WriteLine("[ZeroMQ Bridge] 正在停止服务,取消全局总线订阅..."); - - // 取消事件订阅:必须执行,否则会导致内存泄漏 - GlobalStreamDispatcher.OnGlobalFrame -= OnGlobalFrameReceived; - - Console.WriteLine("[ZeroMQ Bridge] 服务已安全停止。"); - return base.StopAsync(cancellationToken); - } - - #endregion - - #region --- 帧数据处理核心逻辑 --- - - /// - /// 全局帧数据接收回调(事件处理函数)。 - /// 处理流程:安全检查 → 帧数据深拷贝 → JPG 编码 → 封装为标准协议 → ZeroMQ 分发。 - /// - /// 产生该帧的设备唯一标识 - /// 智能帧对象(包含原始/处理后图像数据) - private void OnGlobalFrameReceived(long deviceId, SmartFrame frame) - { - try - { - // 1. 安全校验:跳过空帧或已释放的帧 - var sourceMat = frame.TargetMat ?? frame.InternalMat; - if (sourceMat == null || sourceMat.Empty() || sourceMat.IsDisposed) - return; - - // 2. 深拷贝图像数据:避免跨线程访问冲突(原帧可能被其他模块异步释放) - using var safeMat = sourceMat.Clone(); - - // 3. 图像编码:将 OpenCV Mat 转换为 JPG 字节数组(质量70,平衡画质与性能) - var jpgEncodeParams = new int[] { (int)ImwriteFlags.JpegQuality, 70 }; - byte[] jpgBytes = safeMat.ImEncode(".jpg", jpgEncodeParams); - - // 4. 封装为标准传输协议:使用 SHH.Contracts 中的 VideoPayload 统一格式 - var videoPayload = new VideoPayload - { - CameraId = deviceId.ToString(), // 设备ID(转为字符串,兼容协议标准) - CaptureTime = DateTime.Now, // 帧采集时间(当前时间) - DispatchTime = DateTime.Now, // 帧分发时间(当前时间) - OriginalWidth = safeMat.Width, // 图像原始宽度(编码后宽度) - OriginalHeight = safeMat.Height, // 图像原始高度(编码后高度) - OriginalImageBytes = jpgBytes // JPG 编码后的二进制数据 - }; - - // 5. 传递订阅者ID:保持与原帧的订阅者关联 - if (frame.SubscriberIds.Any()) - videoPayload.SubscriberIds.AddRange(frame.SubscriberIds); - - // 6. ZeroMQ 分发:同时执行广播和定向推送(根据业务需求选择,可按需注释) - _distributor.Broadcast(videoPayload); // 广播给所有订阅端 - _forwarder.Push(videoPayload); // 定向推送给指定目标 - - // 调试日志(生产环境建议注释,避免性能损耗) - // Console.WriteLine($"[ZeroMQ Bridge] 转发设备 {deviceId} 帧数据,大小:{jpgBytes.Length / 1024}KB"); - } - catch (Exception ex) - { - // 异常隔离:单个帧处理失败不影响整体服务运行 - Console.WriteLine($"[ZeroMQ Bridge] 帧转发失败(设备ID:{deviceId}):{ex.Message}"); - } - } - - #endregion - } -} \ No newline at end of file diff --git a/SHH.Contracts/Commands/CameraConfigDto.cs b/SHH.Contracts/Commands/CameraConfigDto.cs new file mode 100644 index 0000000..875f615 --- /dev/null +++ b/SHH.Contracts/Commands/CameraConfigDto.cs @@ -0,0 +1,174 @@ +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; + +namespace SHH.Contracts +{ + // ============================================================================== + // 1. 物理与运行配置 DTO (对应 CRUD 操作) + // 用于设备新增/全量配置查询,包含基础身份、连接信息、运行参数等全量字段 + // ============================================================================== + public class CameraConfigDto + { + // --- 基础身份 (Identity) --- + /// + /// 设备唯一标识 + /// + [Required(ErrorMessage = "设备ID不能为空")] + [Range(1, long.MaxValue, ErrorMessage = "设备ID必须为正整数")] + public long Id { get; set; } + + /// + /// 设备友好名称 + /// + [MaxLength(64, ErrorMessage = "设备名称长度不能超过64个字符")] + public string Name { get; set; } + + /// + /// 摄像头品牌类型 (0:HikVision, 1:Dahua, 2:RTSP...) + /// + [Range(0, 10, ErrorMessage = "品牌类型值必须在0-10范围内")] + public int Brand { get; set; } + + /// + /// 设备安装位置描述 + /// + [MaxLength(128, ErrorMessage = "安装位置长度不能超过128个字符")] + public string Location { get; set; } + + // --- 主板关联信息 (Metadata) --- + + /// + /// 关联主板IP地址 + /// + [RegularExpression(@"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)?$", + ErrorMessage = "请输入合法的IPv4地址")] + public string MainboardIp { get; set; } = string.Empty; + + /// + /// 关联主板端口 + /// + [Range(0, 65535, ErrorMessage = "主板端口号必须在1-65535范围内")] + public int MainboardPort { get; set; } = 0; + + // --- 核心连接 (Connectivity) - 修改此类参数触发冷重启 --- + + /// + /// 摄像头IP地址 + /// + [Required(ErrorMessage = "IP地址不能为空")] + [RegularExpression(@"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$", + ErrorMessage = "请输入合法的IPv4地址")] + public string IpAddress { get; set; } + + /// + /// 登录用户名 + /// + [MaxLength(32, ErrorMessage = "用户名长度不能超过32个字符")] + public string Username { get; set; } + + /// + /// 登录密码 + /// + [MaxLength(64, ErrorMessage = "密码长度不能超过64个字符")] + public string Password { get; set; } + + /// + /// SDK端口 (如海康默认8000) + /// + [Range(1, 65535, ErrorMessage = "端口号必须在1-65535范围内")] + public ushort Port { get; set; } + + /// + /// 通道号 (通常为1) + /// + [Range(0, 256, ErrorMessage = "通道号必须在0-256范围内")] + public int ChannelIndex { get; set; } + + /// + /// 码流类型 (0:主码流, 1:子码流) + /// + [Range(0, 1, ErrorMessage = "码流类型只能是0(主码流)或1(子码流)")] + public int StreamType { get; set; } + + // 渲染句柄 (通常下发时为0,由本地窗口绑定时再指定,或者此处仅作占位) + public long RenderHandle { get; set; } + + /// + /// RTSP流路径 (备用或非SDK模式使用) + /// + [MaxLength(256, ErrorMessage = "RTSP地址长度不能超过256个字符")] + public string RtspPath { get; set; } + + // --- 运行时参数 (Runtime Options) - 支持热更新 --- + + /// + /// 是否使用灰度图 (用于AI分析场景加速) + /// + public bool UseGrayscale { get; set; } = false; + + /// + /// 是否启用图像增强 (去噪/锐化等) + /// + public bool EnhanceImage { get; set; } = true; + + // --- 画面变换 (Transform) - 支持热更新 --- + /// + /// 是否允许图像压缩 (降低带宽占用) + /// + public bool AllowCompress { get; set; } = true; + + /// + /// 是否允许图像放大 (提升渲染质量) + /// + public bool AllowExpand { get; set; } = false; + + /// + /// 目标分辨率 (格式如 1920x1080,空则保持原图) + /// + [RegularExpression(@"^\d+x\d+$", ErrorMessage = "分辨率格式必须为 宽度x高度 (如 1920x1080)")] + public string TargetResolution { get; set; } = string.Empty; + + /// + /// 随配置一并下发的自动订阅请求 + /// + public List AutoSubscriptions { get; set; } + = new List(); + + /// + /// 是否立即执行 + /// + public bool ImmediateExecution { get; set; } + } + + /// + /// 订阅项 + /// + public class CameraConfigSubscribeDto + { + /// + /// 订阅标识 + /// + public string AppId { get; set; } + + /// + /// 订阅业务类型 SubscriptionType + /// + public int Type { get; set; } + + /// + /// 要求的帧率:8帧或1帧 + /// + public int TargetFps { get; set; } + + /// + /// 备注 + /// + public string Memo { get; set; } + + /// + /// 是否需要高清晰度 + /// + public bool NeedHighDefinition { get; set; } + = false; + } +} \ No newline at end of file diff --git a/SHH.Contracts/SHH.Contracts.csproj b/SHH.Contracts/SHH.Contracts.csproj index eeda231..561ca6f 100644 --- a/SHH.Contracts/SHH.Contracts.csproj +++ b/SHH.Contracts/SHH.Contracts.csproj @@ -6,6 +6,7 @@ + diff --git a/SHH.Contracts/VideoPayload.cs b/SHH.Contracts/VideoPayload.cs index 72440ca..b8d1798 100644 --- a/SHH.Contracts/VideoPayload.cs +++ b/SHH.Contracts/VideoPayload.cs @@ -1,120 +1,81 @@ using System; using System.Collections.Generic; using Newtonsoft.Json; +// 注意:如果不想依赖 Newtonsoft,也可以用 System.Text.Json,但 Newtonsoft 在 Std 2.0 中兼容性更好 namespace SHH.Contracts { /// - /// 视频数据传输契约(增强版) + /// 视频数据传输契约(纯净版 POCO) /// public class VideoPayload { - /// - /// 初始化 类的新实例。 - /// public VideoPayload() { - // 预分配一个容量为 16 的列表,以减少内存分配和垃圾回收的压力。 SubscriberIds = new List(16); + Diagnostics = new Dictionary(4); } - #region --- 元数据 (Metadata) --- + #region --- 1. 元数据 (Metadata) --- - /// - /// 获取订阅了此帧数据的客户端ID列表。 - /// - public List SubscriberIds { get; } - - /// - /// 获取或设置摄像头的唯一标识符。 - /// public string CameraId { get; set; } /// - /// 获取或设置图像的采集时间,即从SDK获取到图像数据的时间。 + /// 采集时间戳 (Unix 毫秒) /// - public DateTime CaptureTime { get; set; } + public long CaptureTimestamp { get; set; } /// - /// 获取或设置图像的分发时间,即服务器准备将此帧数据发送给客户端的时间。 + /// 分发时间戳 (Unix 毫秒) /// - public DateTime DispatchTime { get; set; } + public long DispatchTimestamp { get; set; } - /// - /// 获取或设置图像的原始宽度。 - /// public int OriginalWidth { get; set; } - - /// - /// 获取或设置图像的原始高度。 - /// public int OriginalHeight { get; set; } - - /// - /// 获取或设置经过处理后的目标图像宽度。 - /// public int TargetWidth { get; set; } + public int TargetHeight { get; set; } + + public List SubscriberIds { get; } + + public Dictionary Diagnostics { get; } /// - /// 获取或设置经过处理后的目标图像高度。 + /// 指示标志:是否存在原始图 /// - public int TargetHeight { get; set; } + public bool HasOriginalImage { get; set; } + + /// + /// 指示标志:是否存在处理图 + /// + public bool HasTargetImage { get; set; } #endregion - #region --- 核心二进制数据 --- + #region --- 2. 二进制数据 (Binary) --- - /// - /// 获取或设置原始图像的二进制数据(例如,从SDK直接获取的JPG或YUV数据)。 - /// 此属性被标记为 ,以防止在序列化元数据时将其包含在内,从而避免严重的性能问题。 - /// + // 标记 JsonIgnore,防止被错误序列化 [JsonIgnore] public byte[] OriginalImageBytes { get; set; } - /// - /// 获取或设置经过处理后的目标图像的二进制数据(例如,经过缩放、画框或其他AI处理后的图像)。 - /// 此属性可为空,表示此帧可能只包含原始图像或没有图像数据。 - /// 同样,此属性也被标记为 。 - /// [JsonIgnore] public byte[] TargetImageBytes { get; set; } #endregion - #region --- 序列化与反序列化辅助方法 --- + #region --- 3. 辅助方法 (仅保留 JSON 逻辑) --- /// - /// 将当前对象的元数据序列化为一个纯净的 JSON 字符串。 - /// 此方法会自动忽略所有二进制数据()。 + /// 获取纯元数据的 JSON 字符串 /// - /// 包含元数据的 JSON 字符串。 public string GetMetadataJson() { - // 创建一个匿名对象,该对象仅包含需要被序列化的元数据字段。 - // 这比直接序列化整个对象更安全、更高效。 - var metadata = new - { - CameraId, - CaptureTime, - DispatchTime, - OriginalWidth, - OriginalHeight, - TargetWidth, - TargetHeight, - SubscriberIds, - // 附加一个标志,指示此载荷中是否包含目标图像数据,以便接收端进行判断。 - HasTargetImage = (TargetImageBytes != null && TargetImageBytes.Length > 0) - }; - return JsonConvert.SerializeObject(metadata); + // 在序列化前自动更新标志位,防止逻辑不同步 + this.HasOriginalImage = (OriginalImageBytes != null && OriginalImageBytes.Length > 0); + this.HasTargetImage = (TargetImageBytes != null && TargetImageBytes.Length > 0); + + return JsonConvert.SerializeObject(this); } - /// - /// 从一个 JSON 字符串反序列化,创建一个新的 对象。 - /// 注意:反序列化后,对象中的二进制图像数据()将为 null, - /// 需要在后续步骤中手动填充。 - /// - /// 包含元数据的 JSON 字符串。 - /// 一个新的 对象,其元数据已填充。 public static VideoPayload FromMetadataJson(string json) { return JsonConvert.DeserializeObject(json); diff --git a/SHH.NetMQ/DistributorServer.cs b/SHH.NetMQ/DistributorServer.cs index 00b70b7..7bf6f46 100644 --- a/SHH.NetMQ/DistributorServer.cs +++ b/SHH.NetMQ/DistributorServer.cs @@ -35,7 +35,7 @@ namespace SHH.NetMQ if (payload == null) return; // 补充发送时间 - payload.DispatchTime = DateTime.Now; + payload.DispatchTimestamp = DateTime.Now.Ticks; // 准备数据帧 string jsonMeta = payload.GetMetadataJson(); diff --git a/SHH.NetMQ/ForwarderClient.cs b/SHH.NetMQ/ForwarderClient.cs index 398dd51..3ea4c73 100644 --- a/SHH.NetMQ/ForwarderClient.cs +++ b/SHH.NetMQ/ForwarderClient.cs @@ -83,8 +83,8 @@ namespace SHH.NetMQ var metaJson = JsonConvert.SerializeObject(new { payload.CameraId, - payload.CaptureTime, - payload.DispatchTime, + payload.CaptureTimestamp, + payload.DispatchTimestamp, payload.OriginalWidth, payload.OriginalHeight, // 如果有订阅者ID列表也带上