添加契约和网络传输类库

This commit is contained in:
2025-12-29 08:09:14 +08:00
parent 231247c80f
commit 8cd36f44ac
14 changed files with 748 additions and 0 deletions

View File

@@ -0,0 +1,71 @@
using System;
using NetMQ;
using NetMQ.Sockets;
using SHH.Contracts;
namespace SHH.NetMQ
{
/// <summary>
/// 视频分发服务端 (Publisher)
/// 特性:非阻塞、防内存溢出
/// </summary>
public class DistributorServer : IDisposable
{
private PublisherSocket _pubSocket;
private readonly object _lock = new object();
// 配置:高水位限制 (HWM)
// 假设 25fps设置 50 意味着内存只缓存 2 秒的视频。
// 如果断网超过 2 秒,新来的视频帧直接丢弃,优先保证恢复后的实时性。
private const int HWM_LIMIT = 50;
public DistributorServer(string connectionString)
{
_pubSocket = new PublisherSocket();
// 1. 设置发送缓冲区大小 (防爆内存关键)
_pubSocket.Options.SendHighWatermark = HWM_LIMIT;
// 2. 绑定地址 (如 tcp://*:5555)
_pubSocket.Bind(connectionString);
}
public void Broadcast(VideoPayload payload)
{
if (payload == null) return;
// 补充发送时间
payload.DispatchTime = DateTime.Now;
// 准备数据帧
string jsonMeta = payload.GetMetadataJson();
byte[] rawBytes = payload.OriginalImageBytes ?? new byte[0];
byte[] targetBytes = payload.TargetImageBytes ?? new byte[0];
// 使用 NetMQMessage 封装多帧消息
// 这样比手动调三次 Send 更容易管理原子性
var msg = new NetMQMessage();
msg.Append(jsonMeta); // 第1帧
msg.Append(rawBytes); // 第2帧
msg.Append(targetBytes); // 第3帧
lock (_lock)
{
// 3. 非阻塞发送 (核心防卡死代码)
// TimeSpan.Zero 表示:如果缓冲区满了或者发不出去,立即放弃,不等待,返回 false
// 这样你的主线程海康SDK回调永远不会被卡住
bool sent = _pubSocket.TrySendMultipartMessage(TimeSpan.Zero, msg);
if (!sent)
{
// 这里可以打个日志Console.WriteLine("警告:网络拥堵,丢帧中...");
}
}
}
public void Dispose()
{
_pubSocket?.Dispose();
}
}
}

View File

@@ -0,0 +1,69 @@
using System;
using NetMQ;
using NetMQ.Sockets;
using SHH.Contracts;
namespace SHH.NetMQ
{
/// <summary>
/// 视频转发客户端 (Pusher)
/// 特性:主动推送、断线重连、非阻塞
/// </summary>
public class ForwarderClient : IDisposable
{
private PushSocket _pushSocket;
private readonly object _lock = new object();
private bool _isInitialized = false;
// 同样设置 50 帧的缓存限制
private const int HWM_LIMIT = 50;
public ForwarderClient(string remoteAddress)
{
if (string.IsNullOrEmpty(remoteAddress)) return;
_pushSocket = new PushSocket();
// 1. 防堆积设置
_pushSocket.Options.SendHighWatermark = HWM_LIMIT;
try
{
// NetMQ 会自动在后台处理重连,无需人工干预
_pushSocket.Connect(remoteAddress);
_isInitialized = true;
}
catch (Exception ex)
{
Console.WriteLine($"[Client Error] 连接失败: {ex.Message}");
_isInitialized = false;
}
}
public void Push(VideoPayload payload)
{
if (!_isInitialized || payload == null) return;
if (payload.DispatchTime == DateTime.MinValue)
payload.DispatchTime = DateTime.Now;
var msg = new NetMQMessage();
msg.Append(payload.GetMetadataJson());
msg.Append(payload.OriginalImageBytes ?? new byte[0]);
msg.Append(payload.TargetImageBytes ?? new byte[0]);
lock (_lock)
{
// 2. 非阻塞推送
// 如果对方挂了,或者网络断了,缓冲区满后这里的 TrySend 会立即返回 false
// 保证 SDK 采集不受影响
bool sent = _pushSocket.TrySendMultipartMessage(TimeSpan.Zero, msg);
}
}
public void Dispose()
{
_pushSocket?.Dispose();
}
}
}

View File

@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<NoWarn>$(NoWarn);NU1701</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NetMQ" Version="4.0.2.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\SHH.Contracts\SHH.Contracts.csproj" />
</ItemGroup>
</Project>