新增 Mjpegplayer 用来播放 Web 流

This commit is contained in:
2026-01-21 19:03:59 +08:00
parent f79cb6e74d
commit c438edfa0d
71 changed files with 4538 additions and 452 deletions

View File

@@ -0,0 +1,22 @@
using System.Collections.Concurrent;
namespace SHH.MjpegPlayer
{
/// <summary>
/// 辅助类:线程安全集合
/// </summary>
public class ConcurrentHashSet<T> : IEnumerable<T>
{
private readonly ConcurrentDictionary<T, byte> _dict = new ConcurrentDictionary<T, byte>();
public void Add(T item) => _dict.TryAdd(item, 0);
public void Remove(T item) => _dict.TryRemove(item, out _);
public bool IsEmpty => _dict.IsEmpty;
public IEnumerator<T> GetEnumerator() => _dict.Keys.GetEnumerator();
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
}
}

View File

@@ -0,0 +1,106 @@
using System.Collections.Concurrent;
using System.Diagnostics;
namespace SHH.MjpegPlayer
{
/// <summary>
/// 设备配置同步处理器 (原 ConfigSyncManager 瘦身版)
/// 职责仅负责确保远程分析节点Instance的摄像头配置与本地数据库一致。
/// 逻辑:通过 5 秒初始化冷却期避开抖动,并利用配置快照对比实现增量同步。
/// </summary>
public class DeviceConfigHandler
{
#region
/// <summary>
/// 获取配置处理器的全局单例实例
/// </summary>
public static DeviceConfigHandler Instance { get; } = new DeviceConfigHandler();
/// <summary>
/// 活跃服务实例 ID 集合 (InstanceId)
/// 用于记录当前所有已建立 gRpc 长连接的远程节点
/// </summary>
private readonly ConcurrentHashSet<string> _activeServiceIds = new ConcurrentHashSet<string>();
/// <summary>
/// 配置快照缓存:用于防止重复下发相同的配置
/// Key 格式: "InstanceId_CameraId"
/// Value: 该摄像头配置的 JSON 字符串快照
/// </summary>
private readonly ConcurrentDictionary<string, string> _lastSentConfigCache = new ConcurrentDictionary<string, string>();
/// <summary>
/// 后台监控任务的任务取消令牌源
/// </summary>
private CancellationTokenSource _cts;
/// <summary>
/// 初始化完成时间戳:用于 5 秒冷却期判定
/// 防止在服务刚启动或节点刚连接时,由于数据库加载延迟导致误判设备被移除
/// </summary>
private DateTime _initCompleteTime = DateTime.MaxValue;
#endregion
#region
/// <summary>
/// 私有构造函数:订阅消息总线并启动监控任务
/// </summary>
private DeviceConfigHandler()
{
// 订阅总线:仅关注节点注册事件,以此作为触发初始化全量同步的开关
MessageBus.Instance.OnServerRegistered += async (payload) =>
{
await HandleServiceOnlineAsync(payload.InstanceId);
};
// 启动后台轮询监控任务 (检测 Add/Update/Remove)
StartMonitorTask();
}
#endregion
#region (线)
/// <summary>
/// 处理新节点上线:执行全量同步
/// </summary>
/// <param name="instanceId">远程服务实例唯一标识</param>
private async Task HandleServiceOnlineAsync(string instanceId)
{
// 1. 将新实例记录到活跃列表
_activeServiceIds.Add(instanceId);
// 2. 预留 1 秒等待期,确保 gRpc 双向通道完全稳定
await Task.Delay(1000);
//// 3. 从数据库拍摄当前所有摄像头的快照
//var snapshot = CSdkStatics.DbCameras.ToList();
//// 4. 对新节点执行全量下发
//foreach (var cam in snapshot)
//{
// await SendSyncCommandAsync(instanceId, cam);
//}
// 5. 更新冷却期起始点
_initCompleteTime = DateTime.Now;
Debug.WriteLine($"[ConfigHandler] 节点 {instanceId} 初始化全量同步已完成。");
}
#endregion
#region ()
/// <summary>
/// 启动后台增量监控任务
/// </summary>
private void StartMonitorTask()
{
}
#endregion
}
}

View File

@@ -0,0 +1,48 @@
using SHH.Contracts;
namespace SHH.MjpegPlayer
{
/// <summary>
/// 设备状态处理器
/// 职责:监听消息总线发出的状态主题事件,负责将远程节点上报的相机在线/离线状态实时同步至本地管理中心。
/// 架构说明:此类实现了业务逻辑的彻底解耦,不涉及 gRpc 通讯细节,也不涉及复杂的配置下发逻辑。
/// </summary>
public class DeviceStatusHandler
{
#region
/// <summary>
/// 获取设备状态处理器的全局单例实例。
/// 由 GrpcServerManager 在系统启动时显式调用以完成初始化。
/// </summary>
public static DeviceStatusHandler Instance { get; } = new DeviceStatusHandler();
/// <summary>
/// 私有构造函数:在此处完成对消息总线事件的订阅。
/// </summary>
private DeviceStatusHandler()
{
// 订阅 MessageBus 的状态报告主题,当总线收到状态更新包时自动触发 SyncToLocal
MessageBus.Instance.OnDeviceStatusReport += SyncToLocal;
}
#endregion
#region
/// <summary>
/// 执行状态同步:将收到的 Payload 数据精确映射回本地 SDK 管理的摄像头集合中。
/// </summary>
/// <param name="items">包含 CameraId 和在线状态的业务载荷列表</param>
private void SyncToLocal(List<StatusEventPayload> items)
{
// 1. 基础校验:若无数据则不执行后续逻辑
if (items == null || items.Count == 0) return;
// 2. 性能优化:将上报列表转换为字典,利用哈希查找提升大数据量下的匹配效率 (Key: CameraId 字符串)
var stateMap = items.ToDictionary(k => k.CameraId, v => v);
}
#endregion
}
}

View File

@@ -0,0 +1,156 @@
using Grpc.Core;
using SHH.Contracts;
using SHH.Contracts.Grpc;
namespace SHH.MjpegPlayer
{
/// <summary>
/// gRpc 网关服务
/// 职责:作为服务端通讯入口,负责接收客户端(分析节点)的所有 gRpc 请求,将其转译为内部业务载荷,
/// 并通过消息总线 MessageBus 分发至对应的业务处理器。
/// </summary>
public class GatewayService : GatewayProvider.GatewayProviderBase
{
#region 1. (Unary )
/// <summary>
/// 处理分析节点的注册请求
/// </summary>
/// <param name="request">包含节点实例 ID 和服务器 IP 的请求对象</param>
/// <param name="context">gRpc 调用上下文</param>
/// <returns>操作成功响应</returns>
public override Task<GenericResponse> RegisterInstance(RegisterRequest request, ServerCallContext context)
{
// 1. 将 Protobuf 契约对象转换为业务层的 RegisterPayload (DTO)
// 职责:将外部传输格式映射为内部业务模型,实现协议与业务逻辑的解耦
var payload = new RegisterPayload
{
// 身份标识映射
ProcessId = request.ProcessId,
InvokeProcId = request.InvokeProcessId,
InstanceId = request.InstanceId,
Version = request.Version,
// 网络诊断信息映射
ServerIp = request.ServerIp,
WebApiPort = request.WebapiPort,
GrpcPort = request.GrpcPort,
// 运行时状态映射
// 注意:将 int64 类型的 Ticks 转换为 C# 的 DateTime 对象
StartTime = new DateTime(request.StartTimeTicks),
Description = request.Description
};
// 2. 将注册载荷抛给总线,触发如 DeviceConfigHandler 的配置初始化逻辑
// 职责:通过中介者模式分发事件,网关层不需要知道谁在处理这些数据
MessageBus.Instance.RaiseServerRegistered(payload);
return Task.FromResult(new GenericResponse { Success = true });
}
#endregion
#region 2. (Server Streaming)
/// <summary>
/// 建立并维持一个从服务器向客户端单向推送指令的长连接通道
/// </summary>
/// <param name="request">连接请求(包含 InstanceId</param>
/// <param name="responseStream">响应流,用于后续异步推送指令</param>
/// <param name="context">gRpc 调用上下文</param>
/// <returns>异步任务</returns>
public override async Task OpenCommandChannel(CommandStreamRequest request, IServerStreamWriter<CommandPayloadProto> responseStream, ServerCallContext context)
{
// 1. 物理流登记:将此响应流句柄存入 GrpcSessionManager以便 MessageBus 随时调用
GrpcSessionManager.Instance.RegisterSession(request.InstanceId, responseStream);
try
{
// 2. 挂起连接:利用 Task.Delay(-1) 配合取消令牌无限期挂起连接,直到客户端断开
await Task.Delay(-1, context.CancellationToken);
}
catch (OperationCanceledException)
{
// 客户端主动取消连接属于正常预期,无需抛出异常
}
finally
{
// 3. 物理流清理:当连接断开时,必须从会话管理器中移除,防止下发指令时产生死连接
GrpcSessionManager.Instance.RemoveSession(request.InstanceId);
}
}
#endregion
#region 3. (Unary )
/// <summary>
/// 接收来自分析节点的相机在线/离线状态批量上报
/// </summary>
/// <param name="request">包含多个设备状态项的请求对象</param>
/// <param name="context">gRpc 调用上下文</param>
/// <returns>操作成功响应</returns>
public override Task<GenericResponse> ReportStatusBatch(StatusBatchRequest request, ServerCallContext context)
{
if (request.Items == null || !request.Items.Any())
return Task.FromResult(new GenericResponse { Success = true });
// 1. 数据映射:将 Proto 集合转换为业务层的 StatusEventPayload 列表
var payloads = request.Items.Select(item => new StatusEventPayload
{
CameraId = item.CameraId,
IsOnline = item.IsOnline,
Reason = item.Reason,
Timestamp = request.Timestamp
}).ToList();
// 2. 路由分发:通过总线发布状态主题,驱动 DeviceStatusHandler 执行同步
MessageBus.Instance.RaiseDeviceStatusReport(payloads);
return Task.FromResult(new GenericResponse { Success = true });
}
#endregion
#region 4. (Client Streaming)
/// <summary>
/// 接收分析节点持续推送的视频帧数据流
/// </summary>
/// <param name="requestStream">客户端异步流读取器</param>
/// <param name="context">gRpc 调用上下文</param>
/// <returns>流关闭后的最终响应</returns>
public override async Task<GenericResponse> UploadVideoStream(IAsyncStreamReader<VideoFrameRequest> requestStream, ServerCallContext context)
{
// 1. 持续读取客户端推送的每一帧数据,直到流关闭或被取消
while (await requestStream.MoveNext(context.CancellationToken))
{
var frame = requestStream.Current;
// 2. 将 Protobuf 帧数据转换为业务视频载荷 VideoPayload
// 注意ByteString 需要显式调用 ToByteArray 转换
var videoPayload = new VideoPayload
{
CameraId = frame.CameraId,
CaptureTimestamp = frame.CaptureTimestamp,
OriginalWidth = frame.OriginalWidth,
OriginalHeight = frame.OriginalHeight,
OriginalImageBytes = frame.OriginalImageBytes.ToByteArray(),
TargetImageBytes = frame.TargetImageBytes.ToByteArray(),
TargetWidth = frame.TargetWidth,
TargetHeight = frame.TargetHeight,
SubscriberIds = frame.SubscriberIds.ToList(),
HasOriginalImage = true
};
// 3. 导流:将图像数据直接投递给图像分发控制器进行 UI 渲染或二次处理
ImageMonitorController.Instance.ReceivePayload(videoPayload);
}
return new GenericResponse { Success = true, Message = "Video stream ended" };
}
#endregion
}
}

View File

@@ -0,0 +1,108 @@
using Grpc.Core;
using SHH.Contracts.Grpc;
using System.Collections.Concurrent;
namespace SHH.MjpegPlayer
{
/// <summary>
/// gRpc 会话管理器
/// 职责:专门负责维护、检索和清理所有远程客户端(分析节点)的 gRpc 指令下发物理通道 (Stream)。
/// 它是连接“业务逻辑”与“物理传输”的桥梁,确保指令能准确投递到对应的连接流中。
/// </summary>
public class GrpcSessionManager
{
#region
/// <summary>
/// 获取会话管理器的全局单例实例。
/// </summary>
public static GrpcSessionManager Instance { get; } = new GrpcSessionManager();
/// <summary>
/// 私有构造函数,防止外部实例化。
/// </summary>
private GrpcSessionManager() { }
#endregion
#region
/// <summary>
/// 物理流存储字典
/// Key: 远程服务实例唯一 ID (InstanceId)
/// Value: gRpc 双向流或服务端推送流的写入器句柄 (IServerStreamWriter)
/// 使用 ConcurrentDictionary 确保在多客户端并发连接/断开时的线程安全性。
/// </summary>
private readonly ConcurrentDictionary<string, IServerStreamWriter<CommandPayloadProto>> _sessionStreams
= new ConcurrentDictionary<string, IServerStreamWriter<CommandPayloadProto>>();
#endregion
#region
/// <summary>
/// 注册/更新物理物理通道。
/// 当客户端调用 OpenCommandChannel 并成功建立 Server Streaming 连接时,由 GatewayService 调用此方法。
/// </summary>
/// <param name="instanceId">客户端实例唯一标识</param>
/// <param name="responseStream">该客户端对应的 gRpc 响应流句柄</param>
public void RegisterSession(string instanceId, IServerStreamWriter<CommandPayloadProto> responseStream)
{
// 1. 参数校验:无效 ID 不予处理
if (string.IsNullOrEmpty(instanceId)) return;
// 2. 登记或覆盖物理流:
// 如果客户端异常断开后迅速重连,此处会覆盖旧的流句柄,确保指令始终通过最新的管道下发。
_sessionStreams[instanceId] = responseStream;
// 3. 记录日志:便于运维监控连接状态
Console.WriteLine($"[Session] 物理通道就绪通知 -> 节点 ID: {instanceId}, 当前在线总数: {_sessionStreams.Count}");
}
/// <summary>
/// 移除物理通道。
/// 当 gRpc 连接由于网络波动、客户端崩溃或主动关闭而断开时,由 GatewayService 的 finally 块调用。
/// </summary>
/// <param name="instanceId">要注销的客户端实例 ID</param>
public void RemoveSession(string instanceId)
{
// 1. 参数校验
if (string.IsNullOrEmpty(instanceId)) return;
// 2. 安全移除:若 ID 存在则移除并释放相关内部引用
if (_sessionStreams.TryRemove(instanceId, out _))
{
Console.WriteLine($"[Session] 物理通道移除通知 -> 节点 ID: {instanceId}, 剩余在线总数: {_sessionStreams.Count}");
}
}
/// <summary>
/// 检索目标节点的物理流句柄。
/// 供 MessageBus 使用,它是指令下发前定位物理路径的关键步骤。
/// </summary>
/// <param name="instanceId">目标节点的唯一 ID</param>
/// <returns>返回对应的 IServerStreamWriter 实例;若节点不在线则返回 null</returns>
public IServerStreamWriter<CommandPayloadProto> GetSession(string instanceId)
{
// 1. 参数校验
if (string.IsNullOrEmpty(instanceId)) return null;
// 2. 尝试从缓存字典中获取流句柄
_sessionStreams.TryGetValue(instanceId, out var stream);
return stream;
}
/// <summary>
/// 检查指定节点是否处于物理连接状态。
/// </summary>
/// <param name="instanceId">实例 ID</param>
/// <returns>True 表示物理通道已建立</returns>
public bool IsSessionActive(string instanceId)
{
return !string.IsNullOrEmpty(instanceId) && _sessionStreams.ContainsKey(instanceId);
}
#endregion
}
}

View File

@@ -0,0 +1,137 @@
using SHH.Contracts;
using SHH.Contracts.Grpc;
using System.Diagnostics;
namespace SHH.MjpegPlayer
{
/// <summary>
/// 消息总线中心 (纯 gRpc 架构)
/// 职责:解耦 gRpc 接收端与业务处理层,提供基于主题(Topic)的事件发布与统一的指令下发路由。
/// </summary>
public class MessageBus : IDisposable
{
#region
/// <summary>
/// 消息总线全局唯一实例
/// </summary>
public static MessageBus Instance { get; } = new MessageBus();
/// <summary>
/// 私有构造函数
/// </summary>
private MessageBus() { }
#endregion
#region (Topics)
/// <summary>
/// 1. 注册主题:当远程分析节点成功建立逻辑连接时触发。
/// 订阅者通常为 DeviceConfigHandler用于启动初始化配置同步。
/// </summary>
public event Action<RegisterPayload>? OnServerRegistered;
/// <summary>
/// 2. 状态主题:当收到远程节点批量上报的设备在线/离线状态时触发。
/// 订阅者通常为 DeviceStatusHandler用于更新 UI 状态。
/// </summary>
public event Action<List<StatusEventPayload>>? OnDeviceStatusReport;
#endregion
#region ( GatewayService )
/// <summary>
/// 发布节点注册事件:将 gRpc 接收到的原始注册请求推送到业务层
/// </summary>
/// <param name="p">注册载荷信息</param>
public void RaiseServerRegistered(RegisterPayload p)
{
if (p == null) return;
// 调试日志:跟踪节点上线流程
Debug.WriteLine($"[Bus] 发布注册事件: 节点ID = {p.InstanceId}");
// 执行所有已订阅该主题的业务逻辑
OnServerRegistered?.Invoke(p);
}
/// <summary>
/// 发布状态报告事件:将 gRpc 接收到的设备状态批量推送到业务层
/// </summary>
/// <param name="items">设备状态变更列表</param>
public void RaiseDeviceStatusReport(List<StatusEventPayload> items)
{
if (items == null || items.Count == 0) return;
// 执行所有已订阅状态同步的业务逻辑
OnDeviceStatusReport?.Invoke(items);
}
#endregion
#region ( Handler )
/// <summary>
/// 统一指令下发路由:自动定位目标节点的物理 gRpc 流并推送指令载荷
/// </summary>
/// <param name="instanceId">目标分析节点的唯一识别码</param>
/// <param name="payload">要发送的业务指令负载</param>
/// <returns>异步任务</returns>
public async Task SendInternalAsync(string instanceId, CommandPayload payload)
{
// 1. 获取由 GrpcSessionManager 维护的物理长连接流
var stream = GrpcSessionManager.Instance.GetSession(instanceId);
// 2. 健壮性检查:若连接不存在则终止下发
if (stream == null)
{
Debug.WriteLine($"[Bus Warning] 指令下发终止:节点 {instanceId} 尚未建立物理连接。");
return;
}
try
{
// 3. 契约转换:将业务层 CommandPayload 转换为 gRpc 生成的 Protobuf 契约对象
var protoMsg = new CommandPayloadProto
{
Protocol = payload.Protocol,
CmdCode = payload.CmdCode,
JsonParams = payload.JsonParams,
RequestId = payload.RequestId,
TimestampTicks = payload.Timestamp.Ticks
};
// 4. 执行异步推送
await stream.WriteAsync(protoMsg);
Debug.WriteLine($"[Bus] 指令推送成功 -> 目标: {instanceId}, 指令码: {payload.CmdCode}");
}
catch (Exception ex)
{
// 5. 异常处理:若推送失败,通常意味着网络链路已断开
Debug.WriteLine($"[Bus Error] 推送异常: {ex.Message},正在执行物理连接清理...");
// 立即移除失效会话,防止后续指令继续掉入“黑洞”
GrpcSessionManager.Instance.RemoveSession(instanceId);
}
}
#endregion
#region
/// <summary>
/// 释放总线资源
/// </summary>
public void Dispose()
{
// 清理所有事件订阅,防止内存泄漏
OnServerRegistered = null;
OnDeviceStatusReport = null;
}
#endregion
}
}

View File

@@ -0,0 +1,37 @@
using SHH.Contracts;
using System.Diagnostics;
namespace SHH.MjpegPlayer
{
/// <summary>
/// AI 视频流监控控制器
/// 职责:接收 gRpc 转换后的 Payload -> 业务转换 -> 分发 UI/AI
/// </summary>
public class ImageMonitorController
{
public static ImageMonitorController Instance { get; } = new ImageMonitorController();
private ImageMonitorController() { }
/// <summary>
/// 统一接收入口:由 GatewayProviderImpl.UploadVideoStream 调用
/// </summary>
public void ReceivePayload(VideoPayload payload)
{
if (payload == null) return;
// 1. 过滤 2 秒外的过期数据
if ((DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - payload.CaptureTimestamp) > 2000)
return;
try
{
ImagePayloadConverter.ToXWcfMsg(payload);
}
catch (Exception ex)
{
Debug.WriteLine($"[Controller Error] {ex.Message}");
}
}
}
}

View File

@@ -0,0 +1,78 @@
using Core.WcfProtocol;
using SHH.Contracts;
namespace SHH.MjpegPlayer
{
/// <summary>
/// 图像载荷转换器 (原 PayloadConverter)
/// 职责:抹平传输契约与业务契约之间的差异。
/// </summary>
public static class ImagePayloadConverter
{
/// <summary>
/// 将视频负载转换为 XWcf 协议并分发至会话池
/// </summary>
/// <param name="payload">VideoPayload 纯净版契约对象</param>
public static void ToXWcfMsg(VideoPayload payload)
{
if (payload == null) return;
try
{
// 1. 自动选择图像源逻辑
// Optimized: 优先使用 TargetImage若为空则退而求其次使用 OriginalImage
bool isOriginal = false;
byte[]? activeBytes;
activeBytes = payload.TargetImageBytes;
if (payload.TargetImageBytes == null || payload.TargetImageBytes.Length == 0)
{
isOriginal = true;
activeBytes = payload.OriginalImageBytes;
}
// 如果两者都为空,则不进行分发
if (activeBytes == null || activeBytes.Length == 0) return;
// 同理处理宽高Target 为 0 则使用 Original
int activeWidth = !isOriginal ? payload.TargetWidth : payload.OriginalWidth;
int activeHeight = !isOriginal ? payload.TargetHeight : payload.OriginalHeight;
// 2. 构造分发所需的 UploadImageRequest
// Modified: [原因] 适配最新的 VideoPayload 契约字段
var req = new UploadImageRequest
{
// 解析 CameraId。由于旧 req 是 Int64 Id若 CameraId 是数字字符串则解析,否则处理 Hash
Id = long.TryParse(payload.CameraId, out long id) ? id : 0,
Name = payload.CameraId, // 将原始 CameraId 存入 Name 字段保留引用
// 默认类型处理 (可根据 Diagnostics 中的信息动态调整)
Type = "0",
Order = (ulong)payload.CaptureTimestamp, // 使用采集时间戳作为序号
Time = UnixMillisecondsToDateTime(payload.CaptureTimestamp),
ImageBytes = activeBytes, // 零拷贝引用
ImageWidth = activeWidth,
ImageHeight = activeHeight
};
// 3. 执行核心分发逻辑
// 此处调用你之前提供的 O(1) 检索分发方法,确保画面最终流向 DoImageProc
MjpegStatics.Sessions.ProcUploadImageRequest(req);
}
catch (Exception ex)
{
// 统一使用项目规范的 _sysLog
//_sysLog.Error(ex, "VideoPayload 转换分发失败. CameraId: {CameraId}", payload.CameraId);
}
}
/// <summary>
/// 辅助方法Unix 毫秒时间戳转 DateTime
/// </summary>
private static DateTime UnixMillisecondsToDateTime(long timestamp)
{
return DateTimeOffset.FromUnixTimeMilliseconds(timestamp).LocalDateTime;
}
}
}