新通讯图像协议对接成功

This commit is contained in:
2026-01-15 11:04:38 +08:00
parent 81580a8f55
commit 8ef8139382
20 changed files with 237 additions and 593 deletions

View File

@@ -0,0 +1,177 @@
using Newtonsoft.Json.Linq;
using SHH.CameraSdk;
using SHH.Contracts;
namespace SHH.CameraService;
/// <summary>
/// 同步设备配置处理器
/// </summary>
public class DeviceConfigHandler : ICommandHandler
{
private readonly CameraManager _cameraManager;
/// <summary>
/// 命令名称
/// </summary>
public string ActionName => ProtocolHeaders.Sync_Camera;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="cameraManager"></param>
public DeviceConfigHandler(CameraManager cameraManager)
{
_cameraManager = cameraManager;
}
/// <summary>
/// 执行处理
/// </summary>
/// <param name="payload"></param>
/// <returns></returns>
public async Task ExecuteAsync(JToken payload)
{
// 1. 反序列化配置 DTO
var dto = payload.ToObject<CameraConfigDto>();
if (dto == null) return;
// 2. 尝试获取现有设备
var device = _cameraManager.GetDevice(dto.Id);
if (device != null)
{
// =========================================================
// 场景 A: 设备已存在 -> 执行智能更新 (Smart Update)
// =========================================================
Console.WriteLine($"[Sync] 更新设备配置: {dto.Id} ({dto.Name})");
// 将全量配置映射为部分更新 DTO
var updateDto = new DeviceUpdateDto
{
// --- 冷更新参数 (变更会触发重启) ---
IpAddress = dto.IpAddress,
Port = dto.Port,
Username = dto.Username,
Password = dto.Password,
ChannelIndex = dto.ChannelIndex,
Brand = dto.Brand,
RtspPath = dto.RtspPath,
RenderHandle = dto.RenderHandle, // long 类型直接赋值
// --- 热更新参数 (变更立即生效) ---
Name = dto.Name,
Location = dto.Location,
StreamType = dto.StreamType,
MainboardIp = dto.MainboardIp,
MainboardPort = dto.MainboardPort,
// --- 图像处理参数 (热更新) ---
AllowCompress = dto.AllowCompress,
AllowExpand = dto.AllowExpand,
TargetResolution = dto.TargetResolution,
EnhanceImage = dto.EnhanceImage,
UseGrayscale = dto.UseGrayscale
};
// 调用 Manager 的核心更新逻辑 (它会自动判断是 Stop->Start 还是直接应用)
await _cameraManager.UpdateDeviceConfigAsync(dto.Id, updateDto);
}
else
{
// =========================================================
// 场景 B: 设备不存在 -> 执行新增 (Add New)
// =========================================================
Console.WriteLine($"[Sync] 新增设备: {dto.Id} ({dto.Name})");
// 构造全新的设备配置
var newConfig = new VideoSourceConfig
{
Id = dto.Id,
Name = dto.Name,
Brand = (DeviceBrand)dto.Brand, // int -> Enum 强转
IpAddress = dto.IpAddress,
Port = dto.Port,
Username = dto.Username,
Password = dto.Password,
ChannelIndex = dto.ChannelIndex,
StreamType = dto.StreamType,
RtspPath = dto.RtspPath,
MainboardIp = dto.MainboardIp,
MainboardPort = dto.MainboardPort,
RenderHandle = (IntPtr)dto.RenderHandle, // long -> IntPtr 转换
ConnectionTimeoutMs = 5000 // 默认超时
};
// 添加到管理器池
_cameraManager.AddDevice(newConfig);
// 重新获取引用以进行后续操作
device = _cameraManager.GetDevice(dto.Id);
}
// ★★★ 核心修复:统一处理“运行意图” ★★★
if (device != null)
{
// 将 DTO 的立即执行标志直接同步给设备的运行意图
device.IsRunning = dto.ImmediateExecution;
if (dto.ImmediateExecution)
{
// 情况 1: 收到“启动”指令
if (!device.IsOnline) // 只有没在线时才点火
{
Console.WriteLine($"[Sync] 指令:立即启动设备 {dto.Id}");
_ = device.StartAsync();
}
}
else
{
// 情况 2: 收到“停止”指令 (即 ImmediateExecution = false)
if (device.IsOnline) // 只有在线时才熄火
{
Console.WriteLine($"[Sync] 指令:立即停止设备 {dto.Id}");
_ = device.StopAsync();
}
}
}
// =========================================================
// 3. 处理自动订阅策略 (Auto Subscriptions)
// =========================================================
// 无论新增还是更新,都确保订阅策略是最新的
if (device != null && dto.AutoSubscriptions != null)
{
var controller = device.Controller;
if (controller != null)
{
foreach (var sub in dto.AutoSubscriptions)
{
// 如果没有 AppId生成一个临时的通常 Dashboard 会下发固定的 AppId
string appId = string.IsNullOrWhiteSpace(sub.AppId)
? $"AUTO_{Guid.NewGuid().ToString("N")[..8]}"
: sub.AppId;
// 构造流控需求
var req = new FrameRequirement
{
AppId = appId,
TargetFps = sub.TargetFps,
Type = (SubscriptionType)sub.Type, // int -> Enum
Memo = sub.Memo ?? "Sync Auto",
// 自动订阅通常不包含具体的 Handle 或 SavePath除非协议里带了
// 如果需要支持网络转发,这里可以扩展映射 sub.TargetIp 等
Handle = "",
SavePath = ""
};
// 注册到帧控制器
controller.Register(req);
}
}
}
}
}

View File

@@ -0,0 +1,169 @@
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SHH.CameraSdk;
using SHH.Contracts;
using SHH.Contracts.Grpc;
using System.Collections.Concurrent;
namespace SHH.CameraService;
/// <summary>
/// 设备状态监控工作者 (gRPC 版)
/// 职责:监控相机状态并在状态变更或心跳周期内,通过 gRPC 批量上报至所有配置的端点
/// </summary>
public class DeviceStatusHandler : BackgroundService
{
private readonly CameraManager _manager;
private readonly ServiceConfig _config;
private readonly ILogger<DeviceStatusHandler> _logger;
// 状态存储CameraId -> 状态载荷
private readonly ConcurrentDictionary<string, StatusEventPayload> _stateStore = new();
private volatile bool _isDirty = false;
private long _lastSendTick = 0;
public DeviceStatusHandler(
CameraManager manager,
ServiceConfig config,
ILogger<DeviceStatusHandler> logger)
{
_manager = manager;
_config = config;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 1. 初始化本地状态缓存
foreach (var dev in _manager.GetAllDevices())
{
UpdateLocalState(dev.Id, false, "Service Init");
}
// 2. 订阅 SDK 状态变更事件
_manager.OnDeviceStatusChanged += OnSdkStatusChanged;
_logger.LogInformation("[StatusWorker] gRPC 状态上报已启动,配置节点数: {Count}", _config.CommandEndpoints.Count);
// 3. 定时循环 (1秒1次检查)
var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
try
{
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await CheckAndBroadcastAsync(stoppingToken);
}
}
catch (OperationCanceledException) { /* 正常退出 */ }
catch (Exception ex)
{
_logger.LogError(ex, "[StatusWorker] 运行异常");
}
finally
{
_manager.OnDeviceStatusChanged -= OnSdkStatusChanged;
}
}
/// <summary>
/// SDK 状态变更回调
/// </summary>
private void OnSdkStatusChanged(long deviceId, bool isOnline, string reason)
{
UpdateLocalState(deviceId, isOnline, reason);
_isDirty = true;
}
private void UpdateLocalState(long deviceId, bool isOnline, string reason)
{
var evt = new StatusEventPayload
{
CameraId = deviceId.ToString(),
IsOnline = isOnline,
Reason = reason,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
_stateStore[deviceId.ToString()] = evt;
}
/// <summary>
/// 执行广播逻辑
/// </summary>
private async Task CheckAndBroadcastAsync(CancellationToken ct)
{
long now = Environment.TickCount64;
// 策略: 有变更(Dirty) 或 超过5秒(强制心跳)
bool shouldSend = _isDirty || (now - _lastSendTick > 5000);
if (shouldSend && _config.CommandEndpoints.Any())
{
// 1. 构建 gRPC 请求包
var request = new StatusBatchRequest
{
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
// 转换内存中的状态快照为 Protobuf 列表
foreach (var item in _stateStore.Values)
{
request.Items.Add(new StatusEventItem
{
CameraId = item.CameraId,
IsOnline = item.IsOnline,
Reason = item.Reason,
});
}
// 2. 遍历所有端点进行发送
foreach (var endpoint in _config.CommandEndpoints)
{
try
{
string grpcUrl = endpoint.Uri.Replace("tcp://", "http://").Trim();
// --- 增加以下诊断代码 ---
using var channel = GrpcChannel.ForAddress(grpcUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
// 获取 gRPC 内部生成的服务全称
// 这就是客户端尝试调用的真实路径:/包名.服务名/方法名
var methodName = "ReportStatusBatch";
var serviceName = client.GetType().DeclaringType?.Name ?? "Unknown";
_logger.LogInformation("[gRPC Debug] 准备调用端点: {Url}", grpcUrl);
_logger.LogInformation("[gRPC Debug] 客户端契约服务名: {Service}", serviceName);
// 执行调用
var response = await client.ReportStatusBatchAsync(request,
deadline: DateTime.UtcNow.AddSeconds(2), cancellationToken: ct);
if (response.Success)
{
_logger.LogInformation("[gRPC Success] 上报成功");
_isDirty = false;
_lastSendTick = Environment.TickCount64;
}
}
catch (RpcException ex)
{
// 这里是关键:打印 RpcException 的详细状态
_logger.LogError("[gRPC Error] StatusCode: {Code}, Detail: {Detail}", ex.StatusCode, ex.Status.Detail);
// 如果是 Unimplemented通常意味着路径不对
if (ex.StatusCode == StatusCode.Unimplemented)
{
_logger.LogError("[gRPC Fix] 请检查服务端是否注册了名为 'GatewayProvider' 的服务,且其 package 声明与客户端一致。");
}
}
catch (Exception ex)
{
_logger.LogError("[gRPC Fatal] 非 RPC 异常: {Msg}", ex.Message);
}
}
}
}
}

View File

@@ -0,0 +1,102 @@
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using SHH.CameraSdk;
using SHH.Contracts.Grpc; // 引用 Proto 生成的命名空间
namespace SHH.CameraService
{
/// <summary>
/// gRPC 指令接收后台服务
/// 职责:
/// 1. 维护与 AiVideo 的 gRPC 长连接。
/// 2. 完成节点逻辑注册。
/// 3. 监听 Server Streaming 指令流并移交给 Dispatcher。
/// </summary>
public class GatewayService : BackgroundService
{
private readonly ILogger<GatewayService> _logger;
private readonly ServiceConfig _config;
private readonly CommandDispatcher _dispatcher;
public GatewayService(
ILogger<GatewayService> logger,
ServiceConfig config,
CommandDispatcher dispatcher)
{
_logger = logger;
_config = config;
_dispatcher = dispatcher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 预留系统启动缓冲时间,确保数据库和 SDK 已就绪
_logger.LogInformation("[gRPC Bus] 指令接收服务启动,等待环境预热...");
await Task.Delay(3000, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 1. 地址适配:将 tcp 转换为 http并将 127.0.0.1 修正为 localhost 解决 Unimplemented 异常
var ep = _config.CommandEndpoints.First();
string targetUrl = ep.Uri.Replace("tcp://", "http://").Replace("127.0.0.1", "localhost");
using var channel = GrpcChannel.ForAddress(targetUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
// --- 第一步:发起节点逻辑注册 (Unary) ---
_logger.LogInformation("[gRPC Bus] 正在发起逻辑注册: {Url}", targetUrl);
var regResp = await client.RegisterInstanceAsync(new RegisterRequest
{
InstanceId = _config.AppId,
Version = "2.0.0-grpc",
ServerIp = "127.0.0.1",
StartTimeTicks = DateTime.Now.Ticks
}, cancellationToken: stoppingToken);
if (regResp.Success)
{
_logger.LogInformation("[gRPC Bus] 注册成功。正在建立双向指令通道...");
// --- 第二步:开启 Server Streaming 指令流 ---
using var call = client.OpenCommandChannel(new CommandStreamRequest
{
InstanceId = _config.AppId
}, cancellationToken: stoppingToken);
// --- 第三步:循环读取服务端推送的指令 ---
// 只要服务端流未断开,此处会一直阻塞等待新消息
while (await call.ResponseStream.MoveNext(stoppingToken))
{
var protoMsg = call.ResponseStream.Current;
// 核心变更:不再直接处理业务,而是通过分发器进行路由
// 使用 _ = 异步处理,避免某个 Handler 执行过慢导致指令流阻塞
_ = _dispatcher.DispatchAsync(protoMsg);
}
}
}
catch (OperationCanceledException)
{
// 响应系统正常退出信号
break;
}
catch (RpcException ex)
{
_logger.LogError("[gRPC Bus] RPC 异常 (Status: {Code}): {Msg}", ex.StatusCode, ex.Message);
// 链路异常,进入重连等待阶段
await Task.Delay(5000, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError("[gRPC Bus] 非预期链路异常: {Msg}5秒后尝试重连", ex.Message);
await Task.Delay(5000, stoppingToken);
}
}
}
}
}

View File

@@ -0,0 +1,18 @@
namespace SHH.CameraService;
/// <summary>
/// 抽象指令处理器接口
/// </summary>
public interface ICommandHandler
{
/// <summary>
/// 该处理器支持的 Action 名称 (如 "AddCamera", "Reboot")
/// </summary>
string ActionName { get; }
/// <summary>
/// 执行指令逻辑
/// </summary>
/// <param name="payload">指令携带的数据 (JSON JToken)</param>
Task ExecuteAsync(Newtonsoft.Json.Linq.JToken payload);
}

View File

@@ -0,0 +1,84 @@
using Newtonsoft.Json.Linq;
using SHH.CameraSdk;
using SHH.Contracts;
namespace SHH.CameraService
{
/// <summary>
/// 移除设备指令处理器
/// </summary>
public class RemoveCameraHandler : ICommandHandler
{
private readonly CameraManager _cameraManager;
/// <summary>
/// 指令名称
/// </summary>
public string ActionName => ProtocolHeaders.Remove_Camera;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="cameraManager"></param>
public RemoveCameraHandler(CameraManager cameraManager)
{
_cameraManager = cameraManager;
}
/// <summary>
/// 处理指令
/// </summary>
/// <param name="payload"></param>
public async Task ExecuteAsync(JToken payload)
{
long deviceId = 0;
try
{
// 1. 增强型 ID 解析
if (payload.Type == JTokenType.Object)
{
// 兼容大小写不敏感的解析
var idToken = payload["Id"] ?? payload["id"];
if (idToken != null) deviceId = idToken.Value<long>();
}
else if (payload.Type == JTokenType.Integer || payload.Type == JTokenType.String)
{
// 兼容字符串形式的 ID
long.TryParse(payload.ToString(), out deviceId);
}
if (deviceId <= 0)
{
Console.WriteLine($"[{ActionName}] 收到无效指令: ID解析失败 ({payload})");
return;
}
// 2. 预检查
var device = _cameraManager.GetDevice(deviceId);
if (device == null)
{
Console.WriteLine($"[{ActionName}] 设备 {deviceId} 已经不在管理池中,无需操作。");
return;
}
// 3. 安全移除
// 这里建议增加审计日志,记录谁触发了删除(如果协议里有用户信息的话)
device.AddAuditLog("收到远程指令:彻底移除设备");
Console.WriteLine($"[{ActionName}] 正在安全移除设备: {deviceId} ({device.Config.Name})");
// CameraManager 内部会StopAsync -> DisposeAsync -> TryRemove -> SaveChanges
await _cameraManager.RemoveDeviceAsync(deviceId);
Console.WriteLine($"[{ActionName}] 设备 {deviceId} 已彻底清理并从持久化库中移除。");
// 4. (可选) 此处可以调用 CommandDispatcher 发送 Success ACK
}
catch (Exception ex)
{
// 捕获异常,防止影响全局 Socket 轮询
Console.WriteLine($"[{ActionName}] 移除设备 {deviceId} 过程中发生致命错误: {ex.Message}");
}
}
}
}

View File

@@ -0,0 +1,133 @@
using Microsoft.Extensions.Hosting;
using OpenCvSharp;
using SHH.CameraSdk; // 引用 SDK 核心
using SHH.Contracts;
using System.Diagnostics;
namespace SHH.CameraService;
public class ImageMonitorController : BackgroundService
{
// 注入所有注册的目标(云端、大屏等),实现动态分发
private readonly IEnumerable<StreamTarget> _targets;
// 编码参数JPG 质量 75 (平衡画质与带宽)
// 工业经验75 是甜点,体积只有 100 的 1/3肉眼几无区别。
// 如果您确实需要 100请注意带宽压力。此处我保留您要求的 100但建议未来调优。
private readonly int[] _encodeParams = { (int)ImwriteFlags.JpegQuality, 100 };
public ImageMonitorController(IEnumerable<StreamTarget> targets)
{
_targets = targets;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine("[StreamWorker] 启动流媒体采集引擎...");
// =========================================================
// 订阅逻辑:接入 "上帝模式" (God Mode)
// =========================================================
// 理由NetMQ 网关需要无差别地获取所有设备的图像。
GlobalStreamDispatcher.OnGlobalFrame += ProcessFrame;
//Console.WriteLine($"[StreamWorker] 已挂载至全局广播总线,正在监听 {GlobalStreamDispatcher.OnGlobalFrame?.GetInvocationList().Length ?? 0} 个订阅者...");
var tcs = new TaskCompletionSource();
stoppingToken.Register(() =>
{
// 停止时反注册,防止静态事件内存泄漏
GlobalStreamDispatcher.OnGlobalFrame -= ProcessFrame;
Console.WriteLine("[StreamWorker] 已断开全局广播连接");
tcs.SetResult();
});
return tcs.Task;
}
/// <summary>
/// [回调函数] 处理每一帧图像
/// 注意:此方法运行在 SDK 的采集线程池中,必须极速处理,严禁阻塞!
/// </summary>
private void ProcessFrame(long deviceId, SmartFrame frame)
{
try
{
// 1. 基础校验 (合法性检查)
if (frame == null || frame.InternalMat.Empty()) return;
long startTick = Stopwatch.GetTimestamp();
// =========================================================
// 2. 一次编码 (One Encode) - CPU 消耗点
// =========================================================
// 理由:在这里同步编码是最安全的,因为出了这个函数 frame 内存就会失效。
// 且只编一次,后续分发给 10 个目标也只用这一份数据。
byte[] jpgBytes = null;
// 如果有更小的图片, 原始图片不压缩, 除非有特殊需求
if (frame.TargetMat == null)
{
jpgBytes = EncodeImage(frame.InternalMat);
}
// 双流支持:如果存在处理后的 AI 图,也一并编码
byte[] targetBytes = null;
if (frame.TargetMat != null && !frame.TargetMat.Empty())
{
targetBytes = EncodeImage(frame.TargetMat);
}
// =========================================================
// 3. 构建 Payload (数据载荷)
// =========================================================
var payload = new VideoPayload
{
CameraId = deviceId.ToString(),
CaptureTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
OriginalImageBytes = jpgBytes, // 引用赋值
TargetImageBytes = targetBytes, // 引用赋值
OriginalWidth = frame.TargetWidth,
OriginalHeight = frame.TargetHeight,
DispatchTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
// 添加订阅者
payload.SubscriberIds.AddRange(frame.SubscriberIds);
// 计算转码耗时(ms)
double processMs = (Stopwatch.GetTimestamp() - startTick) * 1000.0 / Stopwatch.Frequency;
payload.Diagnostics["encode_ms"] = Math.Round(processMs, 2);
// =========================================================
// 4. 动态扇出 (Dynamic Fan-Out) - 内存消耗极低
// =========================================================
// 遍历所有目标,往各自独立的管道里写数据。
// 实现了"物理隔离":一个管道满了(云端卡顿),不影响另一个管道(大屏流畅)。
foreach (var target in _targets)
{
bool ok = target.Channel.WriteLog(payload);
if (!ok)
{
// 如果这里打印,说明管道由于某种原因被关闭了(通常是程序正在退出)
Console.WriteLine($"[DEBUG] 管道写入失败,目标: {target.Config.Name}");
}
}
}
catch (Exception ex)
{
// 极少发生的内存错误,打印日志但不抛出,避免崩溃 SDK 线程
Console.WriteLine($"[StreamWorker] 采集处理异常: {ex.Message}");
}
}
/// <summary>
/// 辅助OpenCV 内存编码
/// </summary>
private byte[] EncodeImage(Mat mat)
{
// ImEncode 将 Mat 编码为一维字节数组 (托管内存)
Cv2.ImEncode(".jpg", mat, out byte[] buf, _encodeParams);
return buf;
}
}

View File

@@ -0,0 +1,83 @@
using Microsoft.Extensions.Hosting;
using SHH.CameraSdk;
namespace SHH.CameraService;
/// <summary>
/// 图像处理管道配置服务(基于责任链模式)
/// <para>核心职责:</para>
/// <para>1. 组装图像处理集群的执行顺序,形成 "缩放 → 增强" 的固定流程</para>
/// <para>2. 将组装好的管道挂载到全局路由,统一接收驱动层输出的帧数据</para>
/// <para>设计说明:</para>
/// <para>- 采用责任链模式,支持动态扩展处理节点(如后续新增滤镜、裁剪等功能)</para>
/// <para>- 依赖 IHostedService 生命周期,确保在应用启动时完成管道初始化</para>
/// <para>- 与 GlobalPipelineRouter 强关联,是帧数据进入处理流程的唯一入口</para>
public class PipelineConfigurator : IHostedService
{
#region --- ---
/// <summary>
/// 图像缩放集群实例(责任链第一节点)
/// 功能:根据配置缩放帧分辨率、控制图像放大/缩小开关
/// </summary>
private readonly ImageScaleCluster _scale;
/// <summary>
/// 图像增强集群实例(责任链第二节点)
/// 功能:调整图像亮度、对比度等增强效果(基于 ProcessingConfigManager 配置)
/// </summary>
private readonly ImageEnhanceCluster _enhance;
#endregion
#region --- ---
/// <summary>
/// 初始化管道配置服务实例
/// </summary>
/// <param name="scale">图像缩放集群(通过 DI 注入,已预设并行度和配置管理器)</param>
/// <param name="enhance">图像增强集群(通过 DI 注入,已预设并行度和配置管理器)</param>
public PipelineConfigurator(ImageScaleCluster scale, ImageEnhanceCluster enhance)
{
_scale = scale;
_enhance = enhance;
}
#endregion
#region --- IHostedService ---
/// <summary>
/// 启动服务:组装责任链并挂载到全局路由
/// <para>执行时机:应用启动时,在所有 Singleton 服务初始化完成后触发</para>
/// </summary>
/// <param name="cancellationToken">服务停止令牌(用于响应应用关闭信号)</param>
/// <returns>异步任务(无返回值)</returns>
public Task StartAsync(CancellationToken cancellationToken)
{
// 1. 建立责任链关系:缩放集群处理完成后,将帧数据传递给增强集群
// 设计逻辑Scale 是入口节点Enhance 是后续节点,可按需求插入更多处理节点
_scale.SetNext(_enhance);
// 2. 将责任链入口挂载到全局路由:驱动层输出的所有帧数据都会进入该管道
// 关键作用:统一帧数据处理入口,屏蔽驱动层与处理层的直接依赖
GlobalPipelineRouter.SetProcessor(_scale);
// 启动日志:打印管道组装结果,便于运维排查
Console.WriteLine("[Pipeline] 图像处理链组装完成: ImageScaleCluster -> ImageEnhanceCluster");
Console.WriteLine("[Pipeline] 提示:帧数据将按 '缩放 → 增强' 顺序处理,可通过 GlobalPipelineRouter 调整流程");
return Task.CompletedTask;
}
/// <summary>
/// 停止服务:空实现(无资源需要释放)
/// <para>说明:图像处理集群的资源释放由各自的 Dispose 方法管理,此处无需额外操作</para>
/// </summary>
/// <param name="cancellationToken">服务停止令牌</param>
/// <returns>空异步任务</returns>
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
#endregion
}

View File

@@ -0,0 +1,41 @@
using System.Threading.Channels;
using SHH.Contracts;
namespace SHH.CameraService
{
/// <summary>
/// 视频数据内部总线 (线程安全的生产者-消费者通道)
/// <para>作用:解耦 [采集编码线程] 与 [网络发送线程]</para>
/// </summary>
public class VideoDataChannel
{
// 限制容量为 100 帧。如果积压超过 100 帧,说明发送端彻底堵死了,必须丢帧。
private readonly Channel<VideoPayload> _channel;
public VideoDataChannel(int capacity = 10)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropOldest, // 核心策略:满了就丢弃最旧的帧
SingleReader = false, // 允许多个发送 Worker (如 CloudWorker, ScreenWorker) 同时读取
SingleWriter = true // 只有一个采集线程在写
};
_channel = Channel.CreateBounded<VideoPayload>(options);
}
/// <summary>
/// [生产者] 写入一个封装好的数据包 (非阻塞)
/// </summary>
public bool WriteLog(VideoPayload payload) // 改为返回 bool
{
// TryWrite 在 DropOldest 模式下虽然几乎总是返回 true
// 但如果 Channel 被 Complete (关闭) 了,它会返回 false。
return _channel.Writer.TryWrite(payload);
}
/// <summary>
/// [消费者] 读取器
/// </summary>
public ChannelReader<VideoPayload> Reader => _channel.Reader;
}
}

View File

@@ -0,0 +1,102 @@
using Google.Protobuf;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SHH.Contracts.Grpc;
namespace SHH.CameraService;
/// <summary>
/// gRPC 视频流发送工作者
/// 职责:监听特定的 StreamTarget 队列,建立 gRPC 客户端流并持续推送图片
/// </summary>
public class GrpcSenderWorker : BackgroundService
{
private readonly StreamTarget _target;
private readonly ILogger<GrpcSenderWorker> _logger;
private readonly string _grpcUrl;
public GrpcSenderWorker(StreamTarget target, ILogger<GrpcSenderWorker> logger)
{
_target = target;
_logger = logger;
// 自动适配地址:将配置的 tcp://localhost:9001 转换为 http://localhost:9001
// 并且严格使用你验证成功的 localhost
_grpcUrl = _target.Config.Endpoint.Replace("tcp://", "http://");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation($"[gRPC Worker] 启动。目标: {_target.Config.Name}, 地址: {_grpcUrl}");
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 1. 建立通道
using var channel = GrpcChannel.ForAddress(_grpcUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
// 2. 开启客户端流 (UploadVideoStream 是在 proto 中定义的)
using var call = client.UploadVideoStream(cancellationToken: stoppingToken);
_logger.LogInformation($"[gRPC Worker] 已开启视频推送流: {_target.Config.Name}");
// 3. 核心搬运循环:从内存队列 (Channel) 读取数据
await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken))
{
// 【畅通保障】检查数据时效性:丢弃超过 1 秒的积压帧
var delay = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - payload.CaptureTimestamp;
if (delay > 1000)
{
continue;
}
// 将业务 DTO 转换为 gRPC 原生 Request
var request = new VideoFrameRequest
{
CameraId = payload.CameraId ?? "0",
CaptureTimestamp = payload.CaptureTimestamp,
OriginalWidth = payload.OriginalWidth,
OriginalHeight = payload.OriginalHeight,
HasOriginalImage = payload.HasOriginalImage,
HasTargetImage = payload.HasTargetImage,
// ★ 核心:将 byte[] 转换为 gRPC 的 ByteString (高性能)
OriginalImageBytes = payload.OriginalImageBytes != null
? ByteString.CopyFrom(payload.OriginalImageBytes)
: ByteString.Empty,
TargetImageBytes = payload.TargetImageBytes != null
? ByteString.CopyFrom(payload.TargetImageBytes)
: ByteString.Empty
};
request.SubscriberIds.AddRange(payload.SubscriberIds);
// 处理诊断信息 map<string, string>
if (payload.Diagnostics != null)
{
foreach (var kv in payload.Diagnostics)
{
request.Diagnostics.Add(kv.Key, kv.Value?.ToString() ?? "");
}
}
// 4. 发送至 AiVideo
await call.RequestStream.WriteAsync(request);
}
// 正常结束流
await call.RequestStream.CompleteAsync();
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
_logger.LogError($"[gRPC Worker] 推送链路异常5秒后重连: {ex.Message}");
await Task.Delay(5000, stoppingToken);
}
}
}
}

View File

@@ -0,0 +1,30 @@
namespace SHH.CameraService;
/// <summary>
/// 代表一个独立的推送目标
/// 包含:配置信息 + 专属于它的数据管道
/// </summary>
public class StreamTarget
{
/// <summary>
/// 配置
/// </summary>
public PushTargetConfig Config { get; }
/// <summary>
/// 管道
/// </summary>
public VideoDataChannel Channel { get; }
/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public StreamTarget(PushTargetConfig config)
{
Config = config;
// 为这个目标创建独立的管道,容量由配置决定
Channel = new VideoDataChannel(capacity: config.QueueCapacity);
}
}