add - broker设备接入校验

dev
Wen JY 1 year ago
parent a36ab5537e
commit 92b2a45cfd

@ -9,7 +9,8 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hw-mqtt-broker</artifactId>
<!-- <artifactId>hw-mqtt-broker</artifactId>-->
<artifactId>ruoyi-modules-mqttbroker</artifactId>
<description>
海威物联网平台Broker模块

@ -1,9 +1,23 @@
package com.hw.mqtt.auth;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.hw.mqtt.domain.DeviceInfoDto;
import com.hw.mqtt.enums.RedisKeys;
import com.ruoyi.common.core.utils.StringUtils;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.tio.core.ChannelContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* mqtt tcpwebsocket
* @author WenJY
@ -13,10 +27,103 @@ import org.tio.core.ChannelContext;
@Configuration(proxyBeanMethods = false)
public class MqttAuthHandler implements IMqttServerAuthHandler {
private final Logger logger = LoggerFactory.getLogger(MqttAuthHandler.class);
/**
*
*/
private List<DeviceInfoDto> deviceInfoDtos;
private final StringRedisTemplate redisTemplate;
public MqttAuthHandler(List<DeviceInfoDto> deviceInfoDtos, StringRedisTemplate redisTemplate) {
this.deviceInfoDtos = deviceInfoDtos;
this.redisTemplate = redisTemplate;
this.InitDeviceInfoListByRedis();
}
/**
* Redis
*/
private void InitDeviceInfoListByRedis(){
try{
String jsonStr = redisTemplate.opsForValue().get(RedisKeys.CLIENT_DEVICE_INFO.getKey());
if(StringUtils.isEmpty(jsonStr)){
logger.warn("通过Redis获取设备信息为空");
return;
}
this.deviceInfoDtos = JSONArray.parseArray(jsonStr, DeviceInfoDto.class);
}catch (Exception ex){
logger.error("通过Redis获取设备信息方法处理异常"+ex.getMessage());
}
}
@Override
public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {
// 客户端认证逻辑实现
try{
if(deviceInfoDtos!=null){
Optional<DeviceInfoDto> optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst();
//判断本地集合中是否包含该设备信息如果不包含再次读取Redis并初始化本地集合
if (optionalDeviceInfoDto.isPresent()) {
DeviceInfoDto deviceInfo = optionalDeviceInfoDto.get();
return checkDeviceInfo(deviceInfo,clientId,userName,password);
}
}
return NoDeviceInfoEvent(clientId,userName,password);
}catch (Exception ex){
logger.error("客户端认证逻辑处理异常:"+ex.getMessage());
}
return false;
}
/**
*
* @param clientId
* @param userName
* @param password
* @return
*/
private boolean NoDeviceInfoEvent(String clientId,String userName,String password){
DeviceInfoDto deviceInfo = GetDeviceInfoByRedis(clientId);
if (deviceInfo != null) {
return checkDeviceInfo(deviceInfo,clientId,userName,password);
}else {
logger.warn("未获取到接入客户端的设备信息,禁止该设备接入");
return false;
}
}
/**
*
* @param deviceInfo
* @param clientId
* @param userName
* @param passwd
* @return
*/
private boolean checkDeviceInfo(DeviceInfoDto deviceInfo,String clientId,String userName,String passwd){
if(Objects.equals(userName, deviceInfo.getUserName()) && Objects.equals(passwd, deviceInfo.getPassword())){
return true;
}else {
logger.warn("接入设备:"+clientId+";账号或密码错误,禁止该设备接入");
return false;
}
}
/**
* RedisclientId
* @param clientId
* @return
*/
private DeviceInfoDto GetDeviceInfoByRedis(String clientId){
this.InitDeviceInfoListByRedis();
if(deviceInfoDtos!=null){
Optional<DeviceInfoDto> optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst();
return optionalDeviceInfoDto.orElse(null);
}else {
return null;
}
}
}

@ -0,0 +1,29 @@
package com.hw.mqtt.domain;
import lombok.Data;
/**
*
* @author Wen JY
* @description: TODO
* @date 2023-09-13 14:52:18
* @version: 1.0
*/
@Data
public class DeviceInfoDto {
/**
* ClientId
*/
private String deviceCode;
/**
*
*/
private String userName;
/**
*
*/
private String password;
}

@ -0,0 +1,32 @@
package com.hw.mqtt.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
*
* @author Wen JY
* @description: TODO
* @date 2023-09-13 13:42:37
* @version: 1.0
*/
@Getter
@RequiredArgsConstructor
public enum ConnectStatus {
/**
*
*/
connct(1),
/**
*
*/
disconnect(0);
private final int key;
public int getKey(){
return this.key;
}
}

@ -0,0 +1,37 @@
package com.hw.mqtt.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
*
* @author Wen JY
* @description: TODO
* @date 2023-09-13 13:30:38
* @version: 1.0
*/
@Getter
@RequiredArgsConstructor
public enum DeviceType {
/**
* /
*/
edge(1),
/**
* /
*/
sensor(2),
/**
*
*/
device(3);
private final int key;
public int getKey(){
return this.key;
}
}

@ -39,6 +39,11 @@ public enum RedisKeys {
*
*/
MESSAGE_STORE_RETAIN("mqtt:messages:retain:"),
/**
*
*/
CLIENT_DEVICE_INFO("hw_device_info"),
;

@ -17,6 +17,8 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSONArray;
import com.hw.mqtt.enums.ConnectStatus;
import com.hw.mqtt.enums.DeviceType;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
@ -60,14 +62,14 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisTemplate.opsForSet().add(getRedisKey(), clientId);
pushConnectStatus(clientId,1);
pushConnectStatus(clientId,ConnectStatus.connct);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisTemplate.opsForSet().remove(getRedisKey(), clientId);
pushConnectStatus(clientId,2);
pushConnectStatus(clientId,ConnectStatus.disconnect);
}
/**
@ -96,19 +98,19 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
* @param clientId
* @param connectStatus
*/
public void pushConnectStatus(String clientId,Integer connectStatus){
public void pushConnectStatus(String clientId, ConnectStatus connectStatus){
Map<String,Object> entityMap = new HashMap<>();
entityMap.put("msg","设备设备连接状态信息");
entityMap.put("deviceType","edge");
entityMap.put("deviceType", DeviceType.edge.getKey());
entityMap.put("deviceCode",clientId);
entityMap.put("connectStatus",connectStatus);
entityMap.put("connectStatus",connectStatus.getKey());
entityMap.put("statusTime",System.currentTimeMillis());
String jsonString = JSONArray.toJSONString(entityMap);
boolean result = mqttServerTemplate.publishAll("/device/status/v1", jsonString.getBytes(StandardCharsets.UTF_8));
if(result){
logger.info("客户端:"+clientId+""+ (connectStatus == 1 ? "连接" :"断开") +"状态推送成功");
logger.info("客户端:"+clientId+""+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送成功");
}else {
logger.info("客户端:"+clientId+""+ (connectStatus == 1 ? "连接" :"断开") +"状态推送失败");
logger.info("客户端:"+clientId+""+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送失败");
}
}
}

@ -1,6 +1,12 @@
# Tomcat
#Tomcat端口
server:
port: 9604
port: 9605
# broker监听端口
mqtt:
server:
port: 1883 # MQTT端口默认1883
web-port: 8083 # http、websocket 端口默认8083
# Spring
spring:
@ -14,10 +20,10 @@ spring:
nacos:
discovery:
# 服务注册地址
server-addr: 127.0.0.1:8848
server-addr: localhost:8848
config:
# 配置中心地址
server-addr: 127.0.0.1:8848
server-addr: localhost:8848
# 配置文件格式
file-extension: yml
# 共享配置

Loading…
Cancel
Save