diff --git a/ruoyi-modules/hw-mqtt-broker/pom.xml b/ruoyi-modules/hw-mqtt-broker/pom.xml
index e045d88..77ac85e 100644
--- a/ruoyi-modules/hw-mqtt-broker/pom.xml
+++ b/ruoyi-modules/hw-mqtt-broker/pom.xml
@@ -9,7 +9,7 @@
4.0.0
- rouyi-modules-mqttBroker
+ hw-mqtt-broker
海威物联网平台Broker模块
@@ -22,11 +22,10 @@
-
-
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
@@ -47,12 +46,6 @@
2.1.0
-
- net.dreamlu
- mica-redis
- 2.7.9
-
-
net.dreamlu
mica-logging
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java
new file mode 100644
index 0000000..f9884a2
--- /dev/null
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java
@@ -0,0 +1,35 @@
+package com.hw.mqtt.config;
+
+import com.hw.mqtt.enums.RedisKeys;
+import com.hw.mqtt.listener.RedisMessageListener;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
+
+/**
+ * @author Wen JY
+ * @description: TODO
+ * @date 2023-09-11 15:35:20
+ * @version: 1.0
+ */
+@Configuration
+public class RedisListenerConfig {
+ @Bean
+ RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
+ MessageListenerAdapter msgIngoListenerAdapter) {
+
+ RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+ container.setConnectionFactory(connectionFactory);
+ // 可以添加多个 messageListener,配置不同的交换机
+ container.addMessageListener(msgIngoListenerAdapter, new PatternTopic(RedisKeys.REDIS_CHANNEL_DOWN.getKey()));
+ return container;
+ }
+
+ @Bean
+ MessageListenerAdapter msgIngoListenerAdapter(RedisMessageListener receiver) {
+ return new MessageListenerAdapter(receiver, "deviceCommand");
+ }
+}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java
deleted file mode 100644
index 763eb0e..0000000
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java
+++ /dev/null
@@ -1,225 +0,0 @@
-package com.hw.mqtt.domain;
-import java.util.HashMap;
-import java.util.Objects;
-/**
- * @author Wen JY
- * @description: TODO 操作消息提醒
- * @date 2023-08-17 13:33:31
- * @version: 1.0
- */
-public class AjaxResult extends HashMap
-{
- private static final long serialVersionUID = 1L;
-
- /** 状态码 */
- public static final String CODE_TAG = "code";
-
- /** 返回内容 */
- public static final String MSG_TAG = "msg";
-
- /** 数据对象 */
- public static final String DATA_TAG = "data";
-
- /**
- * 状态类型
- */
- public enum Type
- {
- /** 成功 */
- SUCCESS(0),
- /** 警告 */
- WARN(301),
- /** 错误 */
- ERROR(500);
- private final int value;
-
- Type(int value)
- {
- this.value = value;
- }
-
- public int value()
- {
- return this.value;
- }
- }
-
- /**
- * 初始化一个新创建的 AjaxResult 对象,使其表示一个空消息。
- */
- public AjaxResult()
- {
- }
-
- /**
- * 初始化一个新创建的 AjaxResult 对象
- *
- * @param type 状态类型
- * @param msg 返回内容
- */
- public AjaxResult(Type type, String msg)
- {
- super.put(CODE_TAG, type.value);
- super.put(MSG_TAG, msg);
- }
-
- /**
- * 初始化一个新创建的 AjaxResult 对象
- *
- * @param type 状态类型
- * @param msg 返回内容
- * @param data 数据对象
- */
- public AjaxResult(Type type, String msg, Object data)
- {
- super.put(CODE_TAG, type.value);
- super.put(MSG_TAG, msg);
- if (isNotNull(data))
- {
- super.put(DATA_TAG, data);
- }
- }
-
- /**
- * 返回成功消息
- *
- * @return 成功消息
- */
- public static AjaxResult success()
- {
- return AjaxResult.success("操作成功");
- }
-
- /**
- * 返回成功数据
- *
- * @return 成功消息
- */
- public static AjaxResult success(Object data)
- {
- return AjaxResult.success("操作成功", data);
- }
-
- /**
- * 返回成功消息
- *
- * @param msg 返回内容
- * @return 成功消息
- */
- public static AjaxResult success(String msg)
- {
- return AjaxResult.success(msg, null);
- }
-
- /**
- * 返回成功消息
- *
- * @param msg 返回内容
- * @param data 数据对象
- * @return 成功消息
- */
- public static AjaxResult success(String msg, Object data)
- {
- return new AjaxResult(Type.SUCCESS, msg, data);
- }
-
- /**
- * 返回警告消息
- *
- * @param msg 返回内容
- * @return 警告消息
- */
- public static AjaxResult warn(String msg)
- {
- return AjaxResult.warn(msg, null);
- }
-
- /**
- * 返回警告消息
- *
- * @param msg 返回内容
- * @param data 数据对象
- * @return 警告消息
- */
- public static AjaxResult warn(String msg, Object data)
- {
- return new AjaxResult(Type.WARN, msg, data);
- }
-
- /**
- * 返回错误消息
- *
- * @return
- */
- public static AjaxResult error()
- {
- return AjaxResult.error("操作失败");
- }
-
- /**
- * 返回错误消息
- *
- * @param msg 返回内容
- * @return 警告消息
- */
- public static AjaxResult error(String msg)
- {
- return AjaxResult.error(msg, null);
- }
-
- /**
- * 返回错误消息
- *
- * @param msg 返回内容
- * @param data 数据对象
- * @return 警告消息
- */
- public static AjaxResult error(String msg, Object data)
- {
- return new AjaxResult(Type.ERROR, msg, data);
- }
-
- /**
- * 是否为成功消息
- *
- * @return 结果
- */
- public boolean isSuccess()
- {
- return !isError();
- }
-
- /**
- * 是否为错误消息
- *
- * @return 结果
- */
- public boolean isError()
- {
- return Objects.equals(Type.ERROR.value, this.get(CODE_TAG));
- }
-
- /**
- * 方便链式调用
- *
- * @param key 键
- * @param value 值
- * @return 数据对象
- */
- @Override
- public AjaxResult put(String key, Object value)
- {
- super.put(key, value);
- return this;
- }
-
- public static boolean isNotNull(Object object)
- {
- return !isNull(object);
- }
-
- public static boolean isNull(Object object)
- {
- return object == null;
- }
-}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java
deleted file mode 100644
index 767ed5a..0000000
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.hw.mqtt.domain;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * @author WenJY
- * @date 2023年03月14日 13:46
- */
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class ServerNode {
-
- /**
- * 节点名称
- */
- private String name;
- /**
- * ip:port
- */
- private String peerHost;
-
-}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java
index 461c4d2..d399789 100644
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java
@@ -17,23 +17,20 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSONArray;
-import com.hw.mqtt.domain.AjaxResult;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
-import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
+import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -50,26 +47,26 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);
private final ApplicationContext context;
- private final MicaRedisCache redisCache;
+ private final StringRedisTemplate redisTemplate;
private MqttServerCreator serverCreator;
private MqttServerTemplate mqttServerTemplate;
- public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) {
+ public MqttConnectStatusListener(ApplicationContext context, StringRedisTemplate redisTemplate) {
this.context = context;
- this.redisCache = redisCache;
+ this.redisTemplate = redisTemplate;
}
@Override
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
- redisCache.sAdd(getRedisKey(), clientId);
+ redisTemplate.opsForSet().add(getRedisKey(), clientId);
pushConnectStatus(clientId,1);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
- redisCache.sRem(getRedisKey(), clientId);
+ redisTemplate.opsForSet().remove(getRedisKey(), clientId);
pushConnectStatus(clientId,2);
}
@@ -91,7 +88,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
@Override
public void destroy() throws Exception {
// 停机时删除集合
- redisCache.del(getRedisKey());
+ redisTemplate.opsForSet().remove(getRedisKey());
}
/**
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java
new file mode 100644
index 0000000..45a12f7
--- /dev/null
+++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java
@@ -0,0 +1,56 @@
+package com.hw.mqtt.listener;
+
+import com.alibaba.fastjson.parser.Feature;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.ruoyi.common.core.utils.StringUtils;
+import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author Wen JY
+ * @description: TODO
+ * @date 2023-09-11 15:28:18
+ * @version: 1.0
+ */
+@Component
+public class RedisMessageListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);
+
+ @Autowired
+ private MqttServerTemplate mqttServerTemplate;
+
+ /**
+ * 订阅设备控制信息、发布设备指令
+ * @param message
+ */
+ public void deviceCommand(String message){
+ try {
+
+ if(StringUtils.isEmpty(message)){
+ logger.warn("Redis订阅内容为空!!!");
+ return;
+ }
+ logger.info("Redis订阅内容:"+message);
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ String topic = jsonObject.get("Topic").toString();
+ String payload = jsonObject.get("Payload").toString();
+ boolean publishResult = mqttServerTemplate.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
+ if(publishResult){
+ logger.info("Topic:"+topic+";Payload:"+payload+";消息发布成功");
+ }else {
+ logger.info("Topic:"+topic+";Payload:"+payload+";消息发布失败!!!");
+ }
+ }catch (Exception e){
+ logger.error("设备控制指令下发异常:"+e.getMessage());
+ }
+ }
+}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java
deleted file mode 100644
index fb089d6..0000000
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hw.mqtt.service;
-
-
-import java.util.List;
-
-/**
- * mqtt broker 服务
- *
- * @author L.cm
- */
-public interface IMqttBrokerService {
-
- /**
- * 获取在线客户端数量
- * @author WenJY
- * @date 2023-03-14 13:51
- * @return long
- */
- long getOnlineClientSize();
-
- /**
- * 获取所有在线的客户端
- * @author WenJY
- * @date 2023-03-14 13:51
- * @return java.util.List
- */
- List getOnlineClients();
-
- /**
- * 向指定主题发送消息
- * @author WenJY
- * @date 2023-03-14 13:50
- * @param topic
- * @param payload
- * @return boolean
- */
- boolean publish(String topic,String payload);
-
- /**
- * 主动关闭指定客户端连接
- * @author WenJY
- * @date 2023-03-18 9:23
- * @param clientId
- */
- boolean closeClientById(String clientId);
-}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java
deleted file mode 100644
index b516fc5..0000000
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hw.mqtt.service.impl;
-
-import com.hw.mqtt.enums.RedisKeys;
-import com.hw.mqtt.service.IMqttBrokerService;
-import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
-import net.dreamlu.mica.core.utils.StringPool;
-import net.dreamlu.mica.redis.cache.MicaRedisCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * mqtt broker 服务
- *
- * @author L.cm
- */
-@Service
-public class MqttBrokerServiceImpl implements IMqttBrokerService {
- private static final Logger logger = LoggerFactory.getLogger(MqttBrokerServiceImpl.class);
- @Autowired private MicaRedisCache redisCache;
- @Autowired private MqttServerTemplate server;
-
- @Override
- public long getOnlineClientSize() {
- Set keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
- if (keySet.isEmpty()) {
- return 0L;
- }
- long result = 0;
- for (String redisKey : keySet) {
- Long count = redisCache.getSetOps().size(redisKey);
- if (count != null) {
- result += count;
- }
- }
- return result;
- }
-
- @Override
- public List getOnlineClients() {
- Set keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR));
- if (keySet.isEmpty()) {
- return Collections.emptyList();
- }
- List clientList = new ArrayList<>();
- for (String redisKey : keySet) {
- Set members = redisCache.sMembers(redisKey);
- if (members != null && !members.isEmpty()) {
- clientList.addAll(members);
- }
- }
- return clientList;
- }
-
- @Override
- public boolean publish(String topic, String payload) {
- boolean result = server.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
- logger.info("Mqtt publishAll result:{};topic:{};payload:{}", result, topic, payload);
- return result;
- }
-
- @Override
- public boolean closeClientById(String clientId) {
- try{
- server.close(clientId);
- }catch (Exception ex){
- return false;
- }
- return true;
- }
-
-
-}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java
deleted file mode 100644
index ac33ad8..0000000
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.hw.mqtt.task;
-
-import net.dreamlu.iot.mqtt.core.server.MqttServer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Service;
-
-import java.nio.charset.StandardCharsets;
-
-/**
- * 自定义任务:心跳下发
- * @author WenJY
- * @date 2023-03-14 12:16
- * @param null
- * @return null
- */
-@Service
-public class PublishAllTask {
- @Autowired
- private MqttServer mqttServer;
-
- @Scheduled(fixedDelay = 1000)
- public void run() {
- mqttServer.publishAll("/test/heartbeat", "心跳指令无需处理".getBytes(StandardCharsets.UTF_8));
- }
-
-}
diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java
deleted file mode 100644
index 2404422..0000000
--- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hw.mqtt.util;
-
-import net.dreamlu.mica.core.utils.CharPool;
-
-/**
- * redis 工具
- *
- * @author L.cm
- */
-public class RedisUtil {
-
- /**
- * 转换成 redis 的 pattern 规则
- *
- * @return pattern
- */
- public static String getTopicPattern(String topicFilter) {
- // mqtt 分享主题 $share/{ShareName}/{filter}
- return topicFilter
- .replace(CharPool.PLUS, CharPool.STAR)
- .replace(CharPool.HASH, CharPool.STAR);
- }
-
-}