using MQTTnet; using MQTTnet.Client; using System; using System.Security.Authentication; using System.Text; using System.Threading; using System.Threading.Tasks; namespace HighWayIot.Mqtt { public sealed class MqttClient { public delegate void PrintMessageReceived(string message); public event PrintMessageReceived PrintMessageReceivedEvent; private IMqttClient client; private static readonly Lazy lazy = new Lazy(() => new MqttClient()); public static MqttClient Instance { get { return lazy.Value; } } private MqttClient() { } /// /// 链接服务器 /// /// /// /// /// /// public async void Connect(string ip, int port, string clientId, string username, string password) { try { MqttClientOptions options = new MqttClientOptionsBuilder() .WithTcpServer(ip, port) .WithClientId(clientId) .WithCredentials(username, password) .WithTls(o => //开启ssl { o.CertificateValidationHandler = _ => true; o.SslProtocol = SslProtocols.Tls12; }).Build(); client = new MqttFactory().CreateMqttClient(); client.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceived; MqttClientConnectResult result = await client.ConnectAsync(options); if (result != null) { if (result.ResultCode == MQTTnet.Client.MqttClientConnectResultCode.Success) { PrintMessageReceivedEvent?.Invoke($"连接服务器成功{ip}:{port}"); } else { PrintMessageReceivedEvent?.Invoke($"连接服务器失败"); } } } catch (Exception ex) { PrintMessageReceivedEvent?.Invoke($"连接服务器异常:{ex.Message}"); } } /// /// 断开链接 /// public void DisConnect() { client.DisconnectAsync(); PrintMessageReceivedEvent?.Invoke($"断开连接"); } /// /// 订阅主题 /// /// public async void SubscriptionAsync(string topic) { try { var mqttFactory = new MqttFactory(); var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder() .WithTopicFilter( f => { f.WithTopic(topic); }) .Build(); MqttClientSubscribeResult result = await client.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None); PrintMessageReceivedEvent?.Invoke($"订阅主题:{topic}"); } catch (Exception ex) { PrintMessageReceivedEvent?.Invoke($"订阅主题异常:{ex.Message}"); } } /// /// 取消订阅 /// /// public void Unsubscribe(string topic) { client.UnsubscribeAsync(topic); PrintMessageReceivedEvent?.Invoke($"取消订阅,主题:{topic}"); } /// /// 推送消息 /// /// /// public void Publish(string topic, string message) { try { var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(message) .Build(); client.PublishAsync(msg, CancellationToken.None); PrintMessageReceivedEvent?.Invoke($"向服务端推送成功,主题:{topic};内容:{message}"); } catch (Exception ex) { PrintMessageReceivedEvent?.Invoke($"向服务端推送消息异常:{ex.Message}"); } } private async Task MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs) { var info = $"接收到主题:{eventArgs.ApplicationMessage.Topic}的消息,内容:{Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)}"; PrintMessageReceivedEvent?.Invoke(info); } } }