diff --git a/Ayay.Solution.sln b/Ayay.Solution.sln index 904f91e..d2bab2a 100644 --- a/Ayay.Solution.sln +++ b/Ayay.Solution.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 18 -VisualStudioVersion = 18.1.11312.151 d18.0 +VisualStudioVersion = 18.1.11312.151 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.CameraSdk", "SHH.CameraSdk\SHH.CameraSdk.csproj", "{21B70A94-43FC-4D17-AB83-9E4B5178397E}" EndProject @@ -9,12 +9,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.CameraService", "SHH.Ca EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.Contracts", "SHH.Contracts\SHH.Contracts.csproj", "{E7A63644-7A55-4267-99D2-7D0A7D54B43C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.NetMQ", "SHH.NetMQ\SHH.NetMQ.csproj", "{FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.CameraDashboard", "SHH.CameraDashboard\SHH.CameraDashboard.csproj", "{03C249D7-BCF1-404D-AD09-7AB39BA263AD}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.ProcessLaunchers", "SHH.ProcessLaunchers\SHH.ProcessLaunchers.csproj", "{E12F2D41-B7BB-4303-AD01-5DCD02D7FF3C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.Contracts.Grpc", "SHH.Contracts.Grpc\SHH.Contracts.Grpc.csproj", "{5CBDD688-1CD0-4E63-81C5-8E18750D891A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -33,10 +33,6 @@ Global {E7A63644-7A55-4267-99D2-7D0A7D54B43C}.Debug|Any CPU.Build.0 = Debug|Any CPU {E7A63644-7A55-4267-99D2-7D0A7D54B43C}.Release|Any CPU.ActiveCfg = Release|Any CPU {E7A63644-7A55-4267-99D2-7D0A7D54B43C}.Release|Any CPU.Build.0 = Release|Any CPU - {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Debug|Any CPU.Build.0 = Debug|Any CPU - {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Release|Any CPU.ActiveCfg = Release|Any CPU - {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Release|Any CPU.Build.0 = Release|Any CPU {03C249D7-BCF1-404D-AD09-7AB39BA263AD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {03C249D7-BCF1-404D-AD09-7AB39BA263AD}.Debug|Any CPU.Build.0 = Debug|Any CPU {03C249D7-BCF1-404D-AD09-7AB39BA263AD}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -45,6 +41,10 @@ Global {E12F2D41-B7BB-4303-AD01-5DCD02D7FF3C}.Debug|Any CPU.Build.0 = Debug|Any CPU {E12F2D41-B7BB-4303-AD01-5DCD02D7FF3C}.Release|Any CPU.ActiveCfg = Release|Any CPU {E12F2D41-B7BB-4303-AD01-5DCD02D7FF3C}.Release|Any CPU.Build.0 = Release|Any CPU + {5CBDD688-1CD0-4E63-81C5-8E18750D891A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5CBDD688-1CD0-4E63-81C5-8E18750D891A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5CBDD688-1CD0-4E63-81C5-8E18750D891A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5CBDD688-1CD0-4E63-81C5-8E18750D891A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/SHH.CameraDashboard/App.xaml.cs b/SHH.CameraDashboard/App.xaml.cs index d9b571a..a01ecf6 100644 --- a/SHH.CameraDashboard/App.xaml.cs +++ b/SHH.CameraDashboard/App.xaml.cs @@ -99,11 +99,12 @@ namespace SHH.CameraDashboard Id = "CameraService", // 内部标识 DisplayName = "视频接入服务", // UI显示名称 // 请确保路径正确,建议用相对路径 AppDomain.CurrentDomain.BaseDirectory + "SHH.CameraService.exe" - ExePath = @"E:\Codes2026\Ayay\SHH.CameraService\bin\Debug\net8.0\SHH.CameraService.exe", + ExePath = @"D:\Codes\Codes2026\Ayay\SHH.CameraService\bin\Debug\net8.0\SHH.CameraService.exe", Arguments = serviceArgs, // ★★★ 核心:注入参数 ★★★ StartupOrder = 1, // 优先级 RestartDelayMs = 2000, // 崩溃后2秒重启 - Visible = false // 不显示黑框 + Visible = true, // 不显示黑框 + EnableLogRedirect = false, }); // ========================================================= diff --git a/SHH.CameraDashboard/Invokes/CameraHub.cs b/SHH.CameraDashboard/Invokes/CameraHub.cs new file mode 100644 index 0000000..84c076b --- /dev/null +++ b/SHH.CameraDashboard/Invokes/CameraHub.cs @@ -0,0 +1,74 @@ +using Microsoft.AspNetCore.SignalR; +using SHH.Contracts; +using System.Collections.Concurrent; + +namespace SHH.CameraDashboard +{ + /// + /// [SignalR 中心] + /// 职责:负责物理链路的维护,将网络消息路由到业务层 (CommandServer) + /// + public class CameraHub : Hub + { + // 静态映射表:InstanceId -> ConnectionId (用于定向发指令) + private static readonly ConcurrentDictionary _instanceMapping = new(); + + // ================================================================= + // 1. 注册逻辑 (替代原 CommandServer.HandleRegistration) + // ================================================================= + + /// + /// 供 CameraService 调用 + /// + public async Task Register(RegisterPayload payload) + { + string connectionId = Context.ConnectionId; + + // 1. 记录映射关系 + _instanceMapping[payload.InstanceId] = connectionId; + + // 2. 将连接加入组 (按 InstanceId 组队,方便 CommandServer 发指令) + await Groups.AddToGroupAsync(connectionId, payload.InstanceId); + + // 3. 回调业务单例处理 UI 逻辑 + CommandServer.Instance.HandleClientRegister(payload); + + Console.WriteLine($"[Hub] 收到注册: {payload.InstanceId}, 连接ID: {connectionId}"); + } + + // ================================================================= + // 2. 指令回执 + // ================================================================= + + /// + /// 供 CameraService 执行完指令后调用 + /// + public void ReportCommandResult(CommandResult result) + { + CommandServer.Instance.HandleCommandResult(result); + } + + // ================================================================= + // 3. 视频流中转 + // ================================================================= + + /// + /// 供 CameraService 上传图片 + /// + public async Task UploadFrame(VideoPayload payload) + { + // 转发给订阅了该摄像头的组 (例如 Dashboard UI 订阅了该组) + await Clients.Group($"Watch_{payload.CameraId}").SendAsync("OnFrameRecv", payload); + } + + // ================================================================= + // 生命周期管理 + // ================================================================= + + public override async Task OnDisconnectedAsync(Exception? exception) + { + // 清理映射逻辑可以在这里扩展 + await base.OnDisconnectedAsync(exception); + } + } +} \ No newline at end of file diff --git a/SHH.CameraDashboard/SHH.CameraDashboard.csproj b/SHH.CameraDashboard/SHH.CameraDashboard.csproj index f40e757..3a82a9e 100644 --- a/SHH.CameraDashboard/SHH.CameraDashboard.csproj +++ b/SHH.CameraDashboard/SHH.CameraDashboard.csproj @@ -17,6 +17,7 @@ + @@ -33,7 +34,6 @@ - diff --git a/SHH.CameraSdk/Configs/ServiceConfig.cs b/SHH.CameraSdk/Configs/ServiceConfig.cs index c012f1f..5e20476 100644 --- a/SHH.CameraSdk/Configs/ServiceConfig.cs +++ b/SHH.CameraSdk/Configs/ServiceConfig.cs @@ -17,7 +17,8 @@ public class ServiceConfig // 2. 目标地址列表 (类型变了!) // ========================================== - // ★★★ 修改点:从 List 变为 List ★★★ + // ★★★ 修改点:从 List 变为 List< + // > ★★★ public List VideoEndpoints { get; private set; } = new List(); public List CommandEndpoints { get; private set; } = new List(); @@ -77,6 +78,9 @@ public class ServiceConfig // ========================================== private static void ParseSingleUriConfig(ServiceConfig config, string rawValue) { + // 【新增】清理可能存在的双引号,防止地址解析失败 + rawValue = rawValue.Replace("\"", ""); + var segments = rawValue.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries); foreach (var segment in segments) diff --git a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs index 123138c..25c034f 100644 --- a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs +++ b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs @@ -1,194 +1,194 @@ -using System.Text; -using MessagePack; -using Microsoft.Extensions.Hosting; -using NetMQ; -using NetMQ.Monitoring; // ★ 1. 必须引用 Monitoring 命名空间 -using NetMQ.Sockets; -using SHH.CameraSdk; -using SHH.Contracts; +//using System.Text; +//using MessagePack; +//using Microsoft.Extensions.Hosting; +//using NetMQ; +//using NetMQ.Monitoring; // ★ 1. 必须引用 Monitoring 命名空间 +//using NetMQ.Sockets; +//using SHH.CameraSdk; +//using SHH.Contracts; -namespace SHH.CameraService; +//namespace SHH.CameraService; -public class CommandClientWorker : BackgroundService -{ - private readonly ServiceConfig _config; - private readonly CommandDispatcher _dispatcher; - private readonly InterceptorPipeline _pipeline; +//public class CommandClientWorker : BackgroundService +//{ +// private readonly ServiceConfig _config; +// private readonly CommandDispatcher _dispatcher; +// private readonly InterceptorPipeline _pipeline; - // 管理多个 Socket - private readonly List _sockets = new(); +// // 管理多个 Socket +// private readonly List _sockets = new(); - // ★ 2. 新增:保存 Monitor 列表,防止被 GC 回收 - private readonly List _monitors = new(); +// // ★ 2. 新增:保存 Monitor 列表,防止被 GC 回收 +// private readonly List _monitors = new(); - private NetMQPoller? _poller; +// private NetMQPoller? _poller; - public CommandClientWorker( - ServiceConfig config, - CommandDispatcher dispatcher, - InterceptorPipeline pipeline) - { - _config = config; - _dispatcher = dispatcher; - _pipeline = pipeline; - } +// public CommandClientWorker( +// ServiceConfig config, +// CommandDispatcher dispatcher, +// InterceptorPipeline pipeline) +// { +// _config = config; +// _dispatcher = dispatcher; +// _pipeline = pipeline; +// } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - await Task.Yield(); +// protected override async Task ExecuteAsync(CancellationToken stoppingToken) +// { +// await Task.Yield(); - if (!_config.ShouldConnect || _config.CommandEndpoints.Count == 0) return; +// if (!_config.ShouldConnect || _config.CommandEndpoints.Count == 0) return; - _poller = new NetMQPoller(); +// _poller = new NetMQPoller(); - // ------------------------------------------------------------- - // 核心修改区:建立连接并挂载监控器 - // ------------------------------------------------------------- - foreach (var ep in _config.CommandEndpoints) - { - try - { - var socket = new DealerSocket(); - socket.Options.Identity = Encoding.UTF8.GetBytes(_config.AppId); +// // ------------------------------------------------------------- +// // 核心修改区:建立连接并挂载监控器 +// // ------------------------------------------------------------- +// foreach (var ep in _config.CommandEndpoints) +// { +// try +// { +// var socket = new DealerSocket(); +// socket.Options.Identity = Encoding.UTF8.GetBytes(_config.AppId); - var monitorUrl = $"inproc://monitor_{Guid.NewGuid():N}"; - var monitor = new NetMQMonitor(socket, monitorUrl, SocketEvents.Connected); +// var monitorUrl = $"inproc://monitor_{Guid.NewGuid():N}"; +// var monitor = new NetMQMonitor(socket, monitorUrl, SocketEvents.Connected); - monitor.Connected += async (s, args) => - { - Console.WriteLine($"[指令] 网络连接建立: {ep.Uri} -> 正在补发注册包..."); - await SendRegisterAsync(socket); - }; +// monitor.Connected += async (s, args) => +// { +// Console.WriteLine($"[指令] 网络连接建立: {ep.Uri} -> 正在补发注册包..."); +// await SendRegisterAsync(socket); +// }; - // ★★★ 修正点:使用 AttachToPoller 代替 Add ★★★ - // 错误写法: _poller.Add(monitor); - monitor.AttachToPoller(_poller); +// // ★★★ 修正点:使用 AttachToPoller 代替 Add ★★★ +// // 错误写法: _poller.Add(monitor); +// monitor.AttachToPoller(_poller); - // 依然需要保存引用,防止被 GC 回收 - _monitors.Add(monitor); +// // 依然需要保存引用,防止被 GC 回收 +// _monitors.Add(monitor); - socket.Connect(ep.Uri); - socket.ReceiveReady += OnSocketReceiveReady; +// socket.Connect(ep.Uri); +// socket.ReceiveReady += OnSocketReceiveReady; - _sockets.Add(socket); - _poller.Add(socket); +// _sockets.Add(socket); +// _poller.Add(socket); - Console.WriteLine($"[指令] 通道初始化完成: {ep.Uri} (带自动重连监控)"); - } - catch (Exception ex) - { - Console.WriteLine($"[指令] 连接初始化异常: {ex.Message}"); - } - } +// Console.WriteLine($"[指令] 通道初始化完成: {ep.Uri} (带自动重连监控)"); +// } +// catch (Exception ex) +// { +// Console.WriteLine($"[指令] 连接初始化异常: {ex.Message}"); +// } +// } - if (_sockets.Count == 0) return; +// if (_sockets.Count == 0) return; - // ================================================================= - // 6. 绑定 ACK 逻辑 (保持不变) - // ================================================================= - _dispatcher.OnResponseReady += async (result) => - { - try - { - byte[] resultBytes = MessagePackSerializer.Serialize(result); - var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.CommandResult, resultBytes); +// // ================================================================= +// // 6. 绑定 ACK 逻辑 (保持不变) +// // ================================================================= +// _dispatcher.OnResponseReady += async (result) => +// { +// try +// { +// byte[] resultBytes = MessagePackSerializer.Serialize(result); +// var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.CommandResult, resultBytes); - if (ctx != null) - { - foreach (var socket in _sockets) - { - socket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); - } - Console.WriteLine($"[指令] ACK 已广播 (ID: {result.RequestId})"); - } - } - catch (Exception ex) - { - Console.WriteLine($"[ACK] 发送失败: {ex.Message}"); - } - }; +// if (ctx != null) +// { +// foreach (var socket in _sockets) +// { +// socket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); +// } +// Console.WriteLine($"[指令] ACK 已广播 (ID: {result.RequestId})"); +// } +// } +// catch (Exception ex) +// { +// Console.WriteLine($"[ACK] 发送失败: {ex.Message}"); +// } +// }; - // ================================================================= - // 7. 启动 Poller - // ================================================================= - // 注意:我们不需要手动发第一次注册包了, - // 因为 Poller 启动后,底层 TCP 会建立连接,从而触发 monitor.Connected 事件, - // 事件里会自动发送注册包。这就是“自动档”的好处。 - _poller.RunAsync(); +// // ================================================================= +// // 7. 启动 Poller +// // ================================================================= +// // 注意:我们不需要手动发第一次注册包了, +// // 因为 Poller 启动后,底层 TCP 会建立连接,从而触发 monitor.Connected 事件, +// // 事件里会自动发送注册包。这就是“自动档”的好处。 +// _poller.RunAsync(); - // 阻塞直到取消 - while (!stoppingToken.IsCancellationRequested) - { - await Task.Delay(1000, stoppingToken); - } +// // 阻塞直到取消 +// while (!stoppingToken.IsCancellationRequested) +// { +// await Task.Delay(1000, stoppingToken); +// } - // 清理 - _poller.Stop(); - _poller.Dispose(); - foreach (var m in _monitors) m.Dispose(); // 释放监控器 - foreach (var s in _sockets) s.Dispose(); - } +// // 清理 +// _poller.Stop(); +// _poller.Dispose(); +// foreach (var m in _monitors) m.Dispose(); // 释放监控器 +// foreach (var s in _sockets) s.Dispose(); +// } - // ================================================================= - // ★ 8. 抽离出的注册包发送逻辑 (供 Monitor 调用) - // ================================================================= - private async Task SendRegisterAsync(DealerSocket targetSocket) - { - try - { - var registerPayload = new RegisterPayload - { - Protocol = ProtocolHeaders.ServerRegister, - InstanceId = _config.AppId, - ProcessId = Environment.ProcessId, - Version = "1.0.0", - ServerIp = "127.0.0.1", // 建议优化:获取本机真实IP - WebApiPort = _config.BasePort, - StartTime = DateTime.Now - }; +// // ================================================================= +// // ★ 8. 抽离出的注册包发送逻辑 (供 Monitor 调用) +// // ================================================================= +// private async Task SendRegisterAsync(DealerSocket targetSocket) +// { +// try +// { +// var registerPayload = new RegisterPayload +// { +// Protocol = ProtocolHeaders.ServerRegister, +// InstanceId = _config.AppId, +// ProcessId = Environment.ProcessId, +// Version = "1.0.0", +// ServerIp = "127.0.0.1", // 建议优化:获取本机真实IP +// WebApiPort = _config.BasePort, +// StartTime = DateTime.Now +// }; - byte[] regData = MessagePackSerializer.Serialize(registerPayload); +// byte[] regData = MessagePackSerializer.Serialize(registerPayload); - // 执行拦截器 - var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.ServerRegister, regData); +// // 执行拦截器 +// var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.ServerRegister, regData); - if (ctx != null) - { - // 直接向触发事件的那个 Socket 发送 - // DealerSocket 允许在连接未完全就绪时 Send,它会缓存直到网络通畅 - targetSocket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); - // Console.WriteLine($"[指令] 身份注册包已推入队列: {targetSocket.Options.Identity}"); - } - } - catch (Exception ex) - { - Console.WriteLine($"[指令] 注册包发送失败: {ex.Message}"); - } - } +// if (ctx != null) +// { +// // 直接向触发事件的那个 Socket 发送 +// // DealerSocket 允许在连接未完全就绪时 Send,它会缓存直到网络通畅 +// targetSocket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); +// // Console.WriteLine($"[指令] 身份注册包已推入队列: {targetSocket.Options.Identity}"); +// } +// } +// catch (Exception ex) +// { +// Console.WriteLine($"[指令] 注册包发送失败: {ex.Message}"); +// } +// } - private async void OnSocketReceiveReady(object? sender, NetMQSocketEventArgs e) - { - NetMQMessage incomingMsg = new NetMQMessage(); - if (e.Socket.TryReceiveMultipartMessage(ref incomingMsg)) - { - if (incomingMsg.FrameCount >= 2) - { - try - { - string rawProtocol = incomingMsg[0].ConvertToString(); - byte[] rawData = incomingMsg[1].ToByteArray(); +// private async void OnSocketReceiveReady(object? sender, NetMQSocketEventArgs e) +// { +// NetMQMessage incomingMsg = new NetMQMessage(); +// if (e.Socket.TryReceiveMultipartMessage(ref incomingMsg)) +// { +// if (incomingMsg.FrameCount >= 2) +// { +// try +// { +// string rawProtocol = incomingMsg[0].ConvertToString(); +// byte[] rawData = incomingMsg[1].ToByteArray(); - var ctx = await _pipeline.ExecuteReceiveAsync(rawProtocol, rawData); - if (ctx != null) - { - await _dispatcher.DispatchAsync(ctx.Protocol, ctx.Data); - } - } - catch (Exception ex) - { - Console.WriteLine($"[指令] 处理异常: {ex.Message}"); - } - } - } - } -} \ No newline at end of file +// var ctx = await _pipeline.ExecuteReceiveAsync(rawProtocol, rawData); +// if (ctx != null) +// { +// await _dispatcher.DispatchAsync(ctx.Protocol, ctx.Data); +// } +// } +// catch (Exception ex) +// { +// Console.WriteLine($"[指令] 处理异常: {ex.Message}"); +// } +// } +// } +// } +//} \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs b/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs index 76350f1..325190a 100644 --- a/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs +++ b/SHH.CameraService/Core/NetSenders/DeviceStateMonitorWorker.cs @@ -1,142 +1,169 @@ -using System.Collections.Concurrent; +using Grpc.Core; +using Grpc.Net.Client; using Microsoft.Extensions.Hosting; -using NetMQ; -using NetMQ.Sockets; -using MessagePack; +using Microsoft.Extensions.Logging; using SHH.CameraSdk; using SHH.Contracts; -using System.Text; +using SHH.Contracts.Grpc; +using System.Collections.Concurrent; -namespace SHH.CameraService +namespace SHH.CameraService; + +/// +/// 设备状态监控工作者 (gRPC 版) +/// 职责:监控相机状态并在状态变更或心跳周期内,通过 gRPC 批量上报至所有配置的端点 +/// +public class DeviceStateMonitorWorker : BackgroundService { - public class DeviceStateMonitorWorker : BackgroundService + private readonly CameraManager _manager; + private readonly ServiceConfig _config; + private readonly ILogger _logger; + + // 状态存储:CameraId -> 状态载荷 + private readonly ConcurrentDictionary _stateStore = new(); + + private volatile bool _isDirty = false; + private long _lastSendTick = 0; + + public DeviceStateMonitorWorker( + CameraManager manager, + ServiceConfig config, + ILogger logger) { - private readonly CameraManager _manager; - private readonly ServiceConfig _config; - private readonly InterceptorPipeline _pipeline; + _manager = manager; + _config = config; + _logger = logger; + } - // 修改点1: 改为 Socket 列表 - private readonly List _sockets = new(); - private readonly ConcurrentDictionary _stateStore = new(); - - private volatile bool _isDirty = false; - private long _lastSendTick = 0; - - public DeviceStateMonitorWorker( - CameraManager manager, - ServiceConfig config, - InterceptorPipeline pipeline) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // 1. 初始化本地状态缓存 + foreach (var dev in _manager.GetAllDevices()) { - _manager = manager; - _config = config; - _pipeline = pipeline; + UpdateLocalState(dev.Id, false, "Service Init"); } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + // 2. 订阅 SDK 状态变更事件 + _manager.OnDeviceStatusChanged += OnSdkStatusChanged; + + _logger.LogInformation("[StatusWorker] gRPC 状态上报已启动,配置节点数: {Count}", _config.CommandEndpoints.Count); + + // 3. 定时循环 (1秒1次检查) + var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); + try { - // 1. 初始化 - foreach (var dev in _manager.GetAllDevices()) + while (await timer.WaitForNextTickAsync(stoppingToken)) { - UpdateLocalState(dev.Id, false, "Init"); - } - - _manager.OnDeviceStatusChanged += OnSdkStatusChanged; - - // 修改点2: 遍历所有端点建立连接 - if (_config.CommandEndpoints.Count == 0) return; - - Console.WriteLine($"[StatusWorker] 启动状态上报,目标节点数: {_config.CommandEndpoints.Count}"); - - foreach (var ep in _config.CommandEndpoints) - { - try - { - var socket = new DealerSocket(); - // 状态通道也建议设置 Identity,方便服务端追踪 - socket.Options.Identity = Encoding.UTF8.GetBytes(_config.AppId + "_status"); - socket.Options.SendHighWatermark = 1000; - socket.Connect(ep.Uri); - _sockets.Add(socket); - } - catch (Exception ex) - { - Console.WriteLine($"[StatusWorker] 连接失败 {ep.Uri}: {ex.Message}"); - } - } - - // 定时循环 (1秒1次) - var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); - try - { - while (await timer.WaitForNextTickAsync(stoppingToken)) - { - await CheckAndBroadcastAsync(); - } - } - finally - { - _manager.OnDeviceStatusChanged -= OnSdkStatusChanged; - // 清理所有 socket - foreach (var s in _sockets) s.Dispose(); + await CheckAndBroadcastAsync(stoppingToken); } } - - private void OnSdkStatusChanged(long deviceId, bool isOnline, string reason) + catch (OperationCanceledException) { /* 正常退出 */ } + catch (Exception ex) { - UpdateLocalState(deviceId, isOnline, reason); - _isDirty = true; + _logger.LogError(ex, "[StatusWorker] 运行异常"); } - - private void UpdateLocalState(long deviceId, bool isOnline, string reason) + finally { - var evt = new StatusEventPayload + _manager.OnDeviceStatusChanged -= OnSdkStatusChanged; + } + } + + /// + /// SDK 状态变更回调 + /// + private void OnSdkStatusChanged(long deviceId, bool isOnline, string reason) + { + UpdateLocalState(deviceId, isOnline, reason); + _isDirty = true; + } + + private void UpdateLocalState(long deviceId, bool isOnline, string reason) + { + var evt = new StatusEventPayload + { + CameraId = deviceId.ToString(), + IsOnline = isOnline, + Reason = reason, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + _stateStore[deviceId.ToString()] = evt; + } + + /// + /// 执行广播逻辑 + /// + private async Task CheckAndBroadcastAsync(CancellationToken ct) + { + long now = Environment.TickCount64; + + // 策略: 有变更(Dirty) 或 超过5秒(强制心跳) + bool shouldSend = _isDirty || (now - _lastSendTick > 5000); + + if (shouldSend && _config.CommandEndpoints.Any()) + { + // 1. 构建 gRPC 请求包 + var request = new StatusBatchRequest { - CameraId = deviceId.ToString(), - IsOnline = isOnline, - Reason = reason, + Protocol = "GRPC", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; - _stateStore[deviceId.ToString()] = evt; - } - // 修改点3: 广播发送逻辑 - private async Task CheckAndBroadcastAsync() - { - long now = Environment.TickCount64; - // 策略: 有变更 或 超过5秒(心跳) - bool shouldSend = _isDirty || (now - _lastSendTick > 5000); + // 转换内存中的状态快照为 Protobuf 列表 + foreach (var item in _stateStore.Values) + { + request.Items.Add(new StatusEventItem + { + CameraId = item.CameraId, + IsOnline = item.IsOnline, + Reason = item.Reason, + Timestamp = item.Timestamp + }); + } - if (shouldSend && _sockets.Count > 0) + // 2. 遍历所有端点进行发送 + foreach (var endpoint in _config.CommandEndpoints) { try { - var snapshot = _stateStore.Values.ToList(); - var batch = new StatusBatchPayload + string grpcUrl = endpoint.Uri.Replace("tcp://", "http://").Trim(); + + // --- 增加以下诊断代码 --- + using var channel = GrpcChannel.ForAddress(grpcUrl); + var client = new GatewayProvider.GatewayProviderClient(channel); + + // 获取 gRPC 内部生成的服务全称 + // 这就是客户端尝试调用的真实路径:/包名.服务名/方法名 + var methodName = "ReportStatusBatch"; + var serviceName = client.GetType().DeclaringType?.Name ?? "Unknown"; + + _logger.LogInformation("[gRPC Debug] 准备调用端点: {Url}", grpcUrl); + _logger.LogInformation("[gRPC Debug] 客户端契约服务名: {Service}", serviceName); + + // 执行调用 + var response = await client.ReportStatusBatchAsync(request, + deadline: DateTime.UtcNow.AddSeconds(2), cancellationToken: ct); + + if (response.Success) { - Items = snapshot, - Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - }; - - byte[] data = MessagePackSerializer.Serialize(batch); - - // 拦截器处理 - var ctx = await _pipeline.ExecuteSendAsync("STATUS_BATCH", data); - if (ctx != null) - { - // ★★★ 核心修复:循环广播给所有 Socket ★★★ - foreach (var socket in _sockets) - { - // TrySend 避免阻塞,如果某个服务端卡死不影响其他端 - socket.SendMoreFrame(ctx.Protocol).TrySendFrame(ctx.Data); - } - + _logger.LogInformation("[gRPC Success] 上报成功"); _isDirty = false; - _lastSendTick = now; + _lastSendTick = Environment.TickCount64; + } + } + catch (RpcException ex) + { + // 这里是关键:打印 RpcException 的详细状态 + _logger.LogError("[gRPC Error] StatusCode: {Code}, Detail: {Detail}", ex.StatusCode, ex.Status.Detail); + + // 如果是 Unimplemented,通常意味着路径不对 + if (ex.StatusCode == StatusCode.Unimplemented) + { + _logger.LogError("[gRPC Fix] 请检查服务端是否注册了名为 'GatewayProvider' 的服务,且其 package 声明与客户端一致。"); } } catch (Exception ex) { - Console.WriteLine($"[StatusWorker] 发送失败: {ex.Message}"); + _logger.LogError("[gRPC Fatal] 非 RPC 异常: {Msg}", ex.Message); } } } diff --git a/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs b/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs index 709ad8d..0e76f8f 100644 --- a/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs +++ b/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs @@ -1,87 +1,87 @@ -using MessagePack; -using NetMQ; -using SHH.Contracts; +//using MessagePack; +//using NetMQ; +//using SHH.Contracts; -namespace SHH.CameraService -{ - /// - /// 负责将业务契约转换为 ZeroMQ 传输协议 - /// - public static class NetMQProtocolExtensions - { - private const string PROTOCOL_HEADER = "SHH_V1"; +//namespace SHH.CameraService +//{ +// /// +// /// 负责将业务契约转换为 ZeroMQ 传输协议 +// /// +// public static class NetMQProtocolExtensions +// { +// private const string PROTOCOL_HEADER = "SHH_V1"; - /// - /// 扩展方法:将 Payload 转为 NetMQMessage - /// 使用方法:var msg = payload.ToNetMqMessage(); - /// - public static NetMQMessage ToNetMqMessage(this VideoPayload payload) - { - var msg = new NetMQMessage(); +// /// +// /// 扩展方法:将 Payload 转为 NetMQMessage +// /// 使用方法:var msg = payload.ToNetMqMessage(); +// /// +// public static NetMQMessage ToNetMqMessage(this VideoPayload payload) +// { +// var msg = new NetMQMessage(); - // Frame 0: 协议魔数 - msg.Append(PROTOCOL_HEADER); +// // Frame 0: 协议魔数 +// msg.Append(PROTOCOL_HEADER); - ////// Frame 1: 元数据 JSON - ////msg.Append(payload.GetMetadataJson()); +// ////// Frame 1: 元数据 JSON +// ////msg.Append(payload.GetMetadataJson()); - // ★★★ 修复点:在序列化之前,手动更新 Payload 的标志位 ★★★ - payload.HasOriginalImage = (payload.OriginalImageBytes != null && payload.OriginalImageBytes.Length > 0); - payload.HasTargetImage = (payload.TargetImageBytes != null && payload.TargetImageBytes.Length > 0); +// // ★★★ 修复点:在序列化之前,手动更新 Payload 的标志位 ★★★ +// payload.HasOriginalImage = (payload.OriginalImageBytes != null && payload.OriginalImageBytes.Length > 0); +// payload.HasTargetImage = (payload.TargetImageBytes != null && payload.TargetImageBytes.Length > 0); - // Frame 1: Metadata (MessagePack) - byte[] metaBytes = MessagePackSerializer.Serialize(payload); - msg.Append(metaBytes); +// // Frame 1: Metadata (MessagePack) +// byte[] metaBytes = MessagePackSerializer.Serialize(payload); +// msg.Append(metaBytes); - // Frame 2: 原始图 (保持帧位对齐,无数据则发空帧) - if (payload.HasOriginalImage && payload.OriginalImageBytes != null) - msg.Append(payload.OriginalImageBytes); - else - msg.Append(Array.Empty()); +// // Frame 2: 原始图 (保持帧位对齐,无数据则发空帧) +// if (payload.HasOriginalImage && payload.OriginalImageBytes != null) +// msg.Append(payload.OriginalImageBytes); +// else +// msg.Append(Array.Empty()); - // Frame 3: 处理图 - if (payload.HasTargetImage && payload.TargetImageBytes != null) - msg.Append(payload.TargetImageBytes); - else - msg.Append(Array.Empty()); +// // Frame 3: 处理图 +// if (payload.HasTargetImage && payload.TargetImageBytes != null) +// msg.Append(payload.TargetImageBytes); +// else +// msg.Append(Array.Empty()); - return msg; - } +// return msg; +// } - /// - /// 扩展方法:从 NetMQMessage 还原 Payload - /// - public static VideoPayload ToVideoPayload(this NetMQMessage msg) - { - if (msg == null || msg.FrameCount < 2) return null; +// /// +// /// 扩展方法:从 NetMQMessage 还原 Payload +// /// +// public static VideoPayload ToVideoPayload(this NetMQMessage msg) +// { +// if (msg == null || msg.FrameCount < 2) return null; - // Frame 0 Check - if (msg[0].ConvertToString() != PROTOCOL_HEADER) return null; +// // Frame 0 Check +// if (msg[0].ConvertToString() != PROTOCOL_HEADER) return null; - //// Frame 1: Metadata - //string json = msg[1].ConvertToString(); - //var payload = VideoPayload.FromMetadataJson(json); +// //// Frame 1: Metadata +// //string json = msg[1].ConvertToString(); +// //var payload = VideoPayload.FromMetadataJson(json); - // [新代码] 直接从二进制还原 - // ToByteArray() 虽然会产生一次拷贝,但对于 Metadata 这种小数据影响微乎其微 - // 相比 JSON 解析 String 的开销,这已经非常快了 - var payload = MessagePackSerializer.Deserialize(msg[1].ToByteArray()); - if (payload == null) return null; +// // [新代码] 直接从二进制还原 +// // ToByteArray() 虽然会产生一次拷贝,但对于 Metadata 这种小数据影响微乎其微 +// // 相比 JSON 解析 String 的开销,这已经非常快了 +// var payload = MessagePackSerializer.Deserialize(msg[1].ToByteArray()); +// if (payload == null) return null; - // Frame 2: Raw Image - // 利用 BufferSize 避免不必要的内存拷贝,如果长度为0则跳过 - if (payload.HasOriginalImage && msg[2].BufferSize > 0) - { - payload.OriginalImageBytes = msg[2].ToByteArray(); - } +// // Frame 2: Raw Image +// // 利用 BufferSize 避免不必要的内存拷贝,如果长度为0则跳过 +// if (payload.HasOriginalImage && msg[2].BufferSize > 0) +// { +// payload.OriginalImageBytes = msg[2].ToByteArray(); +// } - // Frame 3: Processed Image - if (payload.HasTargetImage && msg[3].BufferSize > 0) - { - payload.TargetImageBytes = msg[3].ToByteArray(); - } +// // Frame 3: Processed Image +// if (payload.HasTargetImage && msg[3].BufferSize > 0) +// { +// payload.TargetImageBytes = msg[3].ToByteArray(); +// } - return payload; - } - } -} \ No newline at end of file +// return payload; +// } +// } +//} \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs b/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs index 3f29db3..69289b1 100644 --- a/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs +++ b/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs @@ -1,84 +1,93 @@ -using Microsoft.Extensions.Hosting; -using NetMQ; -using NetMQ.Sockets; +using Google.Protobuf; +using Grpc.Net.Client; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using SHH.Contracts.Grpc; namespace SHH.CameraService; /// -/// NetMQ 发送工作者 -/// 职责:从指定目标的 VideoDataChannel 读取 Payload,通过 ZeroMQ 发送出去 +/// gRPC 视频流发送工作者 +/// 职责:监听特定的 StreamTarget 队列,建立 gRPC 客户端流并持续推送图片 /// -public class NetMqSenderWorker : BackgroundService +public class GrpcSenderWorker : BackgroundService { private readonly StreamTarget _target; + private readonly ILogger _logger; + private readonly string _grpcUrl; - // 构造函数注入特定的目标对象 (由 Program.cs 的工厂方法提供) - public NetMqSenderWorker(StreamTarget target) + public GrpcSenderWorker(StreamTarget target, ILogger logger) { _target = target; + _logger = logger; + + // 自动适配地址:将配置的 tcp://localhost:9001 转换为 http://localhost:9001 + // 并且严格使用你验证成功的 localhost + _grpcUrl = _target.Config.Endpoint.Replace("tcp://", "http://"); } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - // 增加重启保护 + _logger.LogInformation($"[gRPC Worker] 启动。目标: {_target.Config.Name}, 地址: {_grpcUrl}"); + while (!stoppingToken.IsCancellationRequested) { try { - Console.WriteLine($"[NetMqSender] 连接至: {_target.Config.Endpoint}"); + // 1. 建立通道 + using var channel = GrpcChannel.ForAddress(_grpcUrl); + var client = new GatewayProvider.GatewayProviderClient(channel); - using var clientSocket = new PublisherSocket(); - clientSocket.Options.SendHighWatermark = 1000; - // 关键:增加 TCP 保活,防止防火墙静默断开长连接 - clientSocket.Options.TcpKeepalive = true; - clientSocket.Options.TcpKeepaliveIdle = TimeSpan.FromSeconds(5); + // 2. 开启客户端流 (UploadVideoStream 是在 proto 中定义的) + using var call = client.UploadVideoStream(cancellationToken: stoppingToken); - clientSocket.Connect(_target.Config.Endpoint); + _logger.LogInformation($"[gRPC Worker] 已开启视频推送流: {_target.Config.Name}"); - int frameCount = 0; - - // 使用更稳健的读取方式 + // 3. 核心搬运循环:从内存队列 (Channel) 读取数据 await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken)) { - try + // 将业务 DTO 转换为 gRPC 原生 Request + var request = new VideoFrameRequest { - // 1. 构造消息 (内部执行了 MessagePack 序列化) - var msg = payload.ToNetMqMessage(); + CameraId = payload.CameraId ?? "Unknown", + CaptureTimestamp = payload.CaptureTimestamp, + OriginalWidth = payload.OriginalWidth, + OriginalHeight = payload.OriginalHeight, + HasOriginalImage = payload.HasOriginalImage, + HasTargetImage = payload.HasTargetImage, - // 2. 发送 - bool sent = clientSocket.TrySendMultipartMessage(msg); + // ★ 核心:将 byte[] 转换为 gRPC 的 ByteString (高性能) + OriginalImageBytes = payload.OriginalImageBytes != null + ? ByteString.CopyFrom(payload.OriginalImageBytes) + : ByteString.Empty, - if (!sent) + TargetImageBytes = payload.TargetImageBytes != null + ? ByteString.CopyFrom(payload.TargetImageBytes) + : ByteString.Empty + }; + + // 处理诊断信息 map + if (payload.Diagnostics != null) + { + foreach (var kv in payload.Diagnostics) { - Console.WriteLine($"[NetMqSender] 发送缓冲区满,丢弃帧: {payload.CameraId}"); - // ★ 如果没有发送成功,建议显式清理消息帧,防止内存滞留 - msg.Clear(); - } - else - { - frameCount++; - if (frameCount % 100 == 0) - Console.WriteLine($"[NetMqSender] 已搬运 100 帧至缓冲区."); + request.Diagnostics.Add(kv.Key, kv.Value?.ToString() ?? ""); } } - catch (Exception ex) - { - Console.WriteLine($"[NetMqSender] 内部循环异常: {ex.Message}"); - } + + // 4. 发送至 AiVideo + await call.RequestStream.WriteAsync(request); } + + // 正常结束流 + await call.RequestStream.CompleteAsync(); } catch (OperationCanceledException) { break; } catch (Exception ex) { - // ★★★ 核心改进:捕获异常并等待重试 ★★★ - // 防止因为一次内存溢出或网络波动导致整个 BackgroundService 永久停止 - Console.WriteLine($"[NetMqSender] 发生致命异常,5秒后尝试重建连接: {ex.Message}"); + _logger.LogError($"[gRPC Worker] 推送链路异常,5秒后重连: {ex.Message}"); await Task.Delay(5000, stoppingToken); } - finally - { - // 确保每次循环退出(无论是异常还是正常)都清理环境 - NetMQConfig.Cleanup(false); - } } } } \ No newline at end of file diff --git a/SHH.CameraService/Grpc/CommandReceiverWorker.cs b/SHH.CameraService/Grpc/CommandReceiverWorker.cs new file mode 100644 index 0000000..576707c --- /dev/null +++ b/SHH.CameraService/Grpc/CommandReceiverWorker.cs @@ -0,0 +1,123 @@ +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using SHH.CameraSdk; +using SHH.Contracts; +using SHH.Contracts.Grpc; // 引用 Proto 生成的命名空间 +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace SHH.CameraService +{ + /// + /// gRPC 指令接收后台服务 + /// 负责:1. 逻辑注册 2. 维持指令长连接 3. 指令分发 + /// + public class GrpcCommandReceiverWorker : BackgroundService + { + private readonly ILogger _logger; + private readonly ServiceConfig _config; + private readonly IEnumerable _handlers; // 自动注入所有指令处理器 + + public GrpcCommandReceiverWorker( + ILogger logger, + ServiceConfig config, + IEnumerable handlers) + { + _logger = logger; + _config = config; + _handlers = handlers; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // 给 SDK 和数据库留出几秒钟的加载时间 + _logger.LogInformation("[gRPC Bus] 后台 Worker 准备就绪,3秒后发起连接..."); + await Task.Delay(3000, stoppingToken); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + // 1. 地址预处理 (将 127.0.0.1 强制转换为 localhost 解决 Unimplemented 问题) + var ep = _config.CommandEndpoints.First(); + string targetUrl = ep.Uri.Replace("tcp://", "http://").Replace("127.0.0.1", "localhost"); + + using var channel = GrpcChannel.ForAddress(targetUrl); + var client = new GatewayProvider.GatewayProviderClient(channel); + + // --- 第一步:发起逻辑注册 (Unary) --- + _logger.LogInformation("[gRPC Bus] 正在发起逻辑注册: {Url}", targetUrl); + var regResp = await client.RegisterInstanceAsync(new RegisterRequest + { + InstanceId = _config.AppId, + Version = "2.0.0-grpc", + ServerIp = "127.0.0.1", + StartTimeTicks = DateTime.Now.Ticks + }, cancellationToken: stoppingToken); + + if (regResp.Success) + { + _logger.LogInformation("[gRPC Bus] 逻辑注册成功。正在开启长连接指令通道..."); + + // --- 第二步:开启物理指令流 (Server Streaming) --- + using var call = client.OpenCommandChannel(new CommandStreamRequest + { + InstanceId = _config.AppId + }, cancellationToken: stoppingToken); + + // --- 第三步:阻塞式监听服务端推送 --- + // 只要服务端通过 responseStream.WriteAsync 发消息,这里就会命中 + while (await call.ResponseStream.MoveNext(stoppingToken)) + { + var protoMsg = call.ResponseStream.Current; + _logger.LogInformation("[gRPC Bus] 收到远程指令: {CmdCode}", protoMsg.CmdCode); + + // 异步分发,不阻塞接收循环 + _ = DispatchCommandAsync(protoMsg); + } + } + } + catch (OperationCanceledException) { break; } + catch (Exception ex) + { + _logger.LogError("[gRPC Bus] 链路异常,5秒后重试: {Msg}", ex.Message); + await Task.Delay(5000, stoppingToken); + } + } + } + + /// + /// 指令分发逻辑 + /// + private async Task DispatchCommandAsync(CommandPayloadProto msg) + { + try + { + // 1. 寻找匹配的处理器 (SyncCameraHandler / RemoveCameraHandler) + var handler = _handlers.FirstOrDefault(h => h.ActionName == msg.CmdCode); + + if (handler != null) + { + // 2. 将 Proto 的参数转为 JToken,保持与原有处理器兼容 + var jsonParams = JToken.Parse(msg.JsonParams); + await handler.ExecuteAsync(jsonParams); + _logger.LogInformation("[gRPC Bus] 指令 {CmdCode} 执行完成", msg.CmdCode); + } + else + { + _logger.LogWarning("[gRPC Bus] 未找到处理 {CmdCode} 的处理器", msg.CmdCode); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "[gRPC Bus] 指令执行失败: {CmdCode}", msg.CmdCode); + } + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Program.cs b/SHH.CameraService/Program.cs index 3ba6013..97e02bd 100644 --- a/SHH.CameraService/Program.cs +++ b/SHH.CameraService/Program.cs @@ -1,7 +1,10 @@ -using Microsoft.AspNetCore.Builder; +using Grpc.Net.Client; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Microsoft.OpenApi.Models; using SHH.CameraSdk; +using SHH.Contracts.Grpc; +using Microsoft.Extensions.Logging; namespace SHH.CameraService; @@ -9,131 +12,138 @@ public class Program { public static async Task Main(string[] args) { - // 1. 理由:缓冲时间 10 秒, 供附加调试工具使用 - for (var i = 1; i < 10; i++) - Thread.Sleep(1000); - - // ============================================================= - // 2. 基础环境与配置 (理由:明确身份 ID 和 监听端口) - // ============================================================= - var config = ServiceConfig.BuildFromArgs(args); - - // 硬件预热 (理由:确保底层驱动库在 Web 容器启动前完全就绪) + // 2. 硬件预热 (静态方法保留) HikNativeMethods.NET_DVR_Init(); HikSdkManager.ForceWarmUp(); + // 1. [核心环境] 必须在所有网络操作前开启 + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + + // 2. 模拟/解析配置 + if (args.Length == 0) + { + string serviceArgs = "--appid CameraApp_01 " + + "--uris localhost,9001,video,调试PC; " + + "--uris localhost,9001,command,调试PC; " + + "--mode 1 --ports 5000,100"; + args = serviceArgs.Split(' ', StringSplitOptions.RemoveEmptyEntries); + } + var config = ServiceConfig.BuildFromArgs(args); + + // ============================================================= + // 3. 【强行复刻成功逻辑】 在 Web 容器启动前直接执行注册 + // ============================================================= + if (config.CommandEndpoints.Any()) + { + try + { + // 将 tcp:// 转换为 http:// 以适配 gRPC + string targetUrl = config.CommandEndpoints.First().Uri.Replace("tcp://", "http://"); + + using var channel = GrpcChannel.ForAddress(targetUrl); + var client = new GatewayProvider.GatewayProviderClient(channel); + + Console.WriteLine($"[gRPC] 正在执行预注册 (环境: 纯净): {targetUrl}"); + var resp = await client.RegisterInstanceAsync(new RegisterRequest + { + InstanceId = config.AppId, + Version = "2.0.0-grpc", + ServerIp = "127.0.0.1", + WebApiPort = config.BasePort, + StartTimeTicks = DateTime.Now.Ticks, + ProcessId = Environment.ProcessId, + Description = "Camera Service" + }); + Console.WriteLine($"[gRPC] 预注册成功: {resp.Message}"); + } + catch (Exception ex) + { + Console.WriteLine($"[gRPC] 预注册尝试失败 (不影响启动): {ex.Message}"); + } + } + + // ============================================================= + // 4. 构建 Web 主机环境 + // ============================================================= var builder = WebApplication.CreateBuilder(args); - // ============================================================= - // 3. 依赖注入注册 (DI) - // ============================================================= + // 基础业务单例注册 builder.Services.AddSingleton(config); - - // 注册缩放与增亮业务(不注册则不实现) builder.Services.AddSingleton(); - builder.Services.AddSingleton(sp => new ImageScaleCluster(4, sp.GetRequiredService())); builder.Services.AddSingleton(sp => new ImageEnhanceCluster(4, sp.GetRequiredService())); builder.Services.AddHostedService(); // 接入 SDK 核心逻辑 builder.Services.AddCameraSdk(config.NumericId); - - // 注册后台引擎 (理由:托管长周期的硬件状态监控) builder.Services.AddHostedService(); + + // ★ 注册 gRPC 版本的状态监控工作者 (不讲道理,直接注册) builder.Services.AddHostedService(); - - // 配置 Web 相关的服务 - ConfigureWebServices(builder, config); - - // 配置进程守护 builder.Services.AddHostedService(); + builder.Services.AddHostedService(); // ============================================================= - // 4. 接受启动传参, 并支持将视频进行网络广播 + // 5. 视频流 Target 注册 (gRPC 模式) // ============================================================= - - // 1. 读取配置创建 targets (可以是 1 个,也可以是 10 个) var netTargets = new List(); if (config.VideoEndpoints != null) { - foreach(var cfgVideo in config.VideoEndpoints) + foreach (var cfgVideo in config.VideoEndpoints) { netTargets.Add(new StreamTarget(new PushTargetConfig { - Name = cfgVideo.Description, Endpoint = cfgVideo.Uri, QueueCapacity = 10, + Name = cfgVideo.Description, + Endpoint = cfgVideo.Uri, + QueueCapacity = 10, })); } } - // 2. 注册 Targets (供采集者用) builder.Services.AddSingleton>(netTargets); - - // 3. 注册采集者 (它会注入上面的 targets,进行编码和分发) builder.Services.AddHostedService(); - // 5. 为每个 Target 注册一个独立的发送者 + // 为每个 Target 绑定一个 gRPC 流发送者 foreach (var target in netTargets) { - builder.Services.AddHostedService(sp => new NetMqSenderWorker(target)); + builder.Services.AddHostedService(sp => + new GrpcSenderWorker(target, sp.GetRequiredService>())); } - // ============================================================= - // 5. 命令管道配置 - // ============================================================= - - // 2. 注册管道管理器 + // 注册指令分发 (不再使用 NetMQ 的 CommandClientWorker) builder.Services.AddSingleton(); - - // 负责连接 Dashboard,注册身份,接收重启/控制指令 - builder.Services.AddHostedService(); - - // 1. 注册分发器 builder.Services.AddSingleton(); - - // 2. 注册具体的指令处理器 (每写一个新的 Handler,就在这里注册一下,或者用反射批量注册) builder.Services.AddSingleton(); builder.Services.AddSingleton(); + ConfigureWebServices(builder, config); + // ============================================================= - // 6. 构建与管道配置 + // 6. 启动服务 // ============================================================= var app = builder.Build(); - // 核心修复:同步点火逻辑 (理由:在 Web 开启前完成设备池的初步构建) + // 激活 SDK 管理器并启动业务点火 await StartBusinessLogic(app); app.UseSwagger(); - app.UseSwaggerUI(c => - { - c.SwaggerEndpoint("/swagger/v1/swagger.json", $"SHH Gateway #{config.AppId}"); - }); - - app.MapGet("/", () => $"SHH Gateway {config.AppId} is running."); + app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", $"SHH Gateway #{config.AppId}")); + app.MapGet("/", () => $"SHH Gateway {config.AppId} is running (gRPC Mode)."); app.UseCors("AllowAll"); - - // 理由:正式映射控制器路由 app.MapControllers(); - // ============================================================= - // 5. 正式启动 - // ============================================================= + Console.WriteLine($"[System] 正在启动 Web 服务,监听端口: {config.BasePort}"); await app.RunAsync($"http://0.0.0.0:{config.BasePort}"); } /// - /// 对齐业务启动:激活单例并启动相机管理器 + /// 激活单例并启动相机管理器 /// static async Task StartBusinessLogic(WebApplication app) { var manager = app.Services.GetRequiredService(); - - // 激活哨兵逻辑 (理由:显式 Get 触发单例构造,否则不工作) _ = app.Services.GetRequiredService(); - - // 启动相机任务加载 await manager.StartAsync(); - Console.WriteLine("[System] 核心业务逻辑已激活。"); } @@ -147,14 +157,12 @@ public class Program options.AddPolicy("AllowAll", p => p.AllowAnyOrigin().AllowAnyHeader().AllowAnyMethod()); }); - // ★★★★★ 补全点:跨项目控制器加载 ★★★★★ - // 理由:Controller 定义在 SDK 项目中,必须通过 AddApplicationPart 显式挂载 builder.Services.AddControllers(options => { options.Filters.Add(); }) - .AddApplicationPart(typeof(CamerasController).Assembly) // 必备:加载相机控制接口 - .AddApplicationPart(typeof(MonitorController).Assembly); // 必备:加载监控接口 + .AddApplicationPart(typeof(CamerasController).Assembly) + .AddApplicationPart(typeof(MonitorController).Assembly); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(c => diff --git a/SHH.CameraService/SHH.CameraService.csproj b/SHH.CameraService/SHH.CameraService.csproj index 1eda48a..22199f8 100644 --- a/SHH.CameraService/SHH.CameraService.csproj +++ b/SHH.CameraService/SHH.CameraService.csproj @@ -14,13 +14,17 @@ - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + - diff --git a/SHH.Contracts.Grpc/Protos/gateway_service.proto b/SHH.Contracts.Grpc/Protos/gateway_service.proto new file mode 100644 index 0000000..54ebebd --- /dev/null +++ b/SHH.Contracts.Grpc/Protos/gateway_service.proto @@ -0,0 +1,94 @@ +syntax = "proto3"; + +// 自动生成代码时的命名空间 +option csharp_namespace = "SHH.Contracts.Grpc"; + +service GatewayProvider { + // 1. 身份注册 (CameraService -> AiVideo) + rpc RegisterInstance (RegisterRequest) returns (GenericResponse); + + // 2. 状态批量上报 (CameraService -> AiVideo) + rpc ReportStatusBatch (StatusBatchRequest) returns (GenericResponse); + + // 3. 视频流传输 (双向或客户端流) + rpc UploadVideoStream (stream VideoFrameRequest) returns (GenericResponse); + + // ★ 4. 指令推送通道 (Server Streaming) + // 客户端启动后调用此方法并保持连接,服务端通过此流下发 Sync_Camera 等指令 + rpc OpenCommandChannel (CommandStreamRequest) returns (stream CommandPayloadProto); +} + +// --- 1. 注册相关 --- +message RegisterRequest { + // 进程 ID (用于区分同一台机器上的多个实例) + int32 process_id = 1; + // 调用进程句柄 + int32 invoke_process_id = 2; + // 实例唯一标识符 (例如 "Stream_1") + string instance_id = 3; + // 软件版本号 + string version = 4; + // 软件所在的局域网 IP + string server_ip = 5; + // WebAPI 监听端口 + int32 webapi_port = 6; + // Grpc通讯端口 + int32 grpc_port = 7; + // 启动时间 + int64 start_time_ticks = 9; + // 描述信息 + string description = 10; +} + +// --- 2. 状态上报相关 --- +message StatusBatchRequest { + string protocol = 1; + int64 timestamp = 2; + repeated StatusEventItem items = 3; +} + +message StatusEventItem { + string camera_id = 1; + bool is_online = 2; + string reason = 3; + int64 timestamp = 4; +} + +// --- 3. 视频流相关 --- +message VideoFrameRequest { + string camera_id = 1; + int64 capture_timestamp = 2; + int64 dispatch_timestamp = 3; + int32 original_width = 4; + int32 original_height = 5; + int32 target_width = 6; + int32 target_height = 7; + repeated string subscriber_ids = 8; + map diagnostics = 9; + bool has_original_image = 10; + bool has_target_image = 11; + bytes original_image_bytes = 12; + bytes target_image_bytes = 13; +} + +// --- 4. 指令下发相关 (对应 C# CommandPayload) --- +message CommandStreamRequest { + string instance_id = 1; // 告知服务端我是哪个节点 +} + +message CommandPayloadProto { + string protocol = 1; // 协议类型,默认 "COMMAND" + string cmd_code = 2; // 指令代码,如 "Sync_Camera" + string target_id = 3; // 目标对象 ID + string json_params = 4; // 业务参数 JSON + string request_id = 5; // 请求追踪 ID + int64 timestamp_ticks = 6; // 发送时间戳 (Ticks) + bool require_ack = 7; // 是否需要回执 + int32 retry_count = 8; // 重试计数 + int64 expire_time = 9; // 过期时间戳 +} + +message GenericResponse { + bool success = 1; + string message = 2; +} \ No newline at end of file diff --git a/SHH.Contracts.Grpc/SHH.Contracts.Grpc.csproj b/SHH.Contracts.Grpc/SHH.Contracts.Grpc.csproj new file mode 100644 index 0000000..503e95e --- /dev/null +++ b/SHH.Contracts.Grpc/SHH.Contracts.Grpc.csproj @@ -0,0 +1,28 @@ + + + + Library + net8.0 + enable + enable + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + +