using Ayay.SerilogLogs; using Grpc.Core; using Grpc.Net.Client; using Microsoft.Extensions.Hosting; using Serilog; using SHH.CameraSdk; using SHH.Contracts; using SHH.Contracts.Grpc; using System.Collections.Concurrent; namespace SHH.CameraService; /// /// 设备状态监控工作者 (gRpc 版) /// 职责:监控相机状态并在状态变更或心跳周期内,通过 gRpc 批量上报至所有配置的端点 /// public class DeviceStatusHandler : BackgroundService { private ILogger _gRpcLog = Log.ForContext("SourceContext", LogModules.gRpc); private readonly CameraManager _manager; private readonly ServiceConfig _config; // 状态存储:CameraId -> 状态载荷 private readonly ConcurrentDictionary _stateStore = new(); private volatile bool _isDirty = false; private long _lastSendTick = 0; public DeviceStatusHandler( CameraManager manager, ServiceConfig config) { _manager = manager; _config = config; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // 1. 初始化本地状态缓存 foreach (var dev in _manager.GetAllDevices()) { UpdateLocalState(dev.Id, false, "Service Init"); } // 2. 订阅 SDK 状态变更事件 _manager.OnDeviceStatusChanged += OnSdkStatusChanged; _gRpcLog.Information($"[gRpc] 状态上报已启动,配置节点数: {_config.CommandEndpoints.Count}"); // 3. 定时循环 (1秒1次检查) var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); try { while (await timer.WaitForNextTickAsync(stoppingToken)) { await CheckAndBroadcastAsync(stoppingToken); } } catch (OperationCanceledException) { /* 正常退出 */ } catch (Exception ex) { _gRpcLog.Error($"[gRpc] 状态上报运行异常"); } finally { _manager.OnDeviceStatusChanged -= OnSdkStatusChanged; } } /// /// SDK 状态变更回调 /// private void OnSdkStatusChanged(long deviceId, bool isOnline, string reason) { UpdateLocalState(deviceId, isOnline, reason); _isDirty = true; } private void UpdateLocalState(long deviceId, bool isOnline, string reason) { var evt = new StatusEventPayload { CameraId = deviceId.ToString(), IsOnline = isOnline, Reason = reason, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; _stateStore[deviceId.ToString()] = evt; } /// /// 执行广播逻辑 /// private async Task CheckAndBroadcastAsync(CancellationToken ct) { long now = Environment.TickCount64; // 策略: 有变更(Dirty) 或 超过 2 秒(强制心跳) bool shouldSend = _isDirty || (now - _lastSendTick > 2000); if (shouldSend && _config.CommandEndpoints.Any()) { // 1. 构建 gRpc 请求包 var request = new StatusBatchRequest { Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; // 转换内存中的状态快照为 Protobuf 列表 foreach (var item in _stateStore.Values) { request.Items.Add(new StatusEventItem { CameraId = item.CameraId, IsOnline = item.IsOnline, Reason = item.Reason, }); } // 2. 遍历所有端点进行发送 foreach (var endpoint in _config.CommandEndpoints) { try { string grpcUrl = endpoint.Uri.Replace("tcp://", "http://").Trim(); // --- 增加以下诊断代码 --- using var channel = GrpcChannel.ForAddress(grpcUrl); var client = new GatewayProvider.GatewayProviderClient(channel); // 获取 gRpc 内部生成的服务全称 // 这就是客户端尝试调用的真实路径:/包名.服务名/方法名 var serviceName = client.GetType().DeclaringType?.Name ?? "Unknown"; _gRpcLog.Debug("[gRpc] 准备调用端点: {Url}", grpcUrl); _gRpcLog.Debug("[gRpc] 客户端契约服务名: {Service}", serviceName); // 执行调用 var response = await client.ReportStatusBatchAsync(request, deadline: DateTime.UtcNow.AddSeconds(2), cancellationToken: ct); if (response.Success) { _gRpcLog.Information("[gRpc] 设备状态上报成功, 共计: {Count} 个, Url: {Url}", request.Items.Count, grpcUrl); _gRpcLog.Debug("[gRpc] 设备状态上报成功: {Url} Items:{Items}", grpcUrl, request.Items); _isDirty = false; _lastSendTick = Environment.TickCount64; } } catch (RpcException ex) { // 这里是关键:打印 RpcException 的详细状态 _gRpcLog.Error("[gRpc] StatusCode: {Code}, Detail: {Detail}", ex.StatusCode, ex.Status.Detail); // 如果是 Unimplemented,通常意味着路径不对 if (ex.StatusCode == StatusCode.Unimplemented) { _gRpcLog.Error("[gRpc] 请检查服务端是否注册了名为 'GatewayProvider' 的服务,且其 package 声明与客户端一致。"); } } catch (Exception ex) { _gRpcLog.Error("[gRpc] 非 RPC 异常: {Msg}", ex.Message); } } } } }