diff --git a/Ayay.Solution.sln b/Ayay.Solution.sln index 1583076..0bedb48 100644 --- a/Ayay.Solution.sln +++ b/Ayay.Solution.sln @@ -5,6 +5,12 @@ VisualStudioVersion = 18.1.11304.174 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.CameraSdk", "SHH.CameraSdk\SHH.CameraSdk.csproj", "{21B70A94-43FC-4D17-AB83-9E4B5178397E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.CameraService", "SHH.CameraService\SHH.CameraService.csproj", "{033B348B-4588-4C81-8D6C-D953E8E7967B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.Contracts", "SHH.Contracts\SHH.Contracts.csproj", "{E7A63644-7A55-4267-99D2-7D0A7D54B43C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SHH.NetMQ", "SHH.NetMQ\SHH.NetMQ.csproj", "{FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -15,6 +21,18 @@ Global {21B70A94-43FC-4D17-AB83-9E4B5178397E}.Debug|Any CPU.Build.0 = Debug|Any CPU {21B70A94-43FC-4D17-AB83-9E4B5178397E}.Release|Any CPU.ActiveCfg = Release|Any CPU {21B70A94-43FC-4D17-AB83-9E4B5178397E}.Release|Any CPU.Build.0 = Release|Any CPU + {033B348B-4588-4C81-8D6C-D953E8E7967B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {033B348B-4588-4C81-8D6C-D953E8E7967B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {033B348B-4588-4C81-8D6C-D953E8E7967B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {033B348B-4588-4C81-8D6C-D953E8E7967B}.Release|Any CPU.Build.0 = Release|Any CPU + {E7A63644-7A55-4267-99D2-7D0A7D54B43C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E7A63644-7A55-4267-99D2-7D0A7D54B43C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E7A63644-7A55-4267-99D2-7D0A7D54B43C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E7A63644-7A55-4267-99D2-7D0A7D54B43C}.Release|Any CPU.Build.0 = Release|Any CPU + {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FAC8E0CD-4BB3-4752-A406-CD3D2CE5FBB4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/SHH.CameraSdk/Core/Memory/SmartFrame.cs b/SHH.CameraSdk/Core/Memory/SmartFrame.cs index c0ca447..b0c52ca 100644 --- a/SHH.CameraSdk/Core/Memory/SmartFrame.cs +++ b/SHH.CameraSdk/Core/Memory/SmartFrame.cs @@ -9,6 +9,8 @@ namespace SHH.CameraSdk; /// public class SmartFrame : IDisposable { + public List SubscriberIds { get; } = new List(16); + #region --- 私有资源与状态 (Private Resources & States) --- /// 所属帧池:用于引用归零后自动回收复用 @@ -114,6 +116,10 @@ public class SmartFrame : IDisposable TargetMat = null; } ScaleType = FrameScaleType.None; + + // 2. [核心逻辑] 清空订阅者列表 + // 注意:Clear() 只是把 Count 设为 0,底层数组容量不变,不会触发 GC + SubscriberIds.Clear(); } #endregion diff --git a/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs b/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs index 4417280..521ae79 100644 --- a/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs +++ b/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs @@ -30,6 +30,38 @@ public static class GlobalStreamDispatcher #endregion + // ================================================================= + // 1. 新增:真正的全局广播总线 (上帝模式) + // 任何订阅了这个事件的人,都能收到【所有设备】的每一帧 + // ================================================================= + public static event Action OnGlobalFrame; + + // ================================================================= + // 2. 原有:定向分发逻辑 (保留不动,给图像处理集群用) + // ================================================================= + // private static ConcurrentDictionary _subscribers ... + + /// + /// 统一入口:驱动层调用此方法分发图像 + /// + public static void Dispatch(long deviceId, SmartFrame frame) + { + // A. 优先触发全局广播 (给 ZeroMQ 用) + try + { + // ?.Invoke 是线程安全的,如果设备被删除了,驱动层不调用 Dispatch,这里自然就不会触发 + // 如果新设备增加了,驱动层开始调用 Dispatch,这里自动就会触发 + OnGlobalFrame?.Invoke(deviceId, frame); + } + catch (Exception ex) + { + Console.WriteLine($"[GlobalBus Error] 广播异常: {ex.Message}"); + } + + // B. 执行你原有的定向分发逻辑 (给处理链用) + // DispatchToTargets(deviceId, frame); + } + #region --- 2. 动态路由表 (Dynamic Routing Table) --- /// diff --git a/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs b/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs index 9b30d2e..58fd251 100644 --- a/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs +++ b/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs @@ -377,6 +377,13 @@ public class HikVideoSource : BaseVideoSource, Cv2.CvtColor(rawYuvWrapper, smartFrame.InternalMat, ColorConversionCodes.YUV2BGR_YV12); } + // ========================================================================= + // 【新增】插入这一行! + // 此时 smartFrame.InternalMat 已经有了图像数据 + // 我们把它交给全局分发器,触发 ZeroMQ 广播 + // ========================================================================= + GlobalStreamDispatcher.Dispatch(Id, smartFrame); + // 4. [分发] 将决策结果传递给处理中心 // decision.TargetAppIds 包含了 "谁需要这一帧" 的信息 //GlobalProcessingCenter.Submit(this.Id, smartFrame, decision); diff --git a/SHH.CameraSdk/SHH.CameraSdk.csproj b/SHH.CameraSdk/SHH.CameraSdk.csproj index acbfbe2..650444d 100644 --- a/SHH.CameraSdk/SHH.CameraSdk.csproj +++ b/SHH.CameraSdk/SHH.CameraSdk.csproj @@ -6,6 +6,7 @@ enable enable AnyCPU + D:\Codes\Ayay\SHH.CameraService\bin diff --git a/SHH.CameraService/HikCameraWorker.cs b/SHH.CameraService/HikCameraWorker.cs new file mode 100644 index 0000000..81bad92 --- /dev/null +++ b/SHH.CameraService/HikCameraWorker.cs @@ -0,0 +1,90 @@ +using SHH.Contracts; // 引用契约 + +namespace SHH.CameraService +{ + /// + /// 摄像头工作者 + /// 职责:管理海康SDK生命周期,产出 VideoPayload 数据流 + /// + public class HikCameraWorker : IDisposable + { + // 定义一个事件:当产生新图片时触发 + // 参数是我们的标准快递盒 VideoPayload + public event Action OnNewFrame; + + private bool _isRunning = false; + private string _ip; + + public HikCameraWorker(string ip) + { + _ip = ip; + } + + /// + /// 启动取流 + /// + public void Start() + { + if (_isRunning) return; + + // TODO: 【在此处填入海康 SDK 初始化代码】 + // CHCNetSDK.NET_DVR_Init(); + // CHCNetSDK.NET_DVR_Login_V40(...); + // CHCNetSDK.NET_DVR_RealPlay_V40(...); + + Console.WriteLine($"[HikWorker] 摄像头 {_ip} 已启动,开始取流..."); + _isRunning = true; + + // 模拟一个后台线程不断产出视频帧 (仅用于演示架构) + // 实际中,这里应该是海康的 RealDataCallBack 函数 + Task.Run(() => MockCaptureLoop()); + } + + /// + /// 停止取流 + /// + public void Stop() + { + _isRunning = false; + // TODO: 【在此处填入海康 SDK 释放代码】 + // CHCNetSDK.NET_DVR_StopRealPlay(...); + // CHCNetSDK.NET_DVR_Logout(...); + Console.WriteLine($"[HikWorker] 摄像头 {_ip} 已停止。"); + } + + /// + /// 模拟抓图循环 (实际开发中请替换为 SDK 回调函数) + /// + private void MockCaptureLoop() + { + while (_isRunning) + { + // 1. 模拟拿到了一张 JPG 图片 (假设 100KB) + byte[] mockJpg = new byte[1024 * 100]; + + // 2. 立即封装成标准包 + var payload = new VideoPayload + { + CameraId = _ip, // 使用 IP 或 ID 作为标记 + CaptureTime = DateTime.Now, + OriginalWidth = 1920, + OriginalHeight = 1080, + OriginalImageBytes = mockJpg, // 填入原始数据 + TargetImageBytes = null // SDK 只产出原图,还没有处理图 + }; + + // 3. 【核心】触发事件,把包扔给上层 (主程序) + // ?.Invoke 确保如果没有人订阅,不会报错 + OnNewFrame?.Invoke(payload); + + // 模拟 25fps (每40ms一帧) + Thread.Sleep(40); + } + } + + public void Dispose() + { + Stop(); + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Program.cs b/SHH.CameraService/Program.cs new file mode 100644 index 0000000..8f13c69 --- /dev/null +++ b/SHH.CameraService/Program.cs @@ -0,0 +1,250 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.OpenApi.Models; +using SHH.CameraSdk; // 引用你的业务核心 +using SHH.NetMQ; + +namespace SHH.CameraService; + +public class Program +{ + public static async Task Main(string[] args) + { + // ============================================================= + // 1. 端口与身份计算 + // ============================================================= + int processId = 1; + if (args.Length > 0 && int.TryParse(args[0], out int pid)) processId = pid; + int port = 5000 + (processId - 1); + + Console.Title = $"SHH Gateway - Instance #{processId} (Port: {port})"; + + // ============================================================= + // 2. 硬件环境预热 (【重要】必须在一切开始前调用) + // ============================================================= + InitHardwareEnv(); + + // ============================================================= + // 3. 构建 WebHost + // ============================================================= + var builder = WebApplication.CreateBuilder(args); + + // --- A. 注册 ZeroMQ 组件 (传输层) --- + string zmqBind = $"tcp://*:{5555 + (processId - 1)}"; + string zmqTarget = "tcp://127.0.0.1:6000"; + + builder.Services.AddSingleton(new DistributorServer(zmqBind)); + builder.Services.AddSingleton(new ForwarderClient(zmqTarget)); + + // --- B. 注册核心业务服务 --- + builder.Services.AddSingleton(new FileStorageService(processId)); + // CameraManager 注册为单例,生命周期由 CameraEngineWorker 管理 + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + + // --- C. 注册图像处理集群 (修复版) --- + // 我们需要确保 ImageScaleCluster 和 ImageEnhanceCluster 都能被独立注入, + // 同时它们之间又要建立链式关系。我们使用一个专门的 HostedService 来做连接。 + + // 1. 注册 Scale 实例 + builder.Services.AddSingleton(sp => + { + var config = sp.GetRequiredService(); + return new ImageScaleCluster(4, config); + }); + + // 2. 注册 Enhance 实例 + builder.Services.AddSingleton(sp => + { + var config = sp.GetRequiredService(); + return new ImageEnhanceCluster(4, config); + }); + + // 3. 注册一个启动服务来连接这两个集群 (Chain of Responsibility) + builder.Services.AddHostedService(); + + + // --- D. 注册 Web 基础服务 --- + builder.Services.AddControllers() + .AddApplicationPart(typeof(CamerasController).Assembly) // 加载 SDK 中的控制器 + .AddApplicationPart(typeof(MonitorController).Assembly) + .AddControllersAsServices(); + + // 注册全局操作日志过滤器 (防止 500 错误) + builder.Services.AddScoped(); + + builder.Services.AddEndpointsApiExplorer(); + builder.Services.AddSwaggerGen(c => + { + c.SwaggerDoc("v1", new OpenApiInfo { Title = $"Gateway #{processId}", Version = "v1" }); + }); + + // --- E. 注册后台服务 (Worker) --- + + // 1. 核心引擎工作者 (负责 StartAsync 和 ConfigureBusinessLogic) + builder.Services.AddHostedService(); + + // 2. 网络哨兵 (负责断线重连) + // 假设 ConnectivitySentinel 实现了 IHostedService 或者它是一个简单的类 + // 如果它实现了 IHostedService: + // builder.Services.AddHostedService(); + // 如果它只是一个普通类,需要在 CameraEngineWorker 里启动它,或者注册为单例并手动启动 + // 这里假设我们需要显式注册它以便让它工作: + builder.Services.AddSingleton(); // 注册单例 + // 注意:ConnectivitySentinel 的启动逻辑我们放到 CameraEngineWorker 里去调用 + + // 3. ZeroMQ 桥梁 + builder.Services.AddHostedService(); + + // 4. 配置 CORS + builder.Services.AddCors(options => + { + options.AddPolicy("AllowAll", policy => + { + policy.AllowAnyOrigin().AllowAnyHeader().AllowAnyMethod(); + }); + }); + + // ============================================================= + // 4. 启动应用 + // ============================================================= + var app = builder.Build(); + + app.UseSwagger(); + app.UseSwaggerUI(); + app.UseCors("AllowAll"); // 启用 CORS + app.MapControllers(); + + Console.WriteLine($"[System] 绑定 Web 端口: {port}"); + Console.WriteLine($"[System] 绑定 ZMQ 端口: {zmqBind}"); + + await app.RunAsync($"http://0.0.0.0:{port}"); + } + + static void InitHardwareEnv() + { + Console.WriteLine("=== 工业级视频 SDK 架构测试 (V3.5 框架版) ==="); + Console.WriteLine("[硬件] 海康驱动预热中..."); + try + { + HikNativeMethods.NET_DVR_Init(); + HikSdkManager.ForceWarmUp(); + Console.WriteLine("[硬件] 预热完成。"); + } + catch (Exception ex) + { + Console.WriteLine($"[硬件] 预热失败: {ex.Message}"); + // 不抛出异常,允许程序继续尝试启动(可能是在无 DLL 环境调试) + } + } +} + +/// +/// 负责图像处理管道的组装 (Scale -> Enhance -> Global) +/// +public class PipelineConfigurator : IHostedService +{ + private readonly ImageScaleCluster _scale; + private readonly ImageEnhanceCluster _enhance; + + public PipelineConfigurator(ImageScaleCluster scale, ImageEnhanceCluster enhance) + { + _scale = scale; + _enhance = enhance; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + // 建立责任链: Scale -> Enhance + _scale.SetNext(_enhance); + + // 挂载到全局路由 (驱动层回调会把流推给 Scale) + GlobalPipelineRouter.SetProcessor(_scale); + + Console.WriteLine("[Pipeline] 图像处理链组装完成: Scale -> Enhance"); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} + +/// +/// 负责 CameraManager 的生命周期管理和业务初始化 +/// +public class CameraEngineWorker : BackgroundService +{ + private readonly CameraManager _manager; + private readonly ConnectivitySentinel _sentinel; // 注入哨兵 + + public CameraEngineWorker(CameraManager manager, ConnectivitySentinel sentinel) + { + _manager = manager; + _sentinel = sentinel; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + Console.WriteLine("[Engine] 正在启动摄像头管理器..."); + + // 1. 启动管理器 (加载文件配置) + await _manager.StartAsync(); + + // 2. 启动哨兵 (开始监控断线) + // 假设 ConnectivitySentinel 有一个 Start 或类似的方法,如果没有,说明它在构造函数里就启动了 timers + // _sentinel.Start(); + + // 3. 加载默认业务逻辑 (添加测试设备) + await ConfigureBusinessLogic(_manager); + + Console.WriteLine("[Engine] 业务逻辑加载完成。"); + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + Console.WriteLine("[Engine] 正在停止..."); + await _manager.DisposeAsync(); + await base.StopAsync(cancellationToken); + } + + // 以前 Program 类里的静态方法,现在移到这里 + private async Task ConfigureBusinessLogic(CameraManager manager) + { + try + { + //// 检查是否已经有设备了,如果没有才添加默认的 + //if (manager.GetAllCameras().Any()) return; + + Console.WriteLine("[Engine] 检测到空配置,正在添加默认测试设备..."); + + var config = new VideoSourceConfig + { + Id = 101, + Brand = DeviceBrand.HikVision, + IpAddress = "192.168.5.9", + Port = 8000, + Username = "admin", + Password = "RRYFOA", + StreamType = 0 + }; + manager.AddDevice(config); + + var config2 = new VideoSourceConfig + { + Id = 102, + Brand = DeviceBrand.HikVision, + IpAddress = "172.16.41.20", + Port = 8000, + Username = "admin", + Password = "abcd1234", + StreamType = 0 + }; + manager.AddDevice(config2); + } + catch (Exception ex) + { + Console.WriteLine($"[Engine] 添加默认设备失败: {ex.Message}"); + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/SHH.CameraService.csproj b/SHH.CameraService/SHH.CameraService.csproj new file mode 100644 index 0000000..ef527a6 --- /dev/null +++ b/SHH.CameraService/SHH.CameraService.csproj @@ -0,0 +1,20 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + diff --git a/SHH.CameraService/ZeroMqBridgeService.cs b/SHH.CameraService/ZeroMqBridgeService.cs new file mode 100644 index 0000000..cfcd903 --- /dev/null +++ b/SHH.CameraService/ZeroMqBridgeService.cs @@ -0,0 +1,75 @@ +using Microsoft.Extensions.Hosting; +using OpenCvSharp; +using SHH.Contracts; +using SHH.NetMQ; + +namespace SHH.CameraSdk +{ + public class ZeroMqBridgeService : BackgroundService + { + private readonly DistributorServer _distributor; + private readonly ForwarderClient _forwarder; + + public ZeroMqBridgeService(DistributorServer distributor, ForwarderClient forwarder) + { + _distributor = distributor; + _forwarder = forwarder; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + Console.WriteLine("[Bridge] 正在连接全局广播总线..."); + + // 【关键修改】直接订阅静态的全局事件 + // 不需要传入 APP_ID,因为这是 C# 原生事件,不是字典查找 + GlobalStreamDispatcher.OnGlobalFrame += BridgeHandler; + + Console.WriteLine("[Bridge] 全局总线连接成功!任何动态增删的设备都会自动转发。"); + return Task.CompletedTask; + } + + // 真正的事件处理函数 + private void BridgeHandler(long deviceId, SmartFrame frame) + { + try + { + // 1. 安全检查 + var sourceMat = frame.TargetMat ?? frame.InternalMat; + if (sourceMat == null || sourceMat.Empty()) return; + + // 2. 内存克隆 (Deep Copy) - 这一步不能省 + using var safeMat = sourceMat.Clone(); + + // 3. 编码 & 封装 + // 建议:可以在这里判断一下 deviceId,如果某些设备不想发,可以在这里 return + var jpgParams = new int[] { (int)ImwriteFlags.JpegQuality, 70 }; + byte[] jpgBytes = safeMat.ImEncode(".jpg", jpgParams); + + var payload = new VideoPayload + { + CameraId = deviceId.ToString(), + CaptureTime = DateTime.Now, + DispatchTime = DateTime.Now, + OriginalWidth = safeMat.Width, + OriginalHeight = safeMat.Height, + OriginalImageBytes = jpgBytes + }; + + // 4. 发射 + _distributor.Broadcast(payload); + _forwarder.Push(payload); + } + catch (Exception ex) + { + // Console.WriteLine(ex.Message); // 生产环境建议注释掉,防止日志刷屏 + } + } + + public override Task StopAsync(CancellationToken cancellationToken) + { + // 优雅退订,防止内存泄漏 + GlobalStreamDispatcher.OnGlobalFrame -= BridgeHandler; + return base.StopAsync(cancellationToken); + } + } +} \ No newline at end of file diff --git a/SHH.Contracts/SHH.Contracts.csproj b/SHH.Contracts/SHH.Contracts.csproj new file mode 100644 index 0000000..eeda231 --- /dev/null +++ b/SHH.Contracts/SHH.Contracts.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.0 + + + + + + + diff --git a/SHH.Contracts/VideoPayload.cs b/SHH.Contracts/VideoPayload.cs new file mode 100644 index 0000000..3c80b17 --- /dev/null +++ b/SHH.Contracts/VideoPayload.cs @@ -0,0 +1,82 @@ +using System; +using Newtonsoft.Json; + +namespace SHH.Contracts +{ + /// + /// 视频数据传输契约(增强版) + /// + public class VideoPayload + { + // ========================================== + // 1. 基础元数据 (将被序列化到 JSON) + // ========================================== + + public string CameraId { get; set; } // 摄像头唯一标记 + + // 时间信息 (建议使用 DateTime,调试看日志更直观) + public DateTime CaptureTime { get; set; } // 采集时间 (SDK产生图的时间) + public DateTime DispatchTime { get; set; } // 分发时间 (Server发出图的时间) + + // ========================================== + // 2. 图像规格信息 + // ========================================== + + public int OriginalWidth { get; set; } // 原始宽度 + public int OriginalHeight { get; set; } // 原始高度 + + public int TargetWidth { get; set; } // 目标/处理后宽度 + public int TargetHeight { get; set; } // 目标/处理后高度 + + // ========================================== + // 3. 核心二进制数据 (严禁序列化到 JSON) + // ========================================== + + /// + /// 原始图像数据 (例如海康SDK出来的原始 JPG) + /// JsonIgnore 防止误操作导致序列化性能崩塌 + /// + [JsonIgnore] + public byte[] OriginalImageBytes { get; set; } + + /// + /// 处理后的目标图像 (例如 Yolo 画框后的图,或者缩放后的图) + /// 可为空 + /// + [JsonIgnore] + public byte[] TargetImageBytes { get; set; } + + // ========================================== + // 4. 辅助方法 + // ========================================== + + /// + /// 仅获取元数据的 JSON 字符串 + /// + public string GetMetadataJson() + { + // 创建一个纯净的匿名对象用于序列化 + var meta = new + { + CameraId, + CaptureTime, + DispatchTime, + OriginalWidth, + OriginalHeight, + TargetWidth, + TargetHeight, + // 标记一下是否有目标图,方便接收端判断要不要读第3帧 + HasTargetImage = (TargetImageBytes != null && TargetImageBytes.Length > 0) + }; + return JsonConvert.SerializeObject(meta); + } + + /// + /// 从 JSON 还原元数据 (还原出来的对象 ImageBytes 默认为空,需后续填充) + /// + public static VideoPayload FromMetadataJson(string json) + { + return JsonConvert.DeserializeObject(json); + } + } +} \ No newline at end of file diff --git a/SHH.NetMQ/DistributorServer.cs b/SHH.NetMQ/DistributorServer.cs new file mode 100644 index 0000000..00b70b7 --- /dev/null +++ b/SHH.NetMQ/DistributorServer.cs @@ -0,0 +1,71 @@ +using System; +using NetMQ; +using NetMQ.Sockets; +using SHH.Contracts; + +namespace SHH.NetMQ +{ + /// + /// 视频分发服务端 (Publisher) + /// 特性:非阻塞、防内存溢出 + /// + public class DistributorServer : IDisposable + { + private PublisherSocket _pubSocket; + private readonly object _lock = new object(); + + // 配置:高水位限制 (HWM) + // 假设 25fps,设置 50 意味着内存只缓存 2 秒的视频。 + // 如果断网超过 2 秒,新来的视频帧直接丢弃,优先保证恢复后的实时性。 + private const int HWM_LIMIT = 50; + + public DistributorServer(string connectionString) + { + _pubSocket = new PublisherSocket(); + + // 1. 设置发送缓冲区大小 (防爆内存关键) + _pubSocket.Options.SendHighWatermark = HWM_LIMIT; + + // 2. 绑定地址 (如 tcp://*:5555) + _pubSocket.Bind(connectionString); + } + + public void Broadcast(VideoPayload payload) + { + if (payload == null) return; + + // 补充发送时间 + payload.DispatchTime = DateTime.Now; + + // 准备数据帧 + string jsonMeta = payload.GetMetadataJson(); + byte[] rawBytes = payload.OriginalImageBytes ?? new byte[0]; + byte[] targetBytes = payload.TargetImageBytes ?? new byte[0]; + + // 使用 NetMQMessage 封装多帧消息 + // 这样比手动调三次 Send 更容易管理原子性 + var msg = new NetMQMessage(); + msg.Append(jsonMeta); // 第1帧 + msg.Append(rawBytes); // 第2帧 + msg.Append(targetBytes); // 第3帧 + + lock (_lock) + { + // 3. 非阻塞发送 (核心防卡死代码) + // TimeSpan.Zero 表示:如果缓冲区满了或者发不出去,立即放弃,不等待,返回 false + // 这样你的主线程(海康SDK回调)永远不会被卡住 + bool sent = _pubSocket.TrySendMultipartMessage(TimeSpan.Zero, msg); + + if (!sent) + { + // 这里可以打个日志:Console.WriteLine("警告:网络拥堵,丢帧中..."); + } + } + } + + public void Dispose() + { + _pubSocket?.Dispose(); + } + } +} \ No newline at end of file diff --git a/SHH.NetMQ/ForwarderClient.cs b/SHH.NetMQ/ForwarderClient.cs new file mode 100644 index 0000000..9155623 --- /dev/null +++ b/SHH.NetMQ/ForwarderClient.cs @@ -0,0 +1,69 @@ +using System; +using NetMQ; +using NetMQ.Sockets; +using SHH.Contracts; + +namespace SHH.NetMQ +{ + /// + /// 视频转发客户端 (Pusher) + /// 特性:主动推送、断线重连、非阻塞 + /// + public class ForwarderClient : IDisposable + { + private PushSocket _pushSocket; + private readonly object _lock = new object(); + private bool _isInitialized = false; + + // 同样设置 50 帧的缓存限制 + private const int HWM_LIMIT = 50; + + public ForwarderClient(string remoteAddress) + { + if (string.IsNullOrEmpty(remoteAddress)) return; + + _pushSocket = new PushSocket(); + + // 1. 防堆积设置 + _pushSocket.Options.SendHighWatermark = HWM_LIMIT; + + try + { + // NetMQ 会自动在后台处理重连,无需人工干预 + _pushSocket.Connect(remoteAddress); + _isInitialized = true; + } + catch (Exception ex) + { + Console.WriteLine($"[Client Error] 连接失败: {ex.Message}"); + _isInitialized = false; + } + } + + public void Push(VideoPayload payload) + { + if (!_isInitialized || payload == null) return; + + if (payload.DispatchTime == DateTime.MinValue) + payload.DispatchTime = DateTime.Now; + + var msg = new NetMQMessage(); + msg.Append(payload.GetMetadataJson()); + msg.Append(payload.OriginalImageBytes ?? new byte[0]); + msg.Append(payload.TargetImageBytes ?? new byte[0]); + + lock (_lock) + { + // 2. 非阻塞推送 + // 如果对方挂了,或者网络断了,缓冲区满后这里的 TrySend 会立即返回 false + // 保证 SDK 采集不受影响 + bool sent = _pushSocket.TrySendMultipartMessage(TimeSpan.Zero, msg); + } + } + + public void Dispose() + { + _pushSocket?.Dispose(); + } + } +} \ No newline at end of file diff --git a/SHH.NetMQ/SHH.NetMQ.csproj b/SHH.NetMQ/SHH.NetMQ.csproj new file mode 100644 index 0000000..ebb7747 --- /dev/null +++ b/SHH.NetMQ/SHH.NetMQ.csproj @@ -0,0 +1,16 @@ + + + + netstandard2.0 + $(NoWarn);NU1701 + + + + + + + + + + +