You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
4.8 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using System;
using System.Text;
namespace ZJ_BYD.Untils
{
public class MqttHelper
{
#region 服务端
public static void OpenMqttServer(int port)
{
IMqttServer mqttServer = null; //MQTT服务端实例
if (mqttServer != null && mqttServer.IsStarted)
{
return;
}
mqttServer = new MqttFactory().CreateMqttServer();
var optionsBuilder = new MqttServerOptionsBuilder().WithDefaultEndpoint()
.WithDefaultEndpointPort(port).WithSubscriptionInterceptor(b =>
{
LogHelper.WriteLog($"用户{b.ClientId}已订阅!");
b.AcceptSubscription = true;
}).WithApplicationMessageInterceptor(c =>
{
c.AcceptPublish = true;
});
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServer_UnSubscribedTopic);
mqttServer.StartAsync(optionsBuilder.Build());
if (!mqttServer.IsStarted)
{
LogHelper.WriteLog("消息服务启动失败!");
return;
}
else
{
LogHelper.WriteLog("消息服务启动成功!");
}
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs obj)
{
LogHelper.WriteLog($"{DateTime.Now}>>用户{obj.ClientId}已取消订阅!");
}
/// <summary>
/// 接收客户端消息
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServer_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs obj)
{
var content = Encoding.UTF8.GetString(obj.ApplicationMessage.Payload);
//var topic = obj.ApplicationMessage.Topic;
//var qos = obj.ApplicationMessage.QualityOfServiceLevel.ToString();
//var retained = obj.ApplicationMessage.Retain.ToString();
//txtMsg.AppendText($"{DateTime.Now}>>接收到用户{obj.ClientId}发送的消息:{content}\r\n");
}
/// <summary>
/// 客户端断开连接
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs obj)
{
LogHelper.WriteLog($"{DateTime.Now}>>用户{obj.ClientId}已断开!");
}
/// <summary>
/// 客户端连接
/// </summary>
/// <param name="obj"></param>
private static void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs obj)
{
LogHelper.WriteLog($"{DateTime.Now}>>用户{obj.ClientId}已连接");
}
#endregion
#region 客户端
/// <summary>
/// 连接服务
/// </summary>
/// <param name="ip">客户端标识</param>
/// <param name="serverIp"></param>
/// <param name="port"></param>
/// <returns></returns>
public static async void Connect(string ip, string serverIp, int port)
{
try
{
Program.mqttClient = new MqttFactory().CreateMqttClient();
var optionBuild = new MqttClientOptionsBuilder().WithClientId(ip)
.WithTcpServer(serverIp, port)
.WithCredentials("admin", "123456")
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100))
.WithCommunicationTimeout(TimeSpan.FromSeconds(5000))
.WithCleanSession().Build();
var result = await Program.mqttClient.ConnectAsync(optionBuild);
if (result.ResultCode != MqttClientConnectResultCode.Success)
{
LogHelper.WriteLog($"{DateTime.Now}>>连接MQTT服务失败!");
}
}
catch (Exception ex)
{
LogHelper.WriteLog($"{DateTime.Now}>>连接MQTT服务失败{ex}");
}
}
#endregion
}
}