using System; using System.Text; using System.Threading.Tasks; using System.Threading; using MQTTnet.Client; using System.Security.Authentication; using MQTTnet; using Microsoft.Extensions.Logging; namespace SlnMesnac.Mqtt { /// /// MQTT客户端 /// public class MqttClient { private ILogger _logger; private IMqttClient _client; public MqttClient(ILogger logger) { _logger = logger; } /// /// 链接服务器 /// /// /// /// /// /// public async void Connect(string ip, int port, string clientId, string username, string password) { try { MqttClientOptions options = new MqttClientOptionsBuilder() .WithTcpServer(ip, port) .WithClientId(clientId) .WithCredentials(username, password) .WithTls(o => //开启ssl { o.CertificateValidationHandler = _ => true; o.SslProtocol = SslProtocols.Tls12; }) .Build(); _client = new MqttFactory().CreateMqttClient(); _client.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceived; MqttClientConnectResult result = await _client.ConnectAsync(options); if (result != null) { if (result.ResultCode == MQTTnet.Client.MqttClientConnectResultCode.Success) { _logger.LogInformation($"连接服务器成功{ip}:{port}"); } else { _logger.LogInformation($"连接服务器失败"); } } } catch (Exception ex) { _logger.LogError("连接服务器异常",ex); } } /// /// 断开链接 /// public void DisConnect() { _client.DisconnectAsync(); _logger.LogInformation($"断开连接"); } /// /// 订阅主题 /// /// public async void SubscriptionAsync(string topic) { try { var mqttFactory = new MqttFactory(); var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder() .WithTopicFilter( f => { f.WithTopic(topic); }) .Build(); MqttClientSubscribeResult result = await _client.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None); _logger.LogInformation($"订阅主题:{topic}"); } catch (Exception ex) { _logger.LogError("订阅主题异常",ex); } } /// /// 取消订阅 /// /// public void Unsubscribe(string topic) { _client.UnsubscribeAsync(topic); _logger.LogInformation($"取消订阅,主题:{topic}"); } /// /// 推送消息 /// /// /// public void Publish(string topic, string message) { try { var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(message) .Build(); _client.PublishAsync(msg, CancellationToken.None); _logger.LogInformation($"向服务端推送成功,主题:{topic};内容:{message}"); } catch (Exception ex) { _logger.LogError("向服务端推送消息异常",ex); } } private async Task MqttClient_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs) { var info = $"接收到主题:{eventArgs.ApplicationMessage.Topic}的消息,内容:{Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)}"; _logger.LogInformation(info); } } }