Merge remote-tracking branch 'origin/master'

dev
夜笙歌 1 year ago
commit 44bc09806c

@ -89,6 +89,8 @@ public class SysUser extends BaseEntity
/** 角色ID */
private Long roleId;
private Long tenantId;//租户ID
public SysUser()
{
@ -296,6 +298,15 @@ public class SysUser extends BaseEntity
{
this.roleId = roleId;
}
public Long getTenantId() {
return tenantId;
}
public void setTenantId(Long tenantId) {
this.tenantId = tenantId;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)

@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rouyi-modules-mqttBroker</artifactId>
<artifactId>hw-mqtt-broker</artifactId>
<description>
海威物联网平台Broker模块
@ -22,11 +22,10 @@
<dependencies>
<!-- SpringCloud Alibaba Nacos -->
<!--<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
@ -47,12 +46,6 @@
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-redis</artifactId>
<version>2.7.9</version>
</dependency>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-logging</artifactId>

@ -0,0 +1,35 @@
package com.hw.mqtt.config;
import com.hw.mqtt.enums.RedisKeys;
import com.hw.mqtt.listener.RedisMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @author Wen JY
* @description: TODO
* @date 2023-09-11 15:35:20
* @version: 1.0
*/
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter msgIngoListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener配置不同的交换机
container.addMessageListener(msgIngoListenerAdapter, new PatternTopic(RedisKeys.REDIS_CHANNEL_DOWN.getKey()));
return container;
}
@Bean
MessageListenerAdapter msgIngoListenerAdapter(RedisMessageListener receiver) {
return new MessageListenerAdapter(receiver, "deviceCommand");
}
}

@ -1,225 +0,0 @@
package com.hw.mqtt.domain;
import java.util.HashMap;
import java.util.Objects;
/**
* @author Wen JY
* @description: TODO
* @date 2023-08-17 13:33:31
* @version: 1.0
*/
public class AjaxResult extends HashMap<String, Object>
{
private static final long serialVersionUID = 1L;
/** 状态码 */
public static final String CODE_TAG = "code";
/** 返回内容 */
public static final String MSG_TAG = "msg";
/** 数据对象 */
public static final String DATA_TAG = "data";
/**
*
*/
public enum Type
{
/** 成功 */
SUCCESS(0),
/** 警告 */
WARN(301),
/** 错误 */
ERROR(500);
private final int value;
Type(int value)
{
this.value = value;
}
public int value()
{
return this.value;
}
}
/**
* AjaxResult 使
*/
public AjaxResult()
{
}
/**
* AjaxResult
*
* @param type
* @param msg
*/
public AjaxResult(Type type, String msg)
{
super.put(CODE_TAG, type.value);
super.put(MSG_TAG, msg);
}
/**
* AjaxResult
*
* @param type
* @param msg
* @param data
*/
public AjaxResult(Type type, String msg, Object data)
{
super.put(CODE_TAG, type.value);
super.put(MSG_TAG, msg);
if (isNotNull(data))
{
super.put(DATA_TAG, data);
}
}
/**
*
*
* @return
*/
public static AjaxResult success()
{
return AjaxResult.success("操作成功");
}
/**
*
*
* @return
*/
public static AjaxResult success(Object data)
{
return AjaxResult.success("操作成功", data);
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult success(String msg)
{
return AjaxResult.success(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult success(String msg, Object data)
{
return new AjaxResult(Type.SUCCESS, msg, data);
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult warn(String msg)
{
return AjaxResult.warn(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult warn(String msg, Object data)
{
return new AjaxResult(Type.WARN, msg, data);
}
/**
*
*
* @return
*/
public static AjaxResult error()
{
return AjaxResult.error("操作失败");
}
/**
*
*
* @param msg
* @return
*/
public static AjaxResult error(String msg)
{
return AjaxResult.error(msg, null);
}
/**
*
*
* @param msg
* @param data
* @return
*/
public static AjaxResult error(String msg, Object data)
{
return new AjaxResult(Type.ERROR, msg, data);
}
/**
*
*
* @return
*/
public boolean isSuccess()
{
return !isError();
}
/**
*
*
* @return
*/
public boolean isError()
{
return Objects.equals(Type.ERROR.value, this.get(CODE_TAG));
}
/**
* 便
*
* @param key
* @param value
* @return
*/
@Override
public AjaxResult put(String key, Object value)
{
super.put(key, value);
return this;
}
public static boolean isNotNull(Object object)
{
return !isNull(object);
}
public static boolean isNull(Object object)
{
return object == null;
}
}

@ -1,25 +0,0 @@
package com.hw.mqtt.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author WenJY
* @date 20230314 13:46
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerNode {
/**
*
*/
private String name;
/**
* ip:port
*/
private String peerHost;
}

@ -17,23 +17,20 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSONArray;
import com.hw.mqtt.domain.AjaxResult;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -50,26 +47,26 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);
private final ApplicationContext context;
private final MicaRedisCache redisCache;
private final StringRedisTemplate redisTemplate;
private MqttServerCreator serverCreator;
private MqttServerTemplate mqttServerTemplate;
public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) {
public MqttConnectStatusListener(ApplicationContext context, StringRedisTemplate redisTemplate) {
this.context = context;
this.redisCache = redisCache;
this.redisTemplate = redisTemplate;
}
@Override
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisCache.sAdd(getRedisKey(), clientId);
redisTemplate.opsForSet().add(getRedisKey(), clientId);
pushConnectStatus(clientId,1);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisCache.sRem(getRedisKey(), clientId);
redisTemplate.opsForSet().remove(getRedisKey(), clientId);
pushConnectStatus(clientId,2);
}
@ -91,7 +88,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
@Override
public void destroy() throws Exception {
// 停机时删除集合
redisCache.del(getRedisKey());
redisTemplate.opsForSet().remove(getRedisKey());
}
/**

@ -0,0 +1,56 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.utils.StringUtils;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author Wen JY
* @description: TODO
* @date 2023-09-11 15:28:18
* @version: 1.0
*/
@Component
public class RedisMessageListener {
private static final Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);
@Autowired
private MqttServerTemplate mqttServerTemplate;
/**
*
* @param message
*/
public void deviceCommand(String message){
try {
if(StringUtils.isEmpty(message)){
logger.warn("Redis订阅内容为空");
return;
}
logger.info("Redis订阅内容"+message);
JSONObject jsonObject = JSONObject.parseObject(message);
String topic = jsonObject.get("Topic").toString();
String payload = jsonObject.get("Payload").toString();
boolean publishResult = mqttServerTemplate.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
if(publishResult){
logger.info("Topic:"+topic+";Payload:"+payload+";消息发布成功");
}else {
logger.info("Topic:"+topic+";Payload:"+payload+";消息发布失败!!!");
}
}catch (Exception e){
logger.error("设备控制指令下发异常:"+e.getMessage());
}
}
}

@ -1,62 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.service;
import java.util.List;
/**
* mqtt broker
*
* @author L.cm
*/
public interface IMqttBrokerService {
/**
* 线
* @author WenJY
* @date 2023-03-14 13:51
* @return long
*/
long getOnlineClientSize();
/**
* 线
* @author WenJY
* @date 2023-03-14 13:51
* @return java.util.List<java.lang.String>
*/
List<String> getOnlineClients();
/**
*
* @author WenJY
* @date 2023-03-14 13:50
* @param topic
* @param payload
* @return boolean
*/
boolean publish(String topic,String payload);
/**
*
* @author WenJY
* @date 2023-03-18 9:23
* @param clientId
*/
boolean closeClientById(String clientId);
}

@ -1,96 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.service.impl;
import com.hw.mqtt.enums.RedisKeys;
import com.hw.mqtt.service.IMqttBrokerService;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import net.dreamlu.mica.core.utils.StringPool;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* mqtt broker
*
* @author L.cm
*/
@Service
public class MqttBrokerServiceImpl implements IMqttBrokerService {
private static final Logger logger = LoggerFactory.getLogger(MqttBrokerServiceImpl.class);
@Autowired private MicaRedisCache redisCache;
@Autowired private MqttServerTemplate server;
@Override
public long getOnlineClientSize() {
Set<String> keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
if (keySet.isEmpty()) {
return 0L;
}
long result = 0;
for (String redisKey : keySet) {
Long count = redisCache.getSetOps().size(redisKey);
if (count != null) {
result += count;
}
}
return result;
}
@Override
public List<String> getOnlineClients() {
Set<String> keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
if (keySet.isEmpty()) {
return Collections.emptyList();
}
List<String> clientList = new ArrayList<>();
for (String redisKey : keySet) {
Set<String> members = redisCache.sMembers(redisKey);
if (members != null && !members.isEmpty()) {
clientList.addAll(members);
}
}
return clientList;
}
@Override
public boolean publish(String topic, String payload) {
boolean result = server.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
logger.info("Mqtt publishAll result:{};topic:{};payload:{}", result, topic, payload);
return result;
}
@Override
public boolean closeClientById(String clientId) {
try{
server.close(clientId);
}catch (Exception ex){
return false;
}
return true;
}
}

@ -1,27 +0,0 @@
package com.hw.mqtt.task;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
*
* @author WenJY
* @date 2023-03-14 12:16
* @param null
* @return null
*/
@Service
public class PublishAllTask {
@Autowired
private MqttServer mqttServer;
@Scheduled(fixedDelay = 1000)
public void run() {
mqttServer.publishAll("/test/heartbeat", "心跳指令无需处理".getBytes(StandardCharsets.UTF_8));
}
}

@ -1,40 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.util;
import net.dreamlu.mica.core.utils.CharPool;
/**
* redis
*
* @author L.cm
*/
public class RedisUtil {
/**
* redis pattern
*
* @return pattern
*/
public static String getTopicPattern(String topicFilter) {
// mqtt 分享主题 $share/{ShareName}/{filter}
return topicFilter
.replace(CharPool.PLUS, CharPool.STAR)
.replace(CharPool.HASH, CharPool.STAR);
}
}

@ -23,6 +23,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<result property="updateBy" column="update_by" />
<result property="updateTime" column="update_time" />
<result property="remark" column="remark" />
<result property="tenantId" column="tenant_id" />
<association property="dept" column="dept_id" javaType="SysDept" resultMap="deptResult" />
<collection property="roles" javaType="java.util.List" resultMap="RoleResult" />
</resultMap>
@ -49,7 +50,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<sql id="selectUserVo">
select u.user_id, u.dept_id, u.user_name, u.nick_name, u.email, u.avatar, u.phonenumber, u.password, u.sex, u.status, u.del_flag, u.login_ip, u.login_date, u.create_by, u.create_time, u.remark,
d.dept_id, d.parent_id, d.ancestors, d.dept_name, d.order_num, d.leader, d.status as dept_status,
r.role_id, r.role_name, r.role_key, r.role_sort, r.data_scope, r.status as role_status
r.role_id, r.role_name, r.role_key, r.role_sort, r.data_scope, r.status as role_status,u.tenant_id
from sys_user u
left join sys_dept d on u.dept_id = d.dept_id
left join sys_user_role ur on u.user_id = ur.user_id
@ -57,7 +58,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</sql>
<select id="selectUserList" parameterType="SysUser" resultMap="SysUserResult">
select u.user_id, u.dept_id, u.nick_name, u.user_name, u.email, u.avatar, u.phonenumber, u.sex, u.status, u.del_flag, u.login_ip, u.login_date, u.create_by, u.create_time, u.remark, d.dept_name, d.leader from sys_user u
select u.user_id, u.dept_id, u.nick_name, u.user_name, u.email, u.avatar, u.phonenumber, u.sex, u.status, u.del_flag, u.login_ip, u.login_date, u.create_by, u.create_time, u.remark, d.dept_name, d.leader,u.tenant_id from sys_user u
left join sys_dept d on u.dept_id = d.dept_id
where u.del_flag = '0'
<if test="userId != null and userId != 0">
@ -86,7 +87,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</select>
<select id="selectAllocatedList" parameterType="SysUser" resultMap="SysUserResult">
select distinct u.user_id, u.dept_id, u.user_name, u.nick_name, u.email, u.phonenumber, u.status, u.create_time
select distinct u.user_id, u.dept_id, u.user_name, u.nick_name, u.email, u.phonenumber, u.status, u.create_time,u.tenant_id
from sys_user u
left join sys_dept d on u.dept_id = d.dept_id
left join sys_user_role ur on u.user_id = ur.user_id
@ -103,7 +104,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</select>
<select id="selectUnallocatedList" parameterType="SysUser" resultMap="SysUserResult">
select distinct u.user_id, u.dept_id, u.user_name, u.nick_name, u.email, u.phonenumber, u.status, u.create_time
select distinct u.user_id, u.dept_id, u.user_name, u.nick_name, u.email, u.phonenumber, u.status, u.create_time,u.tenant_id
from sys_user u
left join sys_dept d on u.dept_id = d.dept_id
left join sys_user_role ur on u.user_id = ur.user_id

Loading…
Cancel
Save