157 lines
6.9 KiB
C#
157 lines
6.9 KiB
C#
|
|
using Grpc.Core;
|
|||
|
|
using SHH.Contracts;
|
|||
|
|
using SHH.Contracts.Grpc;
|
|||
|
|
|
|||
|
|
namespace SHH.MjpegPlayer
|
|||
|
|
{
|
|||
|
|
/// <summary>
|
|||
|
|
/// gRpc 网关服务
|
|||
|
|
/// 职责:作为服务端通讯入口,负责接收客户端(分析节点)的所有 gRpc 请求,将其转译为内部业务载荷,
|
|||
|
|
/// 并通过消息总线 MessageBus 分发至对应的业务处理器。
|
|||
|
|
/// </summary>
|
|||
|
|
public class GatewayService : GatewayProvider.GatewayProviderBase
|
|||
|
|
{
|
|||
|
|
#region 1. 逻辑身份注册 (Unary 调用)
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 处理分析节点的注册请求
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="request">包含节点实例 ID 和服务器 IP 的请求对象</param>
|
|||
|
|
/// <param name="context">gRpc 调用上下文</param>
|
|||
|
|
/// <returns>操作成功响应</returns>
|
|||
|
|
public override Task<GenericResponse> RegisterInstance(RegisterRequest request, ServerCallContext context)
|
|||
|
|
{
|
|||
|
|
// 1. 将 Protobuf 契约对象转换为业务层的 RegisterPayload (DTO)
|
|||
|
|
// 职责:将外部传输格式映射为内部业务模型,实现协议与业务逻辑的解耦
|
|||
|
|
var payload = new RegisterPayload
|
|||
|
|
{
|
|||
|
|
// 身份标识映射
|
|||
|
|
ProcessId = request.ProcessId,
|
|||
|
|
InvokeProcId = request.InvokeProcessId,
|
|||
|
|
InstanceId = request.InstanceId,
|
|||
|
|
Version = request.Version,
|
|||
|
|
|
|||
|
|
// 网络诊断信息映射
|
|||
|
|
ServerIp = request.ServerIp,
|
|||
|
|
WebApiPort = request.WebapiPort,
|
|||
|
|
GrpcPort = request.GrpcPort,
|
|||
|
|
|
|||
|
|
// 运行时状态映射
|
|||
|
|
// 注意:将 int64 类型的 Ticks 转换为 C# 的 DateTime 对象
|
|||
|
|
StartTime = new DateTime(request.StartTimeTicks),
|
|||
|
|
Description = request.Description
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 2. 将注册载荷抛给总线,触发如 DeviceConfigHandler 的配置初始化逻辑
|
|||
|
|
// 职责:通过中介者模式分发事件,网关层不需要知道谁在处理这些数据
|
|||
|
|
MessageBus.Instance.RaiseServerRegistered(payload);
|
|||
|
|
|
|||
|
|
return Task.FromResult(new GenericResponse { Success = true });
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
|
|||
|
|
#region 2. 指令下发长连接 (Server Streaming)
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 建立并维持一个从服务器向客户端单向推送指令的长连接通道
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="request">连接请求(包含 InstanceId)</param>
|
|||
|
|
/// <param name="responseStream">响应流,用于后续异步推送指令</param>
|
|||
|
|
/// <param name="context">gRpc 调用上下文</param>
|
|||
|
|
/// <returns>异步任务</returns>
|
|||
|
|
public override async Task OpenCommandChannel(CommandStreamRequest request, IServerStreamWriter<CommandPayloadProto> responseStream, ServerCallContext context)
|
|||
|
|
{
|
|||
|
|
// 1. 物理流登记:将此响应流句柄存入 GrpcSessionManager,以便 MessageBus 随时调用
|
|||
|
|
GrpcSessionManager.Instance.RegisterSession(request.InstanceId, responseStream);
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 2. 挂起连接:利用 Task.Delay(-1) 配合取消令牌无限期挂起连接,直到客户端断开
|
|||
|
|
await Task.Delay(-1, context.CancellationToken);
|
|||
|
|
}
|
|||
|
|
catch (OperationCanceledException)
|
|||
|
|
{
|
|||
|
|
// 客户端主动取消连接属于正常预期,无需抛出异常
|
|||
|
|
}
|
|||
|
|
finally
|
|||
|
|
{
|
|||
|
|
// 3. 物理流清理:当连接断开时,必须从会话管理器中移除,防止下发指令时产生死连接
|
|||
|
|
GrpcSessionManager.Instance.RemoveSession(request.InstanceId);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
|
|||
|
|
#region 3. 设备状态批量上报 (Unary 调用)
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 接收来自分析节点的相机在线/离线状态批量上报
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="request">包含多个设备状态项的请求对象</param>
|
|||
|
|
/// <param name="context">gRpc 调用上下文</param>
|
|||
|
|
/// <returns>操作成功响应</returns>
|
|||
|
|
public override Task<GenericResponse> ReportStatusBatch(StatusBatchRequest request, ServerCallContext context)
|
|||
|
|
{
|
|||
|
|
if (request.Items == null || !request.Items.Any())
|
|||
|
|
return Task.FromResult(new GenericResponse { Success = true });
|
|||
|
|
|
|||
|
|
// 1. 数据映射:将 Proto 集合转换为业务层的 StatusEventPayload 列表
|
|||
|
|
var payloads = request.Items.Select(item => new StatusEventPayload
|
|||
|
|
{
|
|||
|
|
CameraId = item.CameraId,
|
|||
|
|
IsOnline = item.IsOnline,
|
|||
|
|
Reason = item.Reason,
|
|||
|
|
Timestamp = request.Timestamp
|
|||
|
|
}).ToList();
|
|||
|
|
|
|||
|
|
// 2. 路由分发:通过总线发布状态主题,驱动 DeviceStatusHandler 执行同步
|
|||
|
|
MessageBus.Instance.RaiseDeviceStatusReport(payloads);
|
|||
|
|
|
|||
|
|
return Task.FromResult(new GenericResponse { Success = true });
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
|
|||
|
|
#region 4. 视频流传输接收 (Client Streaming)
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 接收分析节点持续推送的视频帧数据流
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="requestStream">客户端异步流读取器</param>
|
|||
|
|
/// <param name="context">gRpc 调用上下文</param>
|
|||
|
|
/// <returns>流关闭后的最终响应</returns>
|
|||
|
|
public override async Task<GenericResponse> UploadVideoStream(IAsyncStreamReader<VideoFrameRequest> requestStream, ServerCallContext context)
|
|||
|
|
{
|
|||
|
|
// 1. 持续读取客户端推送的每一帧数据,直到流关闭或被取消
|
|||
|
|
while (await requestStream.MoveNext(context.CancellationToken))
|
|||
|
|
{
|
|||
|
|
var frame = requestStream.Current;
|
|||
|
|
|
|||
|
|
// 2. 将 Protobuf 帧数据转换为业务视频载荷 VideoPayload
|
|||
|
|
// 注意:ByteString 需要显式调用 ToByteArray 转换
|
|||
|
|
var videoPayload = new VideoPayload
|
|||
|
|
{
|
|||
|
|
CameraId = frame.CameraId,
|
|||
|
|
CaptureTimestamp = frame.CaptureTimestamp,
|
|||
|
|
OriginalWidth = frame.OriginalWidth,
|
|||
|
|
OriginalHeight = frame.OriginalHeight,
|
|||
|
|
OriginalImageBytes = frame.OriginalImageBytes.ToByteArray(),
|
|||
|
|
TargetImageBytes = frame.TargetImageBytes.ToByteArray(),
|
|||
|
|
TargetWidth = frame.TargetWidth,
|
|||
|
|
TargetHeight = frame.TargetHeight,
|
|||
|
|
SubscriberIds = frame.SubscriberIds.ToList(),
|
|||
|
|
HasOriginalImage = true
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 3. 导流:将图像数据直接投递给图像分发控制器进行 UI 渲染或二次处理
|
|||
|
|
ImageMonitorController.Instance.ReceivePayload(videoPayload);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return new GenericResponse { Success = true, Message = "Video stream ended" };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endregion
|
|||
|
|
}
|
|||
|
|
}
|