using Ayay.SerilogLogs; using Microsoft.Extensions.Hosting; using OpenCvSharp; using Serilog; using SHH.CameraSdk; // 引用 SDK 核心 using SHH.Contracts; using System.Diagnostics; using TurboJpegWrapper; namespace SHH.CameraService; /// /// 图像监控采集控制器 (流媒体分发引擎) /// 功能:监听全局图像采集总线,对图像进行实时 JPG 编码,并动态分发至云端、大屏等订阅目标。 /// 设计模式:发布-订阅模式 + 扇出 (Fan-out) 分发。 /// public class ImageMonitorController : BackgroundService { private ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core); // 注入所有注册的目标(云端、大屏等),实现动态分发 private readonly IEnumerable _targets; // 编码参数:JPG 质量 75 (平衡画质与带宽) // 工业经验:75 是甜点,体积只有 100 的 1/3,肉眼几无区别。 // 如果您确实需要 100,请注意带宽压力。此处我保留您要求的 100,但建议未来调优。 private readonly int[] _encodeParams = { (int)ImwriteFlags.JpegQuality, 100 }; /// /// 构造函数 /// /// public ImageMonitorController(IEnumerable targets) { _targets = targets; } /// /// 启动后台服务:挂载事件总线 /// protected override Task ExecuteAsync(CancellationToken stoppingToken) { _sysLog.Information("[Core] 启动流媒体采集引擎..."); // ========================================================= // 订阅逻辑:接入 "上帝模式" (God Mode) // ========================================================= // 理由:gRpc 需要无差别地获取所有设备的图像。 GlobalStreamDispatcher.OnGlobalFrame += ProcessFrame; _sysLog.Information($"[StreamWorker] 已挂载至全局广播总线,正在监听帧信息."); var tcs = new TaskCompletionSource(); stoppingToken.Register(() => { // 停止时反注册,防止静态事件内存泄漏 GlobalStreamDispatcher.OnGlobalFrame -= ProcessFrame; _sysLog.Information("[Core] 流媒体采集引擎已断开全局广播连接."); tcs.SetResult(); }); return tcs.Task; } /// /// [回调函数] 处理实时帧 /// 注意:此方法由 SDK 采集线程池触发,必须保持极速处理,严禁在内部执行 IO 等耗时阻塞操作。 /// /// 设备唯一标识 ID /// 包含原始图像(InternalMat)和处理后图像(TargetMat)的帧数据 private void ProcessFrame(long deviceId, SmartFrame frame) { try { // 1. 基础校验 (合法性检查) if (frame == null || frame.InternalMat.Empty()) return; long startTick = Stopwatch.GetTimestamp(); // ========================================================= // 2. 一次编码 (One Encode) - CPU 消耗点 // ========================================================= // 理由:在这里同步编码是最安全的,因为出了这个函数 frame 内存就会失效。 // 且只编一次,后续分发给 10 个目标也只用这一份数据。 byte[]? jpgBytes = null; // 如果有更小的图片, 原始图片不压缩, 除非有特殊需求 if (frame.TargetMat == null) { jpgBytes = SdkGlobal.UseTurboJpegWrapper ? TurboEncodeImage(frame.InternalMat) : EncodeImage(frame.InternalMat); } // 双流支持:如果存在处理后的 AI 图,也一并编码 byte[]? targetBytes = null; if (frame.TargetMat != null && !frame.TargetMat.Empty()) { targetBytes = SdkGlobal.UseTurboJpegWrapper ? TurboEncodeImage(frame.TargetMat) : EncodeImage(frame.TargetMat); } // ========================================================= // 3. 构建 Payload (数据载荷) // ========================================================= var payload = new VideoPayload { CameraId = deviceId.ToString(), CaptureTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), OriginalImageBytes = jpgBytes, // 引用赋值 OriginalWidth = frame.InternalWidth, OriginalHeight = frame.InnernalHeight, TargetImageBytes = targetBytes, // 引用赋值 TargetWidth = frame.TargetWidth, TargetHeight = frame.TargetHeight, DispatchTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; // 添加订阅者 payload.SubscriberIds.AddRange(frame.SubscriberIds); // 计算转码耗时(ms) double processMs = (Stopwatch.GetTimestamp() - startTick) * 1000.0 / Stopwatch.Frequency; payload.Diagnostics["encode_ms"] = Math.Round(processMs, 2); // ========================================================= // 4. 动态扇出 (Dynamic Fan-Out) - 内存消耗极低 // ========================================================= // 遍历所有目标,往各自独立的管道里写数据。 // 实现了"物理隔离":一个管道满了(云端卡顿),不影响另一个管道(大屏流畅)。 foreach (var target in _targets) { bool ok = target.Channel.WriteLog(payload); if (!ok) { // 如果这里打印,说明管道由于某种原因被关闭了(通常是程序正在退出) _sysLog.Warning($"[ImageMonitor] 管道写入失败,目标: {target.Config.Name}"); } } } catch (Exception ex) { // 极少发生的内存错误,打印日志但不抛出,避免崩溃 SDK 线程 _sysLog.Error($"[ImageMonitor] 采集处理异常: {ex.Message}"); } } /// /// 调用 OpenCV 进行内存级图片编码 /// /// 待编码的 OpenCV Mat 矩阵 /// JPG 字节数组 private byte[]? EncodeImage(Mat mat) { if (mat == null || mat.Empty()) return null; // ImEncode 将 Mat 编码为一维字节数组 (托管内存) Cv2.ImEncode(".jpg", mat, out byte[] buf, _encodeParams); return buf; } // 建议将转换器定义为类成员,避免重复创建(内部持有句柄) private static readonly ThreadLocal _encoderPool = new(() => new TJCompressor()); /// /// TurboJPEG 快速编码 /// /// /// private byte[]? TurboEncodeImage(Mat mat) { // 1. 空引用与销毁状态防御 if (mat == null || mat.Empty() || mat.IsDisposed) return Array.Empty(); try { // 2. 线程安全防护 (如果不用 ThreadLocal,至少保留 lock) var encoder = _encoderPool.Value; if (encoder == null) { _sysLog.Error("[Perf] ThreadLocal 编码器实例初始化失败,降级使用 OpenCV."); return EncodeImage(mat); // 自动降级,保证业务不中断 } // 3. 内存连续性确保 // 保持原逻辑:不连续则 Clone,这是最稳妥的零拷贝退守方案,已通过您的严格测试 if (!mat.IsContinuous()) { using var continuousMat = mat.Clone(); return encoder.Compress(continuousMat.Data, (int)continuousMat.Step(), continuousMat.Width, continuousMat.Height, // 2026-01-31 解决黄色变蓝色问题 // 原因:经实测当前 Mat 内存排布为 RGB,原 BGR 参数导致红蓝通道反转 TJPixelFormats.TJPF_RGB, TJSubsamplingOptions.TJSAMP_420, 95, TJFlags.NONE); } // 执行并行编码 // 注意:TJPF_BGR 确保了 OpenCV 默认内存排布,防止色偏 return encoder.Compress(mat.Data, (int)mat.Step(), mat.Width, mat.Height, // 2026-01-31 解决黄色变蓝色问题 // 修正像素格式为 RGB,匹配底层数据流,确保工业视频颜色还原准确 TJPixelFormats.TJPF_RGB, TJSubsamplingOptions.TJSAMP_420, 95, TJFlags.NONE); } catch (ObjectDisposedException) { // 自动降级,保证业务不中断 SdkGlobal.DisableTurboJpegAcceleration(); return EncodeImage(mat); } catch (Exception ex) { // 4. 记录异常但不让采集线程崩掉 _sysLog.Error(ex, "[Perf] TurboJpeg 编码失败,请检查依赖或内存状态"); // 自动降级,保证业务不中断 SdkGlobal.DisableTurboJpegAcceleration(); return EncodeImage(mat); } } /// /// 释放资源 /// public override void Dispose() { GlobalStreamDispatcher.OnGlobalFrame -= ProcessFrame; if (_encoderPool.IsValueCreated) { // 严谨做法:由于 ThreadLocal 无法直接遍历销毁所有线程的实例, // 建议通过清理当前线程并由 GC 处理剩余部分,或在更高级的对象池中管理 Dispose。 _encoderPool.Dispose(); } base.Dispose(); } }