You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
4.8 KiB
C#

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using System;
using System.Text;
namespace ZJ_BYD.Untils
{
public class MqttHelper
{
#region 服务端
public static void OpenMqttServer(int port)
{
IMqttServer mqttServer = null; //MQTT服务端实例
if (mqttServer != null && mqttServer.IsStarted)
{
return;
}
mqttServer = new MqttFactory().CreateMqttServer();
var optionsBuilder = new MqttServerOptionsBuilder().WithDefaultEndpoint()
.WithDefaultEndpointPort(port).WithSubscriptionInterceptor(b =>
{
LogHelper.WriteLog($"用户{b.ClientId}已订阅!");
b.AcceptSubscription = true;
}).WithApplicationMessageInterceptor(c =>
{
c.AcceptPublish = true;
});
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServer_UnSubscribedTopic);
mqttServer.StartAsync(optionsBuilder.Build());
if (!mqttServer.IsStarted)
{
LogHelper.WriteLog("消息服务启动失败!");
return;
}
else
{
LogHelper.WriteLog("消息服务启动成功!");
}
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs obj)
{
LogHelper.WriteLog($"{DateTime.Now}>>用户{obj.ClientId}已取消订阅!");
}
/// <summary>
/// 接收客户端消息
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServer_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs obj)
{
var content = Encoding.UTF8.GetString(obj.ApplicationMessage.Payload);
//var topic = obj.ApplicationMessage.Topic;
//var qos = obj.ApplicationMessage.QualityOfServiceLevel.ToString();
//var retained = obj.ApplicationMessage.Retain.ToString();
//txtMsg.AppendText($"{DateTime.Now}>>接收到用户{obj.ClientId}发送的消息:{content}\r\n");
}
/// <summary>
/// 客户端断开连接
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs obj)
{
LogHelper.WriteLog($"{DateTime.Now}>>用户{obj.ClientId}已断开!");
}
/// <summary>
/// 客户端连接
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs obj)
{
LogHelper.WriteLog($"{DateTime.Now}>>用户{obj.ClientId}已连接");
}
#endregion
#region 客户端
/// <summary>
/// 连接服务
/// </summary>
/// <param name="ip">客户端标识</param>
/// <param name="serverIp"></param>
/// <param name="port"></param>
/// <returns></returns>
public static async void Connect(string ip, string serverIp, int port)
{
try
{
Program.mqttClient = new MqttFactory().CreateMqttClient();
var optionBuild = new MqttClientOptionsBuilder().WithClientId(ip)
.WithTcpServer(serverIp, port)
.WithCredentials("admin", "123456")
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100))
.WithCommunicationTimeout(TimeSpan.FromSeconds(5000))
.WithCleanSession().Build();
var result = await Program.mqttClient.ConnectAsync(optionBuild);
if (result.ResultCode != MqttClientConnectResultCode.Success)
{
LogHelper.WriteLog($"{DateTime.Now}>>连接MQTT服务失败!");
}
}
catch (Exception ex)
{
LogHelper.WriteLog($"{DateTime.Now}>>连接MQTT服务失败{ex}");
}
}
#endregion
}
}