Files
Ayay/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs
twice109 e9f5975a79 1、解决使用 Cv2.ImShow 播放画面,闪烁一下窗口不见的问题
2、之前注册播放、分析帧,回调时必须判定是否当前注册源,现增加新方法可以不用判定
3、将之前程序一运行就播放,调整为手动指定 IsRunning 值来控制
2025-12-26 06:14:55 +08:00

180 lines
7.4 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
namespace SHH.CameraSdk;
/// <summary>
/// 全局流分发器(静态类 | 线程安全)
/// 核心职责:
/// <para>1. 接收处理完成的帧任务,基于 AppId 路由策略实现帧的精准定向分发</para>
/// <para>2. 隔离 UI 预览、AI 分析等不同消费场景,支撑多模块并行消费</para>
/// 设计特性:
/// <para>✅ 线程安全:基于 ConcurrentDictionary 实现并发订阅/取消订阅</para>
/// <para>✅ 精准路由:按 TargetAppIds 点对点投递,避免广播风暴</para>
/// <para>✅ 异常隔离:单个订阅者异常不影响其他模块消费</para>
/// </summary>
public static class GlobalStreamDispatcher
{
#region --- 1. (Predefined Subscription Channels) ---
/// <summary>
/// UI 预览订阅通道:供 UI 模块订阅帧数据,用于实时画面显示
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
/// 特性:低延迟优先,支持画面渲染相关的轻量级处理
/// </summary>
public static event Action<long, SmartFrame>? OnPreviewFrame;
/// <summary>
/// AI 分析订阅通道:供 AI 模块订阅帧数据,用于行为识别/人脸检测/车牌识别等
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
/// 特性:高吞吐优先,支持复杂算法处理,延迟容忍度较高
/// </summary>
public static event Action<long, SmartFrame>? OnAnalysisFrame;
#endregion
#region --- 2. (Dynamic Routing Table) ---
/// <summary>
/// 动态订阅路由表Key = 业务 AppIdValue = 帧处理多播委托
/// 实现ConcurrentDictionary 保证高并发场景下的读写安全
/// 用途:支持自定义业务模块的精准订阅,扩展帧消费能力
/// </summary>
private static readonly ConcurrentDictionary<string, Action<long, SmartFrame>> _routingTable = new();
#endregion
#region --- 3. (Subscription Management API) ---
/// <summary>
/// 精准订阅:为指定 AppId 注册帧处理回调
/// 线程安全:支持多线程并发调用,委托自动合并(多播)
/// </summary>
/// <param name="appId">业务唯一标识(需与 FrameController.Register 中的 AppId 一致)</param>
/// <param name="handler">帧处理回调函数</param>
/// <exception cref="ArgumentNullException">appId 或 handler 为空时抛出</exception>
public static void Subscribe(string appId, Action<long, SmartFrame> handler)
{
// 入参合法性校验
if (string.IsNullOrWhiteSpace(appId))
throw new ArgumentNullException(nameof(appId), "AppId 不能为空");
if (handler == null)
throw new ArgumentNullException(nameof(handler), "帧处理回调不能为空");
// 线程安全添加/更新委托:新订阅追加,重复订阅合并
_routingTable.AddOrUpdate(
key: appId,
addValue: handler,
updateValueFactory: (_, existingHandler) => existingHandler + handler
);
}
/// <summary>
/// [新增] 精准订阅:仅监听指定设备的特定 AppId 帧
/// 优势:内部自动过滤 DeviceId回调函数无需再写 if 判断
/// </summary>
/// <param name="appId">需求标识</param>
/// <param name="specificDeviceId">只接收此设备的帧</param>
/// <param name="handler">处理回调(注意:此处签名不含 deviceId因为已隐式确定</param>
public static void Subscribe(string appId, long specificDeviceId, Action<SmartFrame> handler)
{
// 创建一个“过滤器”闭包
Action<long, SmartFrame> wrapper = (id, frame) =>
{
// 只有当来源 ID 与订阅 ID 一致时,才触发用户的业务回调
if (id == specificDeviceId)
{
handler(frame);
}
};
// 将过滤器注册到基础路由表中
Subscribe(appId, wrapper);
}
/// <summary>
/// 取消订阅:移除指定 AppId 的帧处理回调
/// 线程安全:支持多线程并发调用,无订阅时静默处理
/// </summary>
/// <param name="appId">业务唯一标识</param>
/// <param name="handler">需要移除的帧处理回调</param>
public static void Unsubscribe(string appId, Action<long, SmartFrame> handler)
{
if (string.IsNullOrWhiteSpace(appId) || handler == null)
return;
// 尝试获取当前委托并移除目标回调
if (_routingTable.TryGetValue(appId, out var currentHandler))
{
var updatedHandler = currentHandler - handler;
if (updatedHandler == null)
{
// 委托为空时移除路由项,避免内存泄漏
_routingTable.TryRemove(appId, out _);
}
else
{
// 委托非空时更新路由表
_routingTable.TryUpdate(appId, updatedHandler, currentHandler);
}
}
}
#endregion
#region --- 4. (Core Dispatch Logic) ---
/// <summary>
/// 帧任务分发入口:基于任务的 TargetAppIds 实现精准点对点投递
/// 核心优化:摒弃广播模式,仅投递到指定订阅者,降低系统资源消耗
/// </summary>
/// <param name="task">处理完成的帧任务(包含目标 AppId 列表、帧数据、上下文)</param>
/// <exception cref="ArgumentNullException">task 为空时抛出</exception>
public static void Dispatch(ProcessingTask task)
{
// 入参合法性校验
if (task == null)
throw new ArgumentNullException(nameof(task), "帧任务不能为空");
var deviceId = task.DeviceId;
var frame = task.Frame;
var targetAppIds = task.Decision.TargetAppIds;
var sequence = task.Decision.Sequence;
// 记录分发日志
task.Context.AddLog($"开始分发帧任务 [Seq:{sequence}],目标 AppId 列表:[{string.Join(", ", targetAppIds)}]");
// 遍历目标 AppId 列表,执行精准投递
foreach (var appId in targetAppIds)
{
// 1. 优先匹配动态路由表中的自定义订阅者
if (_routingTable.TryGetValue(appId, out var customHandler))
{
try
{
customHandler.Invoke(deviceId, frame);
task.Context.AddLog($"帧任务 [Seq:{sequence}] 成功投递到自定义 AppId: {appId}");
}
catch (Exception ex)
{
// 单个订阅者异常隔离,不影响其他分发流程
task.Context.AddLog($"帧任务 [Seq:{sequence}] 投递到 AppId:{appId} 失败:{ex.Message}");
Console.WriteLine($"[DispatchError] AppId={appId}, DeviceId={deviceId}, Error={ex.Message}");
}
}
// 2. 匹配预设的全局通道(兼容旧版订阅逻辑)
switch (appId.ToUpperInvariant())
{
case "UI_PREVIEW":
OnPreviewFrame?.Invoke(deviceId, frame);
break;
case "AI_ANALYSIS":
OnAnalysisFrame?.Invoke(deviceId, frame);
break;
}
}
// 分发完成后记录遥测数据
GlobalTelemetry.RecordLog(sequence, task.Context);
}
#endregion
}