Merge remote-tracking branch 'origin/master'

dev
夜笙歌 1 year ago
commit 56936fcff2

@ -41,7 +41,6 @@ import java.nio.charset.StandardCharsets;
// clean-session: true # mqtt clean session默认true
// use-ssl: false # 是否启用 ssl默认false
//todo:read-buffer-size
@Configuration
public class MqttConfiguration {
private static final Logger logger = LoggerFactory.getLogger(MqttConfiguration.class);
@ -79,13 +78,13 @@ public class MqttConfiguration {
@Value("${mqtt.client.imagePatterns}")
String imagePatterns;
@Autowired
private IDataProcessService dataProcessService;
// @Autowired
// private IDataProcessService dataProcessService;
//
// @Autowired
// private IDeviceStatusService deviceStatusService;
@Autowired
private IDeviceStatusService deviceStatusService;
@Bean//注入spring
/* @Bean//注入spring
public MqttClientCustomizer mqttClientCustomizer() {
return new MqttClientCustomizer() {
@Override
@ -97,7 +96,7 @@ public class MqttConfiguration {
.username(username)
.password(password)
.timeout(timeOut)
.keepAliveSecs(keepAlive)
.keepAliveSecs(keepAlive)//心跳时间Keep Alive (s)如果用户不希望框架层面做心跳相关工作请把此值设为0或负数
// 如果包体过大,建议将此参数设置和 maxBytesInMessage 一样大
// .readBufferSize(1024 * 10)
// 最大包体长度,如果包体过大需要设置此参数
@ -105,12 +104,12 @@ public class MqttConfiguration {
// .version(MqttVersion.MQTT_5)
// 连接监听
.connectListener(new MqttClientConnectListener())
.willMessage(builder -> {
builder.topic("/test/offline")
.messageText("down")
.retain(false)
.qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息
})
// .willMessage(builder -> {
// builder.topic("/test/offline")
// .messageText("down")
// .retain(false)
// .qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息
// })
// 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。
.connectSync();
String[] topicArr = {dataTopicFilter, deviceStatusTopic};
@ -152,7 +151,7 @@ public class MqttConfiguration {
// client.subQos0(dataTopic,new MqttClientMessageListener());
}
};
}
}*/
public String getIp() {
@ -250,4 +249,21 @@ public class MqttConfiguration {
public void setImagePatterns(String imagePatterns) {
this.imagePatterns = imagePatterns;
}
public String getImageDomain() {
return imageDomain;
}
public void setImageDomain(String imageDomain) {
this.imageDomain = imageDomain;
}
public String getImagePrefix() {
return imagePrefix;
}
public void setImagePrefix(String imagePrefix) {
this.imagePrefix = imagePrefix;
}
}

@ -0,0 +1,83 @@
package com.ruoyi.dataprocess.mqtt.client.listener;
import com.ruoyi.dataprocess.mqtt.client.config.MqttConfiguration;
import com.ruoyi.dataprocess.service.IDataProcessService;
import com.ruoyi.dataprocess.service.IDeviceStatusService;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
*
*
* @author L.cm
*/
@Service
public class MqttClientSubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);
public static final String TOPIC_TYPE_DATA_POSTFIX = "data";
public static final String TOPIC_TYPE_COMMAND_POSTFIX = "command";
public static final String TOPIC_TYPE_REPLY_POSTFIX = "reply";
@Autowired
private IDataProcessService dataProcessService;
@Autowired
private IDeviceStatusService deviceStatusService;
@Autowired
private MqttConfiguration mqttConfiguration;
@MqttClientSubscribe(value = "/v1/#", qos = MqttQoS.AT_MOST_ONCE)
public void subQos0(String topic, ByteBuffer payload) {
String payloadString = new String(payload.array(), StandardCharsets.UTF_8);
Long start = System.currentTimeMillis();
String imagePath = mqttConfiguration.getImagePath();
String imagePatterns = mqttConfiguration.getImagePatterns();
String imageDomain = mqttConfiguration.getImageDomain();
String imagePrefix = mqttConfiguration.getImagePrefix();
logger.info("topic:{},start:{}ms", topic, start);
try {
if (topic.endsWith(TOPIC_TYPE_DATA_POSTFIX)) {
int processDataCount = dataProcessService.processBusinessData(payloadString,
imagePath, imagePatterns, imageDomain, imagePrefix);
Long end = System.currentTimeMillis();
logger.info("Process Data start:{}ms,end:{}ms,Spend Time:{}ms,Data Count:{}", start, end, end - start, processDataCount);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("Error processing business data,topic:{},start:{},error:{}", topic, start, e.getMessage());
}
}
@MqttClientSubscribe(value = "/device/status/v1", qos = MqttQoS.AT_MOST_ONCE)
public void subQos1(String topic, ByteBuffer payload) {
String payloadString = new String(payload.array(), StandardCharsets.UTF_8);
Long start = System.currentTimeMillis();
String clientId = mqttConfiguration.getClientId();
logger.info("topic:{},start:{}ms", topic, start);
try {
deviceStatusService.handleDeviceStatus(payloadString, clientId);
} catch (Exception e) {
e.printStackTrace();
logger.error("Error processing business data,topic:{},start:{},error:{}", topic, start, e.getMessage());
}
}
//
// @MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
// public void thingSubRegister(String topic, byte[] payload) {
// // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
// // 注意mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
// logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
// }
}

@ -161,7 +161,6 @@ public class DataProcessServiceImpl implements IDataProcessService {
if (type != null) {
extension = StringUtils.isNotBlank(ImageUtils.getImageType(type, imagePatternArr))
? ImageUtils.getImageType(type, imagePatternArr) : extension;
System.out.println("extension: " + extension);
}
value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension);
if (value == null) continue;

Loading…
Cancel
Save