137 lines
4.8 KiB
C#
137 lines
4.8 KiB
C#
|
|
using SHH.Contracts;
|
|||
|
|
using SHH.Contracts.Grpc;
|
|||
|
|
using System.Diagnostics;
|
|||
|
|
|
|||
|
|
namespace SHH.MjpegPlayer
|
|||
|
|
{
|
|||
|
|
/// <summary>
|
|||
|
|
/// 消息总线中心 (纯 gRpc 架构)
|
|||
|
|
/// 职责:解耦 gRpc 接收端与业务处理层,提供基于主题(Topic)的事件发布与统一的指令下发路由。
|
|||
|
|
/// </summary>
|
|||
|
|
public class MessageBus : IDisposable
|
|||
|
|
{
|
|||
|
|
#region 单例模式
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 消息总线全局唯一实例
|
|||
|
|
/// </summary>
|
|||
|
|
public static MessageBus Instance { get; } = new MessageBus();
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 私有构造函数
|
|||
|
|
/// </summary>
|
|||
|
|
private MessageBus() { }
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
|
|||
|
|
#region 业务事件订阅主题 (Topics)
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 1. 注册主题:当远程分析节点成功建立逻辑连接时触发。
|
|||
|
|
/// 订阅者通常为 DeviceConfigHandler,用于启动初始化配置同步。
|
|||
|
|
/// </summary>
|
|||
|
|
public event Action<RegisterPayload>? OnServerRegistered;
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 2. 状态主题:当收到远程节点批量上报的设备在线/离线状态时触发。
|
|||
|
|
/// 订阅者通常为 DeviceStatusHandler,用于更新 UI 状态。
|
|||
|
|
/// </summary>
|
|||
|
|
public event Action<List<StatusEventPayload>>? OnDeviceStatusReport;
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
|
|||
|
|
#region 事件发布接口 (供 GatewayService 接收端调用)
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 发布节点注册事件:将 gRpc 接收到的原始注册请求推送到业务层
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="p">注册载荷信息</param>
|
|||
|
|
public void RaiseServerRegistered(RegisterPayload p)
|
|||
|
|
{
|
|||
|
|
if (p == null) return;
|
|||
|
|
|
|||
|
|
// 调试日志:跟踪节点上线流程
|
|||
|
|
Debug.WriteLine($"[Bus] 发布注册事件: 节点ID = {p.InstanceId}");
|
|||
|
|
|
|||
|
|
// 执行所有已订阅该主题的业务逻辑
|
|||
|
|
OnServerRegistered?.Invoke(p);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 发布状态报告事件:将 gRpc 接收到的设备状态批量推送到业务层
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="items">设备状态变更列表</param>
|
|||
|
|
public void RaiseDeviceStatusReport(List<StatusEventPayload> items)
|
|||
|
|
{
|
|||
|
|
if (items == null || items.Count == 0) return;
|
|||
|
|
|
|||
|
|
// 执行所有已订阅状态同步的业务逻辑
|
|||
|
|
OnDeviceStatusReport?.Invoke(items);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
|
|||
|
|
#region 指令下发接口 (供各 Handler 业务层调用)
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 统一指令下发路由:自动定位目标节点的物理 gRpc 流并推送指令载荷
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="instanceId">目标分析节点的唯一识别码</param>
|
|||
|
|
/// <param name="payload">要发送的业务指令负载</param>
|
|||
|
|
/// <returns>异步任务</returns>
|
|||
|
|
public async Task SendInternalAsync(string instanceId, CommandPayload payload)
|
|||
|
|
{
|
|||
|
|
// 1. 获取由 GrpcSessionManager 维护的物理长连接流
|
|||
|
|
var stream = GrpcSessionManager.Instance.GetSession(instanceId);
|
|||
|
|
|
|||
|
|
// 2. 健壮性检查:若连接不存在则终止下发
|
|||
|
|
if (stream == null)
|
|||
|
|
{
|
|||
|
|
Debug.WriteLine($"[Bus Warning] 指令下发终止:节点 {instanceId} 尚未建立物理连接。");
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 3. 契约转换:将业务层 CommandPayload 转换为 gRpc 生成的 Protobuf 契约对象
|
|||
|
|
var protoMsg = new CommandPayloadProto
|
|||
|
|
{
|
|||
|
|
Protocol = payload.Protocol,
|
|||
|
|
CmdCode = payload.CmdCode,
|
|||
|
|
JsonParams = payload.JsonParams,
|
|||
|
|
RequestId = payload.RequestId,
|
|||
|
|
TimestampTicks = payload.Timestamp.Ticks
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 4. 执行异步推送
|
|||
|
|
await stream.WriteAsync(protoMsg);
|
|||
|
|
|
|||
|
|
Debug.WriteLine($"[Bus] 指令推送成功 -> 目标: {instanceId}, 指令码: {payload.CmdCode}");
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
// 5. 异常处理:若推送失败,通常意味着网络链路已断开
|
|||
|
|
Debug.WriteLine($"[Bus Error] 推送异常: {ex.Message},正在执行物理连接清理...");
|
|||
|
|
|
|||
|
|
// 立即移除失效会话,防止后续指令继续掉入“黑洞”
|
|||
|
|
GrpcSessionManager.Instance.RemoveSession(instanceId);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
|
|||
|
|
#region 资源释放
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 释放总线资源
|
|||
|
|
/// </summary>
|
|||
|
|
public void Dispose()
|
|||
|
|
{
|
|||
|
|
// 清理所有事件订阅,防止内存泄漏
|
|||
|
|
OnServerRegistered = null;
|
|||
|
|
OnDeviceStatusReport = null;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
}
|
|||
|
|
}
|