diff --git a/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/mqtt/client/config/MqttConfiguration.java b/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/mqtt/client/config/MqttConfiguration.java index 089beb3..461015f 100644 --- a/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/mqtt/client/config/MqttConfiguration.java +++ b/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/mqtt/client/config/MqttConfiguration.java @@ -41,7 +41,6 @@ import java.nio.charset.StandardCharsets; // clean-session: true # mqtt clean session,默认:true // use-ssl: false # 是否启用 ssl,默认:false -//todo:read-buffer-size @Configuration public class MqttConfiguration { private static final Logger logger = LoggerFactory.getLogger(MqttConfiguration.class); @@ -79,13 +78,13 @@ public class MqttConfiguration { @Value("${mqtt.client.imagePatterns}") String imagePatterns; - @Autowired - private IDataProcessService dataProcessService; +// @Autowired +// private IDataProcessService dataProcessService; +// +// @Autowired +// private IDeviceStatusService deviceStatusService; - @Autowired - private IDeviceStatusService deviceStatusService; - - @Bean//注入spring +/* @Bean//注入spring public MqttClientCustomizer mqttClientCustomizer() { return new MqttClientCustomizer() { @Override @@ -97,7 +96,7 @@ public class MqttConfiguration { .username(username) .password(password) .timeout(timeOut) - .keepAliveSecs(keepAlive) + .keepAliveSecs(keepAlive)//心跳时间(Keep Alive (s),如果用户不希望框架层面做心跳相关工作,请把此值设为0或负数) // 如果包体过大,建议将此参数设置和 maxBytesInMessage 一样大 // .readBufferSize(1024 * 10) // 最大包体长度,如果包体过大需要设置此参数 @@ -105,12 +104,12 @@ public class MqttConfiguration { // .version(MqttVersion.MQTT_5) // 连接监听 .connectListener(new MqttClientConnectListener()) - .willMessage(builder -> { - builder.topic("/test/offline") - .messageText("down") - .retain(false) - .qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息 - }) +// .willMessage(builder -> { +// builder.topic("/test/offline") +// .messageText("down") +// .retain(false) +// .qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息 +// }) // 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。 .connectSync(); String[] topicArr = {dataTopicFilter, deviceStatusTopic}; @@ -152,7 +151,7 @@ public class MqttConfiguration { // client.subQos0(dataTopic,new MqttClientMessageListener()); } }; - } + }*/ public String getIp() { @@ -250,4 +249,21 @@ public class MqttConfiguration { public void setImagePatterns(String imagePatterns) { this.imagePatterns = imagePatterns; } + + + public String getImageDomain() { + return imageDomain; + } + + public void setImageDomain(String imageDomain) { + this.imageDomain = imageDomain; + } + + public String getImagePrefix() { + return imagePrefix; + } + + public void setImagePrefix(String imagePrefix) { + this.imagePrefix = imagePrefix; + } } diff --git a/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/mqtt/client/listener/MqttClientSubscribeListener.java b/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/mqtt/client/listener/MqttClientSubscribeListener.java new file mode 100644 index 0000000..a3304da --- /dev/null +++ b/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/mqtt/client/listener/MqttClientSubscribeListener.java @@ -0,0 +1,83 @@ +package com.ruoyi.dataprocess.mqtt.client.listener; + +import com.ruoyi.dataprocess.mqtt.client.config.MqttConfiguration; +import com.ruoyi.dataprocess.service.IDataProcessService; +import com.ruoyi.dataprocess.service.IDeviceStatusService; +import net.dreamlu.iot.mqtt.codec.ByteBufferUtil; +import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +/** + * 客户端消息监听 + * + * @author L.cm + */ +@Service +public class MqttClientSubscribeListener { + private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class); + + public static final String TOPIC_TYPE_DATA_POSTFIX = "data"; + public static final String TOPIC_TYPE_COMMAND_POSTFIX = "command"; + public static final String TOPIC_TYPE_REPLY_POSTFIX = "reply"; + + @Autowired + private IDataProcessService dataProcessService; + + @Autowired + private IDeviceStatusService deviceStatusService; + + @Autowired + private MqttConfiguration mqttConfiguration; + + @MqttClientSubscribe(value = "/v1/#", qos = MqttQoS.AT_MOST_ONCE) + public void subQos0(String topic, ByteBuffer payload) { + String payloadString = new String(payload.array(), StandardCharsets.UTF_8); + Long start = System.currentTimeMillis(); + String imagePath = mqttConfiguration.getImagePath(); + String imagePatterns = mqttConfiguration.getImagePatterns(); + String imageDomain = mqttConfiguration.getImageDomain(); + String imagePrefix = mqttConfiguration.getImagePrefix(); + logger.info("topic:{},start:{}ms", topic, start); + try { + if (topic.endsWith(TOPIC_TYPE_DATA_POSTFIX)) { + int processDataCount = dataProcessService.processBusinessData(payloadString, + imagePath, imagePatterns, imageDomain, imagePrefix); + Long end = System.currentTimeMillis(); + logger.info("Process Data start:{}ms,end:{}ms,Spend Time:{}ms,Data Count:{}", start, end, end - start, processDataCount); + } + } catch (Exception e) { + e.printStackTrace(); + logger.error("Error processing business data,topic:{},start:{},error:{}", topic, start, e.getMessage()); + } + } + + @MqttClientSubscribe(value = "/device/status/v1", qos = MqttQoS.AT_MOST_ONCE) + public void subQos1(String topic, ByteBuffer payload) { + String payloadString = new String(payload.array(), StandardCharsets.UTF_8); + Long start = System.currentTimeMillis(); + String clientId = mqttConfiguration.getClientId(); + logger.info("topic:{},start:{}ms", topic, start); + try { + deviceStatusService.handleDeviceStatus(payloadString, clientId); + } catch (Exception e) { + e.printStackTrace(); + logger.error("Error processing business data,topic:{},start:{},error:{}", topic, start, e.getMessage()); + } + } +// +// @MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register") +// public void thingSubRegister(String topic, byte[] payload) { +// // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 + +// // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。 +// logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); +// } + +} + diff --git a/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/service/impl/DataProcessServiceImpl.java b/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/service/impl/DataProcessServiceImpl.java index 3c4cd12..793c01b 100644 --- a/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/service/impl/DataProcessServiceImpl.java +++ b/ruoyi-modules/hw-data-process/src/main/java/com/ruoyi/dataprocess/service/impl/DataProcessServiceImpl.java @@ -161,7 +161,6 @@ public class DataProcessServiceImpl implements IDataProcessService { if (type != null) { extension = StringUtils.isNotBlank(ImageUtils.getImageType(type, imagePatternArr)) ? ImageUtils.getImageType(type, imagePatternArr) : extension; - System.out.println("extension: " + extension); } value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension); if (value == null) continue;