using System; using NetMQ; using NetMQ.Sockets; using SHH.Contracts; namespace SHH.NetMQ { /// /// 视频转发客户端 (Pusher) /// 特性:主动推送、断线重连、非阻塞 /// public class ForwarderClient : IDisposable { private PushSocket _pushSocket; private readonly object _lock = new object(); private bool _isInitialized = false; // 同样设置 50 帧的缓存限制 private const int HWM_LIMIT = 50; public ForwarderClient(string remoteAddress) { if (string.IsNullOrEmpty(remoteAddress)) return; _pushSocket = new PushSocket(); // 1. 防堆积设置 _pushSocket.Options.SendHighWatermark = HWM_LIMIT; try { // NetMQ 会自动在后台处理重连,无需人工干预 _pushSocket.Connect(remoteAddress); _isInitialized = true; } catch (Exception ex) { Console.WriteLine($"[Client Error] 连接失败: {ex.Message}"); _isInitialized = false; } } public void Push(VideoPayload payload) { if (!_isInitialized || payload == null) return; if (payload.DispatchTime == DateTime.MinValue) payload.DispatchTime = DateTime.Now; var msg = new NetMQMessage(); msg.Append(payload.GetMetadataJson()); msg.Append(payload.OriginalImageBytes ?? new byte[0]); msg.Append(payload.TargetImageBytes ?? new byte[0]); lock (_lock) { // 2. 非阻塞推送 // 如果对方挂了,或者网络断了,缓冲区满后这里的 TrySend 会立即返回 false // 保证 SDK 采集不受影响 bool sent = _pushSocket.TrySendMultipartMessage(TimeSpan.Zero, msg); } } public void Dispose() { _pushSocket?.Dispose(); } } }