From a36ab5537e75288957f25ba16372d7f04bf2c637 Mon Sep 17 00:00:00 2001 From: Wen JY Date: Mon, 11 Sep 2023 16:49:24 +0800 Subject: [PATCH] =?UTF-8?q?change=20-=20broker=E9=9B=86=E7=BE=A4=E8=AE=A2?= =?UTF-8?q?=E9=98=85Redis=E5=AE=9E=E7=8E=B0=E6=B6=88=E6=81=AF=E4=B8=8B?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-modules/hw-mqtt-broker/pom.xml | 17 +- .../hw/mqtt/config/RedisListenerConfig.java | 35 +++ .../java/com/hw/mqtt/domain/AjaxResult.java | 225 ------------------ .../java/com/hw/mqtt/domain/ServerNode.java | 25 -- .../listener/MqttConnectStatusListener.java | 17 +- .../mqtt/listener/RedisMessageListener.java | 56 +++++ .../hw/mqtt/service/IMqttBrokerService.java | 62 ----- .../service/impl/MqttBrokerServiceImpl.java | 96 -------- .../java/com/hw/mqtt/task/PublishAllTask.java | 27 --- .../main/java/com/hw/mqtt/util/RedisUtil.java | 40 ---- 10 files changed, 103 insertions(+), 497 deletions(-) create mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java delete mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java delete mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java create mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java delete mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java delete mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java delete mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java delete mode 100644 ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java diff --git a/ruoyi-modules/hw-mqtt-broker/pom.xml b/ruoyi-modules/hw-mqtt-broker/pom.xml index e045d88..77ac85e 100644 --- a/ruoyi-modules/hw-mqtt-broker/pom.xml +++ b/ruoyi-modules/hw-mqtt-broker/pom.xml @@ -9,7 +9,7 @@ 4.0.0 - rouyi-modules-mqttBroker + hw-mqtt-broker 海威物联网平台Broker模块 @@ -22,11 +22,10 @@ - - + + org.springframework.boot + spring-boot-starter-data-redis + @@ -47,12 +46,6 @@ 2.1.0 - - net.dreamlu - mica-redis - 2.7.9 - - net.dreamlu mica-logging diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java new file mode 100644 index 0000000..f9884a2 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java @@ -0,0 +1,35 @@ +package com.hw.mqtt.config; + +import com.hw.mqtt.enums.RedisKeys; +import com.hw.mqtt.listener.RedisMessageListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; + +/** + * @author Wen JY + * @description: TODO + * @date 2023-09-11 15:35:20 + * @version: 1.0 + */ +@Configuration +public class RedisListenerConfig { + @Bean + RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, + MessageListenerAdapter msgIngoListenerAdapter) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + // 可以添加多个 messageListener,配置不同的交换机 + container.addMessageListener(msgIngoListenerAdapter, new PatternTopic(RedisKeys.REDIS_CHANNEL_DOWN.getKey())); + return container; + } + + @Bean + MessageListenerAdapter msgIngoListenerAdapter(RedisMessageListener receiver) { + return new MessageListenerAdapter(receiver, "deviceCommand"); + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java deleted file mode 100644 index 763eb0e..0000000 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java +++ /dev/null @@ -1,225 +0,0 @@ -package com.hw.mqtt.domain; -import java.util.HashMap; -import java.util.Objects; -/** - * @author Wen JY - * @description: TODO 操作消息提醒 - * @date 2023-08-17 13:33:31 - * @version: 1.0 - */ -public class AjaxResult extends HashMap -{ - private static final long serialVersionUID = 1L; - - /** 状态码 */ - public static final String CODE_TAG = "code"; - - /** 返回内容 */ - public static final String MSG_TAG = "msg"; - - /** 数据对象 */ - public static final String DATA_TAG = "data"; - - /** - * 状态类型 - */ - public enum Type - { - /** 成功 */ - SUCCESS(0), - /** 警告 */ - WARN(301), - /** 错误 */ - ERROR(500); - private final int value; - - Type(int value) - { - this.value = value; - } - - public int value() - { - return this.value; - } - } - - /** - * 初始化一个新创建的 AjaxResult 对象,使其表示一个空消息。 - */ - public AjaxResult() - { - } - - /** - * 初始化一个新创建的 AjaxResult 对象 - * - * @param type 状态类型 - * @param msg 返回内容 - */ - public AjaxResult(Type type, String msg) - { - super.put(CODE_TAG, type.value); - super.put(MSG_TAG, msg); - } - - /** - * 初始化一个新创建的 AjaxResult 对象 - * - * @param type 状态类型 - * @param msg 返回内容 - * @param data 数据对象 - */ - public AjaxResult(Type type, String msg, Object data) - { - super.put(CODE_TAG, type.value); - super.put(MSG_TAG, msg); - if (isNotNull(data)) - { - super.put(DATA_TAG, data); - } - } - - /** - * 返回成功消息 - * - * @return 成功消息 - */ - public static AjaxResult success() - { - return AjaxResult.success("操作成功"); - } - - /** - * 返回成功数据 - * - * @return 成功消息 - */ - public static AjaxResult success(Object data) - { - return AjaxResult.success("操作成功", data); - } - - /** - * 返回成功消息 - * - * @param msg 返回内容 - * @return 成功消息 - */ - public static AjaxResult success(String msg) - { - return AjaxResult.success(msg, null); - } - - /** - * 返回成功消息 - * - * @param msg 返回内容 - * @param data 数据对象 - * @return 成功消息 - */ - public static AjaxResult success(String msg, Object data) - { - return new AjaxResult(Type.SUCCESS, msg, data); - } - - /** - * 返回警告消息 - * - * @param msg 返回内容 - * @return 警告消息 - */ - public static AjaxResult warn(String msg) - { - return AjaxResult.warn(msg, null); - } - - /** - * 返回警告消息 - * - * @param msg 返回内容 - * @param data 数据对象 - * @return 警告消息 - */ - public static AjaxResult warn(String msg, Object data) - { - return new AjaxResult(Type.WARN, msg, data); - } - - /** - * 返回错误消息 - * - * @return - */ - public static AjaxResult error() - { - return AjaxResult.error("操作失败"); - } - - /** - * 返回错误消息 - * - * @param msg 返回内容 - * @return 警告消息 - */ - public static AjaxResult error(String msg) - { - return AjaxResult.error(msg, null); - } - - /** - * 返回错误消息 - * - * @param msg 返回内容 - * @param data 数据对象 - * @return 警告消息 - */ - public static AjaxResult error(String msg, Object data) - { - return new AjaxResult(Type.ERROR, msg, data); - } - - /** - * 是否为成功消息 - * - * @return 结果 - */ - public boolean isSuccess() - { - return !isError(); - } - - /** - * 是否为错误消息 - * - * @return 结果 - */ - public boolean isError() - { - return Objects.equals(Type.ERROR.value, this.get(CODE_TAG)); - } - - /** - * 方便链式调用 - * - * @param key 键 - * @param value 值 - * @return 数据对象 - */ - @Override - public AjaxResult put(String key, Object value) - { - super.put(key, value); - return this; - } - - public static boolean isNotNull(Object object) - { - return !isNull(object); - } - - public static boolean isNull(Object object) - { - return object == null; - } -} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java deleted file mode 100644 index 767ed5a..0000000 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.hw.mqtt.domain; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -/** - * @author WenJY - * @date 2023年03月14日 13:46 - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class ServerNode { - - /** - * 节点名称 - */ - private String name; - /** - * ip:port - */ - private String peerHost; - -} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java index 461c4d2..d399789 100644 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java @@ -17,23 +17,20 @@ package com.hw.mqtt.listener; import com.alibaba.fastjson2.JSONArray; -import com.hw.mqtt.domain.AjaxResult; import com.hw.mqtt.enums.RedisKeys; import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; -import net.dreamlu.mica.redis.cache.MicaRedisCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.tio.core.ChannelContext; import java.nio.charset.StandardCharsets; -import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -50,26 +47,26 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class); private final ApplicationContext context; - private final MicaRedisCache redisCache; + private final StringRedisTemplate redisTemplate; private MqttServerCreator serverCreator; private MqttServerTemplate mqttServerTemplate; - public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) { + public MqttConnectStatusListener(ApplicationContext context, StringRedisTemplate redisTemplate) { this.context = context; - this.redisCache = redisCache; + this.redisTemplate = redisTemplate; } @Override public void online(ChannelContext context, String clientId, String username) { logger.info("Mqtt clientId:{} username:{} online.", clientId, username); - redisCache.sAdd(getRedisKey(), clientId); + redisTemplate.opsForSet().add(getRedisKey(), clientId); pushConnectStatus(clientId,1); } @Override public void offline(ChannelContext context, String clientId, String username, String reason) { logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason); - redisCache.sRem(getRedisKey(), clientId); + redisTemplate.opsForSet().remove(getRedisKey(), clientId); pushConnectStatus(clientId,2); } @@ -91,7 +88,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm @Override public void destroy() throws Exception { // 停机时删除集合 - redisCache.del(getRedisKey()); + redisTemplate.opsForSet().remove(getRedisKey()); } /** diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java new file mode 100644 index 0000000..45a12f7 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java @@ -0,0 +1,56 @@ +package com.hw.mqtt.listener; + +import com.alibaba.fastjson.parser.Feature; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.ruoyi.common.core.utils.StringUtils; +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +/** + * @author Wen JY + * @description: TODO + * @date 2023-09-11 15:28:18 + * @version: 1.0 + */ +@Component +public class RedisMessageListener { + + private static final Logger logger = LoggerFactory.getLogger(RedisMessageListener.class); + + @Autowired + private MqttServerTemplate mqttServerTemplate; + + /** + * 订阅设备控制信息、发布设备指令 + * @param message + */ + public void deviceCommand(String message){ + try { + + if(StringUtils.isEmpty(message)){ + logger.warn("Redis订阅内容为空!!!"); + return; + } + logger.info("Redis订阅内容:"+message); + JSONObject jsonObject = JSONObject.parseObject(message); + String topic = jsonObject.get("Topic").toString(); + String payload = jsonObject.get("Payload").toString(); + boolean publishResult = mqttServerTemplate.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8)); + if(publishResult){ + logger.info("Topic:"+topic+";Payload:"+payload+";消息发布成功"); + }else { + logger.info("Topic:"+topic+";Payload:"+payload+";消息发布失败!!!"); + } + }catch (Exception e){ + logger.error("设备控制指令下发异常:"+e.getMessage()); + } + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java deleted file mode 100644 index fb089d6..0000000 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hw.mqtt.service; - - -import java.util.List; - -/** - * mqtt broker 服务 - * - * @author L.cm - */ -public interface IMqttBrokerService { - - /** - * 获取在线客户端数量 - * @author WenJY - * @date 2023-03-14 13:51 - * @return long - */ - long getOnlineClientSize(); - - /** - * 获取所有在线的客户端 - * @author WenJY - * @date 2023-03-14 13:51 - * @return java.util.List - */ - List getOnlineClients(); - - /** - * 向指定主题发送消息 - * @author WenJY - * @date 2023-03-14 13:50 - * @param topic - * @param payload - * @return boolean - */ - boolean publish(String topic,String payload); - - /** - * 主动关闭指定客户端连接 - * @author WenJY - * @date 2023-03-18 9:23 - * @param clientId - */ - boolean closeClientById(String clientId); -} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java deleted file mode 100644 index b516fc5..0000000 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hw.mqtt.service.impl; - -import com.hw.mqtt.enums.RedisKeys; -import com.hw.mqtt.service.IMqttBrokerService; -import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; -import net.dreamlu.mica.core.utils.StringPool; -import net.dreamlu.mica.redis.cache.MicaRedisCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -/** - * mqtt broker 服务 - * - * @author L.cm - */ -@Service -public class MqttBrokerServiceImpl implements IMqttBrokerService { - private static final Logger logger = LoggerFactory.getLogger(MqttBrokerServiceImpl.class); - @Autowired private MicaRedisCache redisCache; - @Autowired private MqttServerTemplate server; - - @Override - public long getOnlineClientSize() { - Set keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR)); - if (keySet.isEmpty()) { - return 0L; - } - long result = 0; - for (String redisKey : keySet) { - Long count = redisCache.getSetOps().size(redisKey); - if (count != null) { - result += count; - } - } - return result; - } - - @Override - public List getOnlineClients() { - Set keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR)); - if (keySet.isEmpty()) { - return Collections.emptyList(); - } - List clientList = new ArrayList<>(); - for (String redisKey : keySet) { - Set members = redisCache.sMembers(redisKey); - if (members != null && !members.isEmpty()) { - clientList.addAll(members); - } - } - return clientList; - } - - @Override - public boolean publish(String topic, String payload) { - boolean result = server.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8)); - logger.info("Mqtt publishAll result:{};topic:{};payload:{}", result, topic, payload); - return result; - } - - @Override - public boolean closeClientById(String clientId) { - try{ - server.close(clientId); - }catch (Exception ex){ - return false; - } - return true; - } - - -} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java deleted file mode 100644 index ac33ad8..0000000 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.hw.mqtt.task; - -import net.dreamlu.iot.mqtt.core.server.MqttServer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; - -import java.nio.charset.StandardCharsets; - -/** - * 自定义任务:心跳下发 - * @author WenJY - * @date 2023-03-14 12:16 - * @param null - * @return null - */ -@Service -public class PublishAllTask { - @Autowired - private MqttServer mqttServer; - - @Scheduled(fixedDelay = 1000) - public void run() { - mqttServer.publishAll("/test/heartbeat", "心跳指令无需处理".getBytes(StandardCharsets.UTF_8)); - } - -} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java deleted file mode 100644 index 2404422..0000000 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hw.mqtt.util; - -import net.dreamlu.mica.core.utils.CharPool; - -/** - * redis 工具 - * - * @author L.cm - */ -public class RedisUtil { - - /** - * 转换成 redis 的 pattern 规则 - * - * @return pattern - */ - public static String getTopicPattern(String topicFilter) { - // mqtt 分享主题 $share/{ShareName}/{filter} - return topicFilter - .replace(CharPool.PLUS, CharPool.STAR) - .replace(CharPool.HASH, CharPool.STAR); - } - -}