148 lines
5.6 KiB
C#
148 lines
5.6 KiB
C#
namespace SHH.CameraSdk;
|
||
|
||
/// <summary>
|
||
/// 帧处理管道(后台处理核心)
|
||
/// 功能:接收帧处理任务,在后台单线程执行二次处理(如打水印、裁剪),并分发至目标订阅者
|
||
/// 核心特性:
|
||
/// <para>1. 有界通道+DropWrite模式:生产端永不阻塞,管道满时丢弃新任务,避免内存积压</para>
|
||
/// <para>2. 单线程处理:CPU占用恒定,避免多线程竞争导致的性能抖动</para>
|
||
/// <para>3. 引用计数管理:确保帧数据安全转移与释放,防止内存泄漏</para>
|
||
/// </summary>
|
||
public class ProcessingPipeline
|
||
{
|
||
#region --- 私有资源与状态 (Private Resources & States) ---
|
||
|
||
/// <summary> 任务队列(有界通道):存储待处理的帧任务 </summary>
|
||
private readonly Channel<ProcessingTask> _queue;
|
||
|
||
/// <summary> 取消令牌源:用于终止后台处理循环 </summary>
|
||
private readonly CancellationTokenSource _cts = new();
|
||
|
||
#endregion
|
||
|
||
#region --- 构造与初始化 (Constructor & Initialization) ---
|
||
|
||
/// <summary>
|
||
/// 初始化帧处理管道
|
||
/// </summary>
|
||
/// <param name="capacity">管道最大容量:超过该值时,新任务将被丢弃(DropWrite模式)</param>
|
||
public ProcessingPipeline(int capacity)
|
||
{
|
||
// 创建有界通道,配置核心特性
|
||
_queue = Channel.CreateBounded<ProcessingTask>(new BoundedChannelOptions(capacity)
|
||
{
|
||
FullMode = BoundedChannelFullMode.DropWrite, // 管道满时丢弃新写入的任务
|
||
SingleReader = true, // 单线程读取,保证处理顺序与CPU稳定性
|
||
SingleWriter = false // 支持多线程写入(如多相机同时提交任务)
|
||
});
|
||
|
||
// 启动后台处理循环(长期运行任务,标记为 LongRunning 提升调度优先级)
|
||
Task.Factory.StartNew(ProcessLoopAsync, TaskCreationOptions.LongRunning);
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 任务提交 (Task Submission) ---
|
||
|
||
/// <summary>
|
||
/// 尝试提交帧处理任务到管道
|
||
/// 核心逻辑:非阻塞提交,失败时回滚帧引用计数,避免内存泄漏
|
||
/// </summary>
|
||
/// <param name="task">待处理的帧任务(包含帧数据、决策、追踪上下文)</param>
|
||
/// <returns>提交成功返回 true,管道满导致提交失败返回 false</returns>
|
||
public bool TrySubmit(ProcessingTask task)
|
||
{
|
||
// 1. 帧引用计数+1:将帧所有权从生产端转移到管道后台线程
|
||
task.Frame.AddRef();
|
||
|
||
try
|
||
{
|
||
// 2. 非阻塞写入管道:成功则任务进入队列等待处理
|
||
if (_queue.Writer.TryWrite(task))
|
||
{
|
||
return true;
|
||
}
|
||
|
||
// 3. 写入失败(管道满):回滚引用计数,释放帧内存
|
||
task.Frame.Dispose();
|
||
return false;
|
||
}
|
||
catch
|
||
{
|
||
// 异常场景下同样回滚引用计数,确保资源释放
|
||
task.Frame.Dispose();
|
||
return false;
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 后台处理循环 (Background Processing Loop) ---
|
||
|
||
/// <summary>
|
||
/// 后台处理循环:持续读取队列任务,执行二次处理与分发
|
||
/// </summary>
|
||
private async Task ProcessLoopAsync()
|
||
{
|
||
try
|
||
{
|
||
// 异步遍历队列:收到取消信号时退出循环
|
||
await foreach (var task in _queue.Reader.ReadAllAsync(_cts.Token))
|
||
{
|
||
// 使用 using 语句:处理完成后自动调用 Frame.Dispose(),引用计数-1
|
||
using (task.Frame)
|
||
{
|
||
// 执行具体的帧处理逻辑
|
||
ExecuteProcessing(task);
|
||
}
|
||
}
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
// 收到取消信号,正常退出循环,无需处理
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 帧处理执行 (Frame Processing Execution) ---
|
||
|
||
/// <summary>
|
||
/// 执行帧二次处理与分发
|
||
/// 功能:对帧进行自定义加工(如打水印、格式转换),并通过分发器发送至目标订阅者
|
||
/// </summary>
|
||
/// <param name="task">待处理的帧任务</param>
|
||
private void ExecuteProcessing(ProcessingTask task)
|
||
{
|
||
try
|
||
{
|
||
// --- 二次处理车间:可添加自定义加工逻辑(10ms-50ms 耗时操作安全) ---
|
||
// 示例:给帧添加序列号水印(按需启用)
|
||
// string watermarkText = $"SEQ:{task.Decision.Sequence}";
|
||
// Cv2.PutText(
|
||
// img: task.Frame.InternalMat,
|
||
// text: watermarkText,
|
||
// org: new Point(10, 50),
|
||
// fontFace: HersheyFonts.HersheySimplex,
|
||
// fontScale: 1,
|
||
// color: Scalar.Red,
|
||
// thickness: 2
|
||
// );
|
||
|
||
// --- 帧分发:将处理后的帧交给全局分发器,按决策分发至目标订阅者 ---
|
||
GlobalStreamDispatcher.Dispatch(task);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// 捕获处理过程中的异常,避免影响后续任务执行
|
||
Console.WriteLine($"[PipelineError] 帧处理失败 (DeviceId: {task.DeviceId}, Seq: {task.Decision.Sequence}): {ex.Message}");
|
||
}
|
||
finally
|
||
{
|
||
// 归档追踪日志:将帧处理上下文存入全局遥测,支持后续排查与分析
|
||
GlobalTelemetry.RecordLog(task.Decision.Sequence, task.Context);
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
} |