Files

105 lines
4.2 KiB
C#
Raw Permalink Normal View History

2026-01-16 14:30:42 +08:00
using Ayay.SerilogLogs;
using Google.Protobuf;
2026-01-15 09:31:57 +08:00
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
2026-01-16 14:30:42 +08:00
using Serilog;
2026-01-15 09:31:57 +08:00
using SHH.Contracts.Grpc;
namespace SHH.CameraService;
/// <summary>
2026-01-16 14:30:42 +08:00
/// gRpc 视频流发送工作者
/// 职责:监听特定的 StreamTarget 队列,建立 gRpc 客户端流并持续推送图片
/// </summary>
2026-01-15 09:31:57 +08:00
public class GrpcSenderWorker : BackgroundService
{
2026-01-16 15:17:23 +08:00
private ILogger _gRpcLog = Log.ForContext("SourceContext", LogModules.gRpc);
2026-01-16 14:30:42 +08:00
private readonly StreamTarget _target;
2026-01-15 09:31:57 +08:00
private readonly string _grpcUrl;
2026-01-16 14:30:42 +08:00
public GrpcSenderWorker(StreamTarget target)
{
_target = target;
2026-01-15 09:31:57 +08:00
// 自动适配地址:将配置的 tcp://localhost:9001 转换为 http://localhost:9001
// 并且严格使用你验证成功的 localhost
_grpcUrl = _target.Config.Endpoint.Replace("tcp://", "http://");
}
2026-01-15 09:31:57 +08:00
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
2026-01-16 14:30:42 +08:00
_gRpcLog.Information($"[gRpc] 视频流发送业务启动, 目标: {_target.Config.Name}, 地址: {_grpcUrl}");
2026-01-15 09:31:57 +08:00
while (!stoppingToken.IsCancellationRequested)
{
try
{
2026-01-15 09:31:57 +08:00
// 1. 建立通道
using var channel = GrpcChannel.ForAddress(_grpcUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
2026-01-15 09:31:57 +08:00
// 2. 开启客户端流 (UploadVideoStream 是在 proto 中定义的)
using var call = client.UploadVideoStream(cancellationToken: stoppingToken);
2026-01-16 14:30:42 +08:00
_gRpcLog.Information($"[gRpc] 已开启视频推送流, 目标: {_target.Config.Name}, 地址: {_grpcUrl}");
2026-01-16 14:30:42 +08:00
// 3. 核心搬运循环:从内存队列 (Channel) 读取数据
await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken))
{
2026-01-15 11:04:38 +08:00
// 【畅通保障】检查数据时效性:丢弃超过 1 秒的积压帧
var delay = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - payload.CaptureTimestamp;
if (delay > 1000)
{
continue;
}
2026-01-16 14:30:42 +08:00
// 将业务 DTO 转换为 gRpc 原生 Request
2026-01-15 09:31:57 +08:00
var request = new VideoFrameRequest
{
2026-01-15 11:04:38 +08:00
CameraId = payload.CameraId ?? "0",
2026-01-15 09:31:57 +08:00
CaptureTimestamp = payload.CaptureTimestamp,
OriginalWidth = payload.OriginalWidth,
OriginalHeight = payload.OriginalHeight,
2026-01-17 13:13:17 +08:00
TargetWidth = payload.TargetWidth,
TargetHeight = payload.TargetHeight,
2026-01-15 09:31:57 +08:00
HasOriginalImage = payload.HasOriginalImage,
HasTargetImage = payload.HasTargetImage,
2026-01-16 14:30:42 +08:00
// ★ 核心:将 byte[] 转换为 gRpc 的 ByteString (高性能)
2026-01-15 09:31:57 +08:00
OriginalImageBytes = payload.OriginalImageBytes != null
? ByteString.CopyFrom(payload.OriginalImageBytes)
: ByteString.Empty,
2026-01-15 09:31:57 +08:00
TargetImageBytes = payload.TargetImageBytes != null
? ByteString.CopyFrom(payload.TargetImageBytes)
: ByteString.Empty
};
2026-01-15 11:04:38 +08:00
request.SubscriberIds.AddRange(payload.SubscriberIds);
2026-01-15 09:31:57 +08:00
// 处理诊断信息 map<string, string>
if (payload.Diagnostics != null)
{
foreach (var kv in payload.Diagnostics)
{
2026-01-15 09:31:57 +08:00
request.Diagnostics.Add(kv.Key, kv.Value?.ToString() ?? "");
}
}
2026-01-15 09:31:57 +08:00
// 4. 发送至 AiVideo
await call.RequestStream.WriteAsync(request);
}
2026-01-15 09:31:57 +08:00
// 正常结束流
await call.RequestStream.CompleteAsync();
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
2026-01-16 14:30:42 +08:00
_gRpcLog.Warning($"[gRpc] 视频推送流链路异常, 目标: {_target.Config.Name}, 地址: {_grpcUrl}, 5秒后重连: {ex.Message}.");
await Task.Delay(5000, stoppingToken);
2026-01-16 14:30:42 +08:00
}
}
}
}