using MQTTnet.Client; using MQTTnet; using MQTTnet.Protocol; using System; using System.Text; using System.Threading.Tasks; using System.Threading; using System.Security.Authentication; namespace FileDataUpload.Mqtt { public sealed class MqttClient { public delegate void PrintMessageReceived(string message); public event PrintMessageReceived PrintMessageReceivedEvent; private IMqttClient client; private static readonly Lazy lazy = new Lazy(() => new MqttClient()); public static MqttClient Instance { get { return lazy.Value; } } private MqttClient() { } public async void Connect(string ip,int port,string clientId,string username,string password) { try { MqttClientOptions options = new MqttClientOptionsBuilder() .WithTcpServer(ip, port) .WithClientId(clientId) .WithCredentials(username, password) .WithTls(o => { o.CertificateValidationHandler = _ => true; o.SslProtocol = SslProtocols.Tls12; }).Build(); client = new MqttFactory().CreateMqttClient(); client.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceived; MqttClientConnectResult result = await client.ConnectAsync(options); if (result != null) { if (result.ResultCode == MQTTnet.Client.MqttClientConnectResultCode.Success) { PrintMessageReceivedEvent?.Invoke($"连接服务器成功{ip}:{port}"); } else { PrintMessageReceivedEvent?.Invoke($"连接服务器失败"); } } }catch(Exception ex) { PrintMessageReceivedEvent?.Invoke($"连接服务器异常:{ex.Message}"); } } public void DisConnect() { client.DisconnectAsync(); PrintMessageReceivedEvent?.Invoke($"断开连接"); } public async void SubscriptionAsync(string topic) { try { var mqttFactory = new MqttFactory(); var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder() .WithTopicFilter( f => { f.WithTopic(topic); }) .Build(); MqttClientSubscribeResult result = await client.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None); PrintMessageReceivedEvent?.Invoke($"订阅主题:{topic}"); } catch(Exception ex) { PrintMessageReceivedEvent?.Invoke($"订阅主题异常:{ex.Message}"); } } public void Unsubscribe(string topic) { client.UnsubscribeAsync(topic); PrintMessageReceivedEvent?.Invoke($"取消订阅,主题:{topic}"); } public void Publish(string topic,string message) { try { var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(message) .Build(); client.PublishAsync(msg, CancellationToken.None); PrintMessageReceivedEvent?.Invoke($"向服务端推送成功,主题:{topic};内容:{message}"); } catch(Exception ex) { PrintMessageReceivedEvent?.Invoke($"向服务端推送消息异常:{ex.Message}"); } } private async Task MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs) { var info = $"接收到主题:{eventArgs.ApplicationMessage.Topic}的消息,内容:{Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)}"; PrintMessageReceivedEvent?.Invoke(info); } } }