Files

185 lines
7.4 KiB
C#
Raw Permalink Normal View History

using MessagePack;
using NetMQ;
using NetMQ.Sockets;
using SHH.CameraDashboard.Services.Processors;
using SHH.Contracts;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace SHH.CameraDashboard
{
public class CommandBusClient : IDisposable
{
private RouterSocket? _routerSocket;
private NetMQPoller? _poller;
private volatile bool _isRunning;
private readonly object _disposeLock = new object();
// 单例模式
public static CommandBusClient Instance { get; } = new CommandBusClient();
// 处理器字典
private readonly Dictionary<string, IProtocolProcessor> _processors = new();
private readonly ConcurrentDictionary<string, TaskCompletionSource<CommandResult>> _pendingRequests
= new ConcurrentDictionary<string, TaskCompletionSource<CommandResult>>();
private readonly ConcurrentDictionary<string, byte[]> _sessions
= new ConcurrentDictionary<string, byte[]>();
// ★★★ 新增:拦截器管道 ★★★
public InterceptorPipeline Pipeline { get; } = new InterceptorPipeline();
public event Action<RegisterPayload>? OnServerRegistered;
public event Action<List<StatusEventPayload>>? OnDeviceStatusReport;
public event Action<CommandPayload>? OnCommandReceived;
// 注册处理器的方法
public void RegisterProcessor(IProtocolProcessor processor)
{
_processors[processor.ProtocolType] = processor;
}
public void Start(int port)
{
if (_isRunning) return;
lock (_disposeLock)
{
_routerSocket = new RouterSocket();
_routerSocket.Bind($"tcp://*:{port}");
_routerSocket.ReceiveReady += OnReceiveReady;
// --- 注册处理器 ---
this.RegisterProcessor(new RegisterProcessor(this));
this.RegisterProcessor(new StatusBatchProcessor(this));
this.RegisterProcessor(new CommandResultProcessor(this));
this.RegisterProcessor(new CommandProcessor(this));
_poller = new NetMQPoller { _routerSocket };
_poller.RunAsync();
_isRunning = true;
}
}
// 注意NetMQ 的事件处理器本质上是同步的 (void)。
// 为了调用异步拦截器,我们需要在这里使用 async void (仅限顶层事件处理)
private async void OnReceiveReady(object? sender, NetMQSocketEventArgs e)
{
try
{
NetMQMessage msg = new NetMQMessage();
// 1. 尝试接收多帧消息
if (!e.Socket.TryReceiveMultipartMessage(ref msg)) return;
// 2. 帧校验 (Router 收到 Dealer 消息:[Identity] [Protocol] [Data])
// 此时 msg 应该有 3 帧
if (msg.FrameCount < 3) return;
byte[] identity = msg[0].Buffer; // Frame 0: 路由ID
string protocol = msg[1].ConvertToString(); // Frame 1: 协议标识
byte[] rawData = msg[2].ToByteArray(); // Frame 2: 原始数据
// =========================================================
// ★★★ 核心改造 A: 接收拦截 (Inbound) ★★★
// =========================================================
// 执行管道处理
var ctx = await Pipeline.ExecuteReceiveAsync(protocol, rawData);
if (ctx != null) // 如果没被拦截
{
// 使用处理后的协议和数据进行分发
if (_processors.TryGetValue(ctx.Protocol, out var processor))
{
processor.Process(identity, ctx.Data);
}
else
{
Debug.WriteLine($"[Bus] 未知协议: {ctx.Protocol}");
}
}
}
catch (Exception ex)
{
Debug.WriteLine($"[Bus-Err] {ex.Message}");
}
}
// --- 供 Processor 调用的内部方法 (保持不变) ---
internal void UpdateSession(string instanceId, byte[] identity) => _sessions[instanceId] = identity;
internal void RaiseServerRegistered(RegisterPayload p) => OnServerRegistered?.Invoke(p);
internal void RaiseDeviceStatusReport(List<StatusEventPayload> i) => OnDeviceStatusReport?.Invoke(i);
internal void RaiseCommandReceived(CommandPayload payload) => OnCommandReceived?.Invoke(payload);
internal void HandleResponse(CommandResult result)
{
if (_pendingRequests.TryRemove(result.RequestId, out var tcs))
tcs.TrySetResult(result);
}
// =========================================================
// ★★★ 核心改造 B: 发送拦截 (Outbound) ★★★
// =========================================================
// 改为 async Task 以支持异步拦截器
public async Task SendInternalAsync(string instanceId, CommandPayload payload)
{
if (_sessions.TryGetValue(instanceId, out byte[]? identity))
{
// 1. 序列化
byte[] rawData = MessagePackSerializer.Serialize(payload);
// 2. 执行管道处理
var ctx = await Pipeline.ExecuteSendAsync(payload.Protocol, rawData);
// 3. 发送 (如果没被拦截)
if (ctx != null && _routerSocket != null)
{
// 注意Socket 非线程安全,但 RouterSocket 的 SendMultipartMessage 通常是线程安全的
// 或者通过 Poller 线程去发。但在 Router 模式下,多线程直接 Send 通常是允许的。
var msg = new NetMQMessage();
msg.Append(identity);
msg.Append(ctx.Protocol); // 使用拦截器处理后的 Protocol
msg.Append(ctx.Data); // 使用拦截器处理后的 Data
_routerSocket.SendMultipartMessage(msg);
}
}
}
public void Stop()
{
// 增加锁,防止重复释放
lock (_disposeLock)
{
if (!_isRunning) return;
_isRunning = false;
Console.WriteLine("[Bus] 正在释放 NetMQ 资源...");
// 1. 停止 Poller
if (_poller != null)
{
_poller.Stop();
_poller.Dispose();
_poller = null;
}
// 2. 释放 Socket
if (_routerSocket != null)
{
_routerSocket.Close();
_routerSocket.Dispose();
_routerSocket = null;
}
// 3. ★★★ 解决残留的关键:强制清理静态环境 ★★★
// 参数为 true 会等待后台 I/O 线程完成(可能卡住),
// 参数为 false 则强制放弃未完成的 I/O 直接关闭。
NetMQConfig.Cleanup(false);
Console.WriteLine("[Bus] NetMQ 资源已安全释放。");
}
}
public void Dispose() => Stop();
}
}