From 127b07343e025dc2b30b31beeff763e0186fa086 Mon Sep 17 00:00:00 2001 From: twice109 <3518499@qq.com> Date: Sat, 27 Dec 2025 07:25:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9E=B6=E6=9E=84=E5=A2=9E=E5=8A=A0=E5=AF=B9?= =?UTF-8?q?=E5=9B=BE=E5=83=8F=E5=A2=9E=E5=BC=BA=E7=9A=84=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Core/Services/BaseFrameProcessor.cs | 302 ++++++++++++++++++ .../Core/Services/ImageEnhanceCluster.cs | 40 +++ .../Core/Services/ImageScaleCluster.cs | 130 ++------ SHH.CameraSdk/Program.cs | 7 +- 4 files changed, 377 insertions(+), 102 deletions(-) create mode 100644 SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs create mode 100644 SHH.CameraSdk/Core/Services/ImageEnhanceCluster.cs diff --git a/SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs b/SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs new file mode 100644 index 0000000..1fc15e5 --- /dev/null +++ b/SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs @@ -0,0 +1,302 @@ +namespace SHH.CameraSdk +{ + #region --- 架构基类:帧处理集群 (Frame Processor Cluster) --- + + /// + /// [架构基类] 帧处理集群基类 + /// 核心职责: + /// 1. 管理 Worker 线程池,实现多线程并行处理 + /// 2. 基于 DeviceId 哈希分片路由,**保证单设备帧序绝对一致** + /// 3. 维护流水线责任链,支持帧数据的链式传递 + /// 4. 管控帧引用计数,防止内存泄漏 + /// + /// 具体的工作者线程类型 + public abstract class BaseFrameProcessor : IFrameProcessor + where TWorker : BaseWorker + { + #region --- 受保护成员 --- + /// Worker 线程池,负责具体的帧处理任务 + protected readonly List _workers = new List(); + + /// 线程池并行度(Worker 数量) + protected readonly int _workerCount; + + /// 流水线的下一个处理环节 + private IFrameProcessor? _nextStep; + + #endregion + + #region --- 构造函数 --- + + /// + /// 初始化帧处理集群 + /// + /// Worker 线程数量(并行度) + /// 服务名称(用于日志标识) + protected BaseFrameProcessor(int workerCount, string serviceName) + { + // 校验并行度参数,避免无效配置 + if (workerCount < 1) + throw new ArgumentOutOfRangeException(nameof(workerCount), "Worker数量必须大于0"); + + _workerCount = workerCount; + + // 通过抽象工厂模式创建 Worker 实例 + for (int i = 0; i < workerCount; i++) + { + _workers.Add(CreateWorker(i)); + } + + Console.WriteLine($"[{serviceName}] 服务已初始化 (并行度: {workerCount})"); + } + + #endregion + + #region --- 流水线责任链实现 --- + + /// + public void SetNext(IFrameProcessor next) + { + _nextStep = next; + } + + /// + /// [中转核心] 将处理完成的帧传递到下一个流水线环节 + /// + /// 设备ID + /// 处理完成的智能帧 + /// 帧处理决策指令 + internal void PassToNext(long deviceId, SmartFrame frame, FrameDecision decision) + { + if (_nextStep != null) + { + // 转发给下一个处理器,延续流水线 + _nextStep.Enqueue(deviceId, frame, decision); + } + else + { + // 流水线终点:提交给全局处理中心进行分发 + GlobalProcessingCenter.Submit(deviceId, frame, decision); + } + } + + #endregion + + #region --- 帧入队与路由 --- + + /// + public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision) + { + // 关键操作1:增加帧引用计数 + // 因为帧进入异步处理环节,必须保证不会被外部提前释放 + frame.AddRef(); + + // 关键操作2:哈希分片路由 + // 基于 DeviceId 取模,确保同一设备的帧始终分配给同一个 Worker + // 核心价值:保证单设备的帧处理顺序与原始流顺序一致 + int workerIndex = (int)(Math.Abs(deviceId) % _workerCount); + var targetWorker = _workers[workerIndex]; + + // 将帧投递到目标 Worker 的任务队列 + targetWorker.Post(deviceId, frame, decision); + } + + #endregion + + #region --- 抽象工厂方法 --- + + /// + /// 抽象工厂:创建具体的 Worker 实例 + /// 由子类实现,按需创建不同功能的 Worker + /// + /// Worker 唯一标识 + /// 具体的 Worker 实例 + protected abstract TWorker CreateWorker(int workerId); + + #endregion + + #region --- 资源释放 --- + + /// + /// 资源释放 + /// + public virtual void Dispose() + { + // 释放所有 Worker 资源 + foreach (var worker in _workers) + { + worker.Dispose(); + } + _workers.Clear(); + + // 释放下一个流水线节点(责任链递归释放) + _nextStep?.Dispose(); + _nextStep = null; + } + + #endregion + } + + #endregion + + #region --- 架构基类:处理线程 (Worker Thread) --- + + /// + /// [架构基类] 帧处理工作者线程基类 + /// 核心职责: + /// 1. 维护线程内任务队列,实现背压控制 + /// 2. 管理长驻线程生命周期,避免线程频繁创建销毁 + /// 3. 保证帧引用计数的安全闭环,杜绝内存泄漏 + /// 4. 提供统一的异常处理机制,确保流水线不中断 + /// + public abstract class BaseWorker : IDisposable + { + #region --- 私有成员 --- + + /// 线程内任务队列,容量限制100,防止内存溢出 + private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _taskQueue = new BlockingCollection<(long, SmartFrame, FrameDecision)>(100); + + /// 长驻处理线程 + private readonly Task _processingThread; + + /// 线程取消令牌源,用于优雅终止 + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + + #endregion + + #region --- 构造函数 --- + + protected BaseWorker() + { + // 启动长驻后台线程,设置 LongRunning 提升调度优先级 + _processingThread = Task.Factory.StartNew( + action: ProcessLoop, + creationOptions: TaskCreationOptions.LongRunning); + } + + #endregion + + #region --- 任务入队 (背压处理) --- + + /// + /// 将帧处理任务投递到当前 Worker 的队列 + /// 实现背压控制:队列满时丢弃帧并释放引用,避免阻塞上游 + /// + /// 设备ID + /// 待处理的智能帧 + /// 帧处理决策指令 + public void Post(long deviceId, SmartFrame frame, FrameDecision decision) + { + // TryAdd 非阻塞:队列满时直接返回 false + if (!_taskQueue.TryAdd((deviceId, frame, decision))) + { + // 背压处理:丢弃当前帧,释放引用计数 + frame.Dispose(); + Console.WriteLine($"[Worker] 任务队列已满,丢弃设备 {deviceId} 的帧 (引用计数已释放)"); + } + } + + #endregion + + #region --- 核心处理循环 --- + + /// + /// Worker 线程的核心处理循环 + /// 持续消费队列任务,直到收到取消信号 + /// + private void ProcessLoop() + { + try + { + // GetConsumingEnumerable:阻塞式消费队列,支持取消令牌 + foreach (var taskItem in _taskQueue.GetConsumingEnumerable(_cts.Token)) + { + // 关键操作:使用 using 语句自动释放帧引用 + // 无论处理成功/失败,都会保证 Dispose 被调用,形成引用计数闭环 + using (var frame = taskItem.Frame) + { + try + { + // 调用子类实现的具体图像处理算法 + PerformAction(frame, taskItem.Decision); + + // 通知父集群:当前帧处理完成,准备传递到下一个环节 + NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision); + } + catch (Exception ex) + { + Console.WriteLine($"[Worker] 帧处理异常: {ex.Message}"); + + // 异常保底策略:即使处理失败,也透传帧到下一个环节,保证流水线不中断 + NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision); + } + } + } + } + catch (OperationCanceledException) + { + // 正常取消:线程退出,无需报错 + Console.WriteLine("[Worker] 处理循环已正常终止"); + } + catch (Exception ex) + { + Console.WriteLine($"[Worker] 处理循环异常终止: {ex.Message}"); + } + } + + #endregion + + #region --- 抽象方法 (子类实现具体逻辑) --- + + /// + /// 子类实现:具体的图像处理算法逻辑 + /// 如:缩放、灰度转换、AI推理等 + /// + /// 待处理的智能帧 + /// 帧处理决策指令 + protected abstract void PerformAction(SmartFrame frame, FrameDecision decision); + + /// + /// 子类实现:通知父集群处理完成 + /// 通常调用父类的 PassToNext 方法传递帧 + /// + /// 设备ID + /// 处理完成的智能帧 + /// 帧处理决策指令 + protected abstract void NotifyFinished(long deviceId, SmartFrame frame, FrameDecision decision); + + #endregion + + #region --- 资源释放 --- + + /// + /// 资源释放 + /// + public virtual void Dispose() + { + // 1. 发送取消信号,终止处理循环 + _cts.Cancel(); + + // 2. 标记队列完成添加,唤醒阻塞的消费线程 + _taskQueue.CompleteAdding(); + + // 3. 等待处理线程退出(最多等待1秒) + try { _processingThread.Wait(1000); } + catch { /* 忽略等待异常 */ } + + // 4. 清理队列中未处理的任务,释放帧引用 + while (_taskQueue.TryTake(out var remainingItem)) + { + remainingItem.Frame.Dispose(); + } + + // 5. 释放核心组件资源 + _taskQueue.Dispose(); + _cts.Dispose(); + } + + #endregion + } + + #endregion +} \ No newline at end of file diff --git a/SHH.CameraSdk/Core/Services/ImageEnhanceCluster.cs b/SHH.CameraSdk/Core/Services/ImageEnhanceCluster.cs new file mode 100644 index 0000000..f7abbd2 --- /dev/null +++ b/SHH.CameraSdk/Core/Services/ImageEnhanceCluster.cs @@ -0,0 +1,40 @@ +using OpenCvSharp; + +namespace SHH.CameraSdk.Core.Services +{ + /// + /// [图像增亮服务] + /// 实现:对流水线中的 TargetMat 执行像素级亮度提升 + /// + public class ImageEnhanceCluster : BaseFrameProcessor + { + public ImageEnhanceCluster(int count) : base(count, "EnhanceCluster") { } + + protected override EnhanceWorker CreateWorker(int id) => new EnhanceWorker(this); + } + + public class EnhanceWorker : BaseWorker + { + private readonly ImageEnhanceCluster _parent; + public EnhanceWorker(ImageEnhanceCluster parent) => _parent = parent; + + protected override void PerformAction(SmartFrame frame, FrameDecision decision) + { + // 业务逻辑:只处理已经过缩放的 TargetMat + if (frame.TargetMat != null && !frame.TargetMat.IsDisposed) + { + Mat brightMat = new Mat(); + // 亮度线性提升:原像素 * 1.0 + 30 偏移量 + frame.TargetMat.ConvertTo(brightMat, -1, 1.0, 30); + + // 替换掉原来的 TargetMat(旧的会在 AttachTarget 内部被自动 Dispose) + frame.AttachTarget(brightMat, frame.ScaleType); + } + } + + protected override void NotifyFinished(long did, SmartFrame frame, FrameDecision dec) + { + _parent.PassToNext(did, frame, dec); + } + } +} \ No newline at end of file diff --git a/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs b/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs index a9897bc..3cfc36e 100644 --- a/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs +++ b/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs @@ -1,113 +1,41 @@ using OpenCvSharp; - -namespace SHH.CameraSdk; - -/// -/// [标准动作环境] 图像预处理集群 -/// 职责:透明拦截,计算缩放图挂载到 TargetMat 注入 SmartFrame,然后提交给全局中心 -/// 特性: -/// 1. 设备分片:基于 DeviceId 哈希路由,保证单设备帧顺序严格一致 -/// 2. 零内存拷贝:直接操作 SmartFrame 引用,仅在生成新 TargetMat 时申请内存 -/// 3. 闭环流转:处理完成后自动投递到 GlobalProcessingCenter -/// -/// -/// 职责:透明拦截,计算缩放图挂载到 TargetMat,然后提交给全局中心 -/// -public class ImageScaleCluster : IFrameProcessor +namespace SHH.CameraSdk { - private readonly List _workers = new(); - private readonly int _workerCount; - - public ImageScaleCluster(int workerCount = 4) + /// + /// [图像缩放服务] + /// 实现:基于基类,专注于将原图缩放并挂载到 TargetMat + /// + public class ImageScaleCluster : BaseFrameProcessor { - _workerCount = workerCount; - for (int i = 0; i < workerCount; i++) + public ImageScaleCluster(int count) : base(count, "ScaleCluster") { } + + protected override ScaleWorker CreateWorker(int id) => new ScaleWorker(this); + } + + public class ScaleWorker : BaseWorker + { + private readonly ImageScaleCluster _parent; + public ScaleWorker(ImageScaleCluster parent) => _parent = parent; + + protected override void PerformAction(SmartFrame frame, FrameDecision decision) { - _workers.Add(new ProcessingWorker(i)); - } - Console.WriteLine($"[ScaleCluster] 缩放服务已就绪 (Worker: {_workerCount})"); - } + int targetW = 704; + int targetH = 576; - public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision) - { - // 1. 增加引用计数:跨线程持有 - frame.AddRef(); - - // 2. 哈希分片路由:保证保序 - int index = (int)(Math.Abs(deviceId) % _workerCount); - _workers[index].Post(deviceId, frame, decision); - } - - public void Dispose() => _workers.ForEach(w => w.Dispose()); -} - -internal class ProcessingWorker : IDisposable -{ - private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _queue = new(100); - private readonly Task _thread; - private readonly CancellationTokenSource _cts = new(); - - public ProcessingWorker(int id) - { - _thread = Task.Factory.StartNew(ProcessLoop, TaskCreationOptions.LongRunning); - } - - public void Post(long did, SmartFrame frame, FrameDecision decision) - { - if (!_queue.TryAdd((did, frame, decision))) - { - // 背压丢弃 - frame.Dispose(); - } - } - - private void ProcessLoop() - { - foreach (var item in _queue.GetConsumingEnumerable(_cts.Token)) - { - using (var frame = item.Frame) + // 算法逻辑:若尺寸符合要求则执行 Resize + if (frame.InternalMat.Width > targetW) { - try - { - // ------------------------------------------------- - // 核心动作:缩放逻辑 - // ------------------------------------------------- - int targetW = 704; - int targetH = 576; + Mat targetMat = new Mat(); + Cv2.Resize(frame.InternalMat, targetMat, new Size(targetW, targetH), 0, 0, InterpolationFlags.Linear); - // 仅当原图大于目标时才缩放 - if (frame.InternalMat.Width > targetW) - { - Mat targetMat = new Mat(); - Cv2.Resize(frame.InternalMat, targetMat, new Size(targetW, targetH), 0, 0, InterpolationFlags.Linear); - - // [关键] 挂载到 SmartFrame 的衍生属性中 - // 标记为 Shrink (缩小) - frame.AttachTarget(targetMat, FrameScaleType.Shrink); - } - - // ------------------------------------------------- - // 交付下一站:GlobalProcessingCenter - // 消费端对此无感知,它收到的是同一个 frame 对象 - // ------------------------------------------------- - GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision); - } - catch (Exception ex) - { - Console.WriteLine($"[ScaleWorker] 异常: {ex.Message}"); - // 即使处理失败,也要尝试把原图发出去,保证画面不断 - GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision); - } + // 挂载到衍生属性 + frame.AttachTarget(targetMat, FrameScaleType.Shrink); } } - } - public void Dispose() - { - _cts.Cancel(); - _queue.CompleteAdding(); - while (_queue.TryTake(out var item)) item.Frame.Dispose(); - _queue.Dispose(); - _cts.Dispose(); + protected override void NotifyFinished(long did, SmartFrame frame, FrameDecision dec) + { + _parent.PassToNext(did, frame, dec); + } } } \ No newline at end of file diff --git a/SHH.CameraSdk/Program.cs b/SHH.CameraSdk/Program.cs index 2d87ef5..e8145c7 100644 --- a/SHH.CameraSdk/Program.cs +++ b/SHH.CameraSdk/Program.cs @@ -108,7 +108,11 @@ public class Program var builder = WebApplication.CreateBuilder(); // 注册缩放集群服务 (建议 Worker 数 = CPU 核心数,这里设为 4) - var scaleService = new ImageScaleCluster(4); + var scaleService = new ImageScaleCluster(4); // 环节一:缩放 + var enhanceService = new ImageEnhanceCluster(4); // 环节二:增亮 + + // 逻辑:缩放 -> 增亮 -> (自动到终点) + scaleService.SetNext(enhanceService); // 2. [核心] 将缩放服务“挂载”到全局路由上 // 从此刻起,所有驱动层的帧都会先流经 scaleService @@ -116,6 +120,7 @@ public class Program // 3. 注册到 DI 容器 (以便 Controller 或其他服务可以管理它,例如动态调整并行度) builder.Services.AddSingleton(scaleService); + builder.Services.AddSingleton(enhanceService); // 1. 配置 CORS builder.Services.AddCors(options =>