diff --git a/ruoyi-modules/hw-mqtt-broker/pom.xml b/ruoyi-modules/hw-mqtt-broker/pom.xml
index 77ac85e..83d07a3 100644
--- a/ruoyi-modules/hw-mqtt-broker/pom.xml
+++ b/ruoyi-modules/hw-mqtt-broker/pom.xml
@@ -9,7 +9,8 @@
4.0.0
- hw-mqtt-broker
+
+ ruoyi-modules-mqttbroker
海威物联网平台Broker模块
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java
index c69d843..c9d5611 100644
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java
@@ -1,9 +1,23 @@
package com.hw.mqtt.auth;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.hw.mqtt.domain.DeviceInfoDto;
+import com.hw.mqtt.enums.RedisKeys;
+import com.ruoyi.common.core.utils.StringUtils;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.StringRedisTemplate;
import org.tio.core.ChannelContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
/**
* mqtt tcp、websocket 认证
* @author WenJY
@@ -13,10 +27,103 @@ import org.tio.core.ChannelContext;
@Configuration(proxyBeanMethods = false)
public class MqttAuthHandler implements IMqttServerAuthHandler {
+ private final Logger logger = LoggerFactory.getLogger(MqttAuthHandler.class);
+
+ /**
+ * 存储设备信息,设备接入校验
+ */
+ private List deviceInfoDtos;
+
+ private final StringRedisTemplate redisTemplate;
+
+ public MqttAuthHandler(List deviceInfoDtos, StringRedisTemplate redisTemplate) {
+ this.deviceInfoDtos = deviceInfoDtos;
+ this.redisTemplate = redisTemplate;
+ this.InitDeviceInfoListByRedis();
+ }
+
+ /**
+ * 通过Redis初始化本地设备信息集合,只在该类初始化时执行
+ */
+ private void InitDeviceInfoListByRedis(){
+ try{
+ String jsonStr = redisTemplate.opsForValue().get(RedisKeys.CLIENT_DEVICE_INFO.getKey());
+ if(StringUtils.isEmpty(jsonStr)){
+ logger.warn("通过Redis获取设备信息为空");
+ return;
+ }
+ this.deviceInfoDtos = JSONArray.parseArray(jsonStr, DeviceInfoDto.class);
+ }catch (Exception ex){
+ logger.error("通过Redis获取设备信息方法处理异常:"+ex.getMessage());
+ }
+ }
+
@Override
public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {
// 客户端认证逻辑实现
- return true;
+ try{
+ if(deviceInfoDtos!=null){
+ Optional optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst();
+ //判断本地集合中是否包含该设备信息,如果不包含再次读取Redis并初始化本地集合
+ if (optionalDeviceInfoDto.isPresent()) {
+ DeviceInfoDto deviceInfo = optionalDeviceInfoDto.get();
+ return checkDeviceInfo(deviceInfo,clientId,userName,password);
+ }
+ }
+ return NoDeviceInfoEvent(clientId,userName,password);
+ }catch (Exception ex){
+ logger.error("客户端认证逻辑处理异常:"+ex.getMessage());
+ }
+ return false;
+ }
+
+ /**
+ * 通过本地集合未获取到设备信息时重新获取进行校验
+ * @param clientId
+ * @param userName
+ * @param password
+ * @return
+ */
+ private boolean NoDeviceInfoEvent(String clientId,String userName,String password){
+ DeviceInfoDto deviceInfo = GetDeviceInfoByRedis(clientId);
+ if (deviceInfo != null) {
+ return checkDeviceInfo(deviceInfo,clientId,userName,password);
+ }else {
+ logger.warn("未获取到接入客户端的设备信息,禁止该设备接入");
+ return false;
+ }
+ }
+
+ /**
+ * 校验设备接入信息
+ * @param deviceInfo
+ * @param clientId
+ * @param userName
+ * @param passwd
+ * @return
+ */
+ private boolean checkDeviceInfo(DeviceInfoDto deviceInfo,String clientId,String userName,String passwd){
+ if(Objects.equals(userName, deviceInfo.getUserName()) && Objects.equals(passwd, deviceInfo.getPassword())){
+ return true;
+ }else {
+ logger.warn("接入设备:"+clientId+";账号或密码错误,禁止该设备接入");
+ return false;
+ }
+ }
+
+ /**
+ * 再次读取Redis重新加载本地设备信息集合内容获取设备信息,过滤出指定clientId的设备信息
+ * @param clientId
+ * @return
+ */
+ private DeviceInfoDto GetDeviceInfoByRedis(String clientId){
+ this.InitDeviceInfoListByRedis();
+ if(deviceInfoDtos!=null){
+ Optional optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst();
+ return optionalDeviceInfoDto.orElse(null);
+ }else {
+ return null;
+ }
}
}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/DeviceInfoDto.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/DeviceInfoDto.java
new file mode 100644
index 0000000..c03e5e6
--- /dev/null
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/DeviceInfoDto.java
@@ -0,0 +1,29 @@
+package com.hw.mqtt.domain;
+
+import lombok.Data;
+
+/**
+ * 设备基础信息
+ * @author Wen JY
+ * @description: TODO
+ * @date 2023-09-13 14:52:18
+ * @version: 1.0
+ */
+@Data
+public class DeviceInfoDto {
+
+ /**
+ * 设备编号,ClientId
+ */
+ private String deviceCode;
+
+ /**
+ * 用户名
+ */
+ private String userName;
+
+ /**
+ * 密码
+ */
+ private String password;
+}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/ConnectStatus.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/ConnectStatus.java
new file mode 100644
index 0000000..84d68a9
--- /dev/null
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/ConnectStatus.java
@@ -0,0 +1,32 @@
+package com.hw.mqtt.enums;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * 设备连接状态枚举
+ * @author Wen JY
+ * @description: TODO
+ * @date 2023-09-13 13:42:37
+ * @version: 1.0
+ */
+@Getter
+@RequiredArgsConstructor
+public enum ConnectStatus {
+
+ /**
+ * 设备连接
+ */
+ connct(1),
+
+ /**
+ * 设备断开
+ */
+ disconnect(0);
+
+ private final int key;
+
+ public int getKey(){
+ return this.key;
+ }
+}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/DeviceType.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/DeviceType.java
new file mode 100644
index 0000000..399fc12
--- /dev/null
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/DeviceType.java
@@ -0,0 +1,37 @@
+package com.hw.mqtt.enums;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * 设备类型枚举
+ * @author Wen JY
+ * @description: TODO
+ * @date 2023-09-13 13:30:38
+ * @version: 1.0
+ */
+@Getter
+@RequiredArgsConstructor
+public enum DeviceType {
+
+ /**
+ * 网关/基站
+ */
+ edge(1),
+
+ /**
+ * 网关子设备/传感器
+ */
+ sensor(2),
+
+ /**
+ * 直连设备
+ */
+ device(3);
+
+ private final int key;
+
+ public int getKey(){
+ return this.key;
+ }
+}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/RedisKeys.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/RedisKeys.java
index e4fdb0b..f24bc8f 100644
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/RedisKeys.java
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/RedisKeys.java
@@ -39,6 +39,11 @@ public enum RedisKeys {
* 保留消息存储
*/
MESSAGE_STORE_RETAIN("mqtt:messages:retain:"),
+
+ /**
+ * 设备信息
+ */
+ CLIENT_DEVICE_INFO("hw_device_info"),
;
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java
index d399789..56a31b0 100644
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java
@@ -17,6 +17,8 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSONArray;
+import com.hw.mqtt.enums.ConnectStatus;
+import com.hw.mqtt.enums.DeviceType;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
@@ -60,14 +62,14 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisTemplate.opsForSet().add(getRedisKey(), clientId);
- pushConnectStatus(clientId,1);
+ pushConnectStatus(clientId,ConnectStatus.connct);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisTemplate.opsForSet().remove(getRedisKey(), clientId);
- pushConnectStatus(clientId,2);
+ pushConnectStatus(clientId,ConnectStatus.disconnect);
}
/**
@@ -96,19 +98,19 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
* @param clientId
* @param connectStatus
*/
- public void pushConnectStatus(String clientId,Integer connectStatus){
+ public void pushConnectStatus(String clientId, ConnectStatus connectStatus){
Map entityMap = new HashMap<>();
entityMap.put("msg","设备设备连接状态信息");
- entityMap.put("deviceType","edge");
+ entityMap.put("deviceType", DeviceType.edge.getKey());
entityMap.put("deviceCode",clientId);
- entityMap.put("connectStatus",connectStatus);
+ entityMap.put("connectStatus",connectStatus.getKey());
entityMap.put("statusTime",System.currentTimeMillis());
String jsonString = JSONArray.toJSONString(entityMap);
boolean result = mqttServerTemplate.publishAll("/device/status/v1", jsonString.getBytes(StandardCharsets.UTF_8));
if(result){
- logger.info("客户端:"+clientId+";"+ (connectStatus == 1 ? "连接" :"断开") +"状态推送成功");
+ logger.info("客户端:"+clientId+";"+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送成功");
}else {
- logger.info("客户端:"+clientId+";"+ (connectStatus == 1 ? "连接" :"断开") +"状态推送失败");
+ logger.info("客户端:"+clientId+";"+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送失败");
}
}
}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/resources/bootstrap.yml b/ruoyi-modules/hw-mqtt-broker/src/main/resources/bootstrap.yml
index 007734c..4ba1e7d 100644
--- a/ruoyi-modules/hw-mqtt-broker/src/main/resources/bootstrap.yml
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/resources/bootstrap.yml
@@ -1,6 +1,12 @@
-# Tomcat
+#Tomcat端口
server:
- port: 9604
+ port: 9605
+
+# broker监听端口
+mqtt:
+ server:
+ port: 1883 # MQTT端口,默认:1883
+ web-port: 8083 # http、websocket 端口,默认:8083
# Spring
spring:
@@ -14,10 +20,10 @@ spring:
nacos:
discovery:
# 服务注册地址
- server-addr: 127.0.0.1:8848
+ server-addr: localhost:8848
config:
# 配置中心地址
- server-addr: 127.0.0.1:8848
+ server-addr: localhost:8848
# 配置文件格式
file-extension: yml
# 共享配置