From 8ef8139382a1027dd3aa373e851adbedacf6d8cc Mon Sep 17 00:00:00 2001 From: wilson Date: Thu, 15 Jan 2026 11:04:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E9=80=9A=E8=AE=AF=E5=9B=BE=E5=83=8F?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E5=AF=B9=E6=8E=A5=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{NetSenders => }/CameraEngineWorker.cs | 0 .../Core/CmdClients/CommandClientWorker.cs | 194 ------------------ .../Core/CmdClients/CommandDispatcher.cs | 91 -------- .../Core/CmdClients/ConnectedClient.cs | 30 --- SHH.CameraService/Core/CommandDispatcher.cs | 66 ++++++ .../NetSenders/NetMQProtocolExtensions.cs | 87 -------- .../Core/{Configs => }/PushTargetConfig.cs | 0 .../Grpc/CommandReceiverWorker.cs | 123 ----------- .../Handlers/DeviceConfigHandler.cs} | 4 +- .../Handlers/DeviceStatusHandler.cs} | 10 +- .../GrpcImpls/Handlers/GatewayService.cs | 102 +++++++++ .../Handlers}/ICommandHandler.cs | 0 .../Handlers}/RemoveCameraHandler.cs | 0 .../ImageFactory/ImageMonitorController.cs} | 4 +- .../ImageFactory}/PipelineConfigurator.cs | 0 .../ImageFactory}/VideoDataChannel.cs | 0 .../ImageProcs/GrpcSenderWorker.cs} | 11 +- .../ImageProcs}/StreamTarget.cs | 0 SHH.CameraService/Program.cs | 10 +- .../Protos/gateway_service.proto | 98 +++++---- 20 files changed, 237 insertions(+), 593 deletions(-) rename SHH.CameraService/Core/{NetSenders => }/CameraEngineWorker.cs (100%) delete mode 100644 SHH.CameraService/Core/CmdClients/CommandClientWorker.cs delete mode 100644 SHH.CameraService/Core/CmdClients/CommandDispatcher.cs delete mode 100644 SHH.CameraService/Core/CmdClients/ConnectedClient.cs create mode 100644 SHH.CameraService/Core/CommandDispatcher.cs delete mode 100644 SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs rename SHH.CameraService/Core/{Configs => }/PushTargetConfig.cs (100%) delete mode 100644 SHH.CameraService/Grpc/CommandReceiverWorker.cs rename SHH.CameraService/{Core/CmdClients/SyncCameraHandler.cs => GrpcImpls/Handlers/DeviceConfigHandler.cs} (98%) rename SHH.CameraService/{Core/NetSenders/DeviceStateMonitorWorker.cs => GrpcImpls/Handlers/DeviceStatusHandler.cs} (95%) create mode 100644 SHH.CameraService/GrpcImpls/Handlers/GatewayService.cs rename SHH.CameraService/{Core/CmdClients => GrpcImpls/Handlers}/ICommandHandler.cs (100%) rename SHH.CameraService/{Core/CmdClients => GrpcImpls/Handlers}/RemoveCameraHandler.cs (100%) rename SHH.CameraService/{Core/NetSenders/NetworkStreamingWorker.cs => GrpcImpls/ImageFactory/ImageMonitorController.cs} (97%) rename SHH.CameraService/{Core/NetSenders => GrpcImpls/ImageFactory}/PipelineConfigurator.cs (100%) rename SHH.CameraService/{Core/NetSenders => GrpcImpls/ImageFactory}/VideoDataChannel.cs (100%) rename SHH.CameraService/{Core/NetSenders/NetMqSenderWorker.cs => GrpcImpls/ImageProcs/GrpcSenderWorker.cs} (89%) rename SHH.CameraService/{Core/Configs => GrpcImpls/ImageProcs}/StreamTarget.cs (100%) diff --git a/SHH.CameraService/Core/NetSenders/CameraEngineWorker.cs b/SHH.CameraService/Core/CameraEngineWorker.cs similarity index 100% rename from SHH.CameraService/Core/NetSenders/CameraEngineWorker.cs rename to SHH.CameraService/Core/CameraEngineWorker.cs diff --git a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs deleted file mode 100644 index 25c034f..0000000 --- a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs +++ /dev/null @@ -1,194 +0,0 @@ -//using System.Text; -//using MessagePack; -//using Microsoft.Extensions.Hosting; -//using NetMQ; -//using NetMQ.Monitoring; // ★ 1. 必须引用 Monitoring 命名空间 -//using NetMQ.Sockets; -//using SHH.CameraSdk; -//using SHH.Contracts; - -//namespace SHH.CameraService; - -//public class CommandClientWorker : BackgroundService -//{ -// private readonly ServiceConfig _config; -// private readonly CommandDispatcher _dispatcher; -// private readonly InterceptorPipeline _pipeline; - -// // 管理多个 Socket -// private readonly List _sockets = new(); - -// // ★ 2. 新增:保存 Monitor 列表,防止被 GC 回收 -// private readonly List _monitors = new(); - -// private NetMQPoller? _poller; - -// public CommandClientWorker( -// ServiceConfig config, -// CommandDispatcher dispatcher, -// InterceptorPipeline pipeline) -// { -// _config = config; -// _dispatcher = dispatcher; -// _pipeline = pipeline; -// } - -// protected override async Task ExecuteAsync(CancellationToken stoppingToken) -// { -// await Task.Yield(); - -// if (!_config.ShouldConnect || _config.CommandEndpoints.Count == 0) return; - -// _poller = new NetMQPoller(); - -// // ------------------------------------------------------------- -// // 核心修改区:建立连接并挂载监控器 -// // ------------------------------------------------------------- -// foreach (var ep in _config.CommandEndpoints) -// { -// try -// { -// var socket = new DealerSocket(); -// socket.Options.Identity = Encoding.UTF8.GetBytes(_config.AppId); - -// var monitorUrl = $"inproc://monitor_{Guid.NewGuid():N}"; -// var monitor = new NetMQMonitor(socket, monitorUrl, SocketEvents.Connected); - -// monitor.Connected += async (s, args) => -// { -// Console.WriteLine($"[指令] 网络连接建立: {ep.Uri} -> 正在补发注册包..."); -// await SendRegisterAsync(socket); -// }; - -// // ★★★ 修正点:使用 AttachToPoller 代替 Add ★★★ -// // 错误写法: _poller.Add(monitor); -// monitor.AttachToPoller(_poller); - -// // 依然需要保存引用,防止被 GC 回收 -// _monitors.Add(monitor); - -// socket.Connect(ep.Uri); -// socket.ReceiveReady += OnSocketReceiveReady; - -// _sockets.Add(socket); -// _poller.Add(socket); - -// Console.WriteLine($"[指令] 通道初始化完成: {ep.Uri} (带自动重连监控)"); -// } -// catch (Exception ex) -// { -// Console.WriteLine($"[指令] 连接初始化异常: {ex.Message}"); -// } -// } - -// if (_sockets.Count == 0) return; - -// // ================================================================= -// // 6. 绑定 ACK 逻辑 (保持不变) -// // ================================================================= -// _dispatcher.OnResponseReady += async (result) => -// { -// try -// { -// byte[] resultBytes = MessagePackSerializer.Serialize(result); -// var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.CommandResult, resultBytes); - -// if (ctx != null) -// { -// foreach (var socket in _sockets) -// { -// socket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); -// } -// Console.WriteLine($"[指令] ACK 已广播 (ID: {result.RequestId})"); -// } -// } -// catch (Exception ex) -// { -// Console.WriteLine($"[ACK] 发送失败: {ex.Message}"); -// } -// }; - -// // ================================================================= -// // 7. 启动 Poller -// // ================================================================= -// // 注意:我们不需要手动发第一次注册包了, -// // 因为 Poller 启动后,底层 TCP 会建立连接,从而触发 monitor.Connected 事件, -// // 事件里会自动发送注册包。这就是“自动档”的好处。 -// _poller.RunAsync(); - -// // 阻塞直到取消 -// while (!stoppingToken.IsCancellationRequested) -// { -// await Task.Delay(1000, stoppingToken); -// } - -// // 清理 -// _poller.Stop(); -// _poller.Dispose(); -// foreach (var m in _monitors) m.Dispose(); // 释放监控器 -// foreach (var s in _sockets) s.Dispose(); -// } - -// // ================================================================= -// // ★ 8. 抽离出的注册包发送逻辑 (供 Monitor 调用) -// // ================================================================= -// private async Task SendRegisterAsync(DealerSocket targetSocket) -// { -// try -// { -// var registerPayload = new RegisterPayload -// { -// Protocol = ProtocolHeaders.ServerRegister, -// InstanceId = _config.AppId, -// ProcessId = Environment.ProcessId, -// Version = "1.0.0", -// ServerIp = "127.0.0.1", // 建议优化:获取本机真实IP -// WebApiPort = _config.BasePort, -// StartTime = DateTime.Now -// }; - -// byte[] regData = MessagePackSerializer.Serialize(registerPayload); - -// // 执行拦截器 -// var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.ServerRegister, regData); - -// if (ctx != null) -// { -// // 直接向触发事件的那个 Socket 发送 -// // DealerSocket 允许在连接未完全就绪时 Send,它会缓存直到网络通畅 -// targetSocket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); -// // Console.WriteLine($"[指令] 身份注册包已推入队列: {targetSocket.Options.Identity}"); -// } -// } -// catch (Exception ex) -// { -// Console.WriteLine($"[指令] 注册包发送失败: {ex.Message}"); -// } -// } - -// private async void OnSocketReceiveReady(object? sender, NetMQSocketEventArgs e) -// { -// NetMQMessage incomingMsg = new NetMQMessage(); -// if (e.Socket.TryReceiveMultipartMessage(ref incomingMsg)) -// { -// if (incomingMsg.FrameCount >= 2) -// { -// try -// { -// string rawProtocol = incomingMsg[0].ConvertToString(); -// byte[] rawData = incomingMsg[1].ToByteArray(); - -// var ctx = await _pipeline.ExecuteReceiveAsync(rawProtocol, rawData); -// if (ctx != null) -// { -// await _dispatcher.DispatchAsync(ctx.Protocol, ctx.Data); -// } -// } -// catch (Exception ex) -// { -// Console.WriteLine($"[指令] 处理异常: {ex.Message}"); -// } -// } -// } -// } -//} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/CommandDispatcher.cs b/SHH.CameraService/Core/CmdClients/CommandDispatcher.cs deleted file mode 100644 index 58e6c55..0000000 --- a/SHH.CameraService/Core/CmdClients/CommandDispatcher.cs +++ /dev/null @@ -1,91 +0,0 @@ -// 文件: Core\CmdClients\CommandDispatcher.cs - -using MessagePack; -using Newtonsoft.Json.Linq; -using SHH.Contracts; -using System.Text; - -namespace SHH.CameraService; - -public class CommandDispatcher -{ - // 1. 注入路由表 - private readonly Dictionary _handlers; - - // 2. 定义回执事件 (ACK闭环的核心) - public event Action? OnResponseReady; - - // 3. 构造函数:注入所有 Handler - public CommandDispatcher(IEnumerable handlers) - { - // 将注入的 Handler 转换为字典,Key = ActionName (e.g. "SyncCamera") - _handlers = handlers.ToDictionary(h => h.ActionName, h => h, StringComparer.OrdinalIgnoreCase); - } - - public async Task DispatchAsync(string protocol, byte[] data) - { - try - { - // 只处理 COMMAND 协议 - if (protocol != ProtocolHeaders.Command) return; - - // 反序列化信封 - var envelope = MessagePackSerializer.Deserialize(data); - if (envelope == null) return; - - string cmdCode = envelope.CmdCode; // e.g. "SyncCamera" - Console.WriteLine($"[分发] 收到指令: {cmdCode} (ID: {envelope.RequestId})"); - - bool isSuccess = true; - string message = "OK"; - - // --- 路由匹配逻辑 --- - if (_handlers.TryGetValue(cmdCode, out var handler)) - { - try - { - // 数据适配:你的 Handler 需要 JToken - // 如果 envelope.JsonParams 是空的,传个空对象防止报错 - var jsonStr = string.IsNullOrEmpty(envelope.JsonParams) ? "{}" : envelope.JsonParams; - var token = JToken.Parse(jsonStr); - - // ★★★ 核心:调用 SyncCameraHandler.ExecuteAsync ★★★ - await handler.ExecuteAsync(token); - - message = $"Executed {cmdCode}"; - } - catch (Exception ex) - { - isSuccess = false; - message = $"Handler Error: {ex.Message}"; - Console.WriteLine($"[业务异常] {message}"); - } - } - else - { - isSuccess = false; - message = $"No handler found for {cmdCode}"; - Console.WriteLine($"[警告] {message}"); - } - - // --- ACK 闭环逻辑 --- - if (envelope.RequireAck) - { - var result = new CommandResult - { - Protocol = ProtocolHeaders.CommandResult, - RequestId = envelope.RequestId, // 必须带回 ID - Success = isSuccess, - Message = message, - Timestamp = DateTime.Now.Ticks - }; - // 触发事件 - OnResponseReady?.Invoke(result); - } - } - catch (Exception ex) - { - Console.WriteLine($"[Dispatcher] 致命错误: {ex.Message}"); - } - } -} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/ConnectedClient.cs b/SHH.CameraService/Core/CmdClients/ConnectedClient.cs deleted file mode 100644 index 54e5c62..0000000 --- a/SHH.CameraService/Core/CmdClients/ConnectedClient.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace SHH.CameraService; - -/// -/// 在线客户端信息模型 (已更新) -/// -public class ConnectedClient -{ - /// 唯一标识 (AppId) - public string ServiceId { get; set; } = string.Empty; - - /// 版本号 - public string Version { get; set; } = "1.0.0"; - - /// 远程进程 ID - public int Pid { get; set; } - - /// 客户端 IP - public string Ip { get; set; } = string.Empty; - - /// WebAPI 端口 (Dashboard 调用 REST 接口用) - public int WebPort { get; set; } - - /// 该客户端正在推流的目标地址 - public List TargetVideoNodes { get; set; } = new List(); - - public DateTime LastHeartbeat { get; set; } - - // 辅助属性:拼接出完整的 API BaseUrl - public string WebApiUrl => $"http://{Ip}:{WebPort}"; -} \ No newline at end of file diff --git a/SHH.CameraService/Core/CommandDispatcher.cs b/SHH.CameraService/Core/CommandDispatcher.cs new file mode 100644 index 0000000..72a7c2c --- /dev/null +++ b/SHH.CameraService/Core/CommandDispatcher.cs @@ -0,0 +1,66 @@ +using Newtonsoft.Json.Linq; +using SHH.Contracts.Grpc; + +namespace SHH.CameraService; + +/// +/// gRPC 指令分发器 +/// 职责:接收从 GrpcCommandReceiverWorker 传入的 Proto 消息,解析参数并路由至具体的 Handler。 +/// +public class CommandDispatcher +{ + private readonly Dictionary _handlers; + + /// + /// 构造函数:通过 DI 注入所有已注册的处理器 (SyncCameraHandler, RemoveCameraHandler 等) + /// + public CommandDispatcher(IEnumerable handlers) + { + // 将处理器列表转换为字典,方便 O(1) 查询 + _handlers = handlers.ToDictionary( + h => h.ActionName, + h => h, + StringComparer.OrdinalIgnoreCase); + } + + /// + /// 执行指令分发 + /// + /// 从 gRPC Server Streaming 接收到的原始 Proto 指令对象 + public async Task DispatchAsync(CommandPayloadProto protoMsg) + { + if (protoMsg == null) return; + + string cmdCode = protoMsg.CmdCode; // 例如 "Sync_Camera" + Console.WriteLine($"[Dispatcher] 收到远程指令: {cmdCode}, 请求ID: {protoMsg.RequestId}"); + + try + { + // 1. 查找对应的处理器 + if (_handlers.TryGetValue(cmdCode, out var handler)) + { + // 2. 参数转换:将 Proto 里的 JSON 字符串转换为原有 Handler 需要的 JToken + // 这样你之前的 SyncCameraHandler 代码不需要做任何逻辑改动即可直接复用 + var jsonStr = string.IsNullOrWhiteSpace(protoMsg.JsonParams) ? "{}" : protoMsg.JsonParams; + var token = JToken.Parse(jsonStr); + + // 3. 调用具体业务执行 + await handler.ExecuteAsync(token); + + Console.WriteLine($"[Dispatcher] 指令 {cmdCode} 执行成功。"); + } + else + { + Console.WriteLine($"[Dispatcher Warning] 未找到指令处理器: {cmdCode}"); + } + } + catch (Exception ex) + { + Console.WriteLine($"[Dispatcher Error] 执行指令 {cmdCode} 异常: {ex.Message}"); + } + + // 注意:关于 ACK (require_ack) + // 在 NetMQ 时代需要手动回发结果,在 gRPC Server Streaming 模式下, + // 建议通过 Unary RPC (例如另设一个 ReportCommandResult 方法) 异步上报执行结果。 + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs b/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs deleted file mode 100644 index 0e76f8f..0000000 --- a/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs +++ /dev/null @@ -1,87 +0,0 @@ -//using MessagePack; -//using NetMQ; -//using SHH.Contracts; - -//namespace SHH.CameraService -//{ -// /// -// /// 负责将业务契约转换为 ZeroMQ 传输协议 -// /// -// public static class NetMQProtocolExtensions -// { -// private const string PROTOCOL_HEADER = "SHH_V1"; - -// /// -// /// 扩展方法:将 Payload 转为 NetMQMessage -// /// 使用方法:var msg = payload.ToNetMqMessage(); -// /// -// public static NetMQMessage ToNetMqMessage(this VideoPayload payload) -// { -// var msg = new NetMQMessage(); - -// // Frame 0: 协议魔数 -// msg.Append(PROTOCOL_HEADER); - -// ////// Frame 1: 元数据 JSON -// ////msg.Append(payload.GetMetadataJson()); - -// // ★★★ 修复点:在序列化之前,手动更新 Payload 的标志位 ★★★ -// payload.HasOriginalImage = (payload.OriginalImageBytes != null && payload.OriginalImageBytes.Length > 0); -// payload.HasTargetImage = (payload.TargetImageBytes != null && payload.TargetImageBytes.Length > 0); - -// // Frame 1: Metadata (MessagePack) -// byte[] metaBytes = MessagePackSerializer.Serialize(payload); -// msg.Append(metaBytes); - -// // Frame 2: 原始图 (保持帧位对齐,无数据则发空帧) -// if (payload.HasOriginalImage && payload.OriginalImageBytes != null) -// msg.Append(payload.OriginalImageBytes); -// else -// msg.Append(Array.Empty()); - -// // Frame 3: 处理图 -// if (payload.HasTargetImage && payload.TargetImageBytes != null) -// msg.Append(payload.TargetImageBytes); -// else -// msg.Append(Array.Empty()); - -// return msg; -// } - -// /// -// /// 扩展方法:从 NetMQMessage 还原 Payload -// /// -// public static VideoPayload ToVideoPayload(this NetMQMessage msg) -// { -// if (msg == null || msg.FrameCount < 2) return null; - -// // Frame 0 Check -// if (msg[0].ConvertToString() != PROTOCOL_HEADER) return null; - -// //// Frame 1: Metadata -// //string json = msg[1].ConvertToString(); -// //var payload = VideoPayload.FromMetadataJson(json); - -// // [新代码] 直接从二进制还原 -// // ToByteArray() 虽然会产生一次拷贝,但对于 Metadata 这种小数据影响微乎其微 -// // 相比 JSON 解析 String 的开销,这已经非常快了 -// var payload = MessagePackSerializer.Deserialize(msg[1].ToByteArray()); -// if (payload == null) return null; - -// // Frame 2: Raw Image -// // 利用 BufferSize 避免不必要的内存拷贝,如果长度为0则跳过 -// if (payload.HasOriginalImage && msg[2].BufferSize > 0) -// { -// payload.OriginalImageBytes = msg[2].ToByteArray(); -// } - -// // Frame 3: Processed Image -// if (payload.HasTargetImage && msg[3].BufferSize > 0) -// { -// payload.TargetImageBytes = msg[3].ToByteArray(); -// } - -// return payload; -// } -// } -//} \ No newline at end of file diff --git a/SHH.CameraService/Core/Configs/PushTargetConfig.cs b/SHH.CameraService/Core/PushTargetConfig.cs similarity index 100% rename from SHH.CameraService/Core/Configs/PushTargetConfig.cs rename to SHH.CameraService/Core/PushTargetConfig.cs diff --git a/SHH.CameraService/Grpc/CommandReceiverWorker.cs b/SHH.CameraService/Grpc/CommandReceiverWorker.cs deleted file mode 100644 index 576707c..0000000 --- a/SHH.CameraService/Grpc/CommandReceiverWorker.cs +++ /dev/null @@ -1,123 +0,0 @@ -using Grpc.Core; -using Grpc.Net.Client; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Linq; -using SHH.CameraSdk; -using SHH.Contracts; -using SHH.Contracts.Grpc; // 引用 Proto 生成的命名空间 -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace SHH.CameraService -{ - /// - /// gRPC 指令接收后台服务 - /// 负责:1. 逻辑注册 2. 维持指令长连接 3. 指令分发 - /// - public class GrpcCommandReceiverWorker : BackgroundService - { - private readonly ILogger _logger; - private readonly ServiceConfig _config; - private readonly IEnumerable _handlers; // 自动注入所有指令处理器 - - public GrpcCommandReceiverWorker( - ILogger logger, - ServiceConfig config, - IEnumerable handlers) - { - _logger = logger; - _config = config; - _handlers = handlers; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - // 给 SDK 和数据库留出几秒钟的加载时间 - _logger.LogInformation("[gRPC Bus] 后台 Worker 准备就绪,3秒后发起连接..."); - await Task.Delay(3000, stoppingToken); - - while (!stoppingToken.IsCancellationRequested) - { - try - { - // 1. 地址预处理 (将 127.0.0.1 强制转换为 localhost 解决 Unimplemented 问题) - var ep = _config.CommandEndpoints.First(); - string targetUrl = ep.Uri.Replace("tcp://", "http://").Replace("127.0.0.1", "localhost"); - - using var channel = GrpcChannel.ForAddress(targetUrl); - var client = new GatewayProvider.GatewayProviderClient(channel); - - // --- 第一步:发起逻辑注册 (Unary) --- - _logger.LogInformation("[gRPC Bus] 正在发起逻辑注册: {Url}", targetUrl); - var regResp = await client.RegisterInstanceAsync(new RegisterRequest - { - InstanceId = _config.AppId, - Version = "2.0.0-grpc", - ServerIp = "127.0.0.1", - StartTimeTicks = DateTime.Now.Ticks - }, cancellationToken: stoppingToken); - - if (regResp.Success) - { - _logger.LogInformation("[gRPC Bus] 逻辑注册成功。正在开启长连接指令通道..."); - - // --- 第二步:开启物理指令流 (Server Streaming) --- - using var call = client.OpenCommandChannel(new CommandStreamRequest - { - InstanceId = _config.AppId - }, cancellationToken: stoppingToken); - - // --- 第三步:阻塞式监听服务端推送 --- - // 只要服务端通过 responseStream.WriteAsync 发消息,这里就会命中 - while (await call.ResponseStream.MoveNext(stoppingToken)) - { - var protoMsg = call.ResponseStream.Current; - _logger.LogInformation("[gRPC Bus] 收到远程指令: {CmdCode}", protoMsg.CmdCode); - - // 异步分发,不阻塞接收循环 - _ = DispatchCommandAsync(protoMsg); - } - } - } - catch (OperationCanceledException) { break; } - catch (Exception ex) - { - _logger.LogError("[gRPC Bus] 链路异常,5秒后重试: {Msg}", ex.Message); - await Task.Delay(5000, stoppingToken); - } - } - } - - /// - /// 指令分发逻辑 - /// - private async Task DispatchCommandAsync(CommandPayloadProto msg) - { - try - { - // 1. 寻找匹配的处理器 (SyncCameraHandler / RemoveCameraHandler) - var handler = _handlers.FirstOrDefault(h => h.ActionName == msg.CmdCode); - - if (handler != null) - { - // 2. 将 Proto 的参数转为 JToken,保持与原有处理器兼容 - var jsonParams = JToken.Parse(msg.JsonParams); - await handler.ExecuteAsync(jsonParams); - _logger.LogInformation("[gRPC Bus] 指令 {CmdCode} 执行完成", msg.CmdCode); - } - else - { - _logger.LogWarning("[gRPC Bus] 未找到处理 {CmdCode} 的处理器", msg.CmdCode); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "[gRPC Bus] 指令执行失败: {CmdCode}", msg.CmdCode); - } - } - } -} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs b/SHH.CameraService/GrpcImpls/Handlers/DeviceConfigHandler.cs similarity index 98% rename from SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs rename to SHH.CameraService/GrpcImpls/Handlers/DeviceConfigHandler.cs index 217e681..61622ac 100644 --- a/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs +++ b/SHH.CameraService/GrpcImpls/Handlers/DeviceConfigHandler.cs @@ -7,7 +7,7 @@ namespace SHH.CameraService; /// /// 同步设备配置处理器 /// -public class SyncCameraHandler : ICommandHandler +public class DeviceConfigHandler : ICommandHandler { private readonly CameraManager _cameraManager; @@ -20,7 +20,7 @@ public class SyncCameraHandler : ICommandHandler /// 构造函数 /// /// - public SyncCameraHandler(CameraManager cameraManager) + public DeviceConfigHandler(CameraManager cameraManager) { _cameraManager = cameraManager; } diff --git a/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs b/SHH.CameraService/GrpcImpls/Handlers/DeviceStatusHandler.cs similarity index 95% rename from SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs rename to SHH.CameraService/GrpcImpls/Handlers/DeviceStatusHandler.cs index 325190a..5653510 100644 --- a/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs +++ b/SHH.CameraService/GrpcImpls/Handlers/DeviceStatusHandler.cs @@ -13,11 +13,11 @@ namespace SHH.CameraService; /// 设备状态监控工作者 (gRPC 版) /// 职责:监控相机状态并在状态变更或心跳周期内,通过 gRPC 批量上报至所有配置的端点 /// -public class DeviceStateMonitorWorker : BackgroundService +public class DeviceStatusHandler : BackgroundService { private readonly CameraManager _manager; private readonly ServiceConfig _config; - private readonly ILogger _logger; + private readonly ILogger _logger; // 状态存储:CameraId -> 状态载荷 private readonly ConcurrentDictionary _stateStore = new(); @@ -25,10 +25,10 @@ public class DeviceStateMonitorWorker : BackgroundService private volatile bool _isDirty = false; private long _lastSendTick = 0; - public DeviceStateMonitorWorker( + public DeviceStatusHandler( CameraManager manager, ServiceConfig config, - ILogger logger) + ILogger logger) { _manager = manager; _config = config; @@ -104,7 +104,6 @@ public class DeviceStateMonitorWorker : BackgroundService // 1. 构建 gRPC 请求包 var request = new StatusBatchRequest { - Protocol = "GRPC", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; @@ -116,7 +115,6 @@ public class DeviceStateMonitorWorker : BackgroundService CameraId = item.CameraId, IsOnline = item.IsOnline, Reason = item.Reason, - Timestamp = item.Timestamp }); } diff --git a/SHH.CameraService/GrpcImpls/Handlers/GatewayService.cs b/SHH.CameraService/GrpcImpls/Handlers/GatewayService.cs new file mode 100644 index 0000000..0a54238 --- /dev/null +++ b/SHH.CameraService/GrpcImpls/Handlers/GatewayService.cs @@ -0,0 +1,102 @@ +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using SHH.CameraSdk; +using SHH.Contracts.Grpc; // 引用 Proto 生成的命名空间 + +namespace SHH.CameraService +{ + /// + /// gRPC 指令接收后台服务 + /// 职责: + /// 1. 维护与 AiVideo 的 gRPC 长连接。 + /// 2. 完成节点逻辑注册。 + /// 3. 监听 Server Streaming 指令流并移交给 Dispatcher。 + /// + public class GatewayService : BackgroundService + { + private readonly ILogger _logger; + private readonly ServiceConfig _config; + private readonly CommandDispatcher _dispatcher; + + public GatewayService( + ILogger logger, + ServiceConfig config, + CommandDispatcher dispatcher) + { + _logger = logger; + _config = config; + _dispatcher = dispatcher; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // 预留系统启动缓冲时间,确保数据库和 SDK 已就绪 + _logger.LogInformation("[gRPC Bus] 指令接收服务启动,等待环境预热..."); + await Task.Delay(3000, stoppingToken); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + // 1. 地址适配:将 tcp 转换为 http,并将 127.0.0.1 修正为 localhost 解决 Unimplemented 异常 + var ep = _config.CommandEndpoints.First(); + string targetUrl = ep.Uri.Replace("tcp://", "http://").Replace("127.0.0.1", "localhost"); + + using var channel = GrpcChannel.ForAddress(targetUrl); + var client = new GatewayProvider.GatewayProviderClient(channel); + + // --- 第一步:发起节点逻辑注册 (Unary) --- + _logger.LogInformation("[gRPC Bus] 正在发起逻辑注册: {Url}", targetUrl); + var regResp = await client.RegisterInstanceAsync(new RegisterRequest + { + InstanceId = _config.AppId, + Version = "2.0.0-grpc", + ServerIp = "127.0.0.1", + StartTimeTicks = DateTime.Now.Ticks + }, cancellationToken: stoppingToken); + + if (regResp.Success) + { + _logger.LogInformation("[gRPC Bus] 注册成功。正在建立双向指令通道..."); + + // --- 第二步:开启 Server Streaming 指令流 --- + using var call = client.OpenCommandChannel(new CommandStreamRequest + { + InstanceId = _config.AppId + }, cancellationToken: stoppingToken); + + // --- 第三步:循环读取服务端推送的指令 --- + // 只要服务端流未断开,此处会一直阻塞等待新消息 + while (await call.ResponseStream.MoveNext(stoppingToken)) + { + var protoMsg = call.ResponseStream.Current; + + // 核心变更:不再直接处理业务,而是通过分发器进行路由 + // 使用 _ = 异步处理,避免某个 Handler 执行过慢导致指令流阻塞 + _ = _dispatcher.DispatchAsync(protoMsg); + } + } + } + catch (OperationCanceledException) + { + // 响应系统正常退出信号 + break; + } + catch (RpcException ex) + { + _logger.LogError("[gRPC Bus] RPC 异常 (Status: {Code}): {Msg}", ex.StatusCode, ex.Message); + // 链路异常,进入重连等待阶段 + await Task.Delay(5000, stoppingToken); + } + catch (Exception ex) + { + _logger.LogError("[gRPC Bus] 非预期链路异常: {Msg},5秒后尝试重连", ex.Message); + await Task.Delay(5000, stoppingToken); + } + } + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/ICommandHandler.cs b/SHH.CameraService/GrpcImpls/Handlers/ICommandHandler.cs similarity index 100% rename from SHH.CameraService/Core/CmdClients/ICommandHandler.cs rename to SHH.CameraService/GrpcImpls/Handlers/ICommandHandler.cs diff --git a/SHH.CameraService/Core/CmdClients/RemoveCameraHandler.cs b/SHH.CameraService/GrpcImpls/Handlers/RemoveCameraHandler.cs similarity index 100% rename from SHH.CameraService/Core/CmdClients/RemoveCameraHandler.cs rename to SHH.CameraService/GrpcImpls/Handlers/RemoveCameraHandler.cs diff --git a/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs b/SHH.CameraService/GrpcImpls/ImageFactory/ImageMonitorController.cs similarity index 97% rename from SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs rename to SHH.CameraService/GrpcImpls/ImageFactory/ImageMonitorController.cs index 4547b17..3558cd7 100644 --- a/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs +++ b/SHH.CameraService/GrpcImpls/ImageFactory/ImageMonitorController.cs @@ -6,7 +6,7 @@ using System.Diagnostics; namespace SHH.CameraService; -public class NetworkStreamingWorker : BackgroundService +public class ImageMonitorController : BackgroundService { // 注入所有注册的目标(云端、大屏等),实现动态分发 private readonly IEnumerable _targets; @@ -16,7 +16,7 @@ public class NetworkStreamingWorker : BackgroundService // 如果您确实需要 100,请注意带宽压力。此处我保留您要求的 100,但建议未来调优。 private readonly int[] _encodeParams = { (int)ImwriteFlags.JpegQuality, 100 }; - public NetworkStreamingWorker(IEnumerable targets) + public ImageMonitorController(IEnumerable targets) { _targets = targets; } diff --git a/SHH.CameraService/Core/NetSenders/PipelineConfigurator.cs b/SHH.CameraService/GrpcImpls/ImageFactory/PipelineConfigurator.cs similarity index 100% rename from SHH.CameraService/Core/NetSenders/PipelineConfigurator.cs rename to SHH.CameraService/GrpcImpls/ImageFactory/PipelineConfigurator.cs diff --git a/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs b/SHH.CameraService/GrpcImpls/ImageFactory/VideoDataChannel.cs similarity index 100% rename from SHH.CameraService/Core/NetSenders/VideoDataChannel.cs rename to SHH.CameraService/GrpcImpls/ImageFactory/VideoDataChannel.cs diff --git a/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs b/SHH.CameraService/GrpcImpls/ImageProcs/GrpcSenderWorker.cs similarity index 89% rename from SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs rename to SHH.CameraService/GrpcImpls/ImageProcs/GrpcSenderWorker.cs index 69289b1..f677483 100644 --- a/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs +++ b/SHH.CameraService/GrpcImpls/ImageProcs/GrpcSenderWorker.cs @@ -46,10 +46,17 @@ public class GrpcSenderWorker : BackgroundService // 3. 核心搬运循环:从内存队列 (Channel) 读取数据 await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken)) { + // 【畅通保障】检查数据时效性:丢弃超过 1 秒的积压帧 + var delay = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - payload.CaptureTimestamp; + if (delay > 1000) + { + continue; + } + // 将业务 DTO 转换为 gRPC 原生 Request var request = new VideoFrameRequest { - CameraId = payload.CameraId ?? "Unknown", + CameraId = payload.CameraId ?? "0", CaptureTimestamp = payload.CaptureTimestamp, OriginalWidth = payload.OriginalWidth, OriginalHeight = payload.OriginalHeight, @@ -66,6 +73,8 @@ public class GrpcSenderWorker : BackgroundService : ByteString.Empty }; + request.SubscriberIds.AddRange(payload.SubscriberIds); + // 处理诊断信息 map if (payload.Diagnostics != null) { diff --git a/SHH.CameraService/Core/Configs/StreamTarget.cs b/SHH.CameraService/GrpcImpls/ImageProcs/StreamTarget.cs similarity index 100% rename from SHH.CameraService/Core/Configs/StreamTarget.cs rename to SHH.CameraService/GrpcImpls/ImageProcs/StreamTarget.cs diff --git a/SHH.CameraService/Program.cs b/SHH.CameraService/Program.cs index 97e02bd..3817478 100644 --- a/SHH.CameraService/Program.cs +++ b/SHH.CameraService/Program.cs @@ -49,7 +49,7 @@ public class Program InstanceId = config.AppId, Version = "2.0.0-grpc", ServerIp = "127.0.0.1", - WebApiPort = config.BasePort, + WebapiPort = config.BasePort, StartTimeTicks = DateTime.Now.Ticks, ProcessId = Environment.ProcessId, Description = "Camera Service" @@ -79,9 +79,9 @@ public class Program builder.Services.AddHostedService(); // ★ 注册 gRPC 版本的状态监控工作者 (不讲道理,直接注册) - builder.Services.AddHostedService(); + builder.Services.AddHostedService(); builder.Services.AddHostedService(); - builder.Services.AddHostedService(); + builder.Services.AddHostedService(); // ============================================================= // 5. 视频流 Target 注册 (gRPC 模式) @@ -100,7 +100,7 @@ public class Program } } builder.Services.AddSingleton>(netTargets); - builder.Services.AddHostedService(); + builder.Services.AddHostedService(); // 为每个 Target 绑定一个 gRPC 流发送者 foreach (var target in netTargets) @@ -112,7 +112,7 @@ public class Program // 注册指令分发 (不再使用 NetMQ 的 CommandClientWorker) builder.Services.AddSingleton(); builder.Services.AddSingleton(); - builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.AddSingleton(); ConfigureWebServices(builder, config); diff --git a/SHH.Contracts.Grpc/Protos/gateway_service.proto b/SHH.Contracts.Grpc/Protos/gateway_service.proto index 54ebebd..f15524b 100644 --- a/SHH.Contracts.Grpc/Protos/gateway_service.proto +++ b/SHH.Contracts.Grpc/Protos/gateway_service.proto @@ -18,74 +18,68 @@ service GatewayProvider { rpc OpenCommandChannel (CommandStreamRequest) returns (stream CommandPayloadProto); } +// --- 通用指令推送通道 --- +message CommandPayloadProto { + string protocol = 1; // 协议类型,默认 "COMMAND" + string cmd_code = 2; // 指令代码,如 "Sync_Camera" + string target_id = 3; // 目标对象 ID + string json_params = 4; // 业务参数 JSON + string request_id = 5; // 请求追踪 ID + int64 timestamp_ticks = 6; // 发送时间戳 (Ticks) + bool require_ack = 7; // 是否需要回执 + int32 retry_count = 8; // 重试计数 + int64 expire_time = 9; // 过期时间戳 +} + // --- 1. 注册相关 --- message RegisterRequest { - // 进程 ID (用于区分同一台机器上的多个实例) - int32 process_id = 1; - // 调用进程句柄 - int32 invoke_process_id = 2; - // 实例唯一标识符 (例如 "Stream_1") - string instance_id = 3; - // 软件版本号 - string version = 4; - // 软件所在的局域网 IP - string server_ip = 5; - // WebAPI 监听端口 - int32 webapi_port = 6; - // Grpc通讯端口 - int32 grpc_port = 7; - // 启动时间 - int64 start_time_ticks = 9; - // 描述信息 - string description = 10; + int32 process_id = 1; // 进程 ID (用于区分同一台机器上的多个实例) + int32 invoke_process_id = 2; // 调用进程句柄 + string instance_id = 3; // 实例唯一标识符 (例如 "Stream_1") + string version = 4; // 软件版本号 + string server_ip = 5; // 软件所在的局域网 IP + int32 webapi_port = 6; // WebAPI 监听端口 + int32 grpc_port = 7; // Grpc通讯端口 + int64 start_time_ticks = 9; // 启动时间 + string description = 10; // 描述信息 } // --- 2. 状态上报相关 --- message StatusBatchRequest { - string protocol = 1; - int64 timestamp = 2; - repeated StatusEventItem items = 3; + int64 timestamp = 1; // 上报时间戳 + repeated StatusEventItem items = 2; // 状态事件列表 } +// 设备状态变更通知包 message StatusEventItem { - string camera_id = 1; - bool is_online = 2; - string reason = 3; - int64 timestamp = 4; + string camera_id = 1; // 摄像头ID + bool is_online = 2; // 是否在线 + string reason = 3; // 状态变更原因描述 } -// --- 3. 视频流相关 --- +// --- 3. 视频流传输协议 --- +// 职责:承载高频传输的实时视频帧、算法处理图及相关的 AI 诊断元数据 message VideoFrameRequest { - string camera_id = 1; - int64 capture_timestamp = 2; - int64 dispatch_timestamp = 3; - int32 original_width = 4; - int32 original_height = 5; - int32 target_width = 6; - int32 target_height = 7; - repeated string subscriber_ids = 8; - map diagnostics = 9; - bool has_original_image = 10; - bool has_target_image = 11; - bytes original_image_bytes = 12; - bytes target_image_bytes = 13; + + string camera_id = 1; // 摄像头唯一物理标识符 + int64 capture_timestamp = 2; // 图像在传感器端的原始采集时间戳 (Ticks/Unixms) + int64 dispatch_timestamp = 3; // 图像在分析节点端的分发/外传时间戳 (用于测量网络传输耗时) + int32 original_width = 4; // 原始采集图像的宽度 + int32 original_height = 5; // 原始采集图像的高度 + int32 target_width = 6; // 算法处理(如缩放或裁剪)后的目标图像宽度 + int32 target_height = 7; // 算法处理后的目标图像高度 + repeated string subscriber_ids = 8; // 订阅此帧的应用标识列表 (例如: "UI", "AI", "Record") + map diagnostics = 9; // 诊断与扩展元数据 键值对存储:例如 {"fps": "25", "bitrate": "4Mbps", "algo_latency": "12ms"} + bool has_original_image = 10; // 状态标志:包内是否包含原始图像二进制数据 + bool has_target_image = 11; // 状态标志:包内是否包含算法处理图(或带 OSD 渲染的图) + bytes original_image_bytes = 12; // 原始图像二进制数据 (通常为 JPG/NV12 格式) + bytes target_image_bytes = 13; // 算法处理图/标注图二进制数据 } // --- 4. 指令下发相关 (对应 C# CommandPayload) --- message CommandStreamRequest { - string instance_id = 1; // 告知服务端我是哪个节点 -} - -message CommandPayloadProto { - string protocol = 1; // 协议类型,默认 "COMMAND" - string cmd_code = 2; // 指令代码,如 "Sync_Camera" - string target_id = 3; // 目标对象 ID - string json_params = 4; // 业务参数 JSON - string request_id = 5; // 请求追踪 ID - int64 timestamp_ticks = 6; // 发送时间戳 (Ticks) - bool require_ack = 7; // 是否需要回执 - int32 retry_count = 8; // 重试计数 - int64 expire_time = 9; // 过期时间戳 + string instance_id = 1; // 告知服务端我是哪个节点 + int32 process_id = 2; // 告知服务端我是哪个进程 } message GenericResponse {