阶段性批量提交

This commit is contained in:
2026-01-05 14:54:06 +08:00
parent 917d76a87f
commit a697aab3e0
21 changed files with 1479 additions and 379 deletions

View File

@@ -0,0 +1,97 @@
using Microsoft.Extensions.Hosting;
using NetMQ;
using NetMQ.Sockets;
using Newtonsoft.Json;
using SHH.CameraSdk;
using System.Text;
namespace SHH.CameraService;
public class CommandClientWorker : BackgroundService
{
private readonly ServiceConfig _config;
public CommandClientWorker(ServiceConfig config)
{
_config = config;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 1. 如果不是主动/混合模式,不需要连接
if (!_config.ShouldConnect) return;
// ★★★ 核心修正:直接读取解析好的指令地址列表 ★★★
// 这些地址来自参数 --uris "IP,VideoPort&CommandPort" 中的 CommandPort 部分
var cmdUris = _config.CommandEndpoints;
if (cmdUris.Count == 0)
{
Console.WriteLine("[指令] 未在参数中找到指令通道地址(位于&符号右侧),跳过连接。");
return;
}
// 2. 初始化 Dealer Socket
using var dealer = new DealerSocket();
// 设置身份 (Identity),让 Dashboard 知道我是 "CameraApp_01"
string myIdentity = _config.AppId;
dealer.Options.Identity = Encoding.UTF8.GetBytes(myIdentity);
// 3. 连接所有目标 (支持多点控制)
foreach (var uri in cmdUris)
{
Console.WriteLine($"[指令] 连接控制端: {uri}");
dealer.Connect(uri);
}
// 4. 发送登录包 (握手)
// 构造心跳包
var heartbeat = new
{
Type = "Login",
Id = myIdentity,
Time = DateTime.Now
};
string loginJson = JsonConvert.SerializeObject(heartbeat);
// 注意Dealer Socket 发送是负载均衡的 (Round-Robin)。
// 如果连接了多个 DashboardSendFrame 一次只会发给其中一个。
// 为了确保所有 Dashboard 都能收到上线通知,我们根据连接数循环发送几次。
// (注:这只是初始化时的权宜之计,心跳包后续可以定时发送)
for (int i = 0; i < cmdUris.Count; i++)
{
dealer.SendFrame(loginJson);
await Task.Delay(10); // 稍微间隔,给 ZMQ 内部调度一点时间
}
Console.WriteLine($"[指令] 已发送登录包 (ID: {myIdentity}),进入监听循环...");
// 5. 监听循环
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 非阻塞接收 (500ms 超时),避免卡死线程
if (dealer.TryReceiveFrameString(TimeSpan.FromMilliseconds(500), out string msg))
{
Console.WriteLine($"[指令] 收到: {msg}");
// TODO: 在这里解析 JSON 并调用 CameraSDK 执行业务
// var cmd = JsonConvert.DeserializeObject<CommandModel>(msg);
// if (cmd.Action == "Reboot") ...
// 回复 ACK (确认收到)
// Dealer 会自动根据 Router 发来的 RoutingID 路由回去
dealer.SendFrame($"ACK: {msg} (From {myIdentity})");
}
}
catch (Exception ex)
{
Console.WriteLine($"[指令] 通信异常: {ex.Message}");
// 防止异常死循环刷屏
await Task.Delay(1000, stoppingToken);
}
}
}
}

View File

@@ -0,0 +1,91 @@
using SHH.CameraSdk;
using System.Collections.Concurrent;
namespace SHH.CameraService;
/// <summary>
/// 网络推流管理器
/// 职责:管理 ZeroMQ 推流任务的生命周期
/// 类似于 DisplayWindowManager它负责订阅数据并将其桥接到传输层
/// </summary>
public class NetworkStreamManager
{
private readonly VideoDataChannel _channel;
// 记录当前活跃的推流任务,防止重复订阅
private readonly ConcurrentDictionary<string, bool> _activeStreams = new();
public NetworkStreamManager(VideoDataChannel channel)
{
_channel = channel;
}
/// <summary>
/// 启动推流任务
/// </summary>
public void StartStream(string appId, long deviceId)
{
// 1. 防止重复启动
if (_activeStreams.ContainsKey(appId)) return;
// 2. 向全局分发器订阅精准数据
// 这里实现了业务逻辑的闭环:只有被 Manager 管理的任务才会消耗 CPU 去转码
GlobalStreamDispatcher.Subscribe(appId, deviceId, (frame) =>
{
// --- 这里的代码运行在分发线程中 ---
// A. 转码 (耗时操作封装在这里,不污染 Controller)
byte[] jpgBytes = EncodeFrameToJpg(frame);
if (jpgBytes != null && jpgBytes.Length > 0)
{
var payload = new VideoPayload
{
CameraId = appId, // 使用 AppId 作为 Topic (给 Dashboard 订阅用)
OriginalImageBytes = jpgBytes,
CaptureTime = DateTime.Now,
OriginalWidth = frame.TargetWidth,
OriginalHeight = frame.TargetHeight
};
// B. 写入传输通道
_ = _channel.WriteAsync(payload);
}
});
_activeStreams.TryAdd(appId, true);
Console.WriteLine($"[Network] 推流任务已启动: {appId} -> Device {deviceId}");
}
/// <summary>
/// 停止推流任务
/// </summary>
public void StopStream(string appId)
{
if (_activeStreams.TryRemove(appId, out _))
{
// 1. 从全局分发器注销
GlobalStreamDispatcher.Unsubscribe(appId);
Console.WriteLine($"[Network] 推流任务已停止: {appId}");
}
}
// --- 辅助方法 ---
private byte[] EncodeFrameToJpg(SmartFrame frame)
{
try
{
// 优先使用处理后的 TargetMat如果没有则用原始的 InternalMat
var mat = frame.TargetMat ?? frame.InternalMat;
if (mat != null && !mat.Empty())
{
// 80 质量平衡体积与画质
return mat.ImEncode(".jpg", new int[] { 1, 80 });
}
}
catch (Exception ex)
{
Console.WriteLine($"[Network] 转码失败: {ex.Message}");
}
return Array.Empty<byte>();
}
}

View File

@@ -0,0 +1,79 @@
using System.Diagnostics;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SHH.CameraSdk;
namespace SHH.CameraService;
public class ParentProcessSentinel : BackgroundService
{
private readonly ServiceConfig _config;
private readonly IHostApplicationLifetime _lifetime;
private readonly ILogger<ParentProcessSentinel> _logger;
public ParentProcessSentinel(
ServiceConfig config,
IHostApplicationLifetime lifetime,
ILogger<ParentProcessSentinel> logger)
{
_config = config;
_lifetime = lifetime;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int pid = _config.ParentPid;
// 如果 PID 为 0 或负数,说明不需要守护(可能是手动启动调试)
if (pid <= 0)
{
_logger.LogInformation("未指定有效的父进程 PID守护模式已禁用。");
return;
}
_logger.LogInformation($"父进程守护已启动,正在监控 PID: {pid}");
while (!stoppingToken.IsCancellationRequested)
{
if (!IsParentRunning(pid))
{
_logger.LogWarning($"[ALERT] 检测到父进程 (PID:{pid}) 已退出!正在终止当前服务...");
// 触发程序优雅退出
_lifetime.StopApplication();
// 强制跳出循环
break;
}
// 每 2 秒检查一次,避免 CPU 浪费
await Task.Delay(2000, stoppingToken);
}
}
private bool IsParentRunning(int pid)
{
try
{
// 尝试获取进程对象
var process = Process.GetProcessById(pid);
// 检查是否已退出
if (process.HasExited) return false;
return true;
}
catch (ArgumentException)
{
// GetProcessById 在找不到 PID 时会抛出 ArgumentException
// 说明进程已经不存在了
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "检查父进程状态时发生未知错误,默认为存活");
return true; // 发生未知错误时,保守起见认为它还活着
}
}
}

View File

@@ -2,7 +2,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.OpenApi.Models;
using SHH.CameraSdk; // 引用你的业务核心
using SHH.NetMQ;
namespace SHH.CameraService;
@@ -10,22 +9,26 @@ public class Program
{
public static async Task Main(string[] args)
{
#region --- 1. ---
int processId = 1;
// 从命令行参数解析进程ID默认1
if (args.Length > 0 && int.TryParse(args[0], out int pid))
processId = pid;
// 计算 Web 服务端口基础5000 + 进程ID偏移
int port = 5000 + (processId - 1);
// 缓冲时间 (您之前写了20000ms即20秒可能是为了附加调试器。如果觉得太慢可以改回 2000)
for(var i=1; i<10; i++)
Thread.Sleep(1000);
Console.Title = $"SHH Gateway - Instance #{processId} (Port: {port})";
// 1. 解析配置
var config = ServiceConfig.BuildFromArgs(args);
#endregion
#region --- 2. () ---
// ---【补全变量定义】---
// A. 补全 webPort (统一使用 config.BasePort)
int webPort = config.BasePort;
// B. 补全 processIdInt (用于 FileStorage 和 CameraSdk)
// 逻辑:尝试将 AppId 解析为数字;如果 AppId 是字符串(如"CameraApp_01"),则默认给 1或者根据 BasePort 推算
int processIdInt = config.NumericId;
Console.Title = $"SHH Gateway - {config.AppId} (Web: {webPort})";
#region --- 2. ---
InitHardwareEnv();
#endregion
@@ -34,161 +37,135 @@ public class Program
var builder = WebApplication.CreateBuilder(args);
#region --- A. ZeroMQ () ---
// ★★★ 核心:注入全局配置 ★★★
builder.Services.AddSingleton(config);
// 注册转发客户端(定向推送)
string zmqBind = $"tcp://*:{5555 + (processId - 1)}";
// -------------------------------------------------------------
// A. 注册新架构组件
// -------------------------------------------------------------
builder.Services.AddSingleton<VideoDataChannel>();
// ★★★ 新增:注册指令总线服务 ★★★
string zmqTarget = "tcp://127.0.0.1:6000";
// 推流服务 (连接 config.TargetClients 里的 :6002)
builder.Services.AddHostedService<ZeroMQBridgeWorker>();
// 注册转发客户端(定向推送)
builder.Services.AddSingleton(new ForwarderClient(zmqTarget));
// 指令客户端 (连接 config.TargetClients 里的 :6001)
builder.Services.AddHostedService<CommandClientWorker>();
// ★★★ 新增:注册指令总线服务 ★★★
builder.Services.AddHostedService<CommandBusService>();
// 进程守护
builder.Services.AddHostedService<ParentProcessSentinel>();
// 注册分发服务器(广播)
builder.Services.AddSingleton(new DistributorServer(zmqBind));
// -------------------------------------------------------------
// B. 注册 SDK 业务服务
// -------------------------------------------------------------
// 使用刚刚补全的 processIdInt
builder.Services.AddSingleton<IStorageService>(new FileStorageService(processIdInt));
#endregion
#region --- B. ---
// 注册文件存储服务(进程隔离)
builder.Services.AddSingleton<IStorageService>(new FileStorageService(processId));
// CameraManager 注册为单例,生命周期由 CameraEngineWorker 管理
builder.Services.AddSingleton<CameraManager>();
// 图像处理配置管理器(单例)
builder.Services.AddSingleton<ProcessingConfigManager>();
// 显示窗口管理器(单例)
builder.Services.AddSingleton<DisplayWindowManager>();
builder.Services.AddSingleton<NetworkStreamManager>();
#endregion
builder.Services.AddSingleton<ImageScaleCluster>(sp => new ImageScaleCluster(4, sp.GetRequiredService<ProcessingConfigManager>()));
builder.Services.AddSingleton<ImageEnhanceCluster>(sp => new ImageEnhanceCluster(4, sp.GetRequiredService<ProcessingConfigManager>()));
#region --- C. () ---
// 说明:通过责任链模式组装 Scale → Enhance 处理流程,确保顺序执行
// 1. 注册图像缩容集群并行度4
builder.Services.AddSingleton<ImageScaleCluster>(sp =>
{
var configManager = sp.GetRequiredService<ProcessingConfigManager>();
return new ImageScaleCluster(4, configManager);
});
// 2. 注册图像增强集群并行度4
builder.Services.AddSingleton<ImageEnhanceCluster>(sp =>
{
var configManager = sp.GetRequiredService<ProcessingConfigManager>();
return new ImageEnhanceCluster(4, configManager);
});
// 3. 注册管道配置服务(组装责任链)
builder.Services.AddHostedService<PipelineConfigurator>();
#endregion
// 使用补全的 processIdInt
builder.Services.AddCameraSdk(processIdInt);
#region --- D. Web ---
builder.Services.AddHostedService<CameraEngineWorker>();
builder.Services.AddSingleton<ConnectivitySentinel>();
// 注册控制器(加载 SDK 中的 CamerasController、MonitorController
builder.Services.AddControllers()
.AddApplicationPart(typeof(CamerasController).Assembly) // 加载 SDK 中的控制器
.AddApplicationPart(typeof(MonitorController).Assembly)
.AddControllersAsServices();
builder.Services.AddControllers().AddApplicationPart(typeof(CamerasController).Assembly);
builder.Services.AddControllers().AddApplicationPart(typeof(MonitorController).Assembly);
// 注册全局操作日志过滤器(捕获 API 操作日志)
builder.Services.AddScoped<UserActionFilter>();
// 注册 Swagger 文档区分实例ID
// -------------------------------------------------------------
// C. Web API 基础
// -------------------------------------------------------------
builder.Services.AddControllers().AddControllersAsServices();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Title = $"Gateway #{processId}",
Version = "v1"
});
// 【修正】使用 config.AppId
c.SwaggerDoc("v1", new OpenApiInfo { Title = $"Gateway {config.AppId}", Version = "v1" });
});
#endregion
#region --- E. (Worker) ---
// 1. 核心引擎工作者 (负责 StartAsync 和 ConfigureBusinessLogic)
builder.Services.AddHostedService<CameraEngineWorker>();
// 2.网络哨兵(负责断线重连)(监控设备断线重连,注册为单例)
builder.Services.AddSingleton<ConnectivitySentinel>();
// 3. ZeroMQ 桥梁服务(转发帧数据到外部系统)
builder.Services.AddHostedService<ZeroMqBridgeService>();
#endregion
#region --- F. CORS ---
builder.Services.AddCors(options =>
{
options.AddPolicy("AllowAll", policy =>
{
policy.AllowAnyOrigin().AllowAnyHeader().AllowAnyMethod();
});
});
#endregion
#endregion
#region --- 4. ---
builder.Services.AddCors(o => o.AddPolicy("AllowAll", p => p.AllowAnyOrigin().AllowAnyHeader().AllowAnyMethod()));
var app = builder.Build();
// 启用 Swagger 文档
//// =======================================================================
//// ★★★ 核心接入点:连接 [现有分发器] 与 [新推流通道] ★★★
//// =======================================================================
//// 1. 获取刚刚注册的数据通道
//var videoChannel = app.Services.GetRequiredService<VideoDataChannel>();
////var config = app.Services.GetRequiredService<ServiceConfig>();
//// 2. 订阅你现有的全局事件 (这里就是“取货”的地方)
//// 每当 HikVideoSource 采集到一帧并调用 Dispatch 时,这里就会触发
//GlobalStreamDispatcher.OnGlobalFrame += (deviceId, smartFrame) =>
//{
// // 3. 数据处理:将 OpenCvSharp Mat 转为 JPG 字节流 (网络传输必须压缩)
// byte[] jpgData = EncodeToJpg(smartFrame);
// if (jpgData != null && jpgData.Length > 0)
// {
// // 4. 封装载荷
// var payload = new VideoPayload
// {
// // 使用 AppId 或 DeviceId 作为标识
// CameraId = config.AppId,
// OriginalImageBytes = jpgData,
// CaptureTime = DateTime.Now,
// OriginalWidth = smartFrame.TargetWidth,
// OriginalHeight = smartFrame.TargetHeight
// };
// // 5. 扔进通道 (Fire-and-Forget不阻塞你原来的显示逻辑)
// // WriteAsync 是 ValueTask这里忽略等待追求最高吞吐
// _ = videoChannel.WriteAsync(payload);
// }
//};
//Console.WriteLine("[System] 全局流已桥接到 ZeroMQ 推流通道");
app.UseSwagger();
app.UseSwaggerUI();
// 启用 CORS 策略
app.UseCors("AllowAll");
// 映射控制器路由
app.MapControllers();
// 输出启动信息
Console.WriteLine($"[System] 绑定 Web 端口: {port}");
Console.WriteLine($"[System] 绑定 ZMQ 端口: {zmqBind}");
// 【修正】使用 webPort
Console.WriteLine($"[System] Web API 已启动: http://0.0.0.0:{webPort}");
// 启动 Web 应用
await app.RunAsync($"http://0.0.0.0:{port}");
await app.RunAsync($"http://0.0.0.0:{webPort}");
#endregion
}
#region --- ---
/// <summary>
/// 初始化硬件环境(海康 SDK 预热)
/// </summary>
static void InitHardwareEnv()
{
Console.WriteLine("=== 工业级视频 SDK 架构测试 (V3.5 框架版) ===");
Console.WriteLine("[硬件] 海康驱动预热中...");
Console.WriteLine("=== 工业级视频接入服务启动 ===");
}
/// <summary>
/// 内存转码Mat -> Jpg Bytes
/// </summary>
static byte[] EncodeToJpg(SmartFrame frame)
{
try
{
// 初始化海康 SDK
HikNativeMethods.NET_DVR_Init();
// 强制预热播放库(避免首次取流延迟)
HikSdkManager.ForceWarmUp();
Console.WriteLine("[硬件] 预热完成。");
// 假设 SmartFrame 内部持有 OpenCvSharp.Mat 类型的 InternalMat
if (frame != null && frame.InternalMat != null && !frame.InternalMat.Empty())
{
// 80 是 JPG 质量参数,平衡画质与带宽
return frame.InternalMat.ImEncode(".jpg", new int[] { 1, 80 });
}
}
catch (Exception ex)
catch
{
Console.WriteLine($"[硬件] 预热失败: {ex.Message}");
// 不抛出异常,允许程序在无 DLL 环境下调试
// 容错处理,防止一帧损坏导致程序崩溃
}
return Array.Empty<byte>();
}
#endregion
}

View File

@@ -0,0 +1,63 @@
using System.Threading.Channels;
namespace SHH.CameraService;
/// <summary>
/// 视频数据高速通道
/// <para>作用:解耦 采集线程(Producer) 和 发送线程(Consumer)</para>
/// <para>特性:使用 BoundedChannel当网络发送慢时自动丢弃旧帧(DropOldest),防止内存溢出。</para>
/// </summary>
public class VideoDataChannel
{
// 创建一个有限容量的通道 (容量 5)
// 如果发送端太慢这就满了DropOldest 会丢弃最旧的帧,保证实时性
private readonly Channel<VideoPayload> _channel = Channel.CreateBounded<VideoPayload>(
new BoundedChannelOptions(5)
{
FullMode = BoundedChannelFullMode.DropOldest, // 核心策略:丢弃旧帧
SingleReader = true, // 只有一个 ZeroMQWorker 在读
SingleWriter = false //可能有多个相机线程在写
});
// ★★★ 新增:公开 Reader 属性,让外部可以直接调用 ReadAsync ★★★
public ChannelReader<VideoPayload> Reader => _channel.Reader;
/// <summary>
/// 写入数据 (生产者调用)
/// </summary>
public ValueTask WriteAsync(VideoPayload payload)
{
return _channel.Writer.WriteAsync(payload);
}
/// <summary>
/// 读取数据流 (消费者调用)
/// </summary>
public IAsyncEnumerable<VideoPayload> ReadAllAsync(CancellationToken ct)
{
return _channel.Reader.ReadAllAsync(ct);
}
}
// 附带:如果您的项目中还没有定义 VideoPayload这里是一个最小实现
// 如果 SHH.Contracts 中已有,请忽略此类
public class VideoPayload
{
/// <summary> 相机唯一标识 </summary>
public string CameraId { get; set; } = string.Empty;
/// <summary> 采集时间 </summary>
public DateTime CaptureTime { get; set; }
/// <summary> 发送时间 </summary>
public DateTime DispatchTime { get; set; }
/// <summary> 原始宽 </summary>
public int OriginalWidth { get; set; }
/// <summary> 原始高 </summary>
public int OriginalHeight { get; set; }
/// <summary> 已编码的图片数据 (JPG) </summary>
public byte[] OriginalImageBytes { get; set; } = Array.Empty<byte>();
}

View File

@@ -0,0 +1,87 @@
using Microsoft.Extensions.Hosting;
using NetMQ;
using NetMQ.Sockets;
using SHH.CameraSdk;
namespace SHH.CameraService;
public class ZeroMQBridgeWorker : BackgroundService
{
private readonly ServiceConfig _config;
private readonly VideoDataChannel _channel; // 数据源
public ZeroMQBridgeWorker(ServiceConfig config, VideoDataChannel channel)
{
_config = config;
_channel = channel;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 1. 如果不是主动/混合模式,不需要连接
if (!_config.ShouldConnect) return;
// ★★★ 核心修正:直接读取解析好的视频地址列表 ★★★
// 这些地址来自参数 --uris "IP,VideoPort&CommandPort" 中的 VideoPort 部分 (符号左边)
var streamUris = _config.VideoEndpoints;
if (streamUris.Count == 0)
{
Console.WriteLine("[推流] 未在参数中找到视频通道地址(位于&符号左侧),跳过连接。");
return;
}
// 2. 初始化 Publisher Socket
// 特点:只需 Send 一次,底层会自动分发给所有 Connect 的 Dashboard
using var pubSocket = new PublisherSocket();
// 设置发送高水位 (HWM)
// 防止网络拥塞或接收端处理慢时内存无限增长。超过50帧积压就开始丢弃旧帧。
pubSocket.Options.SendHighWatermark = 50;
// 3. 连接所有视频目标
foreach (var uri in streamUris)
{
Console.WriteLine($"[推流] 连接视频接收端: {uri}");
pubSocket.Connect(uri);
}
Console.WriteLine($"[推流] 服务就绪 (AppId: {_config.AppId}),等待视频帧...");
// 4. 推流循环
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 从通道读取最新帧 (支持异步等待)
// 注意:这里使用了之前 VideoDataChannel 暴露出来的 Reader 属性
var payload = await _channel.Reader.ReadAsync(stoppingToken);
// 简单校验
if (payload == null || payload.OriginalImageBytes == null || payload.OriginalImageBytes.Length == 0)
continue;
// 构造 Topic (通常用 AppId 作为 Topic这样 Dashboard 可以按需订阅)
string topic = _config.AppId;
// 发送两帧:[Topic] [ImageBytes]
// 这样 Dashboard 的 Subscriber 可以通过 Subscribe(topic) 来过滤
pubSocket.SendMoreFrame(topic)
.SendFrame(payload.OriginalImageBytes);
// 调试日志 (生产环境建议注释掉,否则刷屏)
// Console.WriteLine($"[推流] Sent {payload.OriginalImageBytes.Length} bytes");
}
catch (OperationCanceledException)
{
break; // 正常退出
}
catch (Exception ex)
{
Console.WriteLine($"[推流] 发送异常: {ex.Message}");
// 发生错误稍微停顿,防止死循环占用 CPU
await Task.Delay(1000, stoppingToken);
}
}
}
}