Files
Ayay/SHH.CameraService/CommandBusService.cs

346 lines
14 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Configuration;
using NetMQ;
using NetMQ.Sockets;
using Newtonsoft.Json;
using SHH.CameraSdk;
using SHH.Contracts;
namespace SHH.CameraService
{
/// <summary>
/// 双模指令总线服务 (Enterprise V2)
/// <para>核心职责:建立 TCP 指令通道,接收客户端指令并分发给 CameraManager</para>
/// <para>增强特性:</para>
/// <para>1. 支持双模:被动监听 (Bind) 与 主动投递 (Connect)</para>
/// <para>2. 幂等性控制:利用 MemoryCache 防止客户端重试导致的重复执行</para>
/// <para>3. 顺序一致性:利用时间戳防止指令乱序</para>
/// </summary>
public class CommandBusService : BackgroundService
{
#region --- 1. ---
private readonly CameraManager _cameraManager;
private readonly IConfiguration _config;
private readonly IMemoryCache _cache; // 核心:用于请求去重
private readonly int _processId;
// 运行状态标志
private volatile bool _isRunning = false;
// 两种模式的 Socket (互斥存在)
private ResponseSocket? _repSocket; // 模式A: 被动监听 (Server-Listening)
private DealerSocket? _dealerSocket; // 模式B: 主动投递 (Server-Dialing)
// 顺序一致性锁:记录每个设备最后处理的指令时间戳
// Key: TargetId (设备ID), Value: Timestamp (最后执行时间)
private readonly Dictionary<string, DateTime> _deviceLastCmdTime = new();
#endregion
#region --- 2. ---
/// <summary>
/// 构造函数 (注意:必须在 Program.cs 注册 AddMemoryCache)
/// </summary>
public CommandBusService(CameraManager manager, IConfiguration config, IMemoryCache cache)
{
_cameraManager = manager;
_config = config;
_cache = cache;
// 获取当前进程 ID (默认为 1)
_processId = _config.GetValue<int>("ProcessId", 1);
}
#endregion
#region --- 3. ---
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// 在后台线程启动,避免阻塞 Web 主线程
return Task.Run(() =>
{
_isRunning = true;
// 1. 读取网络策略
// 优先读取配置中的主动目标,如果没有则回退到被动监听
string? activeTargetIp = _config["Network:ActiveTargets:0:Ip"];
bool isActiveMode = !string.IsNullOrEmpty(activeTargetIp);
try
{
if (isActiveMode)
{
// === 模式 B: 主动投递 (Server Connects Client) ===
// 场景:服务器在内网,主动连接公网/固定IP的客户端
int cmdPort = _config.GetValue<int>("Network:ActiveTargets:0:CmdPort", 7000);
string addr = $"tcp://{activeTargetIp}:{cmdPort}";
RunActiveMode(addr, stoppingToken);
}
else
{
// === 模式 A: 被动监听 (Server Binds Port) ===
// 场景服务器有固定IP等待客户端连接
int basePort = _config.GetValue<int>("Network:Passive:CmdPortBase", 7000);
int listenPort = basePort + (_processId - 1);
string addr = $"tcp://*:{listenPort}";
RunPassiveMode(addr, stoppingToken);
}
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus] 致命错误停止: {ex.Message}");
}
finally
{
_isRunning = false;
CleanupSockets();
}
}, stoppingToken);
}
private void CleanupSockets()
{
try
{
_repSocket?.Dispose();
_dealerSocket?.Dispose();
}
catch { /* 忽略销毁时的异常 */ }
}
#endregion
#region --- 4. (Passive) ---
private void RunPassiveMode(string address, CancellationToken token)
{
using (_repSocket = new ResponseSocket())
{
_repSocket.Bind(address);
Console.WriteLine($"[CmdBus] [被动模式] 指令监听已启动: {address}");
while (!token.IsCancellationRequested)
{
try
{
// 1. 阻塞等待请求 (超时1秒以便响应 Cancel 信号)
if (!_repSocket.TryReceiveFrameString(TimeSpan.FromSeconds(1), out string reqJson))
continue;
// 2. 处理业务 (带 去重 + ID回填 逻辑)
CommandResult result = this.ProcessRequest(reqJson);
// 3. 发送回执
// 注意REP 模式必须发送应答,即使 result 为 null (Fire-and-Forget) 也建议发一个空 ACK 防止 Socket 状态错乱
// 但为了协议统一,建议 Passive 模式下总是返回结果
string respJson = result != null ? JsonConvert.SerializeObject(result) : "{}";
_repSocket.SendFrame(respJson);
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus-Passive] 异常: {ex.Message}");
}
}
}
}
#endregion
#region --- 5. (Active) ---
private void RunActiveMode(string address, CancellationToken token)
{
// 外层循环:断线重连机制
while (!token.IsCancellationRequested)
{
try
{
using (_dealerSocket = new DealerSocket())
{
Console.WriteLine($"[CmdBus] [主动模式] 正在连接指令中心: {address}");
_dealerSocket.Connect(address);
// ★★★ 关键步骤:连接成功后,立即发送【身份注册包】 ★★★
// 客户端收到这个包后,才能在界面上显示"设备在线"
SendRegistration(_dealerSocket);
// 内层循环:消息收发
while (!token.IsCancellationRequested)
{
// 1. 接收指令
// DealerSocket 是异步全双工的,这里即使没收到消息也不会阻塞发送
if (!_dealerSocket.TryReceiveFrameString(TimeSpan.FromSeconds(1), out string reqJson))
{
// 空闲周期,可在此处添加心跳发送逻辑 (Ping)
continue;
}
// 2. 处理业务 (带 去重 + ID回填 逻辑)
CommandResult result = this.ProcessRequest(reqJson);
// 3. 发送结果 (QoS控制)
// 如果结果为 null说明指令是 Fire-and-Forget (无需回执),则不发送网络包节省带宽
if (result != null)
{
_dealerSocket.SendFrame(JsonConvert.SerializeObject(result));
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus-Active] 连接中断或异常: {ex.Message}");
// 避免死循环狂刷 CPU等待 3 秒再重连
Thread.Sleep(3000);
}
}
}
/// <summary>
/// 发送身份注册包 (Active 模式专用)
/// </summary>
private void SendRegistration(DealerSocket socket)
{
try
{
// 计算实际端口信息
int portOffset = _processId - 1;
var regInfo = new ServerRegistrationDto
{
ProcessId = _processId,
InstanceId = $"Gateway_{_processId}",
ServerIp = GetLocalIpAddress(),
WebApiPort = 5000 + portOffset,
VideoPort = 5555 + portOffset,
CmdPort = 7000 + portOffset,
StartTime = DateTime.Now,
Description = "Active Mode Connection (V2)"
};
// 封装信封 (系统级指令)
var payload = new CommandPayload
{
CmdCode = "SERVER_REGISTER",
TargetId = "SYSTEM",
JsonParams = JsonConvert.SerializeObject(regInfo),
RequestId = Guid.NewGuid().ToString("N"),
RequireAck = false // 注册包通常不需要回执,只要连上就行
};
socket.SendFrame(JsonConvert.SerializeObject(payload));
Console.WriteLine($"[CmdBus] 身份注册包已发送 -> {regInfo.ServerIp}:{regInfo.WebApiPort}");
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus] 注册包发送失败: {ex.Message}");
}
}
private string GetLocalIpAddress()
{
try
{
var host = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
foreach (var ip in host.AddressList)
{
if (ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork)
return ip.ToString();
}
}
catch { }
return "127.0.0.1";
}
#endregion
#region --- 6. ( V2 ) ---
/// <summary>
/// 统一处理请求协议:去重 -> 排序 -> 执行 -> 回填 ID
/// </summary>
private CommandResult ProcessRequest(string json)
{
if (string.IsNullOrWhiteSpace(json)) return CommandResult.Fail("Empty Request");
CommandPayload? payload;
try { payload = JsonConvert.DeserializeObject<CommandPayload>(json); }
catch { return CommandResult.Fail("Invalid JSON Protocol"); }
if (payload == null) return CommandResult.Fail("Null Payload");
// =========================================================
// A. 【幂等性检查】(Idempotency Check)
// =========================================================
// 查缓存:如果这个 RequestId 10秒内处理过直接返回上次的结果
// 这样即使客户端重试发了 10 次,业务逻辑也只跑 1 次
if (_cache.TryGetValue(payload.RequestId, out CommandResult cachedResult))
{
Console.WriteLine($"[Dedup] 拦截重复请求: {payload.RequestId} (Retry: {payload.RetryCount})");
return cachedResult;
}
// =========================================================
// B. 【顺序一致性检查】(Order Guarantee)
// =========================================================
// 防止乱序:比如先发的“停止”因为网络卡顿,比后发的“开始”晚到
if (payload.TargetId != "SYSTEM")
{
lock (_deviceLastCmdTime)
{
if (_deviceLastCmdTime.TryGetValue(payload.TargetId, out DateTime lastTime))
{
if (payload.Timestamp < lastTime)
{
Console.WriteLine($"[Order] 丢弃乱序指令: {payload.CmdCode}");
return CommandResult.Fail("Order Violation: Stale Command Dropped");
}
}
_deviceLastCmdTime[payload.TargetId] = payload.Timestamp;
}
}
// =========================================================
// C. 【业务执行】
// =========================================================
CommandResult result;
try
{
// 调用纯逻辑层
result = CommandBusProcessor.ProcessBusinessLogic(_cameraManager, payload);
}
catch (Exception ex)
{
result = CommandResult.Fail($"Internal Logic Error: {ex.Message}");
}
// =========================================================
// D. 【闭环回填】
// =========================================================
// 必须把身份证号贴回去,不然客户端不知道这是谁的回执
result.RequestId = payload.RequestId;
// =========================================================
// E. 【存入缓存】
// =========================================================
// 缓存 10 秒,覆盖客户端的重试窗口
_cache.Set(payload.RequestId, result, TimeSpan.FromSeconds(10));
// =========================================================
// F. 【QoS 过滤】
// =========================================================
// 如果客户端说不需要回信,返回 null
if (!payload.RequireAck)
{
return null;
}
return result;
}
#endregion
}
}