Files

137 lines
4.8 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using SHH.Contracts;
using SHH.Contracts.Grpc;
using System.Diagnostics;
namespace SHH.MjpegPlayer
{
/// <summary>
/// 消息总线中心 (纯 gRpc 架构)
/// 职责:解耦 gRpc 接收端与业务处理层,提供基于主题(Topic)的事件发布与统一的指令下发路由。
/// </summary>
public class MessageBus : IDisposable
{
#region
/// <summary>
/// 消息总线全局唯一实例
/// </summary>
public static MessageBus Instance { get; } = new MessageBus();
/// <summary>
/// 私有构造函数
/// </summary>
private MessageBus() { }
#endregion
#region (Topics)
/// <summary>
/// 1. 注册主题:当远程分析节点成功建立逻辑连接时触发。
/// 订阅者通常为 DeviceConfigHandler用于启动初始化配置同步。
/// </summary>
public event Action<RegisterPayload>? OnServerRegistered;
/// <summary>
/// 2. 状态主题:当收到远程节点批量上报的设备在线/离线状态时触发。
/// 订阅者通常为 DeviceStatusHandler用于更新 UI 状态。
/// </summary>
public event Action<List<StatusEventPayload>>? OnDeviceStatusReport;
#endregion
#region ( GatewayService )
/// <summary>
/// 发布节点注册事件:将 gRpc 接收到的原始注册请求推送到业务层
/// </summary>
/// <param name="p">注册载荷信息</param>
public void RaiseServerRegistered(RegisterPayload p)
{
if (p == null) return;
// 调试日志:跟踪节点上线流程
Debug.WriteLine($"[Bus] 发布注册事件: 节点ID = {p.InstanceId}");
// 执行所有已订阅该主题的业务逻辑
OnServerRegistered?.Invoke(p);
}
/// <summary>
/// 发布状态报告事件:将 gRpc 接收到的设备状态批量推送到业务层
/// </summary>
/// <param name="items">设备状态变更列表</param>
public void RaiseDeviceStatusReport(List<StatusEventPayload> items)
{
if (items == null || items.Count == 0) return;
// 执行所有已订阅状态同步的业务逻辑
OnDeviceStatusReport?.Invoke(items);
}
#endregion
#region ( Handler )
/// <summary>
/// 统一指令下发路由:自动定位目标节点的物理 gRpc 流并推送指令载荷
/// </summary>
/// <param name="instanceId">目标分析节点的唯一识别码</param>
/// <param name="payload">要发送的业务指令负载</param>
/// <returns>异步任务</returns>
public async Task SendInternalAsync(string instanceId, CommandPayload payload)
{
// 1. 获取由 GrpcSessionManager 维护的物理长连接流
var stream = GrpcSessionManager.Instance.GetSession(instanceId);
// 2. 健壮性检查:若连接不存在则终止下发
if (stream == null)
{
Debug.WriteLine($"[Bus Warning] 指令下发终止:节点 {instanceId} 尚未建立物理连接。");
return;
}
try
{
// 3. 契约转换:将业务层 CommandPayload 转换为 gRpc 生成的 Protobuf 契约对象
var protoMsg = new CommandPayloadProto
{
Protocol = payload.Protocol,
CmdCode = payload.CmdCode,
JsonParams = payload.JsonParams,
RequestId = payload.RequestId,
TimestampTicks = payload.Timestamp.Ticks
};
// 4. 执行异步推送
await stream.WriteAsync(protoMsg);
Debug.WriteLine($"[Bus] 指令推送成功 -> 目标: {instanceId}, 指令码: {payload.CmdCode}");
}
catch (Exception ex)
{
// 5. 异常处理:若推送失败,通常意味着网络链路已断开
Debug.WriteLine($"[Bus Error] 推送异常: {ex.Message},正在执行物理连接清理...");
// 立即移除失效会话,防止后续指令继续掉入“黑洞”
GrpcSessionManager.Instance.RemoveSession(instanceId);
}
}
#endregion
#region
/// <summary>
/// 释放总线资源
/// </summary>
public void Dispose()
{
// 清理所有事件订阅,防止内存泄漏
OnServerRegistered = null;
OnDeviceStatusReport = null;
}
#endregion
}
}