架构增加对图像增强的支持

This commit is contained in:
2025-12-27 07:25:32 +08:00
parent d4a8b63031
commit 127b07343e
4 changed files with 377 additions and 102 deletions

View File

@@ -0,0 +1,302 @@
namespace SHH.CameraSdk
{
#region --- (Frame Processor Cluster) ---
/// <summary>
/// [架构基类] 帧处理集群基类
/// 核心职责:
/// 1. 管理 Worker 线程池,实现多线程并行处理
/// 2. 基于 DeviceId 哈希分片路由,**保证单设备帧序绝对一致**
/// 3. 维护流水线责任链,支持帧数据的链式传递
/// 4. 管控帧引用计数,防止内存泄漏
/// </summary>
/// <typeparam name="TWorker">具体的工作者线程类型</typeparam>
public abstract class BaseFrameProcessor<TWorker> : IFrameProcessor
where TWorker : BaseWorker
{
#region --- ---
/// <summary> Worker 线程池,负责具体的帧处理任务 </summary>
protected readonly List<TWorker> _workers = new List<TWorker>();
/// <summary> 线程池并行度Worker 数量) </summary>
protected readonly int _workerCount;
/// <summary> 流水线的下一个处理环节 </summary>
private IFrameProcessor? _nextStep;
#endregion
#region --- ---
/// <summary>
/// 初始化帧处理集群
/// </summary>
/// <param name="workerCount">Worker 线程数量(并行度)</param>
/// <param name="serviceName">服务名称(用于日志标识)</param>
protected BaseFrameProcessor(int workerCount, string serviceName)
{
// 校验并行度参数,避免无效配置
if (workerCount < 1)
throw new ArgumentOutOfRangeException(nameof(workerCount), "Worker数量必须大于0");
_workerCount = workerCount;
// 通过抽象工厂模式创建 Worker 实例
for (int i = 0; i < workerCount; i++)
{
_workers.Add(CreateWorker(i));
}
Console.WriteLine($"[{serviceName}] 服务已初始化 (并行度: {workerCount})");
}
#endregion
#region --- 线 ---
/// <inheritdoc />
public void SetNext(IFrameProcessor next)
{
_nextStep = next;
}
/// <summary>
/// [中转核心] 将处理完成的帧传递到下一个流水线环节
/// </summary>
/// <param name="deviceId">设备ID</param>
/// <param name="frame">处理完成的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
internal void PassToNext(long deviceId, SmartFrame frame, FrameDecision decision)
{
if (_nextStep != null)
{
// 转发给下一个处理器,延续流水线
_nextStep.Enqueue(deviceId, frame, decision);
}
else
{
// 流水线终点:提交给全局处理中心进行分发
GlobalProcessingCenter.Submit(deviceId, frame, decision);
}
}
#endregion
#region --- ---
/// <inheritdoc />
public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision)
{
// 关键操作1增加帧引用计数
// 因为帧进入异步处理环节,必须保证不会被外部提前释放
frame.AddRef();
// 关键操作2哈希分片路由
// 基于 DeviceId 取模,确保同一设备的帧始终分配给同一个 Worker
// 核心价值:保证单设备的帧处理顺序与原始流顺序一致
int workerIndex = (int)(Math.Abs(deviceId) % _workerCount);
var targetWorker = _workers[workerIndex];
// 将帧投递到目标 Worker 的任务队列
targetWorker.Post(deviceId, frame, decision);
}
#endregion
#region --- ---
/// <summary>
/// 抽象工厂:创建具体的 Worker 实例
/// 由子类实现,按需创建不同功能的 Worker
/// </summary>
/// <param name="workerId">Worker 唯一标识</param>
/// <returns>具体的 Worker 实例</returns>
protected abstract TWorker CreateWorker(int workerId);
#endregion
#region --- ---
/// <summary>
/// 资源释放
/// </summary>
public virtual void Dispose()
{
// 释放所有 Worker 资源
foreach (var worker in _workers)
{
worker.Dispose();
}
_workers.Clear();
// 释放下一个流水线节点(责任链递归释放)
_nextStep?.Dispose();
_nextStep = null;
}
#endregion
}
#endregion
#region --- 线 (Worker Thread) ---
/// <summary>
/// [架构基类] 帧处理工作者线程基类
/// 核心职责:
/// 1. 维护线程内任务队列,实现背压控制
/// 2. 管理长驻线程生命周期,避免线程频繁创建销毁
/// 3. 保证帧引用计数的安全闭环,杜绝内存泄漏
/// 4. 提供统一的异常处理机制,确保流水线不中断
/// </summary>
public abstract class BaseWorker : IDisposable
{
#region --- ---
/// <summary> 线程内任务队列容量限制100防止内存溢出 </summary>
private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _taskQueue = new BlockingCollection<(long, SmartFrame, FrameDecision)>(100);
/// <summary> 长驻处理线程 </summary>
private readonly Task _processingThread;
/// <summary> 线程取消令牌源,用于优雅终止 </summary>
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
#endregion
#region --- ---
protected BaseWorker()
{
// 启动长驻后台线程,设置 LongRunning 提升调度优先级
_processingThread = Task.Factory.StartNew(
action: ProcessLoop,
creationOptions: TaskCreationOptions.LongRunning);
}
#endregion
#region --- () ---
/// <summary>
/// 将帧处理任务投递到当前 Worker 的队列
/// 实现背压控制:队列满时丢弃帧并释放引用,避免阻塞上游
/// </summary>
/// <param name="deviceId">设备ID</param>
/// <param name="frame">待处理的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
public void Post(long deviceId, SmartFrame frame, FrameDecision decision)
{
// TryAdd 非阻塞:队列满时直接返回 false
if (!_taskQueue.TryAdd((deviceId, frame, decision)))
{
// 背压处理:丢弃当前帧,释放引用计数
frame.Dispose();
Console.WriteLine($"[Worker] 任务队列已满,丢弃设备 {deviceId} 的帧 (引用计数已释放)");
}
}
#endregion
#region --- ---
/// <summary>
/// Worker 线程的核心处理循环
/// 持续消费队列任务,直到收到取消信号
/// </summary>
private void ProcessLoop()
{
try
{
// GetConsumingEnumerable阻塞式消费队列支持取消令牌
foreach (var taskItem in _taskQueue.GetConsumingEnumerable(_cts.Token))
{
// 关键操作:使用 using 语句自动释放帧引用
// 无论处理成功/失败,都会保证 Dispose 被调用,形成引用计数闭环
using (var frame = taskItem.Frame)
{
try
{
// 调用子类实现的具体图像处理算法
PerformAction(frame, taskItem.Decision);
// 通知父集群:当前帧处理完成,准备传递到下一个环节
NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
}
catch (Exception ex)
{
Console.WriteLine($"[Worker] 帧处理异常: {ex.Message}");
// 异常保底策略:即使处理失败,也透传帧到下一个环节,保证流水线不中断
NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
}
}
}
}
catch (OperationCanceledException)
{
// 正常取消:线程退出,无需报错
Console.WriteLine("[Worker] 处理循环已正常终止");
}
catch (Exception ex)
{
Console.WriteLine($"[Worker] 处理循环异常终止: {ex.Message}");
}
}
#endregion
#region --- () ---
/// <summary>
/// 子类实现:具体的图像处理算法逻辑
/// 如缩放、灰度转换、AI推理等
/// </summary>
/// <param name="frame">待处理的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
protected abstract void PerformAction(SmartFrame frame, FrameDecision decision);
/// <summary>
/// 子类实现:通知父集群处理完成
/// 通常调用父类的 PassToNext 方法传递帧
/// </summary>
/// <param name="deviceId">设备ID</param>
/// <param name="frame">处理完成的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
protected abstract void NotifyFinished(long deviceId, SmartFrame frame, FrameDecision decision);
#endregion
#region --- ---
/// <summary>
/// 资源释放
/// </summary>
public virtual void Dispose()
{
// 1. 发送取消信号,终止处理循环
_cts.Cancel();
// 2. 标记队列完成添加,唤醒阻塞的消费线程
_taskQueue.CompleteAdding();
// 3. 等待处理线程退出最多等待1秒
try { _processingThread.Wait(1000); }
catch { /* 忽略等待异常 */ }
// 4. 清理队列中未处理的任务,释放帧引用
while (_taskQueue.TryTake(out var remainingItem))
{
remainingItem.Frame.Dispose();
}
// 5. 释放核心组件资源
_taskQueue.Dispose();
_cts.Dispose();
}
#endregion
}
#endregion
}

View File

@@ -0,0 +1,40 @@
using OpenCvSharp;
namespace SHH.CameraSdk.Core.Services
{
/// <summary>
/// [图像增亮服务]
/// 实现:对流水线中的 TargetMat 执行像素级亮度提升
/// </summary>
public class ImageEnhanceCluster : BaseFrameProcessor<EnhanceWorker>
{
public ImageEnhanceCluster(int count) : base(count, "EnhanceCluster") { }
protected override EnhanceWorker CreateWorker(int id) => new EnhanceWorker(this);
}
public class EnhanceWorker : BaseWorker
{
private readonly ImageEnhanceCluster _parent;
public EnhanceWorker(ImageEnhanceCluster parent) => _parent = parent;
protected override void PerformAction(SmartFrame frame, FrameDecision decision)
{
// 业务逻辑:只处理已经过缩放的 TargetMat
if (frame.TargetMat != null && !frame.TargetMat.IsDisposed)
{
Mat brightMat = new Mat();
// 亮度线性提升:原像素 * 1.0 + 30 偏移量
frame.TargetMat.ConvertTo(brightMat, -1, 1.0, 30);
// 替换掉原来的 TargetMat旧的会在 AttachTarget 内部被自动 Dispose
frame.AttachTarget(brightMat, frame.ScaleType);
}
}
protected override void NotifyFinished(long did, SmartFrame frame, FrameDecision dec)
{
_parent.PassToNext(did, frame, dec);
}
}
}

View File

@@ -1,113 +1,41 @@
using OpenCvSharp;
namespace SHH.CameraSdk;
/// <summary>
/// [标准动作环境] 图像预处理集群
/// 职责:透明拦截,计算缩放图挂载到 TargetMat 注入 SmartFrame然后提交给全局中心
/// 特性:
/// 1. 设备分片:基于 DeviceId 哈希路由,保证单设备帧顺序严格一致
/// 2. 零内存拷贝:直接操作 SmartFrame 引用,仅在生成新 TargetMat 时申请内存
/// 3. 闭环流转:处理完成后自动投递到 GlobalProcessingCenter
/// </summary>
/// <summary>
/// 职责:透明拦截,计算缩放图挂载到 TargetMat然后提交给全局中心
/// </summary>
public class ImageScaleCluster : IFrameProcessor
namespace SHH.CameraSdk
{
private readonly List<ProcessingWorker> _workers = new();
private readonly int _workerCount;
public ImageScaleCluster(int workerCount = 4)
/// <summary>
/// [图像缩放服务]
/// 实现:基于基类,专注于将原图缩放并挂载到 TargetMat
/// </summary>
public class ImageScaleCluster : BaseFrameProcessor<ScaleWorker>
{
_workerCount = workerCount;
for (int i = 0; i < workerCount; i++)
public ImageScaleCluster(int count) : base(count, "ScaleCluster") { }
protected override ScaleWorker CreateWorker(int id) => new ScaleWorker(this);
}
public class ScaleWorker : BaseWorker
{
private readonly ImageScaleCluster _parent;
public ScaleWorker(ImageScaleCluster parent) => _parent = parent;
protected override void PerformAction(SmartFrame frame, FrameDecision decision)
{
_workers.Add(new ProcessingWorker(i));
}
Console.WriteLine($"[ScaleCluster] 缩放服务已就绪 (Worker: {_workerCount})");
}
int targetW = 704;
int targetH = 576;
public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision)
{
// 1. 增加引用计数:跨线程持有
frame.AddRef();
// 2. 哈希分片路由:保证保序
int index = (int)(Math.Abs(deviceId) % _workerCount);
_workers[index].Post(deviceId, frame, decision);
}
public void Dispose() => _workers.ForEach(w => w.Dispose());
}
internal class ProcessingWorker : IDisposable
{
private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _queue = new(100);
private readonly Task _thread;
private readonly CancellationTokenSource _cts = new();
public ProcessingWorker(int id)
{
_thread = Task.Factory.StartNew(ProcessLoop, TaskCreationOptions.LongRunning);
}
public void Post(long did, SmartFrame frame, FrameDecision decision)
{
if (!_queue.TryAdd((did, frame, decision)))
{
// 背压丢弃
frame.Dispose();
}
}
private void ProcessLoop()
{
foreach (var item in _queue.GetConsumingEnumerable(_cts.Token))
{
using (var frame = item.Frame)
// 算法逻辑:若尺寸符合要求则执行 Resize
if (frame.InternalMat.Width > targetW)
{
try
{
// -------------------------------------------------
// 核心动作:缩放逻辑
// -------------------------------------------------
int targetW = 704;
int targetH = 576;
Mat targetMat = new Mat();
Cv2.Resize(frame.InternalMat, targetMat, new Size(targetW, targetH), 0, 0, InterpolationFlags.Linear);
// 仅当原图大于目标时才缩放
if (frame.InternalMat.Width > targetW)
{
Mat targetMat = new Mat();
Cv2.Resize(frame.InternalMat, targetMat, new Size(targetW, targetH), 0, 0, InterpolationFlags.Linear);
// [关键] 挂载到 SmartFrame 的衍生属性中
// 标记为 Shrink (缩小)
frame.AttachTarget(targetMat, FrameScaleType.Shrink);
}
// -------------------------------------------------
// 交付下一站GlobalProcessingCenter
// 消费端对此无感知,它收到的是同一个 frame 对象
// -------------------------------------------------
GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision);
}
catch (Exception ex)
{
Console.WriteLine($"[ScaleWorker] 异常: {ex.Message}");
// 即使处理失败,也要尝试把原图发出去,保证画面不断
GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision);
}
// 挂载到衍生属性
frame.AttachTarget(targetMat, FrameScaleType.Shrink);
}
}
}
public void Dispose()
{
_cts.Cancel();
_queue.CompleteAdding();
while (_queue.TryTake(out var item)) item.Frame.Dispose();
_queue.Dispose();
_cts.Dispose();
protected override void NotifyFinished(long did, SmartFrame frame, FrameDecision dec)
{
_parent.PassToNext(did, frame, dec);
}
}
}