Files
Ayay/SHH.CameraDashboard/Services/CommandBusClient.cs

185 lines
7.4 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using MessagePack;
using NetMQ;
using NetMQ.Sockets;
using SHH.CameraDashboard.Services.Processors;
using SHH.Contracts;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace SHH.CameraDashboard
{
public class CommandBusClient : IDisposable
{
private RouterSocket? _routerSocket;
private NetMQPoller? _poller;
private volatile bool _isRunning;
private readonly object _disposeLock = new object();
// 单例模式
public static CommandBusClient Instance { get; } = new CommandBusClient();
// 处理器字典
private readonly Dictionary<string, IProtocolProcessor> _processors = new();
private readonly ConcurrentDictionary<string, TaskCompletionSource<CommandResult>> _pendingRequests
= new ConcurrentDictionary<string, TaskCompletionSource<CommandResult>>();
private readonly ConcurrentDictionary<string, byte[]> _sessions
= new ConcurrentDictionary<string, byte[]>();
// ★★★ 新增:拦截器管道 ★★★
public InterceptorPipeline Pipeline { get; } = new InterceptorPipeline();
public event Action<RegisterPayload>? OnServerRegistered;
public event Action<List<StatusEventPayload>>? OnDeviceStatusReport;
public event Action<CommandPayload>? OnCommandReceived;
// 注册处理器的方法
public void RegisterProcessor(IProtocolProcessor processor)
{
_processors[processor.ProtocolType] = processor;
}
public void Start(int port)
{
if (_isRunning) return;
lock (_disposeLock)
{
_routerSocket = new RouterSocket();
_routerSocket.Bind($"tcp://*:{port}");
_routerSocket.ReceiveReady += OnReceiveReady;
// --- 注册处理器 ---
this.RegisterProcessor(new RegisterProcessor(this));
this.RegisterProcessor(new StatusBatchProcessor(this));
this.RegisterProcessor(new CommandResultProcessor(this));
this.RegisterProcessor(new CommandProcessor(this));
_poller = new NetMQPoller { _routerSocket };
_poller.RunAsync();
_isRunning = true;
}
}
// 注意NetMQ 的事件处理器本质上是同步的 (void)。
// 为了调用异步拦截器,我们需要在这里使用 async void (仅限顶层事件处理)
private async void OnReceiveReady(object? sender, NetMQSocketEventArgs e)
{
try
{
NetMQMessage msg = new NetMQMessage();
// 1. 尝试接收多帧消息
if (!e.Socket.TryReceiveMultipartMessage(ref msg)) return;
// 2. 帧校验 (Router 收到 Dealer 消息:[Identity] [Protocol] [Data])
// 此时 msg 应该有 3 帧
if (msg.FrameCount < 3) return;
byte[] identity = msg[0].Buffer; // Frame 0: 路由ID
string protocol = msg[1].ConvertToString(); // Frame 1: 协议标识
byte[] rawData = msg[2].ToByteArray(); // Frame 2: 原始数据
// =========================================================
// ★★★ 核心改造 A: 接收拦截 (Inbound) ★★★
// =========================================================
// 执行管道处理
var ctx = await Pipeline.ExecuteReceiveAsync(protocol, rawData);
if (ctx != null) // 如果没被拦截
{
// 使用处理后的协议和数据进行分发
if (_processors.TryGetValue(ctx.Protocol, out var processor))
{
processor.Process(identity, ctx.Data);
}
else
{
Debug.WriteLine($"[Bus] 未知协议: {ctx.Protocol}");
}
}
}
catch (Exception ex)
{
Debug.WriteLine($"[Bus-Err] {ex.Message}");
}
}
// --- 供 Processor 调用的内部方法 (保持不变) ---
internal void UpdateSession(string instanceId, byte[] identity) => _sessions[instanceId] = identity;
internal void RaiseServerRegistered(RegisterPayload p) => OnServerRegistered?.Invoke(p);
internal void RaiseDeviceStatusReport(List<StatusEventPayload> i) => OnDeviceStatusReport?.Invoke(i);
internal void RaiseCommandReceived(CommandPayload payload) => OnCommandReceived?.Invoke(payload);
internal void HandleResponse(CommandResult result)
{
if (_pendingRequests.TryRemove(result.RequestId, out var tcs))
tcs.TrySetResult(result);
}
// =========================================================
// ★★★ 核心改造 B: 发送拦截 (Outbound) ★★★
// =========================================================
// 改为 async Task 以支持异步拦截器
public async Task SendInternalAsync(string instanceId, CommandPayload payload)
{
if (_sessions.TryGetValue(instanceId, out byte[]? identity))
{
// 1. 序列化
byte[] rawData = MessagePackSerializer.Serialize(payload);
// 2. 执行管道处理
var ctx = await Pipeline.ExecuteSendAsync(payload.Protocol, rawData);
// 3. 发送 (如果没被拦截)
if (ctx != null && _routerSocket != null)
{
// 注意Socket 非线程安全,但 RouterSocket 的 SendMultipartMessage 通常是线程安全的
// 或者通过 Poller 线程去发。但在 Router 模式下,多线程直接 Send 通常是允许的。
var msg = new NetMQMessage();
msg.Append(identity);
msg.Append(ctx.Protocol); // 使用拦截器处理后的 Protocol
msg.Append(ctx.Data); // 使用拦截器处理后的 Data
_routerSocket.SendMultipartMessage(msg);
}
}
}
public void Stop()
{
// 增加锁,防止重复释放
lock (_disposeLock)
{
if (!_isRunning) return;
_isRunning = false;
Console.WriteLine("[Bus] 正在释放 NetMQ 资源...");
// 1. 停止 Poller
if (_poller != null)
{
_poller.Stop();
_poller.Dispose();
_poller = null;
}
// 2. 释放 Socket
if (_routerSocket != null)
{
_routerSocket.Close();
_routerSocket.Dispose();
_routerSocket = null;
}
// 3. ★★★ 解决残留的关键:强制清理静态环境 ★★★
// 参数为 true 会等待后台 I/O 线程完成(可能卡住),
// 参数为 false 则强制放弃未完成的 I/O 直接关闭。
NetMQConfig.Cleanup(false);
Console.WriteLine("[Bus] NetMQ 资源已安全释放。");
}
}
public void Dispose() => Stop();
}
}